Merge pull request #7344 from shykes/cleanup-server-attach

Move container-related jobs out of deprecated server/ package
This commit is contained in:
Victor Vieux 2014-07-31 15:56:48 -07:00
commit 90dadea84d
7 changed files with 177 additions and 136 deletions

View File

@ -684,6 +684,11 @@ func (b *buildFile) run(c *daemon.Container) error {
var errCh chan error
if b.verbose {
errCh = utils.Go(func() error {
// FIXME: call the 'attach' job so that daemon.Attach can be made private
//
// FIXME (LK4D4): Also, maybe makes sense to call "logs" job, it is like attach
// but without hijacking for stdin. Also, with attach there can be race
// condition because of some output already was printed before it.
return <-b.daemon.Attach(c, nil, nil, b.outStream, b.errStream)
})
}

View File

@ -1,11 +1,122 @@
package daemon
import (
"encoding/json"
"fmt"
"io"
"os"
"time"
"github.com/docker/docker/engine"
"github.com/docker/docker/utils"
)
func (daemon *Daemon) ContainerAttach(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s CONTAINER\n", job.Name)
}
var (
name = job.Args[0]
logs = job.GetenvBool("logs")
stream = job.GetenvBool("stream")
stdin = job.GetenvBool("stdin")
stdout = job.GetenvBool("stdout")
stderr = job.GetenvBool("stderr")
)
container := daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
//logs
if logs {
cLog, err := container.ReadLog("json")
if err != nil && os.IsNotExist(err) {
// Legacy logs
utils.Debugf("Old logs format")
if stdout {
cLog, err := container.ReadLog("stdout")
if err != nil {
utils.Errorf("Error reading logs (stdout): %s", err)
} else if _, err := io.Copy(job.Stdout, cLog); err != nil {
utils.Errorf("Error streaming logs (stdout): %s", err)
}
}
if stderr {
cLog, err := container.ReadLog("stderr")
if err != nil {
utils.Errorf("Error reading logs (stderr): %s", err)
} else if _, err := io.Copy(job.Stderr, cLog); err != nil {
utils.Errorf("Error streaming logs (stderr): %s", err)
}
}
} else if err != nil {
utils.Errorf("Error reading logs (json): %s", err)
} else {
dec := json.NewDecoder(cLog)
for {
l := &utils.JSONLog{}
if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
utils.Errorf("Error streaming logs: %s", err)
break
}
if l.Stream == "stdout" && stdout {
fmt.Fprintf(job.Stdout, "%s", l.Log)
}
if l.Stream == "stderr" && stderr {
fmt.Fprintf(job.Stderr, "%s", l.Log)
}
}
}
}
//stream
if stream {
var (
cStdin io.ReadCloser
cStdout, cStderr io.Writer
cStdinCloser io.Closer
)
if stdin {
r, w := io.Pipe()
go func() {
defer w.Close()
defer utils.Debugf("Closing buffered stdin pipe")
io.Copy(w, job.Stdin)
}()
cStdin = r
cStdinCloser = job.Stdin
}
if stdout {
cStdout = job.Stdout
}
if stderr {
cStderr = job.Stderr
}
<-daemon.Attach(container, cStdin, cStdinCloser, cStdout, cStderr)
// If we are in stdinonce mode, wait for the process to end
// otherwise, simply return
if container.Config.StdinOnce && !container.Config.Tty {
container.State.WaitStop(-1 * time.Second)
}
}
return engine.StatusOK
}
// FIXME: this should be private, and every outside subsystem
// should go through the "container_attach" job. But that would require
// that job to be properly documented, as well as the relationship betweem
// Attach and ContainerAttach.
//
// This method is in use by builder/builder.go.
func (daemon *Daemon) Attach(container *Container, stdin io.ReadCloser, stdinCloser io.Closer, stdout io.Writer, stderr io.Writer) chan error {
var (
cStdout, cStderr io.ReadCloser

View File

@ -105,7 +105,19 @@ type Daemon struct {
// Install installs daemon capabilities to eng.
func (daemon *Daemon) Install(eng *engine.Engine) error {
return eng.Register("container_inspect", daemon.ContainerInspect)
if err := eng.Register("container_inspect", daemon.ContainerInspect); err != nil {
return err
}
if err := eng.Register("attach", daemon.ContainerAttach); err != nil {
return err
}
if err := eng.Register("pause", daemon.ContainerPause); err != nil {
return err
}
if err := eng.Register("unpause", daemon.ContainerUnpause); err != nil {
return err
}
return nil
}
// List returns an array of all containers registered in the daemon.

37
daemon/pause.go Normal file
View File

@ -0,0 +1,37 @@
package daemon
import (
"github.com/docker/docker/engine"
)
func (daemon *Daemon) ContainerPause(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s CONTAINER", job.Name)
}
name := job.Args[0]
container := daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
if err := container.Pause(); err != nil {
return job.Errorf("Cannot pause container %s: %s", name, err)
}
job.Eng.Job("log", "pause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
return engine.StatusOK
}
func (daemon *Daemon) ContainerUnpause(job *engine.Job) engine.Status {
if n := len(job.Args); n < 1 || n > 2 {
return job.Errorf("Usage: %s CONTAINER", job.Name)
}
name := job.Args[0]
container := daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
if err := container.Unpause(); err != nil {
return job.Errorf("Cannot unpause container %s: %s", name, err)
}
job.Eng.Job("log", "unpause", container.ID, daemon.Repositories().ImageName(container.Image)).Run()
return engine.StatusOK
}

View File

@ -31,38 +31,6 @@ import (
"github.com/docker/docker/utils"
)
func (srv *Server) ContainerPause(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s CONTAINER", job.Name)
}
name := job.Args[0]
container := srv.daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
if err := container.Pause(); err != nil {
return job.Errorf("Cannot pause container %s: %s", name, err)
}
srv.LogEvent("pause", container.ID, srv.daemon.Repositories().ImageName(container.Image))
return engine.StatusOK
}
func (srv *Server) ContainerUnpause(job *engine.Job) engine.Status {
if n := len(job.Args); n < 1 || n > 2 {
return job.Errorf("Usage: %s CONTAINER", job.Name)
}
name := job.Args[0]
container := srv.daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
if err := container.Unpause(); err != nil {
return job.Errorf("Cannot unpause container %s: %s", name, err)
}
srv.LogEvent("unpause", container.ID, srv.daemon.Repositories().ImageName(container.Image))
return engine.StatusOK
}
// ContainerKill send signal to the container
// If no signal is given (sig 0), then Kill with SIGKILL and wait
// for the container to exit.
@ -798,106 +766,6 @@ func (srv *Server) ContainerLogs(job *engine.Job) engine.Status {
return engine.StatusOK
}
func (srv *Server) ContainerAttach(job *engine.Job) engine.Status {
if len(job.Args) != 1 {
return job.Errorf("Usage: %s CONTAINER\n", job.Name)
}
var (
name = job.Args[0]
logs = job.GetenvBool("logs")
stream = job.GetenvBool("stream")
stdin = job.GetenvBool("stdin")
stdout = job.GetenvBool("stdout")
stderr = job.GetenvBool("stderr")
)
container := srv.daemon.Get(name)
if container == nil {
return job.Errorf("No such container: %s", name)
}
//logs
if logs {
cLog, err := container.ReadLog("json")
if err != nil && os.IsNotExist(err) {
// Legacy logs
utils.Debugf("Old logs format")
if stdout {
cLog, err := container.ReadLog("stdout")
if err != nil {
utils.Errorf("Error reading logs (stdout): %s", err)
} else if _, err := io.Copy(job.Stdout, cLog); err != nil {
utils.Errorf("Error streaming logs (stdout): %s", err)
}
}
if stderr {
cLog, err := container.ReadLog("stderr")
if err != nil {
utils.Errorf("Error reading logs (stderr): %s", err)
} else if _, err := io.Copy(job.Stderr, cLog); err != nil {
utils.Errorf("Error streaming logs (stderr): %s", err)
}
}
} else if err != nil {
utils.Errorf("Error reading logs (json): %s", err)
} else {
dec := json.NewDecoder(cLog)
for {
l := &utils.JSONLog{}
if err := dec.Decode(l); err == io.EOF {
break
} else if err != nil {
utils.Errorf("Error streaming logs: %s", err)
break
}
if l.Stream == "stdout" && stdout {
fmt.Fprintf(job.Stdout, "%s", l.Log)
}
if l.Stream == "stderr" && stderr {
fmt.Fprintf(job.Stderr, "%s", l.Log)
}
}
}
}
//stream
if stream {
var (
cStdin io.ReadCloser
cStdout, cStderr io.Writer
cStdinCloser io.Closer
)
if stdin {
r, w := io.Pipe()
go func() {
defer w.Close()
defer utils.Debugf("Closing buffered stdin pipe")
io.Copy(w, job.Stdin)
}()
cStdin = r
cStdinCloser = job.Stdin
}
if stdout {
cStdout = job.Stdout
}
if stderr {
cStderr = job.Stderr
}
<-srv.daemon.Attach(container, cStdin, cStdinCloser, cStdout, cStderr)
// If we are in stdinonce mode, wait for the process to end
// otherwise, simply return
if container.Config.StdinOnce && !container.Config.Tty {
container.State.WaitStop(-1 * time.Second)
}
}
return engine.StatusOK
}
func (srv *Server) ContainerCopy(job *engine.Job) engine.Status {
if len(job.Args) != 2 {
return job.Errorf("Usage: %s CONTAINER RESOURCE\n", job.Name)

View File

@ -71,6 +71,16 @@ func (srv *Server) Events(job *engine.Job) engine.Status {
}
}
// FIXME: this is a shim to allow breaking up other parts of Server without
// dragging the sphagetti dependency along.
func (srv *Server) Log(job *engine.Job) engine.Status {
if len(job.Args) != 3 {
return job.Errorf("usage: %s ACTION ID FROM", job.Name)
}
srv.LogEvent(job.Args[0], job.Args[1], job.Args[2])
return engine.StatusOK
}
func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage {
now := time.Now().UTC().Unix()
jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now}

View File

@ -92,8 +92,6 @@ func InitServer(job *engine.Job) engine.Status {
"restart": srv.ContainerRestart,
"start": srv.ContainerStart,
"kill": srv.ContainerKill,
"pause": srv.ContainerPause,
"unpause": srv.ContainerUnpause,
"wait": srv.ContainerWait,
"tag": srv.ImageTag, // FIXME merge with "image_tag"
"resize": srv.ContainerResize,
@ -105,7 +103,7 @@ func InitServer(job *engine.Job) engine.Status {
"history": srv.ImageHistory,
"viz": srv.ImagesViz,
"container_copy": srv.ContainerCopy,
"attach": srv.ContainerAttach,
"log": srv.Log,
"logs": srv.ContainerLogs,
"changes": srv.ContainerChanges,
"top": srv.ContainerTop,