From 2b0776c883c41cda0b86808fe86e4df6e1aa1cd5 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 4 Aug 2014 17:05:56 -0700 Subject: [PATCH] Refactor container monitor into type Signed-off-by: Michael Crosby --- daemon/container.go | 142 ++++----------------------------- daemon/monitor.go | 189 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 204 insertions(+), 127 deletions(-) create mode 100644 daemon/monitor.go diff --git a/daemon/container.go b/daemon/container.go index 3383276e6d..681852a17d 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -7,7 +7,6 @@ import ( "io" "io/ioutil" "os" - "os/exec" "path" "path/filepath" "strings" @@ -84,8 +83,8 @@ type Container struct { VolumesRW map[string]bool hostConfig *runconfig.HostConfig - activeLinks map[string]*links.Link - requestedStop bool + activeLinks map[string]*links.Link + monitor *containerMonitor } func (container *Container) FromDisk() error { @@ -496,110 +495,6 @@ func (container *Container) releaseNetwork() { container.NetworkSettings = &NetworkSettings{} } -func (container *Container) monitor(callback execdriver.StartCallback) error { - var ( - err error - exitCode int - failCount int - exit bool - - policy = container.hostConfig.RestartPolicy - ) - - if err := container.startLoggingToDisk(); err != nil { - // TODO: crosbymichael cleanup IO, network, and mounts - return err - } - - // reset the restart count - container.RestartCount = -1 - container.requestedStop = false - - for { - container.RestartCount++ - - pipes := execdriver.NewPipes(container.stdin, container.stdout, container.stderr, container.Config.OpenStdin) - - if exitCode, err = container.daemon.Run(container, pipes, callback); err != nil { - failCount++ - - if failCount == policy.MaximumRetryCount { - exit = true - } - - utils.Errorf("Error running container: %s", err) - } - - // We still wait to set the state as stopped and ensure that the locks were released - container.State.SetStopped(exitCode) - - if container.Config.OpenStdin { - if err := container.stdin.Close(); err != nil { - utils.Errorf("%s: Error close stdin: %s", container.ID, err) - } - } - - if err := container.stdout.Clean(); err != nil { - utils.Errorf("%s: Error close stdout: %s", container.ID, err) - } - - if err := container.stderr.Clean(); err != nil { - utils.Errorf("%s: Error close stderr: %s", container.ID, err) - } - - if container.command != nil && container.command.Terminal != nil { - if err := container.command.Terminal.Close(); err != nil { - utils.Errorf("%s: Error closing terminal: %s", container.ID, err) - } - } - - // Re-create a brand new stdin pipe once the container exited - if container.Config.OpenStdin { - container.stdin, container.stdinPipe = io.Pipe() - } - - if container.daemon != nil && container.daemon.srv != nil { - container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) - } - - if (policy.Name == "always" || (policy.Name == "on-failure" && exitCode != 0)) && !container.requestedStop || !exit { - container.command.Cmd = copyCmd(&container.command.Cmd) - - time.Sleep(1 * time.Second) - - continue - } - - // Cleanup networking and mounts - container.cleanup() - - if container.daemon != nil && container.daemon.srv != nil && container.daemon.srv.IsRunning() { - // FIXME: here is race condition between two RUN instructions in Dockerfile - // because they share same runconfig and change image. Must be fixed - // in builder/builder.go - if err := container.toDisk(); err != nil { - utils.Errorf("Error dumping container %s state to disk: %s\n", container.ID, err) - } - } - - return err - } -} - -func copyCmd(c *exec.Cmd) exec.Cmd { - return exec.Cmd{ - Stdin: c.Stdin, - Stdout: c.Stdout, - Stderr: c.Stderr, - Path: c.Path, - Env: c.Env, - ExtraFiles: c.ExtraFiles, - Args: c.Args, - Dir: c.Dir, - SysProcAttr: c.SysProcAttr, - } -} - // cleanup releases any network resources allocated to the container along with any rules // around how containers are linked together. It also unmounts the container's root filesystem. func (container *Container) cleanup() { @@ -630,7 +525,10 @@ func (container *Container) KillSig(sig int) error { if !container.State.IsRunning() { return nil } - container.requestedStop = true + + // signal to the monitor that it should not restart the container + // after we send the kill signal + container.monitor.ExitOnNext() return container.daemon.Kill(container, sig) } @@ -1174,27 +1072,17 @@ func (container *Container) startLoggingToDisk() error { } func (container *Container) waitForStart() error { - waitStart := make(chan struct{}) - callback := func(command *execdriver.Command) { - if command.Tty { - // The callback is called after the process Start() - // so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace - // which we close here. - if c, ok := command.Stdout.(io.Closer); ok { - c.Close() - } - } + container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy) - container.State.SetRunning(command.Pid()) - if err := container.toDisk(); err != nil { - log.Debugf("%s", err) - } + var ( + cErr = utils.Go(container.monitor.Start) + waitStart = make(chan struct{}) + ) + + go func() { + container.State.WaitRunning(-1 * time.Second) close(waitStart) - } - - // We use a callback here instead of a goroutine and an chan for - // syncronization purposes - cErr := utils.Go(func() error { return container.monitor(callback) }) + }() // Start should not return until the process is actually running select { diff --git a/daemon/monitor.go b/daemon/monitor.go new file mode 100644 index 0000000000..94d7f2b004 --- /dev/null +++ b/daemon/monitor.go @@ -0,0 +1,189 @@ +package daemon + +import ( + "io" + "os/exec" + "sync" + "time" + + "github.com/docker/docker/daemon/execdriver" + "github.com/docker/docker/runconfig" + "github.com/docker/docker/utils" +) + +// containerMonitor monitors the execution of a container's main process. +// If a restart policy is specified for the cotnainer the monitor will ensure that the +// process is restarted based on the rules of the policy. When the container is finally stopped +// the monitor will reset and cleanup any of the container resources such as networking allocations +// and the rootfs +type containerMonitor struct { + mux sync.Mutex + + container *Container + restartPolicy runconfig.RestartPolicy + failureCount int + shouldStop bool +} + +func newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor { + return &containerMonitor{ + container: container, + restartPolicy: policy, + } +} + +// Stop signals to the container monitor that it should stop monitoring the container +// for exits the next time the process dies +func (m *containerMonitor) ExitOnNext() { + m.mux.Lock() + m.shouldStop = true + m.mux.Unlock() +} + +// Close closes the container's resources such as networking allocations and +// unmounts the contatiner's root filesystem +func (m *containerMonitor) Close() error { + // Cleanup networking and mounts + m.container.cleanup() + + if m.container.daemon != nil && m.container.daemon.srv != nil && m.container.daemon.srv.IsRunning() { + // FIXME: here is race condition between two RUN instructions in Dockerfile + // because they share same runconfig and change image. Must be fixed + // in builder/builder.go + if err := m.container.toDisk(); err != nil { + utils.Errorf("Error dumping container %s state to disk: %s\n", m.container.ID, err) + + return err + } + } + + return nil +} + +// reset resets the container's IO and ensures that the command is able to be executed again +// by copying the data into a new struct +func (m *containerMonitor) reset() { + container := m.container + + if container.Config.OpenStdin { + if err := container.stdin.Close(); err != nil { + utils.Errorf("%s: Error close stdin: %s", container.ID, err) + } + } + + if err := container.stdout.Clean(); err != nil { + utils.Errorf("%s: Error close stdout: %s", container.ID, err) + } + + if err := container.stderr.Clean(); err != nil { + utils.Errorf("%s: Error close stderr: %s", container.ID, err) + } + + if container.command != nil && container.command.Terminal != nil { + if err := container.command.Terminal.Close(); err != nil { + utils.Errorf("%s: Error closing terminal: %s", container.ID, err) + } + } + + // Re-create a brand new stdin pipe once the container exited + if container.Config.OpenStdin { + container.stdin, container.stdinPipe = io.Pipe() + } + + if container.daemon != nil && container.daemon.srv != nil { + container.daemon.srv.LogEvent("die", container.ID, container.daemon.repositories.ImageName(container.Image)) + } + + c := container.command.Cmd + + container.command.Cmd = exec.Cmd{ + Stdin: c.Stdin, + Stdout: c.Stdout, + Stderr: c.Stderr, + Path: c.Path, + Env: c.Env, + ExtraFiles: c.ExtraFiles, + Args: c.Args, + Dir: c.Dir, + SysProcAttr: c.SysProcAttr, + } +} + +// Start starts the containers process and monitors it according to the restart policy +func (m *containerMonitor) Start() error { + var ( + err error + exitCode int + ) + defer m.Close() + + // reset the restart count + m.container.RestartCount = -1 + + for !m.shouldStop { + m.container.RestartCount++ + if err := m.container.startLoggingToDisk(); err != nil { + m.reset() + + return err + } + + pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin) + + if exitCode, err = m.container.daemon.Run(m.container, pipes, m.callback); err != nil { + m.failureCount++ + + if m.failureCount == m.restartPolicy.MaximumRetryCount { + m.ExitOnNext() + } + + utils.Errorf("Error running container: %s", err) + } + + // We still wait to set the state as stopped and ensure that the locks were released + m.container.State.SetStopped(exitCode) + + m.reset() + + if m.shouldRestart(exitCode) { + time.Sleep(1 * time.Second) + + continue + } + + break + } + + return err +} + +func (m *containerMonitor) shouldRestart(exitCode int) bool { + m.mux.Lock() + + shouldRestart := (m.restartPolicy.Name == "always" || + (m.restartPolicy.Name == "on-failure" && exitCode != 0)) && + !m.shouldStop + + m.mux.Unlock() + + return shouldRestart +} + +// callback ensures that the container's state is properly updated after we +// received ack from the execution drivers +func (m *containerMonitor) callback(command *execdriver.Command) { + if command.Tty { + // The callback is called after the process Start() + // so we are in the parent process. In TTY mode, stdin/out/err is the PtySlace + // which we close here. + if c, ok := command.Stdout.(io.Closer); ok { + c.Close() + } + } + + m.container.State.SetRunning(command.Pid()) + + if err := m.container.ToDisk(); err != nil { + utils.Debugf("%s", err) + } +}