diff --git a/builder/builder.go b/builder/builder.go index 7a94f2793a..fb344e97e3 100644 --- a/builder/builder.go +++ b/builder/builder.go @@ -684,6 +684,11 @@ func (b *buildFile) run(c *daemon.Container) error { var errCh chan error if b.verbose { errCh = utils.Go(func() error { + // FIXME: call the 'attach' job so that daemon.Attach can be made private + // + // FIXME (LK4D4): Also, maybe makes sense to call "logs" job, it is like attach + // but without hijacking for stdin. Also, with attach there can be race + // condition because of some output already was printed before it. return <-b.daemon.Attach(c, nil, nil, b.outStream, b.errStream) }) } diff --git a/daemon/attach.go b/daemon/attach.go index 665d4a20af..ad8649f4e1 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -1,11 +1,122 @@ package daemon import ( + "encoding/json" + "fmt" "io" + "os" + "time" + "github.com/docker/docker/engine" "github.com/docker/docker/utils" ) +func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status { + if len(job.Args) != 1 { + return job.Errorf("Usage: %s CONTAINER\n", job.Name) + } + + var ( + name = job.Args[0] + logs = job.GetenvBool("logs") + stream = job.GetenvBool("stream") + stdin = job.GetenvBool("stdin") + stdout = job.GetenvBool("stdout") + stderr = job.GetenvBool("stderr") + ) + + container := daemon.Get(name) + if container == nil { + return job.Errorf("No such container: %s", name) + } + + //logs + if logs { + cLog, err := container.ReadLog("json") + if err != nil && os.IsNotExist(err) { + // Legacy logs + utils.Debugf("Old logs format") + if stdout { + cLog, err := container.ReadLog("stdout") + if err != nil { + utils.Errorf("Error reading logs (stdout): %s", err) + } else if _, err := io.Copy(job.Stdout, cLog); err != nil { + utils.Errorf("Error streaming logs (stdout): %s", err) + } + } + if stderr { + cLog, err := container.ReadLog("stderr") + if err != nil { + utils.Errorf("Error reading logs (stderr): %s", err) + } else if _, err := io.Copy(job.Stderr, cLog); err != nil { + utils.Errorf("Error streaming logs (stderr): %s", err) + } + } + } else if err != nil { + utils.Errorf("Error reading logs (json): %s", err) + } else { + dec := json.NewDecoder(cLog) + for { + l := &utils.JSONLog{} + + if err := dec.Decode(l); err == io.EOF { + break + } else if err != nil { + utils.Errorf("Error streaming logs: %s", err) + break + } + if l.Stream == "stdout" && stdout { + fmt.Fprintf(job.Stdout, "%s", l.Log) + } + if l.Stream == "stderr" && stderr { + fmt.Fprintf(job.Stderr, "%s", l.Log) + } + } + } + } + + //stream + if stream { + var ( + cStdin io.ReadCloser + cStdout, cStderr io.Writer + cStdinCloser io.Closer + ) + + if stdin { + r, w := io.Pipe() + go func() { + defer w.Close() + defer utils.Debugf("Closing buffered stdin pipe") + io.Copy(w, job.Stdin) + }() + cStdin = r + cStdinCloser = job.Stdin + } + if stdout { + cStdout = job.Stdout + } + if stderr { + cStderr = job.Stderr + } + + <-daemon.Attach(container, cStdin, cStdinCloser, cStdout, cStderr) + + // If we are in stdinonce mode, wait for the process to end + // otherwise, simply return + if container.Config.StdinOnce && !container.Config.Tty { + container.State.WaitStop(-1 * time.Second) + } + } + return engine.StatusOK +} + +// FIXME: this should be private, and every outside subsystem +// should go through the "container_attach" job. But that would require +// that job to be properly documented, as well as the relationship betweem +// Attach and ContainerAttach. +// +// This method is in use by builder/builder.go. func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error { var ( cStdout, cStderr io.ReadCloser diff --git a/daemon/daemon.go b/daemon/daemon.go index 1d0f6748af..e4304b2a48 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -105,7 +105,19 @@ type Daemon struct { // Install installs daemon capabilities to eng. func (daemon *Daemon) Install(eng *engine.Engine) error { - return eng.Register("container_inspect", daemon.ContainerInspect) + if err := eng.Register("container_inspect", daemon.ContainerInspect); err != nil { + return err + } + if err := eng.Register("attach", daemon.ContainerAttach); err != nil { + return err + } + if err := eng.Register("pause", daemon.ContainerPause); err != nil { + return err + } + if err := eng.Register("unpause", daemon.ContainerUnpause); err != nil { + return err + } + return nil } // List returns an array of all containers registered in the daemon. diff --git a/daemon/pause.go b/daemon/pause.go new file mode 100644 index 0000000000..72e5cee020 --- /dev/null +++ b/daemon/pause.go @@ -0,0 +1,37 @@ +package daemon + +import ( + "github.com/docker/docker/engine" +) + +func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status { + if len(job.Args) != 1 { + return job.Errorf("Usage: %s CONTAINER", job.Name) + } + name := job.Args[0] + container := daemon.Get(name) + if container == nil { + return job.Errorf("No such container: %s", name) + } + if err := container.Pause(); err != nil { + return job.Errorf("Cannot pause container %s: %s", name, err) + } + job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + return engine.StatusOK +} + +func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status { + if n := len(job.Args); n < 1 || n > 2 { + return job.Errorf("Usage: %s CONTAINER", job.Name) + } + name := job.Args[0] + container := daemon.Get(name) + if container == nil { + return job.Errorf("No such container: %s", name) + } + if err := container.Unpause(); err != nil { + return job.Errorf("Cannot unpause container %s: %s", name, err) + } + job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run() + return engine.StatusOK +} diff --git a/server/container.go b/server/container.go index 3e76086a1a..b37fa521f5 100644 --- a/server/container.go +++ b/server/container.go @@ -31,38 +31,6 @@ import ( "github.com/docker/docker/utils" ) -func (srv *Server) ContainerPause(job *engine.Job) engine.Status { - if len(job.Args) != 1 { - return job.Errorf("Usage: %s CONTAINER", job.Name) - } - name := job.Args[0] - container := srv.daemon.Get(name) - if container == nil { - return job.Errorf("No such container: %s", name) - } - if err := container.Pause(); err != nil { - return job.Errorf("Cannot pause container %s: %s", name, err) - } - srv.LogEvent("pause", container.ID, srv.daemon.Repositories().ImageName(container.Image)) - return engine.StatusOK -} - -func (srv *Server) ContainerUnpause(job *engine.Job) engine.Status { - if n := len(job.Args); n < 1 || n > 2 { - return job.Errorf("Usage: %s CONTAINER", job.Name) - } - name := job.Args[0] - container := srv.daemon.Get(name) - if container == nil { - return job.Errorf("No such container: %s", name) - } - if err := container.Unpause(); err != nil { - return job.Errorf("Cannot unpause container %s: %s", name, err) - } - srv.LogEvent("unpause", container.ID, srv.daemon.Repositories().ImageName(container.Image)) - return engine.StatusOK -} - // ContainerKill send signal to the container // If no signal is given (sig 0), then Kill with SIGKILL and wait // for the container to exit. @@ -798,106 +766,6 @@ func (srv *Server) ContainerLogs(job *engine.Job) engine.Status { return engine.StatusOK } -func (srv *Server) ContainerAttach(job *engine.Job) engine.Status { - if len(job.Args) != 1 { - return job.Errorf("Usage: %s CONTAINER\n", job.Name) - } - - var ( - name = job.Args[0] - logs = job.GetenvBool("logs") - stream = job.GetenvBool("stream") - stdin = job.GetenvBool("stdin") - stdout = job.GetenvBool("stdout") - stderr = job.GetenvBool("stderr") - ) - - container := srv.daemon.Get(name) - if container == nil { - return job.Errorf("No such container: %s", name) - } - - //logs - if logs { - cLog, err := container.ReadLog("json") - if err != nil && os.IsNotExist(err) { - // Legacy logs - utils.Debugf("Old logs format") - if stdout { - cLog, err := container.ReadLog("stdout") - if err != nil { - utils.Errorf("Error reading logs (stdout): %s", err) - } else if _, err := io.Copy(job.Stdout, cLog); err != nil { - utils.Errorf("Error streaming logs (stdout): %s", err) - } - } - if stderr { - cLog, err := container.ReadLog("stderr") - if err != nil { - utils.Errorf("Error reading logs (stderr): %s", err) - } else if _, err := io.Copy(job.Stderr, cLog); err != nil { - utils.Errorf("Error streaming logs (stderr): %s", err) - } - } - } else if err != nil { - utils.Errorf("Error reading logs (json): %s", err) - } else { - dec := json.NewDecoder(cLog) - for { - l := &utils.JSONLog{} - - if err := dec.Decode(l); err == io.EOF { - break - } else if err != nil { - utils.Errorf("Error streaming logs: %s", err) - break - } - if l.Stream == "stdout" && stdout { - fmt.Fprintf(job.Stdout, "%s", l.Log) - } - if l.Stream == "stderr" && stderr { - fmt.Fprintf(job.Stderr, "%s", l.Log) - } - } - } - } - - //stream - if stream { - var ( - cStdin io.ReadCloser - cStdout, cStderr io.Writer - cStdinCloser io.Closer - ) - - if stdin { - r, w := io.Pipe() - go func() { - defer w.Close() - defer utils.Debugf("Closing buffered stdin pipe") - io.Copy(w, job.Stdin) - }() - cStdin = r - cStdinCloser = job.Stdin - } - if stdout { - cStdout = job.Stdout - } - if stderr { - cStderr = job.Stderr - } - - <-srv.daemon.Attach(container, cStdin, cStdinCloser, cStdout, cStderr) - - // If we are in stdinonce mode, wait for the process to end - // otherwise, simply return - if container.Config.StdinOnce && !container.Config.Tty { - container.State.WaitStop(-1 * time.Second) - } - } - return engine.StatusOK -} - func (srv *Server) ContainerCopy(job *engine.Job) engine.Status { if len(job.Args) != 2 { return job.Errorf("Usage: %s CONTAINER RESOURCE\n", job.Name) diff --git a/server/events.go b/server/events.go index 0f13d0885e..214dd69e04 100644 --- a/server/events.go +++ b/server/events.go @@ -71,6 +71,16 @@ func (srv *Server) Events(job *engine.Job) engine.Status { } } +// FIXME: this is a shim to allow breaking up other parts of Server without +// dragging the sphagetti dependency along. +func (srv *Server) Log(job *engine.Job) engine.Status { + if len(job.Args) != 3 { + return job.Errorf("usage: %s ACTION ID FROM", job.Name) + } + srv.LogEvent(job.Args[0], job.Args[1], job.Args[2]) + return engine.StatusOK +} + func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { now := time.Now().UTC().Unix() jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now} diff --git a/server/init.go b/server/init.go index dfb24c0e96..9b6033b6c0 100644 --- a/server/init.go +++ b/server/init.go @@ -92,8 +92,6 @@ func InitServer(job *engine.Job) engine.Status { "restart": srv.ContainerRestart, "start": srv.ContainerStart, "kill": srv.ContainerKill, - "pause": srv.ContainerPause, - "unpause": srv.ContainerUnpause, "wait": srv.ContainerWait, "tag": srv.ImageTag, // FIXME merge with "image_tag" "resize": srv.ContainerResize, @@ -105,7 +103,7 @@ func InitServer(job *engine.Job) engine.Status { "history": srv.ImageHistory, "viz": srv.ImagesViz, "container_copy": srv.ContainerCopy, - "attach": srv.ContainerAttach, + "log": srv.Log, "logs": srv.ContainerLogs, "changes": srv.ContainerChanges, "top": srv.ContainerTop,