package events import ( "sync" "time" "github.com/docker/docker/pkg/pubsub" eventtypes "github.com/docker/engine-api/types/events" ) const ( eventsLimit = 64 bufferSize = 1024 ) // Events is pubsub channel for events generated by the engine. type Events struct { mu sync.Mutex events []eventtypes.Message pub *pubsub.Publisher } // New returns new *Events instance func New() *Events { return &Events{ events: make([]eventtypes.Message, 0, eventsLimit), pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize), } } // Subscribe adds new listener to events, returns slice of 64 stored // last events, a channel in which you can expect new events (in form // of interface{}, so you need type assertion), and a function to call // to stop the stream of events. func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) { e.mu.Lock() current := make([]eventtypes.Message, len(e.events)) copy(current, e.events) l := e.pub.Subscribe() e.mu.Unlock() cancel := func() { e.Evict(l) } return current, l, cancel } // SubscribeTopic adds new listener to events, returns slice of 64 stored // last events, a channel in which you can expect new events (in form // of interface{}, so you need type assertion). func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]eventtypes.Message, chan interface{}) { e.mu.Lock() var topic func(m interface{}) bool if ef != nil && ef.filter.Len() > 0 { topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) } } buffered := e.loadBufferedEvents(since, sinceNano, topic) var ch chan interface{} if topic != nil { ch = e.pub.SubscribeTopic(topic) } else { // Subscribe to all events if there are no filters ch = e.pub.Subscribe() } e.mu.Unlock() return buffered, ch } // Evict evicts listener from pubsub func (e *Events) Evict(l chan interface{}) { e.pub.Evict(l) } // Log broadcasts event to listeners. Each listener has 100 millisecond for // receiving event or it will be skipped. func (e *Events) Log(action, eventType string, actor eventtypes.Actor) { now := time.Now().UTC() jm := eventtypes.Message{ Action: action, Type: eventType, Actor: actor, Time: now.Unix(), TimeNano: now.UnixNano(), } // fill deprecated fields for container and images switch eventType { case eventtypes.ContainerEventType: jm.ID = actor.ID jm.Status = action jm.From = actor.Attributes["image"] case eventtypes.ImageEventType: jm.ID = actor.ID jm.Status = action } e.mu.Lock() if len(e.events) == cap(e.events) { // discard oldest event copy(e.events, e.events[1:]) e.events[len(e.events)-1] = jm } else { e.events = append(e.events, jm) } e.mu.Unlock() e.pub.Publish(jm) } // SubscribersCount returns number of event listeners func (e *Events) SubscribersCount() int { return e.pub.Len() } // loadBufferedEvents iterates over the cached events in the buffer // and returns those that were emitted before a specific date. // The date is splitted in two values: // - the `since` argument is a date timestamp without nanoseconds, or -1 to return an empty slice. // - the `sinceNano` argument is the nanoseconds offset from the timestamp. // It uses `time.Unix(seconds, nanoseconds)` to generate a valid date with those two first arguments. // It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages. func (e *Events) loadBufferedEvents(since, sinceNano int64, topic func(interface{}) bool) []eventtypes.Message { var buffered []eventtypes.Message if since == -1 { return buffered } sinceNanoUnix := time.Unix(since, sinceNano).UnixNano() for i := len(e.events) - 1; i >= 0; i-- { ev := e.events[i] if ev.TimeNano < sinceNanoUnix { break } if topic == nil || topic(ev) { buffered = append([]eventtypes.Message{ev}, buffered...) } } return buffered }