mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			165 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			165 lines
		
	
	
	
		
			4.1 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
package events // import "github.com/docker/docker/daemon/events"
 | 
						|
 | 
						|
import (
 | 
						|
	"sync"
 | 
						|
	"time"
 | 
						|
 | 
						|
	eventtypes "github.com/docker/docker/api/types/events"
 | 
						|
	"github.com/docker/docker/pkg/pubsub"
 | 
						|
)
 | 
						|
 | 
						|
const (
 | 
						|
	eventsLimit = 256
 | 
						|
	bufferSize  = 1024
 | 
						|
)
 | 
						|
 | 
						|
// Events is pubsub channel for events generated by the engine.
 | 
						|
type Events struct {
 | 
						|
	mu     sync.Mutex
 | 
						|
	events []eventtypes.Message
 | 
						|
	pub    *pubsub.Publisher
 | 
						|
}
 | 
						|
 | 
						|
// New returns new *Events instance
 | 
						|
func New() *Events {
 | 
						|
	return &Events{
 | 
						|
		events: make([]eventtypes.Message, 0, eventsLimit),
 | 
						|
		pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
 | 
						|
	}
 | 
						|
}
 | 
						|
 | 
						|
// Subscribe adds new listener to events, returns slice of 256 stored
 | 
						|
// last events, a channel in which you can expect new events (in form
 | 
						|
// of interface{}, so you need type assertion), and a function to call
 | 
						|
// to stop the stream of events.
 | 
						|
func (e *Events) Subscribe() ([]eventtypes.Message, chan interface{}, func()) {
 | 
						|
	eventSubscribers.Inc()
 | 
						|
	e.mu.Lock()
 | 
						|
	current := make([]eventtypes.Message, len(e.events))
 | 
						|
	copy(current, e.events)
 | 
						|
	l := e.pub.Subscribe()
 | 
						|
	e.mu.Unlock()
 | 
						|
 | 
						|
	cancel := func() {
 | 
						|
		e.Evict(l)
 | 
						|
	}
 | 
						|
	return current, l, cancel
 | 
						|
}
 | 
						|
 | 
						|
// SubscribeTopic adds new listener to events, returns slice of 256 stored
 | 
						|
// last events, a channel in which you can expect new events (in form
 | 
						|
// of interface{}, so you need type assertion).
 | 
						|
func (e *Events) SubscribeTopic(since, until time.Time, ef *Filter) ([]eventtypes.Message, chan interface{}) {
 | 
						|
	eventSubscribers.Inc()
 | 
						|
	e.mu.Lock()
 | 
						|
 | 
						|
	var topic func(m interface{}) bool
 | 
						|
	if ef != nil && ef.filter.Len() > 0 {
 | 
						|
		topic = func(m interface{}) bool { return ef.Include(m.(eventtypes.Message)) }
 | 
						|
	}
 | 
						|
 | 
						|
	buffered := e.loadBufferedEvents(since, until, topic)
 | 
						|
 | 
						|
	var ch chan interface{}
 | 
						|
	if topic != nil {
 | 
						|
		ch = e.pub.SubscribeTopic(topic)
 | 
						|
	} else {
 | 
						|
		// Subscribe to all events if there are no filters
 | 
						|
		ch = e.pub.Subscribe()
 | 
						|
	}
 | 
						|
 | 
						|
	e.mu.Unlock()
 | 
						|
	return buffered, ch
 | 
						|
}
 | 
						|
 | 
						|
// Evict evicts listener from pubsub
 | 
						|
func (e *Events) Evict(l chan interface{}) {
 | 
						|
	eventSubscribers.Dec()
 | 
						|
	e.pub.Evict(l)
 | 
						|
}
 | 
						|
 | 
						|
// Log creates a local scope message and publishes it
 | 
						|
func (e *Events) Log(action, eventType string, actor eventtypes.Actor) {
 | 
						|
	now := time.Now().UTC()
 | 
						|
	jm := eventtypes.Message{
 | 
						|
		Action:   action,
 | 
						|
		Type:     eventType,
 | 
						|
		Actor:    actor,
 | 
						|
		Scope:    "local",
 | 
						|
		Time:     now.Unix(),
 | 
						|
		TimeNano: now.UnixNano(),
 | 
						|
	}
 | 
						|
 | 
						|
	// fill deprecated fields for container and images
 | 
						|
	switch eventType {
 | 
						|
	case eventtypes.ContainerEventType:
 | 
						|
		jm.ID = actor.ID
 | 
						|
		jm.Status = action
 | 
						|
		jm.From = actor.Attributes["image"]
 | 
						|
	case eventtypes.ImageEventType:
 | 
						|
		jm.ID = actor.ID
 | 
						|
		jm.Status = action
 | 
						|
	}
 | 
						|
 | 
						|
	e.PublishMessage(jm)
 | 
						|
}
 | 
						|
 | 
						|
// PublishMessage broadcasts event to listeners. Each listener has 100 milliseconds to
 | 
						|
// receive the event or it will be skipped.
 | 
						|
func (e *Events) PublishMessage(jm eventtypes.Message) {
 | 
						|
	eventsCounter.Inc()
 | 
						|
 | 
						|
	e.mu.Lock()
 | 
						|
	if len(e.events) == cap(e.events) {
 | 
						|
		// discard oldest event
 | 
						|
		copy(e.events, e.events[1:])
 | 
						|
		e.events[len(e.events)-1] = jm
 | 
						|
	} else {
 | 
						|
		e.events = append(e.events, jm)
 | 
						|
	}
 | 
						|
	e.mu.Unlock()
 | 
						|
	e.pub.Publish(jm)
 | 
						|
}
 | 
						|
 | 
						|
// SubscribersCount returns number of event listeners
 | 
						|
func (e *Events) SubscribersCount() int {
 | 
						|
	return e.pub.Len()
 | 
						|
}
 | 
						|
 | 
						|
// loadBufferedEvents iterates over the cached events in the buffer
 | 
						|
// and returns those that were emitted between two specific dates.
 | 
						|
// It uses `time.Unix(seconds, nanoseconds)` to generate valid dates with those arguments.
 | 
						|
// It filters those buffered messages with a topic function if it's not nil, otherwise it adds all messages.
 | 
						|
func (e *Events) loadBufferedEvents(since, until time.Time, topic func(interface{}) bool) []eventtypes.Message {
 | 
						|
	var buffered []eventtypes.Message
 | 
						|
	if since.IsZero() && until.IsZero() {
 | 
						|
		return buffered
 | 
						|
	}
 | 
						|
 | 
						|
	var sinceNanoUnix int64
 | 
						|
	if !since.IsZero() {
 | 
						|
		sinceNanoUnix = since.UnixNano()
 | 
						|
	}
 | 
						|
 | 
						|
	var untilNanoUnix int64
 | 
						|
	if !until.IsZero() {
 | 
						|
		untilNanoUnix = until.UnixNano()
 | 
						|
	}
 | 
						|
 | 
						|
	for i := len(e.events) - 1; i >= 0; i-- {
 | 
						|
		ev := e.events[i]
 | 
						|
 | 
						|
		if ev.TimeNano < sinceNanoUnix {
 | 
						|
			break
 | 
						|
		}
 | 
						|
 | 
						|
		if untilNanoUnix > 0 && ev.TimeNano > untilNanoUnix {
 | 
						|
			continue
 | 
						|
		}
 | 
						|
 | 
						|
		if topic == nil || topic(ev) {
 | 
						|
			buffered = append([]eventtypes.Message{ev}, buffered...)
 | 
						|
		}
 | 
						|
	}
 | 
						|
	return buffered
 | 
						|
}
 |