diff --git a/daemon/container.go b/daemon/container.go index d9c809cee5..5b41438680 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -53,7 +53,7 @@ type Container struct { Args []string Config *runconfig.Config - State State + State *State Image string NetworkSettings *NetworkSettings @@ -74,8 +74,7 @@ type Container struct { daemon *Daemon MountLabel, ProcessLabel string - waitLock chan struct{} - Volumes map[string]string + Volumes map[string]string // Store rw/ro in a separate structure to preserve reverse-compatibility on-disk. // Easier than migrating older container configs :) VolumesRW map[string]bool @@ -284,7 +283,6 @@ func (container *Container) Start() (err error) { if err := container.startLoggingToDisk(); err != nil { return err } - container.waitLock = make(chan struct{}) return container.waitForStart() } @@ -293,7 +291,7 @@ func (container *Container) Run() error { if err := container.Start(); err != nil { return err } - container.Wait() + container.State.WaitStop(-1 * time.Second) return nil } @@ -307,7 +305,7 @@ func (container *Container) Output() (output []byte, err error) { return nil, err } output, err = ioutil.ReadAll(pipe) - container.Wait() + container.State.WaitStop(-1 * time.Second) return output, err } @@ -467,6 +465,7 @@ func (container *Container) monitor(callback execdriver.StartCallback) error { if err != nil { utils.Errorf("Error running container: %s", err) } + container.State.SetStopped(exitCode) // Cleanup container.cleanup() @@ -475,28 +474,17 @@ func (container *Container) monitor(callback execdriver.StartCallback) error { if container.Config.OpenStdin { container.stdin, container.stdinPipe = io.Pipe() } - if container.daemon != nil && container.daemon.srv != nil { container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) } - - close(container.waitLock) - if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { - container.State.SetStopped(exitCode) - - // FIXME: there is a race condition here which causes this to fail during the unit tests. - // If another goroutine was waiting for Wait() to return before removing the container's root - // from the filesystem... At this point it may already have done so. - // This is because State.setStopped() has already been called, and has caused Wait() - // to return. - // FIXME: why are we serializing running state to disk in the first place? - //log.Printf("%s: Failed to dump configuration to the disk: %s", container.ID, err) + // FIXME: here is race condition between two RUN instructions in Dockerfile + // because they share same runconfig and change image. Must be fixed + // in server/buildfile.go if err := container.ToDisk(); err != nil { - utils.Errorf("Error dumping container state to disk: %s\n", err) + utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err) } } - return err } @@ -532,6 +520,7 @@ func (container *Container) cleanup() { } func (container *Container) KillSig(sig int) error { + utils.Debugf("Sending %d to %s", sig, container.ID) container.Lock() defer container.Unlock() @@ -577,9 +566,9 @@ func (container *Container) Kill() error { } // 2. Wait for the process to die, in last resort, try to kill the process directly - if err := container.WaitTimeout(10 * time.Second); err != nil { + if _, err := container.State.WaitStop(10 * time.Second); err != nil { // Ensure that we don't kill ourselves - if pid := container.State.Pid; pid != 0 { + if pid := container.State.GetPid(); pid != 0 { log.Printf("Container %s failed to exit within 10 seconds of kill - trying direct SIGKILL", utils.TruncateID(container.ID)) if err := syscall.Kill(pid, 9); err != nil { return err @@ -587,7 +576,7 @@ func (container *Container) Kill() error { } } - container.Wait() + container.State.WaitStop(-1 * time.Second) return nil } @@ -605,11 +594,11 @@ func (container *Container) Stop(seconds int) error { } // 2. Wait for the process to exit on its own - if err := container.WaitTimeout(time.Duration(seconds) * time.Second); err != nil { + if _, err := container.State.WaitStop(time.Duration(seconds) * time.Second); err != nil { log.Printf("Container %v failed to exit within %d seconds of SIGTERM - using the force", container.ID, seconds) // 3. If it doesn't, then send SIGKILL if err := container.Kill(); err != nil { - container.Wait() + container.State.WaitStop(-1 * time.Second) return err } } @@ -630,12 +619,6 @@ func (container *Container) Restart(seconds int) error { return container.Start() } -// Wait blocks until the container stops running, then returns its exit code. -func (container *Container) Wait() int { - <-container.waitLock - return container.State.GetExitCode() -} - func (container *Container) Resize(h, w int) error { return container.command.Terminal.Resize(h, w) } @@ -678,21 +661,6 @@ func (container *Container) Export() (archive.Archive, error) { nil } -func (container *Container) WaitTimeout(timeout time.Duration) error { - done := make(chan bool, 1) - go func() { - container.Wait() - done <- true - }() - - select { - case <-time.After(timeout): - return fmt.Errorf("Timed Out") - case <-done: - return nil - } -} - func (container *Container) Mount() error { return container.daemon.Mount(container) } @@ -1103,9 +1071,7 @@ func (container *Container) startLoggingToDisk() error { } func (container *Container) waitForStart() error { - callbackLock := make(chan struct{}) callback := func(command *execdriver.Command) { - container.State.SetRunning(command.Pid()) if command.Tty { // The callback is called after the process Start() // so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace @@ -1117,16 +1083,23 @@ func (container *Container) waitForStart() error { if err := container.ToDisk(); err != nil { utils.Debugf("%s", err) } - close(callbackLock) + container.State.SetRunning(command.Pid()) } // We use a callback here instead of a goroutine and an chan for // syncronization purposes cErr := utils.Go(func() error { return container.monitor(callback) }) + waitStart := make(chan struct{}) + + go func() { + container.State.WaitRunning(-1 * time.Second) + close(waitStart) + }() + // Start should not return until the process is actually running select { - case <-callbackLock: + case <-waitStart: case err := <-cErr: return err } diff --git a/daemon/daemon.go b/daemon/daemon.go index 3536678d22..aa8d2bad3f 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -138,7 +138,7 @@ func (daemon *Daemon) containerRoot(id string) string { // Load reads the contents of a container from disk // This is typically done at startup. func (daemon *Daemon) load(id string) (*Container, error) { - container := &Container{root: daemon.containerRoot(id)} + container := &Container{root: daemon.containerRoot(id), State: NewState()} if err := container.FromDisk(); err != nil { return nil, err } @@ -236,12 +236,6 @@ func (daemon *Daemon) register(container *Container, updateSuffixarray bool, con } } } - } else { - // When the container is not running, we still initialize the waitLock - // chan and close it. Receiving on nil chan blocks whereas receiving on a - // closed chan does not. In this case we do not want to block. - container.waitLock = make(chan struct{}) - close(container.waitLock) } return nil } @@ -588,6 +582,7 @@ func (daemon *Daemon) newContainer(name string, config *runconfig.Config, img *i Name: name, Driver: daemon.driver.String(), ExecDriver: daemon.execDriver.Name(), + State: NewState(), } container.root = daemon.containerRoot(container.ID) @@ -900,7 +895,7 @@ func (daemon *Daemon) shutdown() error { if err := c.KillSig(15); err != nil { utils.Debugf("kill 15 error for %s - %s", c.ID, err) } - c.Wait() + c.State.WaitStop(-1 * time.Second) utils.Debugf("container stopped %s", c.ID) }() } diff --git a/daemon/state.go b/daemon/state.go index 7ee8fc48c3..3f904d7829 100644 --- a/daemon/state.go +++ b/daemon/state.go @@ -16,6 +16,13 @@ type State struct { ExitCode int StartedAt time.Time FinishedAt time.Time + waitChan chan struct{} +} + +func NewState() *State { + return &State{ + waitChan: make(chan struct{}), + } } // String returns a human-readable description of the state @@ -35,56 +42,118 @@ func (s *State) String() string { return fmt.Sprintf("Exited (%d) %s ago", s.ExitCode, units.HumanDuration(time.Now().UTC().Sub(s.FinishedAt))) } +func wait(waitChan <-chan struct{}, timeout time.Duration) error { + if timeout < 0 { + <-waitChan + return nil + } + select { + case <-time.After(timeout): + return fmt.Errorf("Timed out: %v", timeout) + case <-waitChan: + return nil + } +} + +// WaitRunning waits until state is running. If state already running it returns +// immediatly. If you want wait forever you must supply negative timeout. +// Returns pid, that was passed to SetRunning +func (s *State) WaitRunning(timeout time.Duration) (int, error) { + s.RLock() + if s.IsRunning() { + pid := s.Pid + s.RUnlock() + return pid, nil + } + waitChan := s.waitChan + s.RUnlock() + if err := wait(waitChan, timeout); err != nil { + return -1, err + } + return s.GetPid(), nil +} + +// WaitStop waits until state is stopped. If state already stopped it returns +// immediatly. If you want wait forever you must supply negative timeout. +// Returns exit code, that was passed to SetStopped +func (s *State) WaitStop(timeout time.Duration) (int, error) { + s.RLock() + if !s.Running { + exitCode := s.ExitCode + s.RUnlock() + return exitCode, nil + } + waitChan := s.waitChan + s.RUnlock() + if err := wait(waitChan, timeout); err != nil { + return -1, err + } + return s.GetExitCode(), nil +} + func (s *State) IsRunning() bool { s.RLock() - defer s.RUnlock() + res := s.Running + s.RUnlock() + return res +} - return s.Running +func (s *State) GetPid() int { + s.RLock() + res := s.Pid + s.RUnlock() + return res } func (s *State) GetExitCode() int { s.RLock() - defer s.RUnlock() - - return s.ExitCode + res := s.ExitCode + s.RUnlock() + return res } func (s *State) SetRunning(pid int) { s.Lock() - defer s.Unlock() - - s.Running = true - s.Paused = false - s.ExitCode = 0 - s.Pid = pid - s.StartedAt = time.Now().UTC() + if !s.Running { + s.Running = true + s.Paused = false + s.ExitCode = 0 + s.Pid = pid + s.StartedAt = time.Now().UTC() + close(s.waitChan) // fire waiters for start + s.waitChan = make(chan struct{}) + } + s.Unlock() } func (s *State) SetStopped(exitCode int) { s.Lock() - defer s.Unlock() - - s.Running = false - s.Pid = 0 - s.FinishedAt = time.Now().UTC() - s.ExitCode = exitCode + if s.Running { + s.Running = false + s.Pid = 0 + s.FinishedAt = time.Now().UTC() + s.ExitCode = exitCode + close(s.waitChan) // fire waiters for stop + s.waitChan = make(chan struct{}) + } + s.Unlock() } func (s *State) SetPaused() { s.Lock() - defer s.Unlock() s.Paused = true + s.Unlock() } func (s *State) SetUnpaused() { s.Lock() - defer s.Unlock() s.Paused = false + s.Unlock() } func (s *State) IsPaused() bool { s.RLock() - defer s.RUnlock() - - return s.Paused + res := s.Paused + s.RUnlock() + return res } diff --git a/daemon/state_test.go b/daemon/state_test.go new file mode 100644 index 0000000000..7b02f3aeac --- /dev/null +++ b/daemon/state_test.go @@ -0,0 +1,102 @@ +package daemon + +import ( + "sync/atomic" + "testing" + "time" +) + +func TestStateRunStop(t *testing.T) { + s := NewState() + for i := 1; i < 3; i++ { // full lifecycle two times + started := make(chan struct{}) + var pid int64 + go func() { + runPid, _ := s.WaitRunning(-1 * time.Second) + atomic.StoreInt64(&pid, int64(runPid)) + close(started) + }() + s.SetRunning(i + 100) + if !s.IsRunning() { + t.Fatal("State not running") + } + if s.Pid != i+100 { + t.Fatalf("Pid %v, expected %v", s.Pid, i+100) + } + if s.ExitCode != 0 { + t.Fatalf("ExitCode %v, expected 0", s.ExitCode) + } + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("Start callback doesn't fire in 100 milliseconds") + case <-started: + t.Log("Start callback fired") + } + runPid := int(atomic.LoadInt64(&pid)) + if runPid != i+100 { + t.Fatalf("Pid %v, expected %v", runPid, i+100) + } + if pid, err := s.WaitRunning(-1 * time.Second); err != nil || pid != i+100 { + t.Fatal("WaitRunning returned pid: %v, err: %v, expected pid: %v, err: %v", pid, err, i+100, nil) + } + + stopped := make(chan struct{}) + var exit int64 + go func() { + exitCode, _ := s.WaitStop(-1 * time.Second) + atomic.StoreInt64(&exit, int64(exitCode)) + close(stopped) + }() + s.SetStopped(i) + if s.IsRunning() { + t.Fatal("State is running") + } + if s.ExitCode != i { + t.Fatalf("ExitCode %v, expected %v", s.ExitCode, i) + } + if s.Pid != 0 { + t.Fatalf("Pid %v, expected 0", s.Pid) + } + select { + case <-time.After(100 * time.Millisecond): + t.Fatal("Stop callback doesn't fire in 100 milliseconds") + case <-stopped: + t.Log("Stop callback fired") + } + exitCode := int(atomic.LoadInt64(&exit)) + if exitCode != i { + t.Fatalf("ExitCode %v, expected %v", exitCode, i) + } + if exitCode, err := s.WaitStop(-1 * time.Second); err != nil || exitCode != i { + t.Fatal("WaitStop returned exitCode: %v, err: %v, expected exitCode: %v, err: %v", exitCode, err, i, nil) + } + } +} + +func TestStateTimeoutWait(t *testing.T) { + s := NewState() + started := make(chan struct{}) + go func() { + s.WaitRunning(100 * time.Millisecond) + close(started) + }() + select { + case <-time.After(200 * time.Millisecond): + t.Fatal("Start callback doesn't fire in 100 milliseconds") + case <-started: + t.Log("Start callback fired") + } + s.SetRunning(42) + stopped := make(chan struct{}) + go func() { + s.WaitRunning(100 * time.Millisecond) + close(stopped) + }() + select { + case <-time.After(200 * time.Millisecond): + t.Fatal("Start callback doesn't fire in 100 milliseconds") + case <-stopped: + t.Log("Start callback fired") + } + +} diff --git a/integration/commands_test.go b/integration/commands_test.go index 55c822bbeb..47e9860052 100644 --- a/integration/commands_test.go +++ b/integration/commands_test.go @@ -224,7 +224,7 @@ func TestRunDisconnect(t *testing.T) { // cause /bin/cat to exit. setTimeout(t, "Waiting for /bin/cat to exit timed out", 2*time.Second, func() { container := globalDaemon.List()[0] - container.Wait() + container.State.WaitStop(-1 * time.Second) if container.State.IsRunning() { t.Fatalf("/bin/cat is still running after closing stdin") } @@ -276,7 +276,7 @@ func TestRunDisconnectTty(t *testing.T) { // In tty mode, we expect the process to stay alive even after client's stdin closes. // Give some time to monitor to do his thing - container.WaitTimeout(500 * time.Millisecond) + container.State.WaitStop(500 * time.Millisecond) if !container.State.IsRunning() { t.Fatalf("/bin/cat should still be running after closing stdin (tty mode)") } @@ -535,7 +535,7 @@ func TestAttachDisconnect(t *testing.T) { // We closed stdin, expect /bin/cat to still be running // Wait a little bit to make sure container.monitor() did his thing - err := container.WaitTimeout(500 * time.Millisecond) + _, err := container.State.WaitStop(500 * time.Millisecond) if err == nil || !container.State.IsRunning() { t.Fatalf("/bin/cat is not running after closing stdin") } @@ -543,7 +543,7 @@ func TestAttachDisconnect(t *testing.T) { // Try to avoid the timeout in destroy. Best effort, don't check error cStdin, _ := container.StdinPipe() cStdin.Close() - container.Wait() + container.State.WaitStop(-1 * time.Second) } // Expected behaviour: container gets deleted automatically after exit diff --git a/integration/container_test.go b/integration/container_test.go index 8fe52a3cd6..31a57df77b 100644 --- a/integration/container_test.go +++ b/integration/container_test.go @@ -2,7 +2,6 @@ package docker import ( "fmt" - "github.com/dotcloud/docker/runconfig" "io" "io/ioutil" "os" @@ -10,6 +9,8 @@ import ( "strings" "testing" "time" + + "github.com/dotcloud/docker/runconfig" ) func TestKillDifferentUser(t *testing.T) { @@ -60,7 +61,7 @@ func TestKillDifferentUser(t *testing.T) { if container.State.IsRunning() { t.Errorf("Container shouldn't be running") } - container.Wait() + container.State.WaitStop(-1 * time.Second) if container.State.IsRunning() { t.Errorf("Container shouldn't be running") } @@ -134,7 +135,7 @@ func TestRestartStdin(t *testing.T) { if err := stdin.Close(); err != nil { t.Fatal(err) } - container.Wait() + container.State.WaitStop(-1 * time.Second) output, err := ioutil.ReadAll(stdout) if err != nil { t.Fatal(err) @@ -164,7 +165,7 @@ func TestRestartStdin(t *testing.T) { if err := stdin.Close(); err != nil { t.Fatal(err) } - container.Wait() + container.State.WaitStop(-1 * time.Second) output, err = ioutil.ReadAll(stdout) if err != nil { t.Fatal(err) @@ -212,7 +213,7 @@ func TestStdin(t *testing.T) { if err := stdin.Close(); err != nil { t.Fatal(err) } - container.Wait() + container.State.WaitStop(-1 * time.Second) output, err := ioutil.ReadAll(stdout) if err != nil { t.Fatal(err) @@ -257,7 +258,7 @@ func TestTty(t *testing.T) { if err := stdin.Close(); err != nil { t.Fatal(err) } - container.Wait() + container.State.WaitStop(-1 * time.Second) output, err := ioutil.ReadAll(stdout) if err != nil { t.Fatal(err) @@ -366,7 +367,7 @@ func BenchmarkRunParallel(b *testing.B) { complete <- err return } - if err := container.WaitTimeout(15 * time.Second); err != nil { + if _, err := container.State.WaitStop(15 * time.Second); err != nil { complete <- err return } diff --git a/integration/runtime_test.go b/integration/runtime_test.go index 96df15be60..754146c5f8 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -496,7 +496,7 @@ func startEchoServerContainer(t *testing.T, proto string) (*daemon.Daemon, *daem }) // Even if the state is running, lets give some time to lxc to spawn the process - container.WaitTimeout(500 * time.Millisecond) + container.State.WaitStop(500 * time.Millisecond) strPort = container.NetworkSettings.Ports[p][0].HostPort return daemon, container, strPort @@ -611,7 +611,7 @@ func TestRestore(t *testing.T) { // Simulate a crash/manual quit of dockerd: process dies, states stays 'Running' cStdin, _ := container2.StdinPipe() cStdin.Close() - if err := container2.WaitTimeout(2 * time.Second); err != nil { + if _, err := container2.State.WaitStop(2 * time.Second); err != nil { t.Fatal(err) } container2.State.SetRunning(42) diff --git a/integration/utils_test.go b/integration/utils_test.go index d8101dfb1d..7be7f13eee 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -96,11 +96,13 @@ func containerAttach(eng *engine.Engine, id string, t utils.Fataler) (io.WriteCl } func containerWait(eng *engine.Engine, id string, t utils.Fataler) int { - return getContainer(eng, id, t).Wait() + ex, _ := getContainer(eng, id, t).State.WaitStop(-1 * time.Second) + return ex } func containerWaitTimeout(eng *engine.Engine, id string, t utils.Fataler) error { - return getContainer(eng, id, t).WaitTimeout(500 * time.Millisecond) + _, err := getContainer(eng, id, t).State.WaitStop(500 * time.Millisecond) + return err } func containerKill(eng *engine.Engine, id string, t utils.Fataler) { @@ -307,7 +309,7 @@ func runContainer(eng *engine.Engine, r *daemon.Daemon, args []string, t *testin return "", err } - container.Wait() + container.State.WaitStop(-1 * time.Second) data, err := ioutil.ReadAll(stdout) if err != nil { return "", err diff --git a/server/buildfile.go b/server/buildfile.go index f10cc0dbc5..f5ef6e0d2a 100644 --- a/server/buildfile.go +++ b/server/buildfile.go @@ -17,6 +17,7 @@ import ( "sort" "strings" "syscall" + "time" "github.com/dotcloud/docker/archive" "github.com/dotcloud/docker/daemon" @@ -696,7 +697,7 @@ func (b *buildFile) run(c *daemon.Container) error { } // Wait for it to finish - if ret := c.Wait(); ret != 0 { + if ret, _ := c.State.WaitStop(-1 * time.Second); ret != 0 { err := &utils.JSONError{ Message: fmt.Sprintf("The command %v returned a non-zero code: %d", b.config.Cmd, ret), Code: ret, diff --git a/server/server.go b/server/server.go index 76b3a83971..0bbb9f31ca 100644 --- a/server/server.go +++ b/server/server.go @@ -2115,7 +2115,7 @@ func (srv *Server) ContainerWait(job *engine.Job) engine.Status { } name := job.Args[0] if container := srv.daemon.Get(name); container != nil { - status := container.Wait() + status, _ := container.State.WaitStop(-1 * time.Second) job.Printf("%d\n", status) return engine.StatusOK } @@ -2328,7 +2328,7 @@ func (srv *Server) ContainerAttach(job *engine.Job) engine.Status { // If we are in stdinonce mode, wait for the process to end // otherwise, simply return if container.Config.StdinOnce && !container.Config.Tty { - container.Wait() + container.State.WaitStop(-1 * time.Second) } } return engine.StatusOK