Make it explicit raw|multiplexed stream implementation being used

fix #35761

Signed-off-by: Nicolas De Loof <nicolas.deloof@gmail.com>
This commit is contained in:
Nicolas De Loof 2019-08-28 13:46:32 +02:00
parent 7c69b6dc08
commit af5d83a641
No known key found for this signature in database
GPG Key ID: 9858809D6F8F6E7E
14 changed files with 151 additions and 28 deletions

View File

@ -153,6 +153,12 @@ func (s *containerRouter) getContainersLogs(ctx context.Context, w http.Response
return err
}
contentType := types.MediaTypeRawStream
if !tty && versions.GreaterThanOrEqualTo(httputils.VersionFromContext(ctx), "1.42") {
contentType = types.MediaTypeMultiplexedStream
}
w.Header().Set("Content-Type", contentType)
// if has a tty, we're not muxing streams. if it doesn't, we are. simple.
// this is the point of no return for writing a response. once we call
// WriteLogStream, the response has been started and errors will be
@ -598,7 +604,8 @@ func (s *containerRouter) postContainersAttach(ctx context.Context, w http.Respo
return errdefs.InvalidParameter(errors.Errorf("error attaching to container %s, hijack connection missing", containerName))
}
setupStreams := func() (io.ReadCloser, io.Writer, io.Writer, error) {
contentType := types.MediaTypeRawStream
setupStreams := func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error) {
conn, _, err := hijacker.Hijack()
if err != nil {
return nil, nil, nil, err
@ -608,7 +615,10 @@ func (s *containerRouter) postContainersAttach(ctx context.Context, w http.Respo
conn.Write([]byte{})
if upgrade {
fmt.Fprintf(conn, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
if multiplexed && versions.GreaterThanOrEqualTo(httputils.VersionFromContext(ctx), "1.42") {
contentType = types.MediaTypeMultiplexedStream
}
fmt.Fprintf(conn, "HTTP/1.1 101 UPGRADED\r\nContent-Type: "+contentType+"\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n\r\n")
} else {
fmt.Fprintf(conn, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
}
@ -632,16 +642,16 @@ func (s *containerRouter) postContainersAttach(ctx context.Context, w http.Respo
}
if err = s.backend.ContainerAttach(containerName, attachConfig); err != nil {
logrus.Errorf("Handler for %s %s returned error: %v", r.Method, r.URL.Path, err)
logrus.WithError(err).Errorf("Handler for %s %s returned error", r.Method, r.URL.Path)
// Remember to close stream if error happens
conn, _, errHijack := hijacker.Hijack()
if errHijack == nil {
if errHijack != nil {
logrus.WithError(err).Errorf("Handler for %s %s: unable to close stream; error when hijacking connection", r.Method, r.URL.Path)
} else {
statusCode := httpstatus.FromError(err)
statusText := http.StatusText(statusCode)
fmt.Fprintf(conn, "HTTP/1.1 %d %s\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n%s\r\n", statusCode, statusText, err.Error())
fmt.Fprintf(conn, "HTTP/1.1 %d %s\r\nContent-Type: %s\r\n\r\n%s\r\n", statusCode, statusText, contentType, err.Error())
httputils.CloseStreams(conn)
} else {
logrus.Errorf("Error Hijacking: %v", err)
}
}
return nil
@ -661,7 +671,7 @@ func (s *containerRouter) wsContainersAttach(ctx context.Context, w http.Respons
version := httputils.VersionFromContext(ctx)
setupStreams := func() (io.ReadCloser, io.Writer, io.Writer, error) {
setupStreams := func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error) {
wsChan := make(chan *websocket.Conn)
h := func(conn *websocket.Conn) {
wsChan <- conn

View File

@ -98,7 +98,11 @@ func (s *containerRouter) postContainerExecStart(ctx context.Context, w http.Res
defer httputils.CloseStreams(inStream, outStream)
if _, ok := r.Header["Upgrade"]; ok {
fmt.Fprint(outStream, "HTTP/1.1 101 UPGRADED\r\nContent-Type: application/vnd.docker.raw-stream\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n")
contentType := types.MediaTypeRawStream
if !execStartCheck.Tty && versions.GreaterThanOrEqualTo(httputils.VersionFromContext(ctx), "1.42") {
contentType = types.MediaTypeMultiplexedStream
}
fmt.Fprint(outStream, "HTTP/1.1 101 UPGRADED\r\nContent-Type: "+contentType+"\r\nConnection: Upgrade\r\nUpgrade: tcp\r\n")
} else {
fmt.Fprint(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n")
}

View File

@ -3,7 +3,6 @@ package swarm // import "github.com/docker/docker/api/server/router/swarm"
import (
"context"
"fmt"
"io"
"net/http"
"github.com/docker/docker/api/server/httputils"
@ -15,7 +14,7 @@ import (
// swarmLogs takes an http response, request, and selector, and writes the logs
// specified by the selector to the response
func (sr *swarmRouter) swarmLogs(ctx context.Context, w io.Writer, r *http.Request, selector *backend.LogSelector) error {
func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *http.Request, selector *backend.LogSelector) error {
// Args are validated before the stream starts because when it starts we're
// sending HTTP 200 by writing an empty chunk of data to tell the client that
// daemon is going to stream. By sending this initial HTTP 200 we can't report
@ -63,6 +62,11 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w io.Writer, r *http.Reque
return err
}
contentType := basictypes.MediaTypeRawStream
if !tty && versions.GreaterThanOrEqualTo(httputils.VersionFromContext(ctx), "1.42") {
contentType = basictypes.MediaTypeMultiplexedStream
}
w.Header().Set("Content-Type", contentType)
httputils.WriteLogStream(ctx, w, msgs, logsConfig, !tty)
return nil
}

View File

@ -6478,6 +6478,9 @@ paths:
Note: This endpoint works only for containers with the `json-file` or
`journald` logging driver.
produces:
- "application/vnd.docker.raw-stream"
- "application/vnd.docker.multiplexed-stream"
operationId: "ContainerLogs"
responses:
200:
@ -7189,7 +7192,8 @@ paths:
### Stream format
When the TTY setting is disabled in [`POST /containers/create`](#operation/ContainerCreate),
the stream over the hijacked connected is multiplexed to separate out
the HTTP Content-Type header is set to application/vnd.docker.multiplexed-stream
and the stream over the hijacked connected is multiplexed to separate out
`stdout` and `stderr`. The stream consists of a series of frames, each
containing a header and a payload.
@ -7233,6 +7237,7 @@ paths:
operationId: "ContainerAttach"
produces:
- "application/vnd.docker.raw-stream"
- "application/vnd.docker.multiplexed-stream"
responses:
101:
description: "no error, hints proxy about hijacking"
@ -9015,6 +9020,7 @@ paths:
- "application/json"
produces:
- "application/vnd.docker.raw-stream"
- "application/vnd.docker.multiplexed-stream"
responses:
200:
description: "No error"
@ -10913,6 +10919,9 @@ paths:
**Note**: This endpoint works only for services with the `local`,
`json-file` or `journald` logging drivers.
produces:
- "application/vnd.docker.raw-stream"
- "application/vnd.docker.multiplexed-stream"
operationId: "ServiceLogs"
responses:
200:
@ -11168,6 +11177,9 @@ paths:
**Note**: This endpoint works only for services with the `local`,
`json-file` or `journald` logging drivers.
operationId: "TaskLogs"
produces:
- "application/vnd.docker.raw-stream"
- "application/vnd.docker.multiplexed-stream"
responses:
200:
description: "logs returned as a stream in response body"

View File

@ -10,7 +10,7 @@ import (
// ContainerAttachConfig holds the streams to use when connecting to a container to view logs.
type ContainerAttachConfig struct {
GetStreams func() (io.ReadCloser, io.Writer, io.Writer, error)
GetStreams func(multiplexed bool) (io.ReadCloser, io.Writer, io.Writer, error)
UseStdin bool
UseStdout bool
UseStderr bool

View File

@ -112,10 +112,16 @@ type NetworkListOptions struct {
Filters filters.Args
}
// NewHijackedResponse intializes a HijackedResponse type
func NewHijackedResponse(conn net.Conn, mediaType string) HijackedResponse {
return HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn), mediaType: mediaType}
}
// HijackedResponse holds connection information for a hijacked request.
type HijackedResponse struct {
Conn net.Conn
Reader *bufio.Reader
mediaType string
Conn net.Conn
Reader *bufio.Reader
}
// Close closes the hijacked connection and reader.
@ -123,6 +129,15 @@ func (h *HijackedResponse) Close() {
h.Conn.Close()
}
// MediaType let client know if HijackedResponse hold a raw or multiplexed stream.
// returns false if HTTP Content-Type is not relevant, and container must be inspected
func (h *HijackedResponse) MediaType() (string, bool) {
if h.mediaType == "" {
return "", false
}
return h.mediaType, true
}
// CloseWriter is an interface that implements structs
// that close input streams to prevent from writing.
type CloseWriter interface {

View File

@ -18,6 +18,14 @@ import (
"github.com/docker/go-connections/nat"
)
const (
// MediaTypeRawStream is vendor specific MIME-Type set for raw TTY streams
MediaTypeRawStream = "application/vnd.docker.raw-stream"
// MediaTypeMultiplexedStream is vendor specific MIME-Type set for stdin/stdout/stderr multiplexed streams
MediaTypeMultiplexedStream = "application/vnd.docker.multiplexed-stream"
)
// RootFS returns Image's RootFS description including the layer IDs.
type RootFS struct {
Type string `json:",omitempty"`

View File

@ -52,6 +52,8 @@ func (cli *Client) ContainerAttach(ctx context.Context, container string, option
query.Set("logs", "1")
}
headers := map[string][]string{"Content-Type": {"text/plain"}}
headers := map[string][]string{
"Content-Type": {"text/plain"},
}
return cli.postHijacked(ctx, "/containers/"+container+"/attach", query, nil, headers)
}

View File

@ -36,7 +36,9 @@ func (cli *Client) ContainerExecStart(ctx context.Context, execID string, config
// and the a reader to get output. It's up to the called to close
// the hijacked connection by calling types.HijackedResponse.Close.
func (cli *Client) ContainerExecAttach(ctx context.Context, execID string, config types.ExecStartCheck) (types.HijackedResponse, error) {
headers := map[string][]string{"Content-Type": {"application/json"}}
headers := map[string][]string{
"Content-Type": {"application/json"},
}
return cli.postHijacked(ctx, "/exec/"+execID+"/start", nil, config, headers)
}

View File

@ -12,6 +12,7 @@ import (
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/versions"
"github.com/docker/go-connections/sockets"
"github.com/pkg/errors"
)
@ -30,12 +31,12 @@ func (cli *Client) postHijacked(ctx context.Context, path string, query url.Valu
}
req = cli.addHeaders(req, headers)
conn, err := cli.setupHijackConn(ctx, req, "tcp")
conn, mediaType, err := cli.setupHijackConn(ctx, req, "tcp")
if err != nil {
return types.HijackedResponse{}, err
}
return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
return types.NewHijackedResponse(conn, mediaType), err
}
// DialHijack returns a hijacked connection with negotiated protocol proto.
@ -46,7 +47,8 @@ func (cli *Client) DialHijack(ctx context.Context, url, proto string, meta map[s
}
req = cli.addHeaders(req, meta)
return cli.setupHijackConn(ctx, req, proto)
conn, _, err := cli.setupHijackConn(ctx, req, proto)
return conn, err
}
// fallbackDial is used when WithDialer() was not called.
@ -61,7 +63,7 @@ func fallbackDial(proto, addr string, tlsConfig *tls.Config) (net.Conn, error) {
return net.Dial(proto, addr)
}
func (cli *Client) setupHijackConn(ctx context.Context, req *http.Request, proto string) (net.Conn, error) {
func (cli *Client) setupHijackConn(ctx context.Context, req *http.Request, proto string) (net.Conn, string, error) {
req.Host = cli.addr
req.Header.Set("Connection", "Upgrade")
req.Header.Set("Upgrade", proto)
@ -69,7 +71,7 @@ func (cli *Client) setupHijackConn(ctx context.Context, req *http.Request, proto
dialer := cli.Dialer()
conn, err := dialer(ctx)
if err != nil {
return nil, errors.Wrap(err, "cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
return nil, "", errors.Wrap(err, "cannot connect to the Docker daemon. Is 'docker daemon' running on this host?")
}
// When we set up a TCP connection for hijack, there could be long periods
@ -91,18 +93,18 @@ func (cli *Client) setupHijackConn(ctx context.Context, req *http.Request, proto
//nolint:staticcheck // ignore SA1019 for connecting to old (pre go1.8) daemons
if err != httputil.ErrPersistEOF {
if err != nil {
return nil, err
return nil, "", err
}
if resp.StatusCode != http.StatusSwitchingProtocols {
resp.Body.Close()
return nil, fmt.Errorf("unable to upgrade to %s, received %d", proto, resp.StatusCode)
return nil, "", fmt.Errorf("unable to upgrade to %s, received %d", proto, resp.StatusCode)
}
}
c, br := clientconn.Hijack()
if br.Buffered() > 0 {
// If there is buffered content, wrap the connection. We return an
// object that implements CloseWrite iff the underlying connection
// object that implements CloseWrite if the underlying connection
// implements it.
if _, ok := c.(types.CloseWriter); ok {
c = &hijackedConnCloseWriter{&hijackedConn{c, br}}
@ -113,7 +115,13 @@ func (cli *Client) setupHijackConn(ctx context.Context, req *http.Request, proto
br.Reset(nil)
}
return c, nil
var mediaType string
if versions.GreaterThanOrEqualTo(cli.ClientVersion(), "1.42") {
// Prior to 1.42, Content-Type is always set to raw-stream and not relevant
mediaType = resp.Header.Get("Content-Type")
}
return c, mediaType, nil
}
// hijackedConn wraps a net.Conn and is returned by setupHijackConn in the case

View File

@ -50,13 +50,14 @@ func (daemon *Daemon) ContainerAttach(prefixOrName string, c *backend.ContainerA
}
ctr.StreamConfig.AttachStreams(&cfg)
inStream, outStream, errStream, err := c.GetStreams()
multiplexed := !ctr.Config.Tty && c.MuxStreams
inStream, outStream, errStream, err := c.GetStreams(multiplexed)
if err != nil {
return err
}
defer inStream.Close()
if !ctr.Config.Tty && c.MuxStreams {
if multiplexed {
errStream = stdcopy.NewStdWriter(errStream, stdcopy.Stderr)
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
}

View File

@ -73,6 +73,10 @@ keywords: "API, Docker, rcli, REST, documentation"
syntax, `<IDType>://<ID>` is now recognised. Support for specific `<IDType>` values
depends on the underlying implementation and Windows version. This change is not
versioned, and affects all API versions if the daemon has this patch.
* `GET /containers/{id}/attach`, `GET /exec/{id}/start`, `GET /containers/{id}/logs`
`GET /services/{id}/logs` and `GET /tasks/{id}/logs` now set Content-Type header
to `application/vnd.docker.multiplexed-stream` when a multiplexed stdout/stderr
stream is sent to client, `application/vnd.docker.raw-stream` otherwise.
## v1.41 API changes

View File

@ -193,6 +193,9 @@ func (s *DockerSuite) TestPostContainersAttach(c *testing.T) {
resp, err := client.ContainerAttach(context.Background(), cid, attachOpts)
assert.NilError(c, err)
mediaType, b := resp.MediaType()
assert.Check(c, b)
assert.Equal(c, mediaType, types.MediaTypeMultiplexedStream)
expectSuccess(resp.Conn, resp.Reader, "stdout", false)
// Make sure we do see "hello" if Logs is true

View File

@ -0,0 +1,50 @@
package container // import "github.com/docker/docker/integration/container"
import (
"context"
"testing"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"gotest.tools/v3/assert"
)
func TestAttachWithTTY(t *testing.T) {
testAttach(t, true, types.MediaTypeRawStream)
}
func TestAttachWithoutTTy(t *testing.T) {
testAttach(t, false, types.MediaTypeMultiplexedStream)
}
func testAttach(t *testing.T, tty bool, expected string) {
defer setupTest(t)()
client := testEnv.APIClient()
resp, err := client.ContainerCreate(context.Background(),
&container.Config{
Image: "busybox",
Cmd: []string{"echo", "hello"},
Tty: tty,
},
&container.HostConfig{},
&network.NetworkingConfig{},
nil,
"",
)
assert.NilError(t, err)
container := resp.ID
defer client.ContainerRemove(context.Background(), container, types.ContainerRemoveOptions{
Force: true,
})
attach, err := client.ContainerAttach(context.Background(), container, types.ContainerAttachOptions{
Stdout: true,
Stderr: true,
})
assert.NilError(t, err)
mediaType, ok := attach.MediaType()
assert.Check(t, ok)
assert.Check(t, mediaType == expected)
}