diff --git a/api/server/router/local/info.go b/api/server/router/local/info.go index 1ebcae91b7..00314212bd 100644 --- a/api/server/router/local/info.go +++ b/api/server/router/local/info.go @@ -92,27 +92,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R enc := json.NewEncoder(output) - current, l, cancel := s.daemon.SubscribeToEvents() - defer cancel() + buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef) + defer s.daemon.UnsubscribeFromEvents(l) - eventFilter := s.daemon.GetEventFilter(ef) - handleEvent := func(ev *jsonmessage.JSONMessage) error { - if eventFilter.Include(ev) { - if err := enc.Encode(ev); err != nil { - return err - } - } - return nil - } - - if since == -1 { - current = nil - } - for _, ev := range current { - if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { - continue - } - if err := handleEvent(ev); err != nil { + for _, ev := range buffered { + if err := enc.Encode(ev); err != nil { return err } } @@ -129,7 +113,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R if !ok { continue } - if err := handleEvent(jev); err != nil { + if err := enc.Encode(jev); err != nil { return err } case <-timer.C: diff --git a/daemon/daemon.go b/daemon/daemon.go index 6211de2e8e..d5e5540aff 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -532,8 +532,8 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) { return e, nil } -// GetEventFilter returns a filters.Filter for a set of filters -func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { +// getEventFilter returns a filters.Filter for a set of filters +func (daemon *Daemon) getEventFilter(filter filters.Args) *events.Filter { // incoming container filter can be name, id or partial id, convert to // a full container id for _, cn := range filter.Get("container") { @@ -547,8 +547,15 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { } // SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events. -func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) { - return daemon.EventsService.Subscribe() +func (daemon *Daemon) SubscribeToEvents(since, sinceNano int64, filter filters.Args) ([]*jsonmessage.JSONMessage, chan interface{}) { + ef := daemon.getEventFilter(filter) + return daemon.EventsService.SubscribeTopic(since, sinceNano, ef) +} + +// UnsubscribeFromEvents stops the event subscription for a client by closing the +// channel where the daemon sends events to. +func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) { + daemon.EventsService.Evict(listener) } // GetLabels for a container or image id diff --git a/daemon/events/events.go b/daemon/events/events.go index 996c856f42..3674170fe3 100644 --- a/daemon/events/events.go +++ b/daemon/events/events.go @@ -8,7 +8,10 @@ import ( "github.com/docker/docker/pkg/pubsub" ) -const eventsLimit = 64 +const ( + eventsLimit = 64 + bufferSize = 1024 +) // Events is pubsub channel for *jsonmessage.JSONMessage type Events struct { @@ -21,7 +24,7 @@ type Events struct { func New() *Events { return &Events{ events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), - pub: pubsub.NewPublisher(100*time.Millisecond, 1024), + pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize), } } @@ -42,6 +45,41 @@ func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func return current, l, cancel } +// SubscribeTopic adds new listener to events, returns slice of 64 stored +// last events, a channel in which you can expect new events (in form +// of interface{}, so you need type assertion). +func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]*jsonmessage.JSONMessage, chan interface{}) { + e.mu.Lock() + defer e.mu.Unlock() + + var buffered []*jsonmessage.JSONMessage + topic := func(m interface{}) bool { + return ef.Include(m.(*jsonmessage.JSONMessage)) + } + + if since != -1 { + for i := len(e.events) - 1; i >= 0; i-- { + ev := e.events[i] + if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { + break + } + if ef.filter.Len() == 0 || topic(ev) { + buffered = append([]*jsonmessage.JSONMessage{ev}, buffered...) + } + } + } + + var ch chan interface{} + if ef.filter.Len() > 0 { + ch = e.pub.SubscribeTopic(topic) + } else { + // Subscribe to all events if there are no filters + ch = e.pub.Subscribe() + } + + return buffered, ch +} + // Evict evicts listener from pubsub func (e *Events) Evict(l chan interface{}) { e.pub.Evict(l) diff --git a/integration-cli/docker_cli_build_test.go b/integration-cli/docker_cli_build_test.go index d4b31bd38f..1707680dba 100644 --- a/integration-cli/docker_cli_build_test.go +++ b/integration-cli/docker_cli_build_test.go @@ -1891,9 +1891,7 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { startEpoch := daemonTime(c).Unix() // Watch for events since epoch. - eventsCmd := exec.Command( - dockerBinary, "events", - "--since", strconv.FormatInt(startEpoch, 10)) + eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10)) stdout, err := eventsCmd.StdoutPipe() if err != nil { c.Fatal(err) @@ -1932,12 +1930,12 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { c.Fatalf("failed to run build: %s", err) } - matchCID := regexp.MustCompile("Running in ") + matchCID := regexp.MustCompile("Running in (.+)") scanner := bufio.NewScanner(stdoutBuild) for scanner.Scan() { line := scanner.Text() - if ok := matchCID.MatchString(line); ok { - containerID <- line[len(line)-12:] + if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 { + containerID <- matches[1] break } } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index ab457cfba9..8529ffa322 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, - subscribers: make(map[subscriber]struct{}), + subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} +type topicFunc func(v interface{}) bool // Publisher is basic pub/sub structure. Allows to send events and subscribe // to them. Can be safely used from multiple goroutines. @@ -25,7 +26,7 @@ type Publisher struct { m sync.RWMutex buffer int timeout time.Duration - subscribers map[subscriber]struct{} + subscribers map[subscriber]topicFunc } // Len returns the number of subscribers for the publisher @@ -38,9 +39,14 @@ func (p *Publisher) Len() int { // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() - p.subscribers[ch] = struct{}{} + p.subscribers[ch] = topic p.m.Unlock() return ch } @@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() - for sub := range p.subscribers { - // send under a select as to not block if the receiver is unavailable - if p.timeout > 0 { - select { - case sub <- v: - case <-time.After(p.timeout): - } - continue - } - select { - case sub <- v: - default: - } + wg := new(sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + + go p.sendTopic(sub, topic, v, wg) } + wg.Wait() p.m.RUnlock() } @@ -82,3 +81,24 @@ func (p *Publisher) Close() { } p.m.Unlock() } + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +}