diff --git a/daemon/attach.go b/daemon/attach.go index f2cc6bd469..599b272472 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -83,7 +83,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { var ( cStdin io.ReadCloser cStdout, cStderr io.Writer - cStdinCloser io.Closer ) if stdin { @@ -94,7 +93,6 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { io.Copy(w, job.Stdin) }() cStdin = r - cStdinCloser = job.Stdin } if stdout { cStdout = job.Stdout @@ -103,7 +101,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { cStderr = job.Stderr } - <-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdinCloser, cStdout, cStderr) + <-daemon.attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdout, cStderr) // If we are in stdinonce mode, wait for the process to end // otherwise, simply return if container.Config.StdinOnce && !container.Config.Tty { @@ -113,7 +111,7 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { return engine.StatusOK } -func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error { +func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { var ( cStdout, cStderr io.ReadCloser nJobs int @@ -130,10 +128,10 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t go func() { log.Debugf("attach: stdin: begin") defer log.Debugf("attach: stdin: end") - // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr if stdinOnce && !tty { defer cStdin.Close() } else { + // No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr defer func() { if cStdout != nil { cStdout.Close() @@ -173,9 +171,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t if stdinOnce && stdin != nil { defer stdin.Close() } - if stdinCloser != nil { - defer stdinCloser.Close() - } _, err := io.Copy(stdout, cStdout) if err == io.ErrClosedPipe { err = nil @@ -189,9 +184,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t } else { // Point stdout of container to a no-op writer. go func() { - if stdinCloser != nil { - defer stdinCloser.Close() - } if cStdout, err := streamConfig.StdoutPipe(); err != nil { log.Errorf("attach: stdout pipe: %s", err) } else { @@ -213,9 +205,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t if stdinOnce && stdin != nil { defer stdin.Close() } - if stdinCloser != nil { - defer stdinCloser.Close() - } _, err := io.Copy(stderr, cStderr) if err == io.ErrClosedPipe { err = nil @@ -229,10 +218,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t } else { // Point stderr at a no-op writer. go func() { - if stdinCloser != nil { - defer stdinCloser.Close() - } - if cStderr, err := streamConfig.StderrPipe(); err != nil { log.Errorf("attach: stdout pipe: %s", err) } else { @@ -251,8 +236,6 @@ func (daemon *Daemon) attach(streamConfig *StreamConfig, openStdin, stdinOnce, t } }() - // FIXME: how to clean up the stdin goroutine without the unwanted side effect - // of closing the passed stdin? Add an intermediary io.Pipe? for i := 0; i < nJobs; i++ { log.Debugf("attach: waiting for job %d/%d", i+1, nJobs) if err := <-errors; err != nil { diff --git a/daemon/exec.go b/daemon/exec.go index eae096e680..058b712625 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -155,7 +155,6 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status { var ( cStdin io.ReadCloser cStdout, cStderr io.Writer - cStdinCloser io.Closer execName = job.Args[0] ) @@ -183,10 +182,10 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status { r, w := io.Pipe() go func() { defer w.Close() + defer log.Debugf("Closing buffered stdin pipe") io.Copy(w, job.Stdin) }() cStdin = r - cStdinCloser = job.Stdin } if execConfig.OpenStdout { cStdout = job.Stdout @@ -204,7 +203,7 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) engine.Status { execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin } - attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdinCloser, cStdout, cStderr) + attachErr := d.attach(&execConfig.StreamConfig, execConfig.OpenStdin, false, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr) execErr := make(chan error) diff --git a/integration-cli/docker_cli_run_test.go b/integration-cli/docker_cli_run_test.go index 6f1c846150..072c6f6b44 100644 --- a/integration-cli/docker_cli_run_test.go +++ b/integration-cli/docker_cli_run_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "fmt" + "io" "io/ioutil" "net" "os" @@ -2446,3 +2447,46 @@ func TestRunVolumesCleanPaths(t *testing.T) { logDone("run - volume paths are cleaned") } + +// Regression test for #3631 +func TestRunSlowStdoutConsumer(t *testing.T) { + defer deleteAllContainers() + + c := exec.Command("/bin/bash", "-c", dockerBinary+` run --rm -i busybox /bin/sh -c "dd if=/dev/zero of=/foo bs=1024 count=2000 &>/dev/null; catv /foo"`) + + stdout, err := c.StdoutPipe() + if err != nil { + t.Fatal(err) + } + + if err := c.Start(); err != nil { + t.Fatal(err) + } + n, err := consumeSlow(stdout, 10000, 5*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + expected := 2 * 1024 * 2000 + if n != expected { + t.Fatalf("Expected %d, got %d", expected, n) + } + + logDone("run - slow consumer") +} + +func consumeSlow(reader io.Reader, chunkSize int, interval time.Duration) (n int, err error) { + buffer := make([]byte, chunkSize) + for { + var readBytes int + readBytes, err = reader.Read(buffer) + n += readBytes + if err != nil { + if err == io.EOF { + err = nil + } + return + } + time.Sleep(interval) + } +}