From c808940c041ca8c7f9f9c5e0fa93e37ebfdd412a Mon Sep 17 00:00:00 2001 From: Solomon Hykes Date: Mon, 1 Apr 2013 23:52:20 -0700 Subject: [PATCH] Refactored CmdRun and CmdAttach to use Container.Attach --- commands.go | 110 ++++++++------------------------------------------- container.go | 81 +++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 93 deletions(-) diff --git a/commands.go b/commands.go index 0c896f29ba..ee481fd282 100644 --- a/commands.go +++ b/commands.go @@ -13,7 +13,6 @@ import ( "runtime" "strconv" "strings" - "sync" "text/tabwriter" "time" "unicode" @@ -533,6 +532,7 @@ func (srv *Server) CmdPull(stdin io.ReadCloser, stdout io.Writer, args ...string return nil } + // FIXME: CmdPull should be a wrapper around Runtime.Pull() if srv.runtime.graph.LookupRemoteImage(remote, srv.runtime.authConfig) { if err := srv.runtime.graph.PullImage(stdout, remote, srv.runtime.authConfig); err != nil { return err @@ -796,56 +796,7 @@ func (srv *Server) CmdAttach(stdin io.ReadCloser, stdout io.Writer, args ...stri if container == nil { return fmt.Errorf("No such container: %s", name) } - - cStdout, err := container.StdoutPipe() - if err != nil { - return err - } - cStderr, err := container.StderrPipe() - if err != nil { - return err - } - - var wg sync.WaitGroup - if container.Config.OpenStdin { - cStdin, err := container.StdinPipe() - if err != nil { - return err - } - wg.Add(1) - go func() { - Debugf("Begin stdin pipe [attach]") - io.Copy(cStdin, stdin) - - // When stdin get closed, it means the client has been detached - // Make sure all pipes are closed. - if err := cStdout.Close(); err != nil { - Debugf("Error closing stdin pipe: %s", err) - } - if err := cStderr.Close(); err != nil { - Debugf("Error closing stderr pipe: %s", err) - } - - wg.Add(-1) - Debugf("End of stdin pipe [attach]") - }() - } - wg.Add(1) - go func() { - Debugf("Begin stdout pipe [attach]") - io.Copy(stdout, cStdout) - wg.Add(-1) - Debugf("End of stdout pipe [attach]") - }() - wg.Add(1) - go func() { - Debugf("Begin stderr pipe [attach]") - io.Copy(stdout, cStderr) - wg.Add(-1) - Debugf("End of stderr pipe [attach]") - }() - wg.Wait() - return nil + return <-container.Attach(stdin, stdout, stdout) } // Ports type - Used to parse multiple -p flags @@ -919,55 +870,28 @@ func (srv *Server) CmdRun(stdin io.ReadCloser, stdout io.Writer, args ...string) return err } } - if config.OpenStdin { - cmdStdin, err := container.StdinPipe() - if err != nil { - return err - } - if !config.Detach { - Go(func() error { - _, err := io.Copy(cmdStdin, stdin) - cmdStdin.Close() - return err - }) - } - } // Run the container if !config.Detach { - cmdStderr, err := container.StderrPipe() - if err != nil { - return err - } - cmdStdout, err := container.StdoutPipe() - if err != nil { - return err + var attachErr chan error + if config.OpenStdin { + config.StdinOnce = true + Debugf("Attaching with stdin\n") + attachErr = container.Attach(stdin, stdout, stdout) + } else { + Debugf("Attaching without stdin\n") + attachErr = container.Attach(nil, stdout, nil) } + Debugf("Starting\n") if err := container.Start(); err != nil { return err } - sendingStdout := Go(func() error { - _, err := io.Copy(stdout, cmdStdout) - return err - }) - sendingStderr := Go(func() error { - _, err := io.Copy(stdout, cmdStderr) - return err - }) - errSendingStdout := <-sendingStdout - errSendingStderr := <-sendingStderr - if errSendingStdout != nil { - return errSendingStdout - } - if errSendingStderr != nil { - return errSendingStderr - } - container.Wait() - } else { - if err := container.Start(); err != nil { - return err - } - fmt.Fprintln(stdout, container.ShortId()) + Debugf("Waiting for attach to return\n") + return <-attachErr } + if err := container.Start(); err != nil { + return err + } + fmt.Fprintln(stdout, container.ShortId()) return nil } diff --git a/container.go b/container.go index 8f993a35e1..4f320f32fd 100644 --- a/container.go +++ b/container.go @@ -56,6 +56,7 @@ type Config struct { Ports []int Tty bool // Attach standard streams to a tty, including stdin if it is not closed. OpenStdin bool // Open stdin + StdinOnce bool // If true, close stdin after the 1 attached client disconnects. Env []string Cmd []string Image string // Name of the image as it was passed by the operator (eg. could be symbolic) @@ -229,6 +230,86 @@ func (container *Container) start() error { return container.cmd.Start() } +func (container *Container) Attach(stdin io.Reader, stdout io.Writer, stderr io.Writer) chan error { + var cStdout io.ReadCloser + var cStderr io.ReadCloser + var nJobs int + errors := make(chan error, 3) + if stdin != nil && container.Config.OpenStdin { + nJobs += 1 + if cStdin, err := container.StdinPipe(); err != nil { + errors <- err + } else { + go func() { + Debugf("[start] attach stdin\n") + defer Debugf("[end] attach stdin\n") + if container.Config.StdinOnce { + defer cStdin.Close() + } + _, err := io.Copy(cStdin, stdin) + if err != nil { + Debugf("[error] attach stdout: %s\n", err) + } + errors <- err + }() + } + } + if stdout != nil { + nJobs += 1 + if p, err := container.StdoutPipe(); err != nil { + errors <- err + } else { + cStdout = p + go func() { + Debugf("[start] attach stdout\n") + defer Debugf("[end] attach stdout\n") + _, err := io.Copy(stdout, cStdout) + if err != nil { + Debugf("[error] attach stdout: %s\n", err) + } + errors <- err + }() + } + } + if stderr != nil { + nJobs += 1 + if p, err := container.StderrPipe(); err != nil { + errors <- err + } else { + cStderr = p + go func() { + Debugf("[start] attach stderr\n") + defer Debugf("[end] attach stderr\n") + _, err := io.Copy(stderr, cStderr) + if err != nil { + Debugf("[error] attach stderr: %s\n", err) + } + errors <- err + }() + } + } + return Go(func() error { + if cStdout != nil { + defer cStdout.Close() + } + if cStderr != nil { + defer cStderr.Close() + } + // FIXME: how do clean up the stdin goroutine without the unwanted side effect + // of closing the passed stdin? Add an intermediary io.Pipe? + for i := 0; i < nJobs; i += 1 { + Debugf("Waiting for job %d/%d\n", i+1, nJobs) + if err := <-errors; err != nil { + Debugf("Job %d returned error %s. Aborting all jobs\n", i+1, err) + return err + } + Debugf("Job %d completed successfully\n", i+1) + } + Debugf("All jobs completed successfully\n") + return nil + }) +} + func (container *Container) Start() error { if container.State.Running { return fmt.Errorf("The container %s is already running.", container.Id)