diff --git a/daemon/events/events.go b/daemon/events/events.go index a8cd66fd99..ac1c98cd46 100644 --- a/daemon/events/events.go +++ b/daemon/events/events.go @@ -50,33 +50,23 @@ func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) { // of interface{}, so you need type assertion). func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]eventtypes.Message, chan interface{}) { e.mu.Lock() - defer e.mu.Unlock() - var buffered []eventtypes.Message - topic := func(m interface{}) bool { - return ef.Include(m.(eventtypes.Message)) + var topic func(m interface{}) bool + if ef != nil && ef.filter.Len() > 0 { + topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) } } - if since != -1 { - for i := len(e.events) - 1; i >= 0; i-- { - ev := e.events[i] - if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { - break - } - if ef.filter.Len() == 0 || topic(ev) { - buffered = append([]eventtypes.Message{ev}, buffered...) - } - } - } + buffered := e.loadBufferedEvents(since, sinceNano, topic) var ch chan interface{} - if ef.filter.Len() > 0 { + if topic != nil { ch = e.pub.SubscribeTopic(topic) } else { // Subscribe to all events if there are no filters ch = e.pub.Subscribe() } + e.mu.Unlock() return buffered, ch } @@ -124,3 +114,29 @@ func (e *Events) Log(action, eventType string, actor eventtypes.Actor) { func (e *Events) SubscribersCount() int { return e.pub.Len() } + +// loadBufferedEvents iterates over the cached events in the buffer +// and returns those that were emitted before a specific date. +// The date is splitted in two values: +// - the `since` argument is a date timestamp without nanoseconds, or -1 to return an empty slice. +// - the `sinceNano` argument is the nanoseconds offset from the timestamp. +// It uses `time.Unix(seconds, nanoseconds)` to generate a valid date with those two first arguments. +// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages. +func (e *Events) loadBufferedEvents(since, sinceNano int64, topic func(interface{}) bool) []eventtypes.Message { + var buffered []eventtypes.Message + if since == -1 { + return buffered + } + + sinceNanoUnix := time.Unix(since, sinceNano).UnixNano() + for i := len(e.events) - 1; i >= 0; i-- { + ev := e.events[i] + if ev.TimeNano < sinceNanoUnix { + break + } + if topic == nil || topic(ev) { + buffered = append([]eventtypes.Message{ev}, buffered...) + } + } + return buffered +} diff --git a/daemon/events/events_test.go b/daemon/events/events_test.go index fc3b84bb85..5fd577b992 100644 --- a/daemon/events/events_test.go +++ b/daemon/events/events_test.go @@ -5,7 +5,9 @@ import ( "testing" "time" + "github.com/docker/docker/daemon/events/testutils" "github.com/docker/engine-api/types/events" + timetypes "github.com/docker/engine-api/types/time" ) func TestEventsLog(t *testing.T) { @@ -150,3 +152,45 @@ func TestLogEvents(t *testing.T) { t.Fatalf("Last action is %s, must be action_89", lastC.Status) } } + +// https://github.com/docker/docker/issues/20999 +// Fixtures: +// +//2016-03-07T17:28:03.022433271+02:00 container die 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover) +//2016-03-07T17:28:03.091719377+02:00 network disconnect 19c5ed41acb798f26b751e0035cd7821741ab79e2bbd59a66b5fd8abf954eaa0 (type=bridge, container=0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079, name=bridge) +//2016-03-07T17:28:03.129014751+02:00 container destroy 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover) +func TestLoadBufferedEvents(t *testing.T) { + now := time.Now() + f, err := timetypes.GetTimestamp("2016-03-07T17:28:03.100000000+02:00", now) + if err != nil { + t.Fatal(err) + } + since, sinceNano, err := timetypes.ParseTimestamps(f, -1) + if err != nil { + t.Fatal(err) + } + + m1, err := eventstestutils.Scan("2016-03-07T17:28:03.022433271+02:00 container die 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover)") + if err != nil { + t.Fatal(err) + } + m2, err := eventstestutils.Scan("2016-03-07T17:28:03.091719377+02:00 network disconnect 19c5ed41acb798f26b751e0035cd7821741ab79e2bbd59a66b5fd8abf954eaa0 (type=bridge, container=0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079, name=bridge)") + if err != nil { + t.Fatal(err) + } + m3, err := eventstestutils.Scan("2016-03-07T17:28:03.129014751+02:00 container destroy 0b863f2a26c18557fc6cdadda007c459f9ec81b874780808138aea78a3595079 (image=ubuntu, name=small_hoover)") + if err != nil { + t.Fatal(err) + } + + buffered := []events.Message{*m1, *m2, *m3} + + events := &Events{ + events: buffered, + } + + out := events.loadBufferedEvents(since, sinceNano, nil) + if len(out) != 1 { + t.Fatalf("expected 1 message, got %d: %v", len(out), out) + } +} diff --git a/daemon/events/testutils/testutils.go b/daemon/events/testutils/testutils.go new file mode 100644 index 0000000000..c84418a9e7 --- /dev/null +++ b/daemon/events/testutils/testutils.go @@ -0,0 +1,76 @@ +package eventstestutils + +import ( + "fmt" + "regexp" + "strings" + "time" + + "github.com/docker/engine-api/types/events" + timetypes "github.com/docker/engine-api/types/time" +) + +var ( + reTimestamp = `(?P\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{9}(:?(:?(:?-|\+)\d{2}:\d{2})|Z))` + reEventType = `(?P\w+)` + reAction = `(?P\w+)` + reID = `(?P[^\s]+)` + reAttributes = `(\s\((?P[^\)]+)\))?` + reString = fmt.Sprintf(`\A%s\s%s\s%s\s%s%s\z`, reTimestamp, reEventType, reAction, reID, reAttributes) + + // eventCliRegexp is a regular expression that matches all possible event outputs in the cli + eventCliRegexp = regexp.MustCompile(reString) +) + +// ScanMap turns an event string like the default ones formatted in the cli output +// and turns it into map. +func ScanMap(text string) map[string]string { + matches := eventCliRegexp.FindAllStringSubmatch(text, -1) + md := map[string]string{} + if len(matches) == 0 { + return md + } + + names := eventCliRegexp.SubexpNames() + for i, n := range matches[0] { + md[names[i]] = n + } + return md +} + +// Scan turns an event string like the default ones formatted in the cli output +// and turns it into an event message. +func Scan(text string) (*events.Message, error) { + md := ScanMap(text) + if len(md) == 0 { + return nil, fmt.Errorf("text is not an event: %s", text) + } + + f, err := timetypes.GetTimestamp(md["timestamp"], time.Now()) + if err != nil { + return nil, err + } + + t, tn, err := timetypes.ParseTimestamps(f, -1) + if err != nil { + return nil, err + } + + attrs := make(map[string]string) + for _, a := range strings.SplitN(md["attributes"], ", ", -1) { + kv := strings.SplitN(a, "=", 2) + attrs[kv[0]] = kv[1] + } + + tu := time.Unix(t, tn) + return &events.Message{ + Time: t, + TimeNano: tu.UnixNano(), + Type: md["eventType"], + Action: md["action"], + Actor: events.Actor{ + ID: md["id"], + Attributes: attrs, + }, + }, nil +} diff --git a/integration-cli/docker_cli_events_test.go b/integration-cli/docker_cli_events_test.go index 878e2c103f..f426650c2b 100644 --- a/integration-cli/docker_cli_events_test.go +++ b/integration-cli/docker_cli_events_test.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/docker/docker/daemon/events/testutils" "github.com/docker/docker/pkg/integration/checker" "github.com/go-check/check" ) @@ -152,7 +153,7 @@ func (s *DockerSuite) TestEventsContainerEventsAttrSort(c *check.C) { c.Assert(nEvents, checker.GreaterOrEqualThan, 3) //Missing expected event matchedEvents := 0 for _, event := range events { - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) if matches["id"] != containerID { continue } @@ -201,7 +202,7 @@ func (s *DockerSuite) TestEventsImageTag(c *check.C) { c.Assert(events, checker.HasLen, 1, check.Commentf("was expecting 1 event. out=%s", out)) event := strings.TrimSpace(events[0]) - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) c.Assert(matchEventID(matches, image), checker.True, check.Commentf("matches: %v\nout:\n%s", matches, out)) c.Assert(matches["action"], checker.Equals, "tag") } @@ -220,7 +221,7 @@ func (s *DockerSuite) TestEventsImagePull(c *check.C) { events := strings.Split(strings.TrimSpace(out), "\n") event := strings.TrimSpace(events[len(events)-1]) - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) c.Assert(matches["id"], checker.Equals, "hello-world:latest") c.Assert(matches["action"], checker.Equals, "pull") @@ -245,7 +246,7 @@ func (s *DockerSuite) TestEventsImageImport(c *check.C) { out, _ = dockerCmd(c, "events", fmt.Sprintf("--since=%d", since), fmt.Sprintf("--until=%d", daemonTime(c).Unix()), "--filter", "event=import") events := strings.Split(strings.TrimSpace(out), "\n") c.Assert(events, checker.HasLen, 1) - matches := parseEventText(events[0]) + matches := eventstestutils.ScanMap(events[0]) c.Assert(matches["id"], checker.Equals, imageRef, check.Commentf("matches: %v\nout:\n%s\n", matches, out)) c.Assert(matches["action"], checker.Equals, "import", check.Commentf("matches: %v\nout:\n%s\n", matches, out)) } @@ -370,7 +371,7 @@ func (s *DockerSuite) TestEventsFilterContainer(c *check.C) { return fmt.Errorf("expected 4 events, got %v", events) } for _, event := range events { - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) if !matchEventID(matches, id) { return fmt.Errorf("expected event for container id %s: %s - parsed container id: %s", id, event, matches["id"]) } diff --git a/integration-cli/events_utils.go b/integration-cli/events_utils.go index 7089be2dc8..cdd3106b8d 100644 --- a/integration-cli/events_utils.go +++ b/integration-cli/events_utils.go @@ -3,7 +3,6 @@ package main import ( "bufio" "bytes" - "fmt" "io" "os/exec" "regexp" @@ -11,22 +10,11 @@ import ( "strings" "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/events/testutils" "github.com/docker/docker/pkg/integration/checker" "github.com/go-check/check" ) -var ( - reTimestamp = `\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.\d{9}(:?(:?(:?-|\+)\d{2}:\d{2})|Z)` - reEventType = `(?P\w+)` - reAction = `(?P\w+)` - reID = `(?P[^\s]+)` - reAttributes = `(\s\((?P[^\)]+)\))?` - reString = fmt.Sprintf(`\A%s\s%s\s%s\s%s%s\z`, reTimestamp, reEventType, reAction, reID, reAttributes) - - // eventCliRegexp is a regular expression that matches all possible event outputs in the cli - eventCliRegexp = regexp.MustCompile(reString) -) - // eventMatcher is a function that tries to match an event input. // It returns true if the event matches and a map with // a set of key/value to identify the match. @@ -131,7 +119,7 @@ func (e *eventObserver) CheckEventError(c *check.C, id, event string, match even // It returns an empty map and false if there is no match. func matchEventLine(id, eventType string, actions map[string]chan bool) eventMatcher { return func(text string) (map[string]string, bool) { - matches := parseEventText(text) + matches := eventstestutils.ScanMap(text) if len(matches) == 0 { return matches, false } @@ -154,26 +142,10 @@ func processEventMatch(actions map[string]chan bool) eventMatchProcessor { } } -// parseEventText parses a line of events coming from the cli and returns -// the matchers in a map. -func parseEventText(text string) map[string]string { - matches := eventCliRegexp.FindAllStringSubmatch(text, -1) - md := map[string]string{} - if len(matches) == 0 { - return md - } - - names := eventCliRegexp.SubexpNames() - for i, n := range matches[0] { - md[names[i]] = n - } - return md -} - // parseEventAction parses an event text and returns the action. // It fails if the text is not in the event format. func parseEventAction(c *check.C, text string) string { - matches := parseEventText(text) + matches := eventstestutils.ScanMap(text) return matches["action"] } @@ -182,7 +154,7 @@ func parseEventAction(c *check.C, text string) string { func eventActionsByIDAndType(c *check.C, events []string, id, eventType string) []string { var filtered []string for _, event := range events { - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) c.Assert(matches, checker.Not(checker.IsNil)) if matchIDAndEventType(matches, id, eventType) { filtered = append(filtered, matches["action"]) @@ -214,7 +186,7 @@ func matchEventID(matches map[string]string, id string) bool { func parseEvents(c *check.C, out, match string) { events := strings.Split(strings.TrimSpace(out), "\n") for _, event := range events { - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) matched, err := regexp.MatchString(match, matches["action"]) c.Assert(err, checker.IsNil) c.Assert(matched, checker.True, check.Commentf("Matcher: %s did not match %s", match, matches["action"])) @@ -224,7 +196,7 @@ func parseEvents(c *check.C, out, match string) { func parseEventsWithID(c *check.C, out, match, id string) { events := strings.Split(strings.TrimSpace(out), "\n") for _, event := range events { - matches := parseEventText(event) + matches := eventstestutils.ScanMap(event) c.Assert(matchEventID(matches, id), checker.True) matched, err := regexp.MatchString(match, matches["action"])