diff --git a/api.go b/api.go index aadd79e3c8..1708420e12 100644 --- a/api.go +++ b/api.go @@ -1,6 +1,8 @@ package docker import ( + "bufio" + "bytes" "code.google.com/p/go.net/websocket" "encoding/base64" "encoding/json" @@ -565,12 +567,18 @@ func postContainersCreate(srv *Server, version float64, w http.ResponseWriter, r job.SetenvList("Dns", defaultDns) } // Read container ID from the first line of stdout - job.StdoutParseString(&out.ID) + job.Stdout.AddString(&out.ID) // Read warnings from stderr - job.StderrParseLines(&out.Warnings, 0) + warnings := &bytes.Buffer{} + job.Stderr.Add(warnings) if err := job.Run(); err != nil { return err } + // Parse warnings from stderr + scanner := bufio.NewScanner(warnings) + for scanner.Scan() { + out.Warnings = append(out.Warnings, scanner.Text()) + } if job.GetenvInt("Memory") > 0 && !srv.runtime.capabilities.MemoryLimit { log.Println("WARNING: Your kernel does not support memory limit capabilities. Limitation discarded.") out.Warnings = append(out.Warnings, "Your kernel does not support memory limit capabilities. Limitation discarded.") diff --git a/engine/engine.go b/engine/engine.go index edd04fc5c0..5a411e8cc2 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -9,7 +9,7 @@ import ( "strings" ) -type Handler func(*Job) string +type Handler func(*Job) Status var globalHandlers map[string]Handler @@ -99,10 +99,12 @@ func (eng *Engine) Job(name string, args ...string) *Job { Eng: eng, Name: name, Args: args, - Stdin: os.Stdin, - Stdout: os.Stdout, - Stderr: os.Stderr, + Stdin: NewInput(), + Stdout: NewOutput(), + Stderr: NewOutput(), } + job.Stdout.Add(utils.NopWriteCloser(os.Stdout)) + job.Stderr.Add(utils.NopWriteCloser(os.Stderr)) handler, exists := eng.handlers[name] if exists { job.handler = handler diff --git a/engine/engine_test.go b/engine/engine_test.go index fdc0b0ec7f..f877a3e4d5 100644 --- a/engine/engine_test.go +++ b/engine/engine_test.go @@ -38,8 +38,9 @@ func TestJob(t *testing.T) { t.Fatalf("job1.handler should be empty") } - h := func(j *Job) string { - return j.Name + h := func(j *Job) Status { + j.Printf("%s\n", j.Name) + return 42 } eng.Register("dummy2", h) @@ -49,7 +50,7 @@ func TestJob(t *testing.T) { t.Fatalf("job2.handler shouldn't be nil") } - if job2.handler(job2) != job2.Name { + if job2.handler(job2) != 42 { t.Fatalf("handler dummy2 was not found in job2") } } diff --git a/engine/helpers_test.go b/engine/helpers_test.go index 5b1c0baf60..488529fc7f 100644 --- a/engine/helpers_test.go +++ b/engine/helpers_test.go @@ -1,32 +1,18 @@ package engine import ( - "fmt" "github.com/dotcloud/docker/utils" - "io/ioutil" - "runtime" - "strings" "testing" ) var globalTestID string func newTestEngine(t *testing.T) *Engine { - // Use the caller function name as a prefix. - // This helps trace temp directories back to their test. - pc, _, _, _ := runtime.Caller(1) - callerLongName := runtime.FuncForPC(pc).Name() - parts := strings.Split(callerLongName, ".") - callerShortName := parts[len(parts)-1] - if globalTestID == "" { - globalTestID = utils.RandomString()[:4] - } - prefix := fmt.Sprintf("docker-test%s-%s-", globalTestID, callerShortName) - root, err := ioutil.TempDir("", prefix) + tmp, err := utils.TestDirectory("") if err != nil { t.Fatal(err) } - eng, err := New(root) + eng, err := New(tmp) if err != nil { t.Fatal(err) } diff --git a/engine/job.go b/engine/job.go index 365c94e06b..066a144664 100644 --- a/engine/job.go +++ b/engine/job.go @@ -1,16 +1,13 @@ package engine import ( - "bufio" "bytes" "encoding/json" "fmt" "io" - "io/ioutil" - "os" "strconv" "strings" - "sync" + "time" ) // A job is the fundamental unit of work in the docker engine. @@ -31,126 +28,75 @@ type Job struct { Name string Args []string env []string - Stdin io.Reader - Stdout io.Writer - Stderr io.Writer - handler func(*Job) string - status string + Stdout *Output + Stderr *Output + Stdin *Input + handler Handler + status Status + end time.Time onExit []func() } +type Status int + +const ( + StatusOK Status = 0 + StatusErr Status = 1 + StatusNotFound Status = 127 +) + // Run executes the job and blocks until the job completes. // If the job returns a failure status, an error is returned // which includes the status. func (job *Job) Run() error { - defer func() { - var wg sync.WaitGroup - for _, f := range job.onExit { - wg.Add(1) - go func(f func()) { - f() - wg.Done() - }(f) - } - wg.Wait() - }() - if job.Stdout != nil && job.Stdout != os.Stdout { - job.Stdout = io.MultiWriter(job.Stdout, os.Stdout) - } - if job.Stderr != nil && job.Stderr != os.Stderr { - job.Stderr = io.MultiWriter(job.Stderr, os.Stderr) + // FIXME: make this thread-safe + // FIXME: implement wait + if !job.end.IsZero() { + return fmt.Errorf("%s: job has already completed", job.Name) } + // Log beginning and end of the job job.Eng.Logf("+job %s", job.CallString()) defer func() { job.Eng.Logf("-job %s%s", job.CallString(), job.StatusString()) }() + var errorMessage string + job.Stderr.AddString(&errorMessage) if job.handler == nil { - job.status = "command not found" + job.Errorf("%s: command not found", job.Name) + job.status = 127 } else { job.status = job.handler(job) + job.end = time.Now() } - if job.status != "0" { - return fmt.Errorf("%s: %s", job.Name, job.status) + // Wait for all background tasks to complete + if err := job.Stdout.Close(); err != nil { + return err + } + if err := job.Stderr.Close(); err != nil { + return err + } + if job.status != 0 { + return fmt.Errorf("%s: %s", job.Name, errorMessage) } return nil } -func (job *Job) StdoutParseLines(dst *[]string, limit int) { - job.parseLines(job.StdoutPipe(), dst, limit) -} - -func (job *Job) StderrParseLines(dst *[]string, limit int) { - job.parseLines(job.StderrPipe(), dst, limit) -} - -func (job *Job) parseLines(src io.Reader, dst *[]string, limit int) { - var wg sync.WaitGroup - wg.Add(1) - go func() { - defer wg.Done() - scanner := bufio.NewScanner(src) - for scanner.Scan() { - // If the limit is reached, flush the rest of the source and return - if limit > 0 && len(*dst) >= limit { - io.Copy(ioutil.Discard, src) - return - } - line := scanner.Text() - // Append the line (with delimitor removed) - *dst = append(*dst, line) - } - }() - job.onExit = append(job.onExit, wg.Wait) -} - -func (job *Job) StdoutParseString(dst *string) { - lines := make([]string, 0, 1) - job.StdoutParseLines(&lines, 1) - job.onExit = append(job.onExit, func() { - if len(lines) >= 1 { - *dst = lines[0] - } - }) -} - -func (job *Job) StderrParseString(dst *string) { - lines := make([]string, 0, 1) - job.StderrParseLines(&lines, 1) - job.onExit = append(job.onExit, func() { *dst = lines[0] }) -} - -func (job *Job) StdoutPipe() io.ReadCloser { - r, w := io.Pipe() - job.Stdout = w - job.onExit = append(job.onExit, func() { w.Close() }) - return r -} - -func (job *Job) StderrPipe() io.ReadCloser { - r, w := io.Pipe() - job.Stderr = w - job.onExit = append(job.onExit, func() { w.Close() }) - return r -} - func (job *Job) CallString() string { return fmt.Sprintf("%s(%s)", job.Name, strings.Join(job.Args, ", ")) } func (job *Job) StatusString() string { - // FIXME: if a job returns the empty string, it will be printed - // as not having returned. - // (this only affects String which is a convenience function). - if job.status != "" { - var okerr string - if job.status == "0" { - okerr = "OK" - } else { - okerr = "ERR" - } - return fmt.Sprintf(" = %s (%s)", okerr, job.status) + // If the job hasn't completed, status string is empty + if job.end.IsZero() { + return "" } - return "" + var okerr string + if job.status == StatusOK { + okerr = "OK" + } else { + okerr = "ERR" + } + return fmt.Sprintf(" = %s (%d)", okerr, job.status) } // String returns a human-readable description of `job` @@ -338,5 +284,8 @@ func (job *Job) Printf(format string, args ...interface{}) (n int, err error) { func (job *Job) Errorf(format string, args ...interface{}) (n int, err error) { return fmt.Fprintf(job.Stderr, format, args...) - +} + +func (job *Job) Error(err error) (int, error) { + return fmt.Fprintf(job.Stderr, "%s", err) } diff --git a/engine/streams.go b/engine/streams.go new file mode 100644 index 0000000000..697407f1f4 --- /dev/null +++ b/engine/streams.go @@ -0,0 +1,166 @@ +package engine + +import ( + "bufio" + "container/ring" + "fmt" + "io" + "sync" +) + +type Output struct { + sync.Mutex + dests []io.Writer + tasks sync.WaitGroup +} + +// NewOutput returns a new Output object with no destinations attached. +// Writing to an empty Output will cause the written data to be discarded. +func NewOutput() *Output { + return &Output{} +} + +// Add attaches a new destination to the Output. Any data subsequently written +// to the output will be written to the new destination in addition to all the others. +// This method is thread-safe. +// FIXME: Add cannot fail +func (o *Output) Add(dst io.Writer) error { + o.Mutex.Lock() + defer o.Mutex.Unlock() + o.dests = append(o.dests, dst) + return nil +} + +// AddPipe creates an in-memory pipe with io.Pipe(), adds its writing end as a destination, +// and returns its reading end for consumption by the caller. +// This is a rough equivalent similar to Cmd.StdoutPipe() in the standard os/exec package. +// This method is thread-safe. +func (o *Output) AddPipe() (io.Reader, error) { + r, w := io.Pipe() + o.Add(w) + return r, nil +} + +// AddTail starts a new goroutine which will read all subsequent data written to the output, +// line by line, and append the last `n` lines to `dst`. +func (o *Output) AddTail(dst *[]string, n int) error { + src, err := o.AddPipe() + if err != nil { + return err + } + o.tasks.Add(1) + go func() { + defer o.tasks.Done() + Tail(src, n, dst) + }() + return nil +} + +// AddString starts a new goroutine which will read all subsequent data written to the output, +// line by line, and store the last line into `dst`. +func (o *Output) AddString(dst *string) error { + src, err := o.AddPipe() + if err != nil { + return err + } + o.tasks.Add(1) + go func() { + defer o.tasks.Done() + lines := make([]string, 0, 1) + Tail(src, 1, &lines) + if len(lines) == 0 { + *dst = "" + } else { + *dst = lines[0] + } + }() + return nil +} + +// Write writes the same data to all registered destinations. +// This method is thread-safe. +func (o *Output) Write(p []byte) (n int, err error) { + o.Mutex.Lock() + defer o.Mutex.Unlock() + var firstErr error + for _, dst := range o.dests { + _, err := dst.Write(p) + if err != nil && firstErr == nil { + firstErr = err + } + } + return len(p), err +} + +// Close unregisters all destinations and waits for all background +// AddTail and AddString tasks to complete. +// The Close method of each destination is called if it exists. +func (o *Output) Close() error { + o.Mutex.Lock() + defer o.Mutex.Unlock() + var firstErr error + for _, dst := range o.dests { + if closer, ok := dst.(io.WriteCloser); ok { + err := closer.Close() + if err != nil && firstErr == nil { + firstErr = err + } + } + } + o.tasks.Wait() + return firstErr +} + +type Input struct { + src io.Reader + sync.Mutex +} + +// NewInput returns a new Input object with no source attached. +// Reading to an empty Input will return io.EOF. +func NewInput() *Input { + return &Input{} +} + +// Read reads from the input in a thread-safe way. +func (i *Input) Read(p []byte) (n int, err error) { + i.Mutex.Lock() + defer i.Mutex.Unlock() + if i.src == nil { + return 0, io.EOF + } + return i.src.Read(p) +} + +// Add attaches a new source to the input. +// Add can only be called once per input. Subsequent calls will +// return an error. +func (i *Input) Add(src io.Reader) error { + i.Mutex.Lock() + defer i.Mutex.Unlock() + if i.src != nil { + return fmt.Errorf("Maximum number of sources reached: 1") + } + i.src = src + return nil +} + +// Tail reads from `src` line per line, and returns the last `n` lines as an array. +// A ring buffer is used to only store `n` lines at any time. +func Tail(src io.Reader, n int, dst *[]string) { + scanner := bufio.NewScanner(src) + r := ring.New(n) + for scanner.Scan() { + if n == 0 { + continue + } + r.Value = scanner.Text() + r = r.Next() + } + r.Do(func(v interface{}) { + if v == nil { + return + } + *dst = append(*dst, v.(string)) + }) +} diff --git a/integration/api_test.go b/integration/api_test.go index 3d5a2e42e0..896831c3d0 100644 --- a/integration/api_test.go +++ b/integration/api_test.go @@ -304,6 +304,10 @@ func TestGetContainersJSON(t *testing.T) { Cmd: []string{"echo", "test"}, }, t) + if containerID == "" { + t.Fatalf("Received empty container ID") + } + req, err := http.NewRequest("GET", "/containers/json?all=1", nil) if err != nil { t.Fatal(err) diff --git a/integration/container_test.go b/integration/container_test.go index 05eb48728c..6cd72c8608 100644 --- a/integration/container_test.go +++ b/integration/container_test.go @@ -499,7 +499,7 @@ func TestCreateVolume(t *testing.T) { t.Fatal(err) } var id string - jobCreate.StdoutParseString(&id) + jobCreate.Stdout.AddString(&id) if err := jobCreate.Run(); err != nil { t.Fatal(err) } @@ -1502,7 +1502,7 @@ func TestOnlyLoopbackExistsWhenUsingDisableNetworkOption(t *testing.T) { t.Fatal(err) } var id string - jobCreate.StdoutParseString(&id) + jobCreate.Stdout.AddString(&id) if err := jobCreate.Run(); err != nil { t.Fatal(err) } diff --git a/integration/runtime_test.go b/integration/runtime_test.go index 1ab6d0a080..7074a14ce9 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -390,7 +390,7 @@ func startEchoServerContainer(t *testing.T, proto string) (*docker.Runtime, *doc jobCreate.SetenvList("Cmd", []string{"sh", "-c", cmd}) jobCreate.SetenvList("PortSpecs", []string{fmt.Sprintf("%s/%s", strPort, proto)}) jobCreate.SetenvJson("ExposedPorts", ep) - jobCreate.StdoutParseString(&id) + jobCreate.Stdout.AddString(&id) if err := jobCreate.Run(); err != nil { t.Fatal(err) } diff --git a/integration/server_test.go b/integration/server_test.go index 494e23fef3..3e85effe8f 100644 --- a/integration/server_test.go +++ b/integration/server_test.go @@ -224,7 +224,7 @@ func TestRunWithTooLowMemoryLimit(t *testing.T) { job.Setenv("CpuShares", "1000") job.SetenvList("Cmd", []string{"/bin/cat"}) var id string - job.StdoutParseString(&id) + job.Stdout.AddString(&id) if err := job.Run(); err == nil { t.Errorf("Memory limit is smaller than the allowed limit. Container creation should've failed!") } diff --git a/integration/utils_test.go b/integration/utils_test.go index 1f47c45382..2feaf25396 100644 --- a/integration/utils_test.go +++ b/integration/utils_test.go @@ -46,7 +46,7 @@ func createNamedTestContainer(eng *engine.Engine, config *docker.Config, f utils if err := job.ImportEnv(config); err != nil { f.Fatal(err) } - job.StdoutParseString(&shortId) + job.Stdout.AddString(&shortId) if err := job.Run(); err != nil { f.Fatal(err) } diff --git a/server.go b/server.go index 725b964a70..ae6ccd4f40 100644 --- a/server.go +++ b/server.go @@ -39,15 +39,18 @@ func init() { // jobInitApi runs the remote api server `srv` as a daemon, // Only one api server can run at the same time - this is enforced by a pidfile. // The signals SIGINT and SIGTERM are intercepted for cleanup. -func jobInitApi(job *engine.Job) string { +func jobInitApi(job *engine.Job) engine.Status { job.Logf("Creating server") + // FIXME: ImportEnv deprecates ConfigFromJob srv, err := NewServer(job.Eng, ConfigFromJob(job)) if err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } if srv.runtime.config.Pidfile != "" { job.Logf("Creating pidfile") if err := utils.CreatePidFile(srv.runtime.config.Pidfile); err != nil { + // FIXME: do we need fatal here instead of returning a job error? log.Fatal(err) } } @@ -68,18 +71,21 @@ func jobInitApi(job *engine.Job) string { job.Eng.Hack_SetGlobalVar("httpapi.bridgeIP", srv.runtime.networkManager.bridgeNetwork.IP) } if err := job.Eng.Register("create", srv.ContainerCreate); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } if err := job.Eng.Register("start", srv.ContainerStart); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } if err := job.Eng.Register("serveapi", srv.ListenAndServe); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } - return "0" + return engine.StatusOK } -func (srv *Server) ListenAndServe(job *engine.Job) string { +func (srv *Server) ListenAndServe(job *engine.Job) engine.Status { protoAddrs := job.Args chErrors := make(chan error, len(protoAddrs)) for _, protoAddr := range protoAddrs { @@ -94,7 +100,8 @@ func (srv *Server) ListenAndServe(job *engine.Job) string { log.Println("/!\\ DON'T BIND ON ANOTHER IP ADDRESS THAN 127.0.0.1 IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\") } default: - return "Invalid protocol format." + job.Errorf("Invalid protocol format.") + return engine.StatusErr } go func() { // FIXME: merge Server.ListenAndServe with ListenAndServe @@ -104,10 +111,11 @@ func (srv *Server) ListenAndServe(job *engine.Job) string { for i := 0; i < len(protoAddrs); i += 1 { err := <-chErrors if err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } } - return "0" + return engine.StatusOK } func (srv *Server) DockerVersion() APIVersion { @@ -1260,19 +1268,22 @@ func (srv *Server) ImageImport(src, repo, tag string, in io.Reader, out io.Write return nil } -func (srv *Server) ContainerCreate(job *engine.Job) string { +func (srv *Server) ContainerCreate(job *engine.Job) engine.Status { var name string if len(job.Args) == 1 { name = job.Args[0] } else if len(job.Args) > 1 { - return fmt.Sprintf("Usage: %s ", job.Name) + job.Printf("Usage: %s", job.Name) + return engine.StatusErr } var config Config if err := job.ExportEnv(&config); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } if config.Memory != 0 && config.Memory < 524288 { - return "Minimum memory limit allowed is 512k" + job.Errorf("Minimum memory limit allowed is 512k") + return engine.StatusErr } if config.Memory > 0 && !srv.runtime.capabilities.MemoryLimit { config.Memory = 0 @@ -1287,9 +1298,11 @@ func (srv *Server) ContainerCreate(job *engine.Job) string { if tag == "" { tag = DEFAULTTAG } - return fmt.Sprintf("No such image: %s (tag: %s)", config.Image, tag) + job.Errorf("No such image: %s (tag: %s)", config.Image, tag) + return engine.StatusErr } - return err.Error() + job.Error(err) + return engine.StatusErr } srv.LogEvent("create", container.ID, srv.runtime.repositories.ImageName(container.Image)) // FIXME: this is necessary because runtime.Create might return a nil container @@ -1301,7 +1314,7 @@ func (srv *Server) ContainerCreate(job *engine.Job) string { for _, warning := range buildWarnings { job.Errorf("%s\n", warning) } - return "0" + return engine.StatusOK } func (srv *Server) ContainerRestart(name string, t int) error { @@ -1619,22 +1632,25 @@ func (srv *Server) RegisterLinks(name string, hostConfig *HostConfig) error { return nil } -func (srv *Server) ContainerStart(job *engine.Job) string { +func (srv *Server) ContainerStart(job *engine.Job) engine.Status { if len(job.Args) < 1 { - return fmt.Sprintf("Usage: %s container_id", job.Name) + job.Errorf("Usage: %s container_id", job.Name) + return engine.StatusErr } name := job.Args[0] runtime := srv.runtime container := runtime.Get(name) if container == nil { - return fmt.Sprintf("No such container: %s", name) + job.Errorf("No such container: %s", name) + return engine.StatusErr } // If no environment was set, then no hostconfig was passed. if len(job.Environ()) > 0 { var hostConfig HostConfig if err := job.ExportEnv(&hostConfig); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } // Validate the HostConfig binds. Make sure that: // 1) the source of a bind mount isn't / @@ -1647,29 +1663,33 @@ func (srv *Server) ContainerStart(job *engine.Job) string { // refuse to bind mount "/" to the container if source == "/" { - return fmt.Sprintf("Invalid bind mount '%s' : source can't be '/'", bind) + job.Errorf("Invalid bind mount '%s' : source can't be '/'", bind) + return engine.StatusErr } // ensure the source exists on the host _, err := os.Stat(source) if err != nil && os.IsNotExist(err) { - return fmt.Sprintf("Invalid bind mount '%s' : source doesn't exist", bind) + job.Errorf("Invalid bind mount '%s' : source doesn't exist", bind) + return engine.StatusErr } } // Register any links from the host config before starting the container // FIXME: we could just pass the container here, no need to lookup by name again. if err := srv.RegisterLinks(name, &hostConfig); err != nil { - return err.Error() + job.Error(err) + return engine.StatusErr } container.hostConfig = &hostConfig container.ToDisk() } if err := container.Start(); err != nil { - return fmt.Sprintf("Cannot start container %s: %s", name, err) + job.Errorf("Cannot start container %s: %s", name, err) + return engine.StatusErr } srv.LogEvent("start", container.ID, runtime.repositories.ImageName(container.Image)) - return "0" + return engine.StatusOK } func (srv *Server) ContainerStop(name string, t int) error {