diff --git a/daemon/archive.go b/daemon/archive.go index f9a9cb8afc..61196c91f5 100644 --- a/daemon/archive.go +++ b/daemon/archive.go @@ -219,7 +219,7 @@ func (daemon *Daemon) containerArchivePath(container *Container, path string) (c return err }) - container.logEvent("archive-path") + daemon.LogContainerEvent(container, "archive-path") return content, stat, nil } @@ -318,7 +318,7 @@ func (daemon *Daemon) containerExtractToDir(container *Container, path string, n return err } - container.logEvent("extract-to-dir") + daemon.LogContainerEvent(container, "extract-to-dir") return nil } @@ -384,6 +384,6 @@ func (daemon *Daemon) containerCopy(container *Container, resource string) (rc i container.Unlock() return err }) - daemon.logContainerEvent(container, "copy") + daemon.LogContainerEvent(container, "copy") return reader, nil } diff --git a/daemon/attach.go b/daemon/attach.go index 44198788ce..64a1223a4d 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -2,7 +2,10 @@ package daemon import ( "io" + "time" + "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/logger" "github.com/docker/docker/pkg/stdcopy" ) @@ -43,7 +46,7 @@ func (daemon *Daemon) ContainerAttachWithLogs(prefixOrName string, c *ContainerA stderr = errStream } - return container.attachWithLogs(stdin, stdout, stderr, c.Logs, c.Stream) + return daemon.attachWithLogs(container, stdin, stdout, stderr, c.Logs, c.Stream) } // ContainerWsAttachWithLogsConfig attach with websockets, since all @@ -60,5 +63,61 @@ func (daemon *Daemon) ContainerWsAttachWithLogs(prefixOrName string, c *Containe if err != nil { return err } - return container.attachWithLogs(c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream) + return daemon.attachWithLogs(container, c.InStream, c.OutStream, c.ErrStream, c.Logs, c.Stream) +} + +func (daemon *Daemon) attachWithLogs(container *Container, stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { + if logs { + logDriver, err := container.getLogger() + if err != nil { + return err + } + cLog, ok := logDriver.(logger.LogReader) + if !ok { + return logger.ErrReadLogsNotSupported + } + logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) + + LogLoop: + for { + select { + case msg, ok := <-logs.Msg: + if !ok { + break LogLoop + } + if msg.Source == "stdout" && stdout != nil { + stdout.Write(msg.Line) + } + if msg.Source == "stderr" && stderr != nil { + stderr.Write(msg.Line) + } + case err := <-logs.Err: + logrus.Errorf("Error streaming logs: %v", err) + break LogLoop + } + } + } + + daemon.LogContainerEvent(container, "attach") + + //stream + if stream { + var stdinPipe io.ReadCloser + if stdin != nil { + r, w := io.Pipe() + go func() { + defer w.Close() + defer logrus.Debugf("Closing buffered stdin pipe") + io.Copy(w, stdin) + }() + stdinPipe = r + } + <-container.Attach(stdinPipe, stdout, stderr) + // If we are in stdinonce mode, wait for the process to end + // otherwise, simply return + if container.Config.StdinOnce && !container.Config.Tty { + container.WaitStop(-1 * time.Second) + } + } + return nil } diff --git a/daemon/commit.go b/daemon/commit.go index fd11fc4683..35759a032c 100644 --- a/daemon/commit.go +++ b/daemon/commit.go @@ -48,7 +48,8 @@ func (daemon *Daemon) Commit(container *Container, c *ContainerCommitConfig) (*i return img, err } } - container.logEvent("commit") + + daemon.LogContainerEvent(container, "commit") return img, nil } diff --git a/daemon/container.go b/daemon/container.go index 9955b2123b..8a5b0fd266 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -172,15 +172,6 @@ func (container *Container) writeHostConfig() error { return json.NewEncoder(f).Encode(&container.hostConfig) } -func (container *Container) logEvent(action string) { - d := container.daemon - d.EventsService.Log( - action, - container.ID, - container.Config.Image, - ) -} - // GetResourcePath evaluates `path` in the scope of the container's basefs, with proper path // sanitisation. Symlinks are all scoped to the basefs of the container, as // though the container's basefs was `/`. @@ -278,7 +269,6 @@ func (container *Container) Resize(h, w int) error { if err := container.command.ProcessConfig.Terminal.Resize(h, w); err != nil { return err } - container.logEvent("resize") return nil } @@ -380,20 +370,6 @@ func (container *Container) startLogging() error { return nil } -func (container *Container) waitForStart() error { - container.monitor = newContainerMonitor(container, container.hostConfig.RestartPolicy) - - // block until we either receive an error from the initial start of the container's - // process or until the process is running in the container - select { - case <-container.monitor.startSignal: - case err := <-promise.Go(container.monitor.Start): - return err - } - - return nil -} - func (container *Container) getProcessLabel() string { // even if we have a process label return "" if we are running // in privileged mode @@ -424,62 +400,6 @@ func (container *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr return attach(&container.streamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, stdin, stdout, stderr) } -func (container *Container) attachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error { - if logs { - logDriver, err := container.getLogger() - if err != nil { - return err - } - cLog, ok := logDriver.(logger.LogReader) - if !ok { - return logger.ErrReadLogsNotSupported - } - logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) - - LogLoop: - for { - select { - case msg, ok := <-logs.Msg: - if !ok { - break LogLoop - } - if msg.Source == "stdout" && stdout != nil { - stdout.Write(msg.Line) - } - if msg.Source == "stderr" && stderr != nil { - stderr.Write(msg.Line) - } - case err := <-logs.Err: - logrus.Errorf("Error streaming logs: %v", err) - break LogLoop - } - } - } - - container.logEvent("attach") - - //stream - if stream { - var stdinPipe io.ReadCloser - if stdin != nil { - r, w := io.Pipe() - go func() { - defer w.Close() - defer logrus.Debugf("Closing buffered stdin pipe") - io.Copy(w, stdin) - }() - stdinPipe = r - } - <-container.Attach(stdinPipe, stdout, stderr) - // If we are in stdinonce mode, wait for the process to end - // otherwise, simply return - if container.Config.StdinOnce && !container.Config.Tty { - container.WaitStop(-1 * time.Second) - } - } - return nil -} - func attach(streamConfig *streamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { var ( cStdout, cStderr io.ReadCloser diff --git a/daemon/create.go b/daemon/create.go index 75fa775c53..b27a20eed4 100644 --- a/daemon/create.go +++ b/daemon/create.go @@ -127,7 +127,7 @@ func (daemon *Daemon) create(params *ContainerCreateConfig) (retC *Container, re logrus.Errorf("Error saving new container to disk: %v", err) return nil, err } - daemon.logContainerEvent(container, "create") + daemon.LogContainerEvent(container, "create") return container, nil } diff --git a/daemon/delete.go b/daemon/delete.go index bf751ec2da..1de9ba9c1d 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -111,7 +111,7 @@ func (daemon *Daemon) rm(container *Container, forceRemove bool) (err error) { daemon.idIndex.Delete(container.ID) daemon.containers.Delete(container.ID) os.RemoveAll(container.root) - container.logEvent("destroy") + daemon.LogContainerEvent(container, "destroy") } }() @@ -140,7 +140,7 @@ func (daemon *Daemon) rm(container *Container, forceRemove bool) (err error) { daemon.idIndex.Delete(container.ID) daemon.containers.Delete(container.ID) - container.logEvent("destroy") + daemon.LogContainerEvent(container, "destroy") return nil } diff --git a/daemon/events.go b/daemon/events.go index be6d0c5b67..85ac25c0e6 100644 --- a/daemon/events.go +++ b/daemon/events.go @@ -1,7 +1,7 @@ package daemon -// logContainerEvent generates an event related to a container. -func (daemon *Daemon) logContainerEvent(container *Container, action string) { +// LogContainerEvent generates an event related to a container. +func (daemon *Daemon) LogContainerEvent(container *Container, action string) { daemon.EventsService.Log( action, container.ID, diff --git a/daemon/exec.go b/daemon/exec.go index c1ef8f58d9..6ebacd6643 100644 --- a/daemon/exec.go +++ b/daemon/exec.go @@ -188,7 +188,7 @@ func (d *Daemon) ContainerExecCreate(config *runconfig.ExecConfig) (string, erro d.registerExecCommand(ExecConfig) - container.logEvent("exec_create: " + ExecConfig.ProcessConfig.Entrypoint + " " + strings.Join(ExecConfig.ProcessConfig.Arguments, " ")) + d.LogContainerEvent(container, "exec_create: "+ExecConfig.ProcessConfig.Entrypoint+" "+strings.Join(ExecConfig.ProcessConfig.Arguments, " ")) return ExecConfig.ID, nil } @@ -216,7 +216,7 @@ func (d *Daemon) ContainerExecStart(name string, stdin io.ReadCloser, stdout io. logrus.Debugf("starting exec command %s in container %s", ec.ID, ec.Container.ID) container := ec.Container - container.logEvent("exec_start: " + ec.ProcessConfig.Entrypoint + " " + strings.Join(ec.ProcessConfig.Arguments, " ")) + d.LogContainerEvent(container, "exec_start: "+ec.ProcessConfig.Entrypoint+" "+strings.Join(ec.ProcessConfig.Arguments, " ")) if ec.OpenStdin { r, w := io.Pipe() diff --git a/daemon/export.go b/daemon/export.go index 84b83c5fa5..0a246ed5af 100644 --- a/daemon/export.go +++ b/daemon/export.go @@ -49,6 +49,6 @@ func (daemon *Daemon) containerExport(container *Container) (archive.Archive, er daemon.Unmount(container) return err }) - daemon.logContainerEvent(container, "export") + daemon.LogContainerEvent(container, "export") return arch, err } diff --git a/daemon/kill.go b/daemon/kill.go index 91b16ad389..8b94fa3a88 100644 --- a/daemon/kill.go +++ b/daemon/kill.go @@ -71,7 +71,7 @@ func (daemon *Daemon) killWithSignal(container *Container, sig int) error { return err } - daemon.logContainerEvent(container, "kill") + daemon.LogContainerEvent(container, "kill") return nil } diff --git a/daemon/monitor.go b/daemon/monitor.go index 4af0d2a2fd..4a24b7d94b 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -17,6 +17,12 @@ const ( loggerCloseTimeout = 10 * time.Second ) +// containerSupervisor defines the interface that a supervisor must implement +type containerSupervisor interface { + // LogContainerEvent generates events related to a given container + LogContainerEvent(*Container, string) +} + // containerMonitor monitors the execution of a container's main process. // If a restart policy is specified for the container the monitor will ensure that the // process is restarted based on the rules of the policy. When the container is finally stopped @@ -25,6 +31,9 @@ const ( type containerMonitor struct { mux sync.Mutex + // supervisor keeps track of the container and the events it generates + supervisor containerSupervisor + // container is the container being monitored container *Container @@ -57,8 +66,9 @@ type containerMonitor struct { // newContainerMonitor returns an initialized containerMonitor for the provided container // honoring the provided restart policy -func newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor { +func (daemon *Daemon) newContainerMonitor(container *Container, policy runconfig.RestartPolicy) *containerMonitor { return &containerMonitor{ + supervisor: daemon, container: container, restartPolicy: policy, timeIncrement: defaultTimeIncrement, @@ -138,7 +148,7 @@ func (m *containerMonitor) Start() error { pipes := execdriver.NewPipes(m.container.stdin, m.container.stdout, m.container.stderr, m.container.Config.OpenStdin) - m.container.logEvent("start") + m.logEvent("start") m.lastStartTime = time.Now() @@ -162,7 +172,7 @@ func (m *containerMonitor) Start() error { if m.shouldRestart(exitStatus.ExitCode) { m.container.setRestarting(&exitStatus) - m.container.logEvent("die") + m.logEvent("die") m.resetContainer(true) // sleep with a small time increment between each restart to help avoid issues cased by quickly @@ -177,7 +187,7 @@ func (m *containerMonitor) Start() error { continue } - m.container.logEvent("die") + m.logEvent("die") m.resetContainer(true) return err } @@ -249,7 +259,7 @@ func (m *containerMonitor) callback(processConfig *execdriver.ProcessConfig, pid go func() { _, ok := <-chOOM if ok { - m.container.logEvent("oom") + m.logEvent("oom") } }() @@ -345,3 +355,7 @@ func (m *containerMonitor) resetContainer(lock bool) { SysProcAttr: c.SysProcAttr, } } + +func (m *containerMonitor) logEvent(action string) { + m.supervisor.LogContainerEvent(m.container, action) +} diff --git a/daemon/pause.go b/daemon/pause.go index 4182e92e2f..be7f63394b 100644 --- a/daemon/pause.go +++ b/daemon/pause.go @@ -38,6 +38,6 @@ func (daemon *Daemon) containerPause(container *Container) error { return err } container.Paused = true - daemon.logContainerEvent(container, "pause") + daemon.LogContainerEvent(container, "pause") return nil } diff --git a/daemon/rename.go b/daemon/rename.go index 1aab1f9303..36421dcd7d 100644 --- a/daemon/rename.go +++ b/daemon/rename.go @@ -1,10 +1,11 @@ package daemon import ( + "strings" + "github.com/Sirupsen/logrus" derr "github.com/docker/docker/errors" "github.com/docker/libnetwork" - "strings" ) // ContainerRename changes the name of a container, using the oldName @@ -54,7 +55,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error { } if !container.Running { - container.logEvent("rename") + daemon.LogContainerEvent(container, "rename") return nil } @@ -78,6 +79,6 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error { return err } - container.logEvent("rename") + daemon.LogContainerEvent(container, "rename") return nil } diff --git a/daemon/resize.go b/daemon/resize.go index 199bc48b43..7fa5b6652a 100644 --- a/daemon/resize.go +++ b/daemon/resize.go @@ -8,7 +8,10 @@ func (daemon *Daemon) ContainerResize(name string, height, width int) error { return err } - return container.Resize(height, width) + if err = container.Resize(height, width); err == nil { + daemon.LogContainerEvent(container, "resize") + } + return err } // ContainerExecResize changes the size of the TTY of the process diff --git a/daemon/restart.go b/daemon/restart.go index a237f3a62c..fcfe6128fa 100644 --- a/daemon/restart.go +++ b/daemon/restart.go @@ -41,6 +41,6 @@ func (daemon *Daemon) containerRestart(container *Container, seconds int) error return err } - daemon.logContainerEvent(container, "restart") + daemon.LogContainerEvent(container, "restart") return nil } diff --git a/daemon/start.go b/daemon/start.go index 9326d584f5..de86b9e476 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -4,6 +4,7 @@ import ( "runtime" derr "github.com/docker/docker/errors" + "github.com/docker/docker/pkg/promise" "github.com/docker/docker/runconfig" "github.com/docker/docker/utils" ) @@ -83,7 +84,7 @@ func (daemon *Daemon) containerStart(container *Container) (err error) { } container.toDisk() container.cleanup() - daemon.logContainerEvent(container, "die") + daemon.LogContainerEvent(container, "die") } }() @@ -123,5 +124,19 @@ func (daemon *Daemon) containerStart(container *Container) (err error) { mounts = append(mounts, container.ipcMounts()...) container.command.Mounts = mounts - return container.waitForStart() + return daemon.waitForStart(container) +} + +func (daemon *Daemon) waitForStart(container *Container) error { + container.monitor = daemon.newContainerMonitor(container, container.hostConfig.RestartPolicy) + + // block until we either receive an error from the initial start of the container's + // process or until the process is running in the container + select { + case <-container.monitor.startSignal: + case err := <-promise.Go(container.monitor.Start): + return err + } + + return nil } diff --git a/daemon/stop.go b/daemon/stop.go index 17629acdd6..c97781b8fa 100644 --- a/daemon/stop.go +++ b/daemon/stop.go @@ -55,6 +55,6 @@ func (daemon *Daemon) containerStop(container *Container, seconds int) error { } } - daemon.logContainerEvent(container, "stop") + daemon.LogContainerEvent(container, "stop") return nil } diff --git a/daemon/top_unix.go b/daemon/top_unix.go index 4112fbcbb1..36ace121e6 100644 --- a/daemon/top_unix.go +++ b/daemon/top_unix.go @@ -76,6 +76,6 @@ func (daemon *Daemon) ContainerTop(name string, psArgs string) (*types.Container } } } - container.logEvent("top") + daemon.LogContainerEvent(container, "top") return procList, nil } diff --git a/daemon/unpause.go b/daemon/unpause.go index 4112494fc4..3397f44b45 100644 --- a/daemon/unpause.go +++ b/daemon/unpause.go @@ -38,6 +38,6 @@ func (daemon *Daemon) containerUnpause(container *Container) error { } container.Paused = false - daemon.logContainerEvent(container, "unpause") + daemon.LogContainerEvent(container, "unpause") return nil }