1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #21048 from LK4D4/fix_attach_leak

daemon: fix hanging attaches on initial start failures
This commit is contained in:
David Calavera 2016-03-14 10:16:45 -07:00
commit 8e74cf59d0
4 changed files with 127 additions and 37 deletions

View file

@ -13,6 +13,8 @@ import (
"syscall"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/exec"
"github.com/docker/docker/daemon/execdriver"
@ -78,6 +80,7 @@ type CommonContainer struct {
// logDriver for closing
LogDriver logger.Logger `json:"-"`
LogCopier *logger.Copier `json:"-"`
attachContext *attachContext
}
// NewBaseContainer creates a new container with its
@ -91,6 +94,7 @@ func NewBaseContainer(id, root string) *Container {
Root: root,
MountPoints: make(map[string]*volume.MountPoint),
StreamConfig: runconfig.NewStreamConfig(),
attachContext: &attachContext{},
},
}
}
@ -359,12 +363,13 @@ func (container *Container) GetExecIDs() []string {
// Attach connects to the container's TTY, delegating to standard
// streams or websockets depending on the configuration.
func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
return AttachStreams(container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys)
ctx := container.InitAttachContext()
return AttachStreams(ctx, container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr, keys)
}
// AttachStreams connects streams to a TTY.
// Used by exec too. Should this move somewhere else?
func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
func AttachStreams(ctx context.Context, streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer, keys []byte) chan error {
var (
cStdout, cStderr io.ReadCloser
cStdin io.WriteCloser
@ -393,21 +398,6 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
return
}
logrus.Debugf("attach: stdin: begin")
defer func() {
if stdinOnce && !tty {
cStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
}
}
wg.Done()
logrus.Debugf("attach: stdin: end")
}()
var err error
if tty {
@ -422,23 +412,26 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
if err != nil {
logrus.Errorf("attach: stdin: %s", err)
errors <- err
return
}
if stdinOnce && !tty {
cStdin.Close()
} else {
// No matter what, when stdin is closed (io.Copy unblock), close stdout and stderr
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
}
}
logrus.Debugf("attach: stdin: end")
wg.Done()
}()
attachStream := func(name string, stream io.Writer, streamPipe io.ReadCloser) {
if stream == nil {
return
}
defer func() {
// Make sure stdin gets closed
if stdin != nil {
stdin.Close()
}
streamPipe.Close()
wg.Done()
logrus.Debugf("attach: %s: end", name)
}()
logrus.Debugf("attach: %s: begin", name)
_, err := io.Copy(stream, streamPipe)
@ -449,13 +442,39 @@ func AttachStreams(streamConfig *runconfig.StreamConfig, openStdin, stdinOnce, t
logrus.Errorf("attach: %s: %v", name, err)
errors <- err
}
// Make sure stdin gets closed
if stdin != nil {
stdin.Close()
}
streamPipe.Close()
logrus.Debugf("attach: %s: end", name)
wg.Done()
}
go attachStream("stdout", stdout, cStdout)
go attachStream("stderr", stderr, cStderr)
return promise.Go(func() error {
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
case <-ctx.Done():
// close all pipes
if cStdin != nil {
cStdin.Close()
}
if cStdout != nil {
cStdout.Close()
}
if cStderr != nil {
cStderr.Close()
}
<-done
}
close(errors)
for err := range errors {
if err != nil {
@ -898,3 +917,31 @@ func (container *Container) UpdateMonitor(restartPolicy containertypes.RestartPo
}
monitor.mux.Unlock()
}
type attachContext struct {
ctx context.Context
cancel context.CancelFunc
mu sync.Mutex
}
// InitAttachContext initialize or returns existing context for attach calls to
// track container liveness.
func (container *Container) InitAttachContext() context.Context {
container.attachContext.mu.Lock()
defer container.attachContext.mu.Unlock()
if container.attachContext.ctx == nil {
container.attachContext.ctx, container.attachContext.cancel = context.WithCancel(context.Background())
}
return container.attachContext.ctx
}
// CancelAttachContext cancel attach context. All attach calls should detach
// after this call.
func (container *Container) CancelAttachContext() {
container.attachContext.mu.Lock()
if container.attachContext.ctx != nil {
container.attachContext.cancel()
container.attachContext.ctx = nil
}
container.attachContext.mu.Unlock()
}

View file

@ -6,6 +6,8 @@ import (
"strings"
"time"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/container"
"github.com/docker/docker/daemon/exec"
@ -181,7 +183,7 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io.
ec.NewNopInputPipe()
}
attachErr := container.AttachStreams(ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
attachErr := container.AttachStreams(context.Background(), ec.StreamConfig, ec.OpenStdin, true, ec.ProcessConfig.Tty, cStdin, cStdout, cStderr, ec.DetachKeys)
execErr := make(chan error)

View file

@ -176,4 +176,5 @@ func (daemon *Daemon) Cleanup(container *container.Container) {
if err := container.UnmountVolumes(false, daemon.LogVolumeEvent); err != nil {
logrus.Warnf("%s cleanup: Failed to umount volumes: %v", container.ID, err)
}
container.CancelAttachContext()
}

View file

@ -3,6 +3,7 @@ package main
import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io/ioutil"
"net"
@ -4220,3 +4221,42 @@ func (s *DockerSuite) TestRunNamedVolumesFromNotRemoved(c *check.C) {
out, _ := dockerCmd(c, "volume", "ls", "-q")
c.Assert(strings.TrimSpace(out), checker.Equals, "test")
}
func (s *DockerSuite) TestRunAttachFailedNoLeak(c *check.C) {
type info struct {
NGoroutines int
}
getNGoroutines := func() int {
var i info
status, b, err := sockRequest("GET", "/info", nil)
c.Assert(err, checker.IsNil)
c.Assert(status, checker.Equals, 200)
c.Assert(json.Unmarshal(b, &i), checker.IsNil)
return i.NGoroutines
}
nroutines := getNGoroutines()
runSleepingContainer(c, "--name=test", "-p", "8000:8000")
out, _, err := dockerCmdWithError("run", "-p", "8000:8000", "busybox", "true")
c.Assert(err, checker.NotNil)
// check for windows error as well
c.Assert(strings.Contains(string(out), "port is already allocated") || strings.Contains(string(out), "were not connected because a duplicate name exists"), checker.Equals, true, check.Commentf("Output: %s", out))
dockerCmd(c, "rm", "-f", "test")
// NGoroutines is not updated right away, so we need to wait before failing
t := time.After(30 * time.Second)
for {
select {
case <-t:
n := getNGoroutines()
c.Assert(n <= nroutines, checker.Equals, true, check.Commentf("leaked goroutines: expected less than or equal to %d, got: %d", nroutines, n))
default:
if n := getNGoroutines(); n <= nroutines {
return
}
time.Sleep(200 * time.Millisecond)
}
}
}