diff --git a/daemon/import.go b/daemon/import.go index 26f8ce66fe..e74f2cf962 100644 --- a/daemon/import.go +++ b/daemon/import.go @@ -97,7 +97,7 @@ func (daemon *Daemon) ImportImage(src string, newRef reference.Named, msg string } } - outStream.Write(sf.FormatStatus("", id.String())) daemon.EventsService.Log("import", id.String(), "") + outStream.Write(sf.FormatStatus("", id.String())) return nil } diff --git a/integration-cli/docker_cli_build_test.go b/integration-cli/docker_cli_build_test.go index e9b557382f..b8972844d2 100644 --- a/integration-cli/docker_cli_build_test.go +++ b/integration-cli/docker_cli_build_test.go @@ -7,7 +7,6 @@ import ( "encoding/json" "fmt" "io/ioutil" - "net/http" "os" "os/exec" "path/filepath" @@ -1882,45 +1881,14 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { } defer ctx.Close() - finish := make(chan struct{}) - defer close(finish) - eventStart := make(chan struct{}) eventDie := make(chan struct{}) - containerID := make(chan string) - startEpoch := daemonTime(c).Unix() - // Watch for events since epoch. - eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10)) - stdout, err := eventsCmd.StdoutPipe() - if err != nil { - c.Fatal(err) - } - if err := eventsCmd.Start(); err != nil { - c.Fatal(err) - } - defer eventsCmd.Process.Kill() - - // Goroutine responsible for watching start/die events from `docker events` - go func() { - cid := <-containerID - - matchStart := regexp.MustCompile(cid + `(.*) start$`) - matchDie := regexp.MustCompile(cid + `(.*) die$`) - - // - // Read lines of `docker events` looking for container start and stop. - // - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - switch { - case matchStart.MatchString(scanner.Text()): - close(eventStart) - case matchDie.MatchString(scanner.Text()): - close(eventDie) - } - } - }() + observer, err := newEventObserver(c) + c.Assert(err, checker.IsNil) + err = observer.Start() + c.Assert(err, checker.IsNil) + defer observer.Stop() buildCmd := exec.Command(dockerBinary, "build", "-t", name, ".") buildCmd.Dir = ctx.Dir @@ -1932,17 +1900,39 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { matchCID := regexp.MustCompile("Running in (.+)") scanner := bufio.NewScanner(stdoutBuild) + + outputBuffer := new(bytes.Buffer) + var buildID string for scanner.Scan() { line := scanner.Text() + outputBuffer.WriteString(line) + outputBuffer.WriteString("\n") if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 { - containerID <- matches[1] + buildID = matches[1] break } } + if buildID == "" { + c.Fatalf("Unable to find build container id in build output:\n%s", outputBuffer.String()) + } + + matchStart := regexp.MustCompile(buildID + `.* start\z`) + matchDie := regexp.MustCompile(buildID + `.* die\z`) + + matcher := func(text string) { + switch { + case matchStart.MatchString(text): + close(eventStart) + case matchDie.MatchString(text): + close(eventDie) + } + } + go observer.Match(matcher) + select { - case <-time.After(5 * time.Second): - c.Fatal("failed to observe build container start in timely fashion") + case <-time.After(10 * time.Second): + c.Fatal(observer.TimeoutError(buildID, "start")) case <-eventStart: // Proceeds from here when we see the container fly past in the // output of "docker events". @@ -1961,9 +1951,9 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { } select { - case <-time.After(5 * time.Second): + case <-time.After(10 * time.Second): // If we don't get here in a timely fashion, it wasn't killed. - c.Fatal("container cancel did not succeed") + c.Fatal(observer.TimeoutError(buildID, "die")) case <-eventDie: // We saw the container shut down in the `docker events` stream, // as expected. @@ -6498,22 +6488,12 @@ func (s *DockerSuite) TestBuildNoNamedVolume(c *check.C) { func (s *DockerSuite) TestBuildTagEvent(c *check.C) { testRequires(c, DaemonIsLinux) - resp, rc, err := sockRequestRaw("GET", `/events?filters={"event":["tag"]}`, nil, "application/json") - c.Assert(err, check.IsNil) - defer rc.Close() - c.Assert(resp.StatusCode, check.Equals, http.StatusOK) - type event struct { - Status string `json:"status"` - ID string `json:"id"` - } - ch := make(chan event) - go func() { - ev := event{} - if err := json.NewDecoder(rc).Decode(&ev); err == nil { - ch <- ev - } - }() + observer, err := newEventObserver(c, "--filter", "event=tag") + c.Assert(err, check.IsNil) + err = observer.Start() + c.Assert(err, check.IsNil) + defer observer.Stop() dockerFile := `FROM busybox RUN echo events @@ -6521,12 +6501,20 @@ func (s *DockerSuite) TestBuildTagEvent(c *check.C) { _, err = buildImage("test", dockerFile, false) c.Assert(err, check.IsNil) + matchTag := regexp.MustCompile("test:latest") + eventTag := make(chan bool) + matcher := func(text string) { + if matchTag.MatchString(text) { + close(eventTag) + } + } + go observer.Match(matcher) + select { - case ev := <-ch: - c.Assert(ev.Status, check.Equals, "tag") - c.Assert(ev.ID, check.Equals, "test:latest") - case <-time.After(5 * time.Second): - c.Fatal("The 'tag' event not heard from the server") + case <-time.After(10 * time.Second): + c.Fatal(observer.TimeoutError("test:latest", "tag")) + case <-eventTag: + // We saw the tag event as expected. } } diff --git a/integration-cli/docker_cli_events_test.go b/integration-cli/docker_cli_events_test.go index 23028a5b2d..c9647ec182 100644 --- a/integration-cli/docker_cli_events_test.go +++ b/integration-cli/docker_cli_events_test.go @@ -2,7 +2,6 @@ package main import ( "bufio" - "bytes" "fmt" "io/ioutil" "net/http" @@ -216,27 +215,13 @@ func (s *DockerSuite) TestEventsImagePull(c *check.C) { func (s *DockerSuite) TestEventsImageImport(c *check.C) { testRequires(c, DaemonIsLinux) - since := daemonTime(c).Unix() - id := make(chan string) - eventImport := make(chan struct{}) - eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(since, 10)) - stdout, err := eventsCmd.StdoutPipe() + observer, err := newEventObserver(c) c.Assert(err, checker.IsNil) - c.Assert(eventsCmd.Start(), checker.IsNil) - defer eventsCmd.Process.Kill() - go func() { - containerID := <-id - - matchImport := regexp.MustCompile(containerID + `: import$`) - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - if matchImport.MatchString(scanner.Text()) { - close(eventImport) - } - } - }() + err = observer.Start() + c.Assert(err, checker.IsNil) + defer observer.Stop() out, _ := dockerCmd(c, "run", "-d", "busybox", "true") cleanedContainerID := strings.TrimSpace(out) @@ -246,12 +231,20 @@ func (s *DockerSuite) TestEventsImageImport(c *check.C) { exec.Command(dockerBinary, "import", "-"), ) c.Assert(err, checker.IsNil, check.Commentf("import failed with output: %q", out)) - newContainerID := strings.TrimSpace(out) - id <- newContainerID + imageRef := strings.TrimSpace(out) + + eventImport := make(chan bool) + matchImport := regexp.MustCompile(imageRef + `: import\z`) + matcher := func(text string) { + if matchImport.MatchString(text) { + close(eventImport) + } + } + go observer.Match(matcher) select { case <-time.After(5 * time.Second): - c.Fatal("failed to observe image import in timely fashion") + c.Fatal(observer.TimeoutError(imageRef, "import")) case <-eventImport: // ignore, done } @@ -421,76 +414,65 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) { func (s *DockerSuite) TestEventsStreaming(c *check.C) { testRequires(c, DaemonIsLinux) - start := daemonTime(c).Unix() - id := make(chan string) eventCreate := make(chan struct{}) eventStart := make(chan struct{}) eventDie := make(chan struct{}) eventDestroy := make(chan struct{}) - eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(start, 10)) - stdout, err := eventsCmd.StdoutPipe() + observer, err := newEventObserver(c) c.Assert(err, checker.IsNil) - c.Assert(eventsCmd.Start(), checker.IsNil, check.Commentf("failed to start 'docker events'")) - defer eventsCmd.Process.Kill() - - buffer := new(bytes.Buffer) - go func() { - containerID := <-id - - matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`) - matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`) - matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`) - matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`) - - scanner := bufio.NewScanner(stdout) - for scanner.Scan() { - text := scanner.Text() - buffer.WriteString(text + "\n") - switch { - case matchCreate.MatchString(text): - close(eventCreate) - case matchStart.MatchString(text): - close(eventStart) - case matchDie.MatchString(text): - close(eventDie) - case matchDestroy.MatchString(text): - close(eventDestroy) - } - } - }() + err = observer.Start() + c.Assert(err, checker.IsNil) + defer observer.Stop() out, _ := dockerCmd(c, "run", "-d", "busybox:latest", "true") - cleanedContainerID := strings.TrimSpace(out) - id <- cleanedContainerID + containerID := strings.TrimSpace(out) + matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`) + matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`) + matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`) + matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`) + + matcher := func(text string) { + switch { + case matchCreate.MatchString(text): + close(eventCreate) + case matchStart.MatchString(text): + close(eventStart) + case matchDie.MatchString(text): + close(eventDie) + case matchDestroy.MatchString(text): + close(eventDestroy) + } + } + go observer.Match(matcher) select { case <-time.After(5 * time.Second): - c.Fatal("failed to observe container create in timely fashion", "\n", buffer.String()) + c.Fatal(observer.TimeoutError(containerID, "create")) case <-eventCreate: // ignore, done } select { case <-time.After(5 * time.Second): - c.Fatal("failed to observe container start in timely fashion", "\n", buffer.String()) + c.Fatal(observer.TimeoutError(containerID, "start")) case <-eventStart: // ignore, done } select { case <-time.After(5 * time.Second): - c.Fatal("failed to observe container die in timely fashion", "\n", buffer.String()) + c.Fatal(observer.TimeoutError(containerID, "die")) case <-eventDie: // ignore, done } - dockerCmd(c, "rm", cleanedContainerID) + dockerCmd(c, "rm", containerID) select { case <-time.After(5 * time.Second): - c.Fatal("failed to observe container destroy in timely fashion", "\n", buffer.String()) + c.Fatal(observer.TimeoutError(containerID, "destroy")) case <-eventDestroy: // ignore, done } diff --git a/integration-cli/events_utils.go b/integration-cli/events_utils.go new file mode 100644 index 0000000000..faff4fc6f8 --- /dev/null +++ b/integration-cli/events_utils.go @@ -0,0 +1,78 @@ +package main + +import ( + "bufio" + "bytes" + "fmt" + "io" + "os/exec" + "strconv" + + "github.com/go-check/check" +) + +// eventMatcher is a function that tries to match an event input. +type eventMatcher func(text string) + +// eventObserver runs an events commands and observes its output. +type eventObserver struct { + buffer *bytes.Buffer + command *exec.Cmd + stdout io.Reader +} + +// newEventObserver creates the observer and initializes the command +// without running it. Users must call `eventObserver.Start` to start the command. +func newEventObserver(c *check.C, args ...string) (*eventObserver, error) { + since := daemonTime(c).Unix() + + cmdArgs := []string{"events", "--since", strconv.FormatInt(since, 10)} + if len(args) > 0 { + cmdArgs = append(cmdArgs, args...) + } + eventsCmd := exec.Command(dockerBinary, cmdArgs...) + stdout, err := eventsCmd.StdoutPipe() + if err != nil { + return nil, err + } + + return &eventObserver{ + buffer: new(bytes.Buffer), + command: eventsCmd, + stdout: stdout, + }, nil +} + +// Start starts the events command. +func (e *eventObserver) Start() error { + return e.command.Start() +} + +// Stop stops the events command. +func (e *eventObserver) Stop() { + e.command.Process.Kill() +} + +// Match tries to match the events output with a given matcher. +func (e *eventObserver) Match(match eventMatcher) { + scanner := bufio.NewScanner(e.stdout) + + for scanner.Scan() { + text := scanner.Text() + e.buffer.WriteString(text) + e.buffer.WriteString("\n") + + match(text) + } +} + +// TimeoutError generates an error for a given containerID and event type. +// It attaches the events command output to the error. +func (e *eventObserver) TimeoutError(id, event string) error { + return fmt.Errorf("failed to observe event `%s` for %s\n%v", event, id, e.output()) +} + +// output returns the events command output read until now by the Match goroutine. +func (e *eventObserver) output() string { + return e.buffer.String() +}