mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	Handle blocked I/O of exec'd processes
This is the second part to https://github.com/containerd/containerd/pull/3361 and will help process delete not block forever when the process exists but the I/O was inherited by a subprocess that lives on. Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
		
							parent
							
								
									384c782721
								
							
						
					
					
						commit
						b5f28865ef
					
				
					 6 changed files with 43 additions and 114 deletions
				
			
		| 
						 | 
				
			
			@ -730,7 +730,7 @@ func (i *rio) Close() error {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (i *rio) Wait() {
 | 
			
		||||
	i.sc.Wait()
 | 
			
		||||
	i.sc.Wait(context.Background())
 | 
			
		||||
 | 
			
		||||
	i.IO.Wait()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,7 @@
 | 
			
		|||
package stream // import "github.com/docker/docker/container/stream"
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"io/ioutil"
 | 
			
		||||
| 
						 | 
				
			
			@ -24,11 +25,12 @@ import (
 | 
			
		|||
// copied and delivered to all StdoutPipe and StderrPipe consumers, using
 | 
			
		||||
// a kind of "broadcaster".
 | 
			
		||||
type Config struct {
 | 
			
		||||
	sync.WaitGroup
 | 
			
		||||
	wg        sync.WaitGroup
 | 
			
		||||
	stdout    *broadcaster.Unbuffered
 | 
			
		||||
	stderr    *broadcaster.Unbuffered
 | 
			
		||||
	stdin     io.ReadCloser
 | 
			
		||||
	stdinPipe io.WriteCloser
 | 
			
		||||
	dio       *cio.DirectIO
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewConfig creates a stream config and initializes
 | 
			
		||||
| 
						 | 
				
			
			@ -115,14 +117,15 @@ func (c *Config) CloseStreams() error {
 | 
			
		|||
 | 
			
		||||
// CopyToPipe connects streamconfig with a libcontainerd.IOPipe
 | 
			
		||||
func (c *Config) CopyToPipe(iop *cio.DirectIO) {
 | 
			
		||||
	c.dio = iop
 | 
			
		||||
	copyFunc := func(w io.Writer, r io.ReadCloser) {
 | 
			
		||||
		c.Add(1)
 | 
			
		||||
		c.wg.Add(1)
 | 
			
		||||
		go func() {
 | 
			
		||||
			if _, err := pools.Copy(w, r); err != nil {
 | 
			
		||||
				logrus.Errorf("stream copy error: %v", err)
 | 
			
		||||
			}
 | 
			
		||||
			r.Close()
 | 
			
		||||
			c.Done()
 | 
			
		||||
			c.wg.Done()
 | 
			
		||||
		}()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -144,3 +147,23 @@ func (c *Config) CopyToPipe(iop *cio.DirectIO) {
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Wait for the stream to close
 | 
			
		||||
// Wait supports timeouts via the context to unblock and forcefully
 | 
			
		||||
// close the io streams
 | 
			
		||||
func (c *Config) Wait(ctx context.Context) {
 | 
			
		||||
	done := make(chan struct{}, 1)
 | 
			
		||||
	go func() {
 | 
			
		||||
		c.wg.Wait()
 | 
			
		||||
		close(done)
 | 
			
		||||
	}()
 | 
			
		||||
	select {
 | 
			
		||||
	case <-done:
 | 
			
		||||
	case <-ctx.Done():
 | 
			
		||||
		if c.dio != nil {
 | 
			
		||||
			c.dio.Cancel()
 | 
			
		||||
			c.dio.Wait()
 | 
			
		||||
			c.dio.Close()
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,6 +1,7 @@
 | 
			
		|||
package exec // import "github.com/docker/docker/daemon/exec"
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -58,7 +59,7 @@ func (i *rio) Close() error {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func (i *rio) Wait() {
 | 
			
		||||
	i.sc.Wait()
 | 
			
		||||
	i.sc.Wait(context.Background())
 | 
			
		||||
 | 
			
		||||
	i.IO.Wait()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -55,8 +55,9 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 | 
			
		|||
			if err != nil {
 | 
			
		||||
				logrus.WithError(err).Warnf("failed to delete container %s from containerd", c.ID)
 | 
			
		||||
			}
 | 
			
		||||
			ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
 | 
			
		||||
 | 
			
		||||
			c.StreamConfig.Wait()
 | 
			
		||||
			c.StreamConfig.Wait(ctx)
 | 
			
		||||
			c.Reset(false)
 | 
			
		||||
 | 
			
		||||
			exitStatus := container.ExitStatus{
 | 
			
		||||
| 
						 | 
				
			
			@ -123,7 +124,10 @@ func (daemon *Daemon) ProcessEvent(id string, e libcontainerdtypes.EventType, ei
 | 
			
		|||
			defer execConfig.Unlock()
 | 
			
		||||
			execConfig.ExitCode = &ec
 | 
			
		||||
			execConfig.Running = false
 | 
			
		||||
			execConfig.StreamConfig.Wait()
 | 
			
		||||
 | 
			
		||||
			ctx, _ := context.WithTimeout(context.Background(), 2*time.Second)
 | 
			
		||||
			execConfig.StreamConfig.Wait(ctx)
 | 
			
		||||
 | 
			
		||||
			if err := execConfig.CloseStreams(); err != nil {
 | 
			
		||||
				logrus.Errorf("failed to cleanup exec %s streams: %s", c.ID, err)
 | 
			
		||||
			}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,6 @@ import (
 | 
			
		|||
	"reflect"
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"sync"
 | 
			
		||||
	"time"
 | 
			
		||||
| 
						 | 
				
			
			@ -19,7 +18,6 @@ import (
 | 
			
		|||
	"github.com/docker/docker/client"
 | 
			
		||||
	"github.com/docker/docker/integration-cli/cli"
 | 
			
		||||
	"github.com/docker/docker/integration-cli/cli/build"
 | 
			
		||||
	"github.com/docker/docker/pkg/parsers/kernel"
 | 
			
		||||
	"github.com/go-check/check"
 | 
			
		||||
	"gotest.tools/assert"
 | 
			
		||||
	is "gotest.tools/assert/cmp"
 | 
			
		||||
| 
						 | 
				
			
			@ -534,100 +532,3 @@ func (s *DockerSuite) TestExecEnvLinksHost(c *check.C) {
 | 
			
		|||
	assert.Check(c, is.Contains(out, "HOSTNAME=myhost"))
 | 
			
		||||
	assert.Check(c, is.Contains(out, "DB_NAME=/bar/db"))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *DockerSuite) TestExecWindowsOpenHandles(c *check.C) {
 | 
			
		||||
	testRequires(c, DaemonIsWindows)
 | 
			
		||||
 | 
			
		||||
	if runtime.GOOS == "windows" {
 | 
			
		||||
		v, err := kernel.GetKernelVersion()
 | 
			
		||||
		assert.NilError(c, err)
 | 
			
		||||
		build, _ := strconv.Atoi(strings.Split(strings.SplitN(v.String(), " ", 3)[2][1:], ".")[0])
 | 
			
		||||
		if build >= 17743 {
 | 
			
		||||
			c.Skip("Temporarily disabled on RS5 17743+ builds due to platform bug")
 | 
			
		||||
 | 
			
		||||
			// This is being tracked internally. @jhowardmsft. Summary of failure
 | 
			
		||||
			// from an email in early July 2018 below:
 | 
			
		||||
			//
 | 
			
		||||
			// Platform regression. In cmd.exe by the look of it. I can repro
 | 
			
		||||
			// it outside of CI.  It fails the same on 17681, 17676 and even as
 | 
			
		||||
			// far back as 17663, over a month old. From investigating, I can see
 | 
			
		||||
			// what's happening in the container, but not the reason. The test
 | 
			
		||||
			// starts a long-running container based on the Windows busybox image.
 | 
			
		||||
			// It then adds another process (docker exec) to that container to
 | 
			
		||||
			// sleep. It loops waiting for two instances of busybox.exe running,
 | 
			
		||||
			// and cmd.exe to quit. What's actually happening is that the second
 | 
			
		||||
			// exec hangs indefinitely, and from docker top, I can see
 | 
			
		||||
			// "OpenWith.exe" running.
 | 
			
		||||
 | 
			
		||||
			//Manual repro would be
 | 
			
		||||
			//# Start the first long-running container
 | 
			
		||||
			//docker run --rm -d --name test busybox sleep 300
 | 
			
		||||
 | 
			
		||||
			//# In another window, docker top test. There should be a single instance of busybox.exe running
 | 
			
		||||
			//# In a third window, docker exec test cmd /c start sleep 10  NOTE THIS HANGS UNTIL 5 MIN TIMEOUT
 | 
			
		||||
			//# In the second window, run docker top test. Note that OpenWith.exe is running, one cmd.exe and only one busybox. I would expect no "OpenWith" and two busybox.exe's.
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	runSleepingContainer(c, "-d", "--name", "test")
 | 
			
		||||
	exec := make(chan bool)
 | 
			
		||||
	go func() {
 | 
			
		||||
		dockerCmd(c, "exec", "test", "cmd", "/c", "start sleep 10")
 | 
			
		||||
		exec <- true
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	count := 0
 | 
			
		||||
	for {
 | 
			
		||||
		top := make(chan string)
 | 
			
		||||
		var out string
 | 
			
		||||
		go func() {
 | 
			
		||||
			out, _ := dockerCmd(c, "top", "test")
 | 
			
		||||
			top <- out
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		select {
 | 
			
		||||
		case <-time.After(time.Second * 5):
 | 
			
		||||
			c.Fatal("timed out waiting for top while exec is exiting")
 | 
			
		||||
		case out = <-top:
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		if strings.Count(out, "busybox.exe") == 2 && !strings.Contains(out, "cmd.exe") {
 | 
			
		||||
			// The initial exec process (cmd.exe) has exited, and both sleeps are currently running
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
		count++
 | 
			
		||||
		if count >= 30 {
 | 
			
		||||
			c.Fatal("too many retries")
 | 
			
		||||
		}
 | 
			
		||||
		time.Sleep(1 * time.Second)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	inspect := make(chan bool)
 | 
			
		||||
	go func() {
 | 
			
		||||
		dockerCmd(c, "inspect", "test")
 | 
			
		||||
		inspect <- true
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case <-time.After(time.Second * 5):
 | 
			
		||||
		c.Fatal("timed out waiting for inspect while exec is exiting")
 | 
			
		||||
	case <-inspect:
 | 
			
		||||
		break
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Ensure the background sleep is still running
 | 
			
		||||
	out, _ := dockerCmd(c, "top", "test")
 | 
			
		||||
	assert.Equal(c, strings.Count(out, "busybox.exe"), 2)
 | 
			
		||||
 | 
			
		||||
	// The exec should exit when the background sleep exits
 | 
			
		||||
	select {
 | 
			
		||||
	case <-time.After(time.Second * 15):
 | 
			
		||||
		c.Fatal("timed out waiting for async exec to exit")
 | 
			
		||||
	case <-exec:
 | 
			
		||||
		// Ensure the background sleep has actually exited
 | 
			
		||||
		out, _ := dockerCmd(c, "top", "test")
 | 
			
		||||
		assert.Equal(c, strings.Count(out, "busybox.exe"), 1)
 | 
			
		||||
		break
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -652,13 +652,6 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
 | 
			
		|||
					}).Error("exit event")
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
			_, err = p.Delete(context.Background())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				c.logger.WithError(err).WithFields(logrus.Fields{
 | 
			
		||||
					"container": ei.ContainerID,
 | 
			
		||||
					"process":   ei.ProcessID,
 | 
			
		||||
				}).Warn("failed to delete process")
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			ctr, err := c.getContainer(ctx, ei.ContainerID)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -672,11 +665,18 @@ func (c *client) processEvent(ctx context.Context, et libcontainerdtypes.EventTy
 | 
			
		|||
					c.logger.WithFields(logrus.Fields{
 | 
			
		||||
						"container": ei.ContainerID,
 | 
			
		||||
						"error":     err,
 | 
			
		||||
					}).Error("failed to find container")
 | 
			
		||||
					}).Error("failed to get container labels")
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				newFIFOSet(labels[DockerContainerBundlePath], ei.ProcessID, true, false).Close()
 | 
			
		||||
			}
 | 
			
		||||
			_, err = p.Delete(context.Background())
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				c.logger.WithError(err).WithFields(logrus.Fields{
 | 
			
		||||
					"container": ei.ContainerID,
 | 
			
		||||
					"process":   ei.ProcessID,
 | 
			
		||||
				}).Warn("failed to delete process")
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue