mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #18831 from calavera/test_event_observer
Extract event processing to a common function for testing.
This commit is contained in:
commit
eacedcbe21
4 changed files with 172 additions and 124 deletions
|
@ -97,7 +97,7 @@ func (daemon *Daemon) ImportImage(src string, newRef reference.Named, msg string
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
outStream.Write(sf.FormatStatus("", id.String()))
|
|
||||||
daemon.EventsService.Log("import", id.String(), "")
|
daemon.EventsService.Log("import", id.String(), "")
|
||||||
|
outStream.Write(sf.FormatStatus("", id.String()))
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
@ -1882,45 +1881,14 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
|
||||||
}
|
}
|
||||||
defer ctx.Close()
|
defer ctx.Close()
|
||||||
|
|
||||||
finish := make(chan struct{})
|
|
||||||
defer close(finish)
|
|
||||||
|
|
||||||
eventStart := make(chan struct{})
|
eventStart := make(chan struct{})
|
||||||
eventDie := make(chan struct{})
|
eventDie := make(chan struct{})
|
||||||
containerID := make(chan string)
|
|
||||||
|
|
||||||
startEpoch := daemonTime(c).Unix()
|
observer, err := newEventObserver(c)
|
||||||
// Watch for events since epoch.
|
c.Assert(err, checker.IsNil)
|
||||||
eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10))
|
err = observer.Start()
|
||||||
stdout, err := eventsCmd.StdoutPipe()
|
c.Assert(err, checker.IsNil)
|
||||||
if err != nil {
|
defer observer.Stop()
|
||||||
c.Fatal(err)
|
|
||||||
}
|
|
||||||
if err := eventsCmd.Start(); err != nil {
|
|
||||||
c.Fatal(err)
|
|
||||||
}
|
|
||||||
defer eventsCmd.Process.Kill()
|
|
||||||
|
|
||||||
// Goroutine responsible for watching start/die events from `docker events`
|
|
||||||
go func() {
|
|
||||||
cid := <-containerID
|
|
||||||
|
|
||||||
matchStart := regexp.MustCompile(cid + `(.*) start$`)
|
|
||||||
matchDie := regexp.MustCompile(cid + `(.*) die$`)
|
|
||||||
|
|
||||||
//
|
|
||||||
// Read lines of `docker events` looking for container start and stop.
|
|
||||||
//
|
|
||||||
scanner := bufio.NewScanner(stdout)
|
|
||||||
for scanner.Scan() {
|
|
||||||
switch {
|
|
||||||
case matchStart.MatchString(scanner.Text()):
|
|
||||||
close(eventStart)
|
|
||||||
case matchDie.MatchString(scanner.Text()):
|
|
||||||
close(eventDie)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
buildCmd := exec.Command(dockerBinary, "build", "-t", name, ".")
|
buildCmd := exec.Command(dockerBinary, "build", "-t", name, ".")
|
||||||
buildCmd.Dir = ctx.Dir
|
buildCmd.Dir = ctx.Dir
|
||||||
|
@ -1932,17 +1900,39 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
|
||||||
|
|
||||||
matchCID := regexp.MustCompile("Running in (.+)")
|
matchCID := regexp.MustCompile("Running in (.+)")
|
||||||
scanner := bufio.NewScanner(stdoutBuild)
|
scanner := bufio.NewScanner(stdoutBuild)
|
||||||
|
|
||||||
|
outputBuffer := new(bytes.Buffer)
|
||||||
|
var buildID string
|
||||||
for scanner.Scan() {
|
for scanner.Scan() {
|
||||||
line := scanner.Text()
|
line := scanner.Text()
|
||||||
|
outputBuffer.WriteString(line)
|
||||||
|
outputBuffer.WriteString("\n")
|
||||||
if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 {
|
if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 {
|
||||||
containerID <- matches[1]
|
buildID = matches[1]
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if buildID == "" {
|
||||||
|
c.Fatalf("Unable to find build container id in build output:\n%s", outputBuffer.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
matchStart := regexp.MustCompile(buildID + `.* start\z`)
|
||||||
|
matchDie := regexp.MustCompile(buildID + `.* die\z`)
|
||||||
|
|
||||||
|
matcher := func(text string) {
|
||||||
|
switch {
|
||||||
|
case matchStart.MatchString(text):
|
||||||
|
close(eventStart)
|
||||||
|
case matchDie.MatchString(text):
|
||||||
|
close(eventDie)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go observer.Match(matcher)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
c.Fatal("failed to observe build container start in timely fashion")
|
c.Fatal(observer.TimeoutError(buildID, "start"))
|
||||||
case <-eventStart:
|
case <-eventStart:
|
||||||
// Proceeds from here when we see the container fly past in the
|
// Proceeds from here when we see the container fly past in the
|
||||||
// output of "docker events".
|
// output of "docker events".
|
||||||
|
@ -1961,9 +1951,9 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(10 * time.Second):
|
||||||
// If we don't get here in a timely fashion, it wasn't killed.
|
// If we don't get here in a timely fashion, it wasn't killed.
|
||||||
c.Fatal("container cancel did not succeed")
|
c.Fatal(observer.TimeoutError(buildID, "die"))
|
||||||
case <-eventDie:
|
case <-eventDie:
|
||||||
// We saw the container shut down in the `docker events` stream,
|
// We saw the container shut down in the `docker events` stream,
|
||||||
// as expected.
|
// as expected.
|
||||||
|
@ -6498,22 +6488,12 @@ func (s *DockerSuite) TestBuildNoNamedVolume(c *check.C) {
|
||||||
|
|
||||||
func (s *DockerSuite) TestBuildTagEvent(c *check.C) {
|
func (s *DockerSuite) TestBuildTagEvent(c *check.C) {
|
||||||
testRequires(c, DaemonIsLinux)
|
testRequires(c, DaemonIsLinux)
|
||||||
resp, rc, err := sockRequestRaw("GET", `/events?filters={"event":["tag"]}`, nil, "application/json")
|
|
||||||
c.Assert(err, check.IsNil)
|
|
||||||
defer rc.Close()
|
|
||||||
c.Assert(resp.StatusCode, check.Equals, http.StatusOK)
|
|
||||||
|
|
||||||
type event struct {
|
observer, err := newEventObserver(c, "--filter", "event=tag")
|
||||||
Status string `json:"status"`
|
c.Assert(err, check.IsNil)
|
||||||
ID string `json:"id"`
|
err = observer.Start()
|
||||||
}
|
c.Assert(err, check.IsNil)
|
||||||
ch := make(chan event)
|
defer observer.Stop()
|
||||||
go func() {
|
|
||||||
ev := event{}
|
|
||||||
if err := json.NewDecoder(rc).Decode(&ev); err == nil {
|
|
||||||
ch <- ev
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
dockerFile := `FROM busybox
|
dockerFile := `FROM busybox
|
||||||
RUN echo events
|
RUN echo events
|
||||||
|
@ -6521,12 +6501,20 @@ func (s *DockerSuite) TestBuildTagEvent(c *check.C) {
|
||||||
_, err = buildImage("test", dockerFile, false)
|
_, err = buildImage("test", dockerFile, false)
|
||||||
c.Assert(err, check.IsNil)
|
c.Assert(err, check.IsNil)
|
||||||
|
|
||||||
|
matchTag := regexp.MustCompile("test:latest")
|
||||||
|
eventTag := make(chan bool)
|
||||||
|
matcher := func(text string) {
|
||||||
|
if matchTag.MatchString(text) {
|
||||||
|
close(eventTag)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go observer.Match(matcher)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case ev := <-ch:
|
case <-time.After(10 * time.Second):
|
||||||
c.Assert(ev.Status, check.Equals, "tag")
|
c.Fatal(observer.TimeoutError("test:latest", "tag"))
|
||||||
c.Assert(ev.ID, check.Equals, "test:latest")
|
case <-eventTag:
|
||||||
case <-time.After(5 * time.Second):
|
// We saw the tag event as expected.
|
||||||
c.Fatal("The 'tag' event not heard from the server")
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,7 +2,6 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bufio"
|
"bufio"
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
@ -216,27 +215,13 @@ func (s *DockerSuite) TestEventsImagePull(c *check.C) {
|
||||||
|
|
||||||
func (s *DockerSuite) TestEventsImageImport(c *check.C) {
|
func (s *DockerSuite) TestEventsImageImport(c *check.C) {
|
||||||
testRequires(c, DaemonIsLinux)
|
testRequires(c, DaemonIsLinux)
|
||||||
since := daemonTime(c).Unix()
|
|
||||||
|
|
||||||
id := make(chan string)
|
observer, err := newEventObserver(c)
|
||||||
eventImport := make(chan struct{})
|
|
||||||
eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(since, 10))
|
|
||||||
stdout, err := eventsCmd.StdoutPipe()
|
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
c.Assert(eventsCmd.Start(), checker.IsNil)
|
|
||||||
defer eventsCmd.Process.Kill()
|
|
||||||
|
|
||||||
go func() {
|
err = observer.Start()
|
||||||
containerID := <-id
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer observer.Stop()
|
||||||
matchImport := regexp.MustCompile(containerID + `: import$`)
|
|
||||||
scanner := bufio.NewScanner(stdout)
|
|
||||||
for scanner.Scan() {
|
|
||||||
if matchImport.MatchString(scanner.Text()) {
|
|
||||||
close(eventImport)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
out, _ := dockerCmd(c, "run", "-d", "busybox", "true")
|
out, _ := dockerCmd(c, "run", "-d", "busybox", "true")
|
||||||
cleanedContainerID := strings.TrimSpace(out)
|
cleanedContainerID := strings.TrimSpace(out)
|
||||||
|
@ -246,12 +231,20 @@ func (s *DockerSuite) TestEventsImageImport(c *check.C) {
|
||||||
exec.Command(dockerBinary, "import", "-"),
|
exec.Command(dockerBinary, "import", "-"),
|
||||||
)
|
)
|
||||||
c.Assert(err, checker.IsNil, check.Commentf("import failed with output: %q", out))
|
c.Assert(err, checker.IsNil, check.Commentf("import failed with output: %q", out))
|
||||||
newContainerID := strings.TrimSpace(out)
|
imageRef := strings.TrimSpace(out)
|
||||||
id <- newContainerID
|
|
||||||
|
eventImport := make(chan bool)
|
||||||
|
matchImport := regexp.MustCompile(imageRef + `: import\z`)
|
||||||
|
matcher := func(text string) {
|
||||||
|
if matchImport.MatchString(text) {
|
||||||
|
close(eventImport)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
go observer.Match(matcher)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
c.Fatal("failed to observe image import in timely fashion")
|
c.Fatal(observer.TimeoutError(imageRef, "import"))
|
||||||
case <-eventImport:
|
case <-eventImport:
|
||||||
// ignore, done
|
// ignore, done
|
||||||
}
|
}
|
||||||
|
@ -421,33 +414,26 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) {
|
||||||
|
|
||||||
func (s *DockerSuite) TestEventsStreaming(c *check.C) {
|
func (s *DockerSuite) TestEventsStreaming(c *check.C) {
|
||||||
testRequires(c, DaemonIsLinux)
|
testRequires(c, DaemonIsLinux)
|
||||||
start := daemonTime(c).Unix()
|
|
||||||
|
|
||||||
id := make(chan string)
|
|
||||||
eventCreate := make(chan struct{})
|
eventCreate := make(chan struct{})
|
||||||
eventStart := make(chan struct{})
|
eventStart := make(chan struct{})
|
||||||
eventDie := make(chan struct{})
|
eventDie := make(chan struct{})
|
||||||
eventDestroy := make(chan struct{})
|
eventDestroy := make(chan struct{})
|
||||||
|
|
||||||
eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(start, 10))
|
observer, err := newEventObserver(c)
|
||||||
stdout, err := eventsCmd.StdoutPipe()
|
|
||||||
c.Assert(err, checker.IsNil)
|
c.Assert(err, checker.IsNil)
|
||||||
c.Assert(eventsCmd.Start(), checker.IsNil, check.Commentf("failed to start 'docker events'"))
|
err = observer.Start()
|
||||||
defer eventsCmd.Process.Kill()
|
c.Assert(err, checker.IsNil)
|
||||||
|
defer observer.Stop()
|
||||||
buffer := new(bytes.Buffer)
|
|
||||||
go func() {
|
|
||||||
containerID := <-id
|
|
||||||
|
|
||||||
|
out, _ := dockerCmd(c, "run", "-d", "busybox:latest", "true")
|
||||||
|
containerID := strings.TrimSpace(out)
|
||||||
matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`)
|
matchCreate := regexp.MustCompile(containerID + `: \(from busybox:latest\) create\z`)
|
||||||
matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`)
|
matchStart := regexp.MustCompile(containerID + `: \(from busybox:latest\) start\z`)
|
||||||
matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`)
|
matchDie := regexp.MustCompile(containerID + `: \(from busybox:latest\) die\z`)
|
||||||
matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`)
|
matchDestroy := regexp.MustCompile(containerID + `: \(from busybox:latest\) destroy\z`)
|
||||||
|
|
||||||
scanner := bufio.NewScanner(stdout)
|
matcher := func(text string) {
|
||||||
for scanner.Scan() {
|
|
||||||
text := scanner.Text()
|
|
||||||
buffer.WriteString(text + "\n")
|
|
||||||
switch {
|
switch {
|
||||||
case matchCreate.MatchString(text):
|
case matchCreate.MatchString(text):
|
||||||
close(eventCreate)
|
close(eventCreate)
|
||||||
|
@ -459,38 +445,34 @@ func (s *DockerSuite) TestEventsStreaming(c *check.C) {
|
||||||
close(eventDestroy)
|
close(eventDestroy)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
go observer.Match(matcher)
|
||||||
|
|
||||||
out, _ := dockerCmd(c, "run", "-d", "busybox:latest", "true")
|
|
||||||
cleanedContainerID := strings.TrimSpace(out)
|
|
||||||
id <- cleanedContainerID
|
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
c.Fatal("failed to observe container create in timely fashion", "\n", buffer.String())
|
c.Fatal(observer.TimeoutError(containerID, "create"))
|
||||||
case <-eventCreate:
|
case <-eventCreate:
|
||||||
// ignore, done
|
// ignore, done
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
c.Fatal("failed to observe container start in timely fashion", "\n", buffer.String())
|
c.Fatal(observer.TimeoutError(containerID, "start"))
|
||||||
case <-eventStart:
|
case <-eventStart:
|
||||||
// ignore, done
|
// ignore, done
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
c.Fatal("failed to observe container die in timely fashion", "\n", buffer.String())
|
c.Fatal(observer.TimeoutError(containerID, "die"))
|
||||||
case <-eventDie:
|
case <-eventDie:
|
||||||
// ignore, done
|
// ignore, done
|
||||||
}
|
}
|
||||||
|
|
||||||
dockerCmd(c, "rm", cleanedContainerID)
|
dockerCmd(c, "rm", containerID)
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-time.After(5 * time.Second):
|
case <-time.After(5 * time.Second):
|
||||||
c.Fatal("failed to observe container destroy in timely fashion", "\n", buffer.String())
|
c.Fatal(observer.TimeoutError(containerID, "destroy"))
|
||||||
case <-eventDestroy:
|
case <-eventDestroy:
|
||||||
// ignore, done
|
// ignore, done
|
||||||
}
|
}
|
||||||
|
|
78
integration-cli/events_utils.go
Normal file
78
integration-cli/events_utils.go
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bufio"
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"os/exec"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
|
"github.com/go-check/check"
|
||||||
|
)
|
||||||
|
|
||||||
|
// eventMatcher is a function that tries to match an event input.
|
||||||
|
type eventMatcher func(text string)
|
||||||
|
|
||||||
|
// eventObserver runs an events commands and observes its output.
|
||||||
|
type eventObserver struct {
|
||||||
|
buffer *bytes.Buffer
|
||||||
|
command *exec.Cmd
|
||||||
|
stdout io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
// newEventObserver creates the observer and initializes the command
|
||||||
|
// without running it. Users must call `eventObserver.Start` to start the command.
|
||||||
|
func newEventObserver(c *check.C, args ...string) (*eventObserver, error) {
|
||||||
|
since := daemonTime(c).Unix()
|
||||||
|
|
||||||
|
cmdArgs := []string{"events", "--since", strconv.FormatInt(since, 10)}
|
||||||
|
if len(args) > 0 {
|
||||||
|
cmdArgs = append(cmdArgs, args...)
|
||||||
|
}
|
||||||
|
eventsCmd := exec.Command(dockerBinary, cmdArgs...)
|
||||||
|
stdout, err := eventsCmd.StdoutPipe()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return &eventObserver{
|
||||||
|
buffer: new(bytes.Buffer),
|
||||||
|
command: eventsCmd,
|
||||||
|
stdout: stdout,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start starts the events command.
|
||||||
|
func (e *eventObserver) Start() error {
|
||||||
|
return e.command.Start()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop stops the events command.
|
||||||
|
func (e *eventObserver) Stop() {
|
||||||
|
e.command.Process.Kill()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Match tries to match the events output with a given matcher.
|
||||||
|
func (e *eventObserver) Match(match eventMatcher) {
|
||||||
|
scanner := bufio.NewScanner(e.stdout)
|
||||||
|
|
||||||
|
for scanner.Scan() {
|
||||||
|
text := scanner.Text()
|
||||||
|
e.buffer.WriteString(text)
|
||||||
|
e.buffer.WriteString("\n")
|
||||||
|
|
||||||
|
match(text)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TimeoutError generates an error for a given containerID and event type.
|
||||||
|
// It attaches the events command output to the error.
|
||||||
|
func (e *eventObserver) TimeoutError(id, event string) error {
|
||||||
|
return fmt.Errorf("failed to observe event `%s` for %s\n%v", event, id, e.output())
|
||||||
|
}
|
||||||
|
|
||||||
|
// output returns the events command output read until now by the Match goroutine.
|
||||||
|
func (e *eventObserver) output() string {
|
||||||
|
return e.buffer.String()
|
||||||
|
}
|
Loading…
Reference in a new issue