package events import ( "sync" "time" "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/pubsub" ) const eventsLimit = 64 // Events is pubsub channel for *jsonmessage.JSONMessage type Events struct { mu sync.Mutex events []*jsonmessage.JSONMessage pub *pubsub.Publisher } // New returns new *Events instance func New() *Events { return &Events{ events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), pub: pubsub.NewPublisher(100*time.Millisecond, 1024), } } // Subscribe adds new listener to events, returns slice of 64 stored last events // channel in which you can expect new events in form of interface{}, so you // need type assertion. func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) { e.mu.Lock() current := make([]*jsonmessage.JSONMessage, len(e.events)) copy(current, e.events) l := e.pub.Subscribe() e.mu.Unlock() return current, l } // Evict evicts listener from pubsub func (e *Events) Evict(l chan interface{}) { e.pub.Evict(l) } // Log broadcasts event to listeners. Each listener has 100 millisecond for // receiving event or it will be skipped. func (e *Events) Log(action, id, from string) { go func() { e.mu.Lock() jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()} if len(e.events) == cap(e.events) { // discard oldest event copy(e.events, e.events[1:]) e.events[len(e.events)-1] = jm } else { e.events = append(e.events, jm) } e.mu.Unlock() e.pub.Publish(jm) }() } // SubscribersCount returns number of event listeners func (e *Events) SubscribersCount() int { return e.pub.Len() }