diff --git a/api/server/router/local/info.go b/api/server/router/local/info.go index 297bea43be..9b87f5ae7c 100644 --- a/api/server/router/local/info.go +++ b/api/server/router/local/info.go @@ -88,12 +88,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R } enc := buildOutputEncoder(w) - d := s.daemon - es := d.EventsService - current, l := es.Subscribe() - defer es.Evict(l) - eventFilter := d.GetEventFilter(ef) + current, l, cancel := s.daemon.SubscribeToEvents() + defer cancel() + + eventFilter := s.daemon.GetEventFilter(ef) handleEvent := func(ev *jsonmessage.JSONMessage) error { if eventFilter.Include(ev) { if err := enc.Encode(ev); err != nil { diff --git a/daemon/daemon.go b/daemon/daemon.go index c0d91bf077..91af20ac66 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -39,6 +39,7 @@ import ( "github.com/docker/docker/pkg/graphdb" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/namesgenerator" "github.com/docker/docker/pkg/nat" "github.com/docker/docker/pkg/parsers/filters" @@ -548,6 +549,11 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { return events.NewFilter(filter, daemon.GetLabels) } +// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events. +func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) { + return daemon.EventsService.Subscribe() +} + // GetLabels for a container or image id func (daemon *Daemon) GetLabels(id string) map[string]string { // TODO: TestCase diff --git a/daemon/events/events.go b/daemon/events/events.go index 0334dece47..996c856f42 100644 --- a/daemon/events/events.go +++ b/daemon/events/events.go @@ -25,16 +25,21 @@ func New() *Events { } } -// Subscribe adds new listener to events, returns slice of 64 stored last events -// channel in which you can expect new events in form of interface{}, so you -// need type assertion. -func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) { +// Subscribe adds new listener to events, returns slice of 64 stored +// last events, a channel in which you can expect new events (in form +// of interface{}, so you need type assertion), and a function to call +// to stop the stream of events. +func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func()) { e.mu.Lock() current := make([]*jsonmessage.JSONMessage, len(e.events)) copy(current, e.events) l := e.pub.Subscribe() e.mu.Unlock() - return current, l + + cancel := func() { + e.Evict(l) + } + return current, l, cancel } // Evict evicts listener from pubsub diff --git a/daemon/events/events_test.go b/daemon/events/events_test.go index 7aa8d9facc..cf3c28aac5 100644 --- a/daemon/events/events_test.go +++ b/daemon/events/events_test.go @@ -10,8 +10,8 @@ import ( func TestEventsLog(t *testing.T) { e := New() - _, l1 := e.Subscribe() - _, l2 := e.Subscribe() + _, l1, _ := e.Subscribe() + _, l2, _ := e.Subscribe() defer e.Evict(l1) defer e.Evict(l2) count := e.SubscribersCount() @@ -65,7 +65,7 @@ func TestEventsLog(t *testing.T) { func TestEventsLogTimeout(t *testing.T) { e := New() - _, l := e.Subscribe() + _, l, _ := e.Subscribe() defer e.Evict(l) c := make(chan struct{}) @@ -91,7 +91,7 @@ func TestLogEvents(t *testing.T) { e.Log(action, id, from) } time.Sleep(50 * time.Millisecond) - current, l := e.Subscribe() + current, l, _ := e.Subscribe() for i := 0; i < 10; i++ { num := i + eventsLimit + 16 action := fmt.Sprintf("action_%d", num)