1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #12120 from LK4D4/split_attach

Remove engine usage from attach
This commit is contained in:
Tibor Vass 2015-04-07 18:13:35 -04:00
commit dccda72a41
5 changed files with 44 additions and 91 deletions

View file

@ -904,16 +904,12 @@ func postContainersAttach(eng *engine.Engine, version version.Version, w http.Re
return fmt.Errorf("Missing parameter") return fmt.Errorf("Missing parameter")
} }
var ( d := getDaemon(eng)
job = eng.Job("container_inspect", vars["name"])
c, err = job.Stdout.AddEnv() cont, err := d.Get(vars["name"])
)
if err != nil { if err != nil {
return err return err
} }
if err = job.Run(); err != nil {
return err
}
inStream, outStream, err := hijackServer(w) inStream, outStream, err := hijackServer(w)
if err != nil { if err != nil {
@ -929,25 +925,17 @@ func postContainersAttach(eng *engine.Engine, version version.Version, w http.Re
fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n") fmt.Fprintf(outStream, "HTTP/1.1 200 OK\r\nContent-Type: application/vnd.docker.raw-stream\r\n\r\n")
} }
if c.GetSubEnv("Config") != nil && !c.GetSubEnv("Config").GetBool("Tty") && version.GreaterThanOrEqualTo("1.6") { if !cont.Config.Tty && version.GreaterThanOrEqualTo("1.6") {
errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr) errStream = stdcopy.NewStdWriter(outStream, stdcopy.Stderr)
outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout) outStream = stdcopy.NewStdWriter(outStream, stdcopy.Stdout)
} else { } else {
errStream = outStream errStream = outStream
} }
logs := r.Form.Get("logs") != ""
stream := r.Form.Get("stream") != ""
job = eng.Job("attach", vars["name"]) if err := cont.AttachWithLogs(inStream, outStream, errStream, logs, stream); err != nil {
job.Setenv("logs", r.Form.Get("logs"))
job.Setenv("stream", r.Form.Get("stream"))
job.Setenv("stdin", r.Form.Get("stdin"))
job.Setenv("stdout", r.Form.Get("stdout"))
job.Setenv("stderr", r.Form.Get("stderr"))
job.Stdin.Add(inStream)
job.Stdout.Add(outStream)
job.Stderr.Set(errStream)
if err := job.Run(); err != nil {
fmt.Fprintf(outStream, "Error attaching: %s\n", err) fmt.Fprintf(outStream, "Error attaching: %s\n", err)
} }
return nil return nil
} }
@ -959,23 +947,19 @@ func wsContainersAttach(eng *engine.Engine, version version.Version, w http.Resp
if vars == nil { if vars == nil {
return fmt.Errorf("Missing parameter") return fmt.Errorf("Missing parameter")
} }
d := getDaemon(eng)
if err := eng.Job("container_inspect", vars["name"]).Run(); err != nil { cont, err := d.Get(vars["name"])
if err != nil {
return err return err
} }
h := websocket.Handler(func(ws *websocket.Conn) { h := websocket.Handler(func(ws *websocket.Conn) {
defer ws.Close() defer ws.Close()
job := eng.Job("attach", vars["name"]) logs := r.Form.Get("logs") != ""
job.Setenv("logs", r.Form.Get("logs")) stream := r.Form.Get("stream") != ""
job.Setenv("stream", r.Form.Get("stream"))
job.Setenv("stdin", r.Form.Get("stdin")) if err := cont.AttachWithLogs(ws, ws, ws, logs, stream); err != nil {
job.Setenv("stdout", r.Form.Get("stdout"))
job.Setenv("stderr", r.Form.Get("stderr"))
job.Stdin.Add(ws)
job.Stdout.Add(ws)
job.Stderr.Set(ws)
if err := job.Run(); err != nil {
logrus.Errorf("Error attaching websocket: %s", err) logrus.Errorf("Error attaching websocket: %s", err)
} }
}) })

View file

@ -573,7 +573,7 @@ func (b *Builder) create() (*daemon.Container, error) {
func (b *Builder) run(c *daemon.Container) error { func (b *Builder) run(c *daemon.Container) error {
var errCh chan error var errCh chan error
if b.Verbose { if b.Verbose {
errCh = b.Daemon.Attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, nil, b.OutStream, b.ErrStream) errCh = c.Attach(nil, b.OutStream, b.ErrStream)
} }
//start the container //start the container

View file

@ -2,57 +2,36 @@ package daemon
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io" "io"
"os" "os"
"sync" "sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/docker/engine"
"github.com/docker/docker/pkg/jsonlog" "github.com/docker/docker/pkg/jsonlog"
"github.com/docker/docker/pkg/promise" "github.com/docker/docker/pkg/promise"
"github.com/docker/docker/utils" "github.com/docker/docker/utils"
) )
func (daemon *Daemon) ContainerAttach(job *engine.Job) error { func (c *Container) AttachWithLogs(stdin io.ReadCloser, stdout, stderr io.Writer, logs, stream bool) error {
if len(job.Args) != 1 {
return fmt.Errorf("Usage: %s CONTAINER\n", job.Name)
}
var (
name = job.Args[0]
logs = job.GetenvBool("logs")
stream = job.GetenvBool("stream")
stdin = job.GetenvBool("stdin")
stdout = job.GetenvBool("stdout")
stderr = job.GetenvBool("stderr")
)
container, err := daemon.Get(name)
if err != nil {
return err
}
//logs
if logs { if logs {
cLog, err := container.ReadLog("json") cLog, err := c.ReadLog("json")
if err != nil && os.IsNotExist(err) { if err != nil && os.IsNotExist(err) {
// Legacy logs // Legacy logs
logrus.Debugf("Old logs format") logrus.Debugf("Old logs format")
if stdout { if stdout != nil {
cLog, err := container.ReadLog("stdout") cLog, err := c.ReadLog("stdout")
if err != nil { if err != nil {
logrus.Errorf("Error reading logs (stdout): %s", err) logrus.Errorf("Error reading logs (stdout): %s", err)
} else if _, err := io.Copy(job.Stdout, cLog); err != nil { } else if _, err := io.Copy(stdout, cLog); err != nil {
logrus.Errorf("Error streaming logs (stdout): %s", err) logrus.Errorf("Error streaming logs (stdout): %s", err)
} }
} }
if stderr { if stderr != nil {
cLog, err := container.ReadLog("stderr") cLog, err := c.ReadLog("stderr")
if err != nil { if err != nil {
logrus.Errorf("Error reading logs (stderr): %s", err) logrus.Errorf("Error reading logs (stderr): %s", err)
} else if _, err := io.Copy(job.Stderr, cLog); err != nil { } else if _, err := io.Copy(stderr, cLog); err != nil {
logrus.Errorf("Error streaming logs (stderr): %s", err) logrus.Errorf("Error streaming logs (stderr): %s", err)
} }
} }
@ -69,11 +48,11 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) error {
logrus.Errorf("Error streaming logs: %s", err) logrus.Errorf("Error streaming logs: %s", err)
break break
} }
if l.Stream == "stdout" && stdout { if l.Stream == "stdout" && stdout != nil {
io.WriteString(job.Stdout, l.Log) io.WriteString(stdout, l.Log)
} }
if l.Stream == "stderr" && stderr { if l.Stream == "stderr" && stderr != nil {
io.WriteString(job.Stderr, l.Log) io.WriteString(stderr, l.Log)
} }
} }
} }
@ -81,38 +60,29 @@ func (daemon *Daemon) ContainerAttach(job *engine.Job) error {
//stream //stream
if stream { if stream {
var ( var stdinPipe io.ReadCloser
cStdin io.ReadCloser r, w := io.Pipe()
cStdout, cStderr io.Writer go func() {
) defer w.Close()
defer logrus.Debugf("Closing buffered stdin pipe")
if stdin { io.Copy(w, stdin)
r, w := io.Pipe() }()
go func() { stdinPipe = r
defer w.Close() <-c.Attach(stdinPipe, stdout, stderr)
defer logrus.Debugf("Closing buffered stdin pipe")
io.Copy(w, job.Stdin)
}()
cStdin = r
}
if stdout {
cStdout = job.Stdout
}
if stderr {
cStderr = job.Stderr
}
<-daemon.Attach(&container.StreamConfig, container.Config.OpenStdin, container.Config.StdinOnce, container.Config.Tty, cStdin, cStdout, cStderr)
// If we are in stdinonce mode, wait for the process to end // If we are in stdinonce mode, wait for the process to end
// otherwise, simply return // otherwise, simply return
if container.Config.StdinOnce && !container.Config.Tty { if c.Config.StdinOnce && !c.Config.Tty {
container.WaitStop(-1 * time.Second) c.WaitStop(-1 * time.Second)
} }
} }
return nil return nil
} }
func (daemon *Daemon) Attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error { func (c *Container) Attach(stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
return attach(&c.StreamConfig, c.Config.OpenStdin, c.Config.StdinOnce, c.Config.Tty, stdin, stdout, stderr)
}
func attach(streamConfig *StreamConfig, openStdin, stdinOnce, tty bool, stdin io.ReadCloser, stdout io.Writer, stderr io.Writer) chan error {
var ( var (
cStdout, cStderr io.ReadCloser cStdout, cStderr io.ReadCloser
cStdin io.WriteCloser cStdin io.WriteCloser

View file

@ -115,7 +115,6 @@ type Daemon struct {
func (daemon *Daemon) Install(eng *engine.Engine) error { func (daemon *Daemon) Install(eng *engine.Engine) error {
// FIXME: remove ImageDelete's dependency on Daemon, then move to graph/ // FIXME: remove ImageDelete's dependency on Daemon, then move to graph/
for name, method := range map[string]engine.Handler{ for name, method := range map[string]engine.Handler{
"attach": daemon.ContainerAttach,
"commit": daemon.ContainerCommit, "commit": daemon.ContainerCommit,
"container_changes": daemon.ContainerChanges, "container_changes": daemon.ContainerChanges,
"container_copy": daemon.ContainerCopy, "container_copy": daemon.ContainerCopy,

View file

@ -218,7 +218,7 @@ func (d *Daemon) ContainerExecStart(job *engine.Job) error {
execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin execConfig.StreamConfig.stdinPipe = ioutils.NopWriteCloser(ioutil.Discard) // Silently drop stdin
} }
attachErr := d.Attach(&execConfig.StreamConfig, execConfig.OpenStdin, true, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr) attachErr := attach(&execConfig.StreamConfig, execConfig.OpenStdin, true, execConfig.ProcessConfig.Tty, cStdin, cStdout, cStderr)
execErr := make(chan error) execErr := make(chan error)