mirror of
				https://github.com/moby/moby.git
				synced 2022-11-09 12:21:53 -05:00 
			
		
		
		
	Merge pull request #18266 from calavera/events_pub_sub
Event PubSub topics + linear filtering.
This commit is contained in:
		
						commit
						33ab2bb52c
					
				
					 17 changed files with 540 additions and 287 deletions
				
			
		| 
						 | 
				
			
			@ -26,7 +26,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error {
 | 
			
		|||
 | 
			
		||||
	var (
 | 
			
		||||
		v               = url.Values{}
 | 
			
		||||
		eventFilterArgs = filters.Args{}
 | 
			
		||||
		eventFilterArgs = filters.NewArgs()
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	// Consolidate all filter flags, and sanity check them early.
 | 
			
		||||
| 
						 | 
				
			
			@ -53,7 +53,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error {
 | 
			
		|||
		}
 | 
			
		||||
		v.Set("until", ts)
 | 
			
		||||
	}
 | 
			
		||||
	if len(eventFilterArgs) > 0 {
 | 
			
		||||
	if eventFilterArgs.Len() > 0 {
 | 
			
		||||
		filterJSON, err := filters.ToParam(eventFilterArgs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -36,7 +36,7 @@ func (cli *DockerCli) CmdImages(args ...string) error {
 | 
			
		|||
 | 
			
		||||
	// Consolidate all filter flags, and sanity check them early.
 | 
			
		||||
	// They'll get process in the daemon/server.
 | 
			
		||||
	imageFilterArgs := filters.Args{}
 | 
			
		||||
	imageFilterArgs := filters.NewArgs()
 | 
			
		||||
	for _, f := range flFilter.GetAll() {
 | 
			
		||||
		var err error
 | 
			
		||||
		imageFilterArgs, err = filters.ParseFlag(f, imageFilterArgs)
 | 
			
		||||
| 
						 | 
				
			
			@ -47,7 +47,7 @@ func (cli *DockerCli) CmdImages(args ...string) error {
 | 
			
		|||
 | 
			
		||||
	matchName := cmd.Arg(0)
 | 
			
		||||
	v := url.Values{}
 | 
			
		||||
	if len(imageFilterArgs) > 0 {
 | 
			
		||||
	if imageFilterArgs.Len() > 0 {
 | 
			
		||||
		filterJSON, err := filters.ToParam(imageFilterArgs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,7 +20,7 @@ func (cli *DockerCli) CmdPs(args ...string) error {
 | 
			
		|||
	var (
 | 
			
		||||
		err error
 | 
			
		||||
 | 
			
		||||
		psFilterArgs = filters.Args{}
 | 
			
		||||
		psFilterArgs = filters.NewArgs()
 | 
			
		||||
		v            = url.Values{}
 | 
			
		||||
 | 
			
		||||
		cmd      = Cli.Subcmd("ps", nil, Cli.DockerCommands["ps"].Description, true)
 | 
			
		||||
| 
						 | 
				
			
			@ -72,7 +72,7 @@ func (cli *DockerCli) CmdPs(args ...string) error {
 | 
			
		|||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if len(psFilterArgs) > 0 {
 | 
			
		||||
	if psFilterArgs.Len() > 0 {
 | 
			
		||||
		filterJSON, err := filters.ToParam(psFilterArgs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -54,7 +54,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error {
 | 
			
		|||
	cmd.Require(flag.Exact, 0)
 | 
			
		||||
	cmd.ParseFlags(args, true)
 | 
			
		||||
 | 
			
		||||
	volFilterArgs := filters.Args{}
 | 
			
		||||
	volFilterArgs := filters.NewArgs()
 | 
			
		||||
	for _, f := range flFilter.GetAll() {
 | 
			
		||||
		var err error
 | 
			
		||||
		volFilterArgs, err = filters.ParseFlag(f, volFilterArgs)
 | 
			
		||||
| 
						 | 
				
			
			@ -64,7 +64,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	v := url.Values{}
 | 
			
		||||
	if len(volFilterArgs) > 0 {
 | 
			
		||||
	if volFilterArgs.Len() > 0 {
 | 
			
		||||
		filterJSON, err := filters.ToParam(volFilterArgs)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -92,27 +92,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
 | 
			
		|||
 | 
			
		||||
	enc := json.NewEncoder(output)
 | 
			
		||||
 | 
			
		||||
	current, l, cancel := s.daemon.SubscribeToEvents()
 | 
			
		||||
	defer cancel()
 | 
			
		||||
	buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef)
 | 
			
		||||
	defer s.daemon.UnsubscribeFromEvents(l)
 | 
			
		||||
 | 
			
		||||
	eventFilter := s.daemon.GetEventFilter(ef)
 | 
			
		||||
	handleEvent := func(ev *jsonmessage.JSONMessage) error {
 | 
			
		||||
		if eventFilter.Include(ev) {
 | 
			
		||||
			if err := enc.Encode(ev); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since == -1 {
 | 
			
		||||
		current = nil
 | 
			
		||||
	}
 | 
			
		||||
	for _, ev := range current {
 | 
			
		||||
		if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) {
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		if err := handleEvent(ev); err != nil {
 | 
			
		||||
	for _, ev := range buffered {
 | 
			
		||||
		if err := enc.Encode(ev); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -129,7 +113,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
 | 
			
		|||
			if !ok {
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			if err := handleEvent(jev); err != nil {
 | 
			
		||||
			if err := enc.Encode(jev); err != nil {
 | 
			
		||||
				return err
 | 
			
		||||
			}
 | 
			
		||||
		case <-timer.C:
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -29,27 +29,23 @@ func (n *networkRouter) getNetworksList(ctx context.Context, w http.ResponseWrit
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	list := []*types.NetworkResource{}
 | 
			
		||||
	var nameFilter, idFilter bool
 | 
			
		||||
	var names, ids []string
 | 
			
		||||
	if names, nameFilter = netFilters["name"]; nameFilter {
 | 
			
		||||
		for _, name := range names {
 | 
			
		||||
			if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil {
 | 
			
		||||
				list = append(list, buildNetworkResource(nw))
 | 
			
		||||
			} else {
 | 
			
		||||
				logrus.Errorf("failed to get network for filter=%s : %v", name, err)
 | 
			
		||||
			}
 | 
			
		||||
	netFilters.WalkValues("name", func(name string) error {
 | 
			
		||||
		if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil {
 | 
			
		||||
			list = append(list, buildNetworkResource(nw))
 | 
			
		||||
		} else {
 | 
			
		||||
			logrus.Errorf("failed to get network for filter=%s : %v", name, err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if ids, idFilter = netFilters["id"]; idFilter {
 | 
			
		||||
		for _, id := range ids {
 | 
			
		||||
			for _, nw := range n.backend.GetNetworksByID(id) {
 | 
			
		||||
				list = append(list, buildNetworkResource(nw))
 | 
			
		||||
			}
 | 
			
		||||
	netFilters.WalkValues("id", func(id string) error {
 | 
			
		||||
		for _, nw := range n.backend.GetNetworksByID(id) {
 | 
			
		||||
			list = append(list, buildNetworkResource(nw))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	if !nameFilter && !idFilter {
 | 
			
		||||
	if !netFilters.Include("name") && !netFilters.Include("id") {
 | 
			
		||||
		nwList := n.backend.GetNetworksByID("")
 | 
			
		||||
		for _, nw := range nwList {
 | 
			
		||||
			list = append(list, buildNetworkResource(nw))
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -532,24 +532,30 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) {
 | 
			
		|||
	return e, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetEventFilter returns a filters.Filter for a set of filters
 | 
			
		||||
func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter {
 | 
			
		||||
// getEventFilter returns a filters.Filter for a set of filters
 | 
			
		||||
func (daemon *Daemon) getEventFilter(filter filters.Args) *events.Filter {
 | 
			
		||||
	// incoming container filter can be name, id or partial id, convert to
 | 
			
		||||
	// a full container id
 | 
			
		||||
	for i, cn := range filter["container"] {
 | 
			
		||||
	for _, cn := range filter.Get("container") {
 | 
			
		||||
		c, err := daemon.Get(cn)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			filter["container"][i] = ""
 | 
			
		||||
		} else {
 | 
			
		||||
			filter["container"][i] = c.ID
 | 
			
		||||
		filter.Del("container", cn)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			filter.Add("container", c.ID)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return events.NewFilter(filter, daemon.GetLabels)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events.
 | 
			
		||||
func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) {
 | 
			
		||||
	return daemon.EventsService.Subscribe()
 | 
			
		||||
func (daemon *Daemon) SubscribeToEvents(since, sinceNano int64, filter filters.Args) ([]*jsonmessage.JSONMessage, chan interface{}) {
 | 
			
		||||
	ef := daemon.getEventFilter(filter)
 | 
			
		||||
	return daemon.EventsService.SubscribeTopic(since, sinceNano, ef)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// UnsubscribeFromEvents stops the event subscription for a client by closing the
 | 
			
		||||
// channel where the daemon sends events to.
 | 
			
		||||
func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) {
 | 
			
		||||
	daemon.EventsService.Evict(listener)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GetLabels for a container or image id
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,7 +8,10 @@ import (
 | 
			
		|||
	"github.com/docker/docker/pkg/pubsub"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
const eventsLimit = 64
 | 
			
		||||
const (
 | 
			
		||||
	eventsLimit = 64
 | 
			
		||||
	bufferSize  = 1024
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Events is pubsub channel for *jsonmessage.JSONMessage
 | 
			
		||||
type Events struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -21,7 +24,7 @@ type Events struct {
 | 
			
		|||
func New() *Events {
 | 
			
		||||
	return &Events{
 | 
			
		||||
		events: make([]*jsonmessage.JSONMessage, 0, eventsLimit),
 | 
			
		||||
		pub:    pubsub.NewPublisher(100*time.Millisecond, 1024),
 | 
			
		||||
		pub:    pubsub.NewPublisher(100*time.Millisecond, bufferSize),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -42,6 +45,41 @@ func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func
 | 
			
		|||
	return current, l, cancel
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeTopic adds new listener to events, returns slice of 64 stored
 | 
			
		||||
// last events, a channel in which you can expect new events (in form
 | 
			
		||||
// of interface{}, so you need type assertion).
 | 
			
		||||
func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]*jsonmessage.JSONMessage, chan interface{}) {
 | 
			
		||||
	e.mu.Lock()
 | 
			
		||||
	defer e.mu.Unlock()
 | 
			
		||||
 | 
			
		||||
	var buffered []*jsonmessage.JSONMessage
 | 
			
		||||
	topic := func(m interface{}) bool {
 | 
			
		||||
		return ef.Include(m.(*jsonmessage.JSONMessage))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if since != -1 {
 | 
			
		||||
		for i := len(e.events) - 1; i >= 0; i-- {
 | 
			
		||||
			ev := e.events[i]
 | 
			
		||||
			if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) {
 | 
			
		||||
				break
 | 
			
		||||
			}
 | 
			
		||||
			if ef.filter.Len() == 0 || topic(ev) {
 | 
			
		||||
				buffered = append([]*jsonmessage.JSONMessage{ev}, buffered...)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var ch chan interface{}
 | 
			
		||||
	if ef.filter.Len() > 0 {
 | 
			
		||||
		ch = e.pub.SubscribeTopic(topic)
 | 
			
		||||
	} else {
 | 
			
		||||
		// Subscribe to all events if there are no filters
 | 
			
		||||
		ch = e.pub.Subscribe()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return buffered, ch
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Evict evicts listener from pubsub
 | 
			
		||||
func (e *Events) Evict(l chan interface{}) {
 | 
			
		||||
	e.pub.Evict(l)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -19,14 +19,14 @@ func NewFilter(filter filters.Args, getLabels func(id string) map[string]string)
 | 
			
		|||
 | 
			
		||||
// Include returns true when the event ev is included by the filters
 | 
			
		||||
func (ef *Filter) Include(ev *jsonmessage.JSONMessage) bool {
 | 
			
		||||
	return isFieldIncluded(ev.Status, ef.filter["event"]) &&
 | 
			
		||||
		isFieldIncluded(ev.ID, ef.filter["container"]) &&
 | 
			
		||||
	return ef.filter.ExactMatch("event", ev.Status) &&
 | 
			
		||||
		ef.filter.ExactMatch("container", ev.ID) &&
 | 
			
		||||
		ef.isImageIncluded(ev.ID, ev.From) &&
 | 
			
		||||
		ef.isLabelFieldIncluded(ev.ID)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (ef *Filter) isLabelFieldIncluded(id string) bool {
 | 
			
		||||
	if _, ok := ef.filter["label"]; !ok {
 | 
			
		||||
	if !ef.filter.Include("label") {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return ef.filter.MatchKVList("label", ef.getLabels(id))
 | 
			
		||||
| 
						 | 
				
			
			@ -37,31 +37,16 @@ func (ef *Filter) isLabelFieldIncluded(id string) bool {
 | 
			
		|||
// from an image will be included in the image events. Also compare both
 | 
			
		||||
// against the stripped repo name without any tags.
 | 
			
		||||
func (ef *Filter) isImageIncluded(eventID string, eventFrom string) bool {
 | 
			
		||||
	stripTag := func(image string) string {
 | 
			
		||||
		ref, err := reference.ParseNamed(image)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return image
 | 
			
		||||
		}
 | 
			
		||||
		return ref.Name()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return isFieldIncluded(eventID, ef.filter["image"]) ||
 | 
			
		||||
		isFieldIncluded(eventFrom, ef.filter["image"]) ||
 | 
			
		||||
		isFieldIncluded(stripTag(eventID), ef.filter["image"]) ||
 | 
			
		||||
		isFieldIncluded(stripTag(eventFrom), ef.filter["image"])
 | 
			
		||||
	return ef.filter.ExactMatch("image", eventID) ||
 | 
			
		||||
		ef.filter.ExactMatch("image", eventFrom) ||
 | 
			
		||||
		ef.filter.ExactMatch("image", stripTag(eventID)) ||
 | 
			
		||||
		ef.filter.ExactMatch("image", stripTag(eventFrom))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func isFieldIncluded(field string, filter []string) bool {
 | 
			
		||||
	if len(field) == 0 {
 | 
			
		||||
		return true
 | 
			
		||||
func stripTag(image string) string {
 | 
			
		||||
	ref, err := reference.ParseNamed(image)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return image
 | 
			
		||||
	}
 | 
			
		||||
	if len(filter) == 0 {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	for _, v := range filter {
 | 
			
		||||
		if v == field {
 | 
			
		||||
			return true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
	return ref.Name()
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -4,7 +4,6 @@ import (
 | 
			
		|||
	"fmt"
 | 
			
		||||
	"path"
 | 
			
		||||
	"sort"
 | 
			
		||||
	"strings"
 | 
			
		||||
 | 
			
		||||
	"github.com/docker/distribution/reference"
 | 
			
		||||
	"github.com/docker/docker/api/types"
 | 
			
		||||
| 
						 | 
				
			
			@ -13,9 +12,9 @@ import (
 | 
			
		|||
	"github.com/docker/docker/pkg/parsers/filters"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
var acceptedImageFilterTags = map[string]struct{}{
 | 
			
		||||
	"dangling": {},
 | 
			
		||||
	"label":    {},
 | 
			
		||||
var acceptedImageFilterTags = map[string]bool{
 | 
			
		||||
	"dangling": true,
 | 
			
		||||
	"label":    true,
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// byCreated is a temporary type used to sort a list of images by creation
 | 
			
		||||
| 
						 | 
				
			
			@ -47,19 +46,15 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
	for name := range imageFilters {
 | 
			
		||||
		if _, ok := acceptedImageFilterTags[name]; !ok {
 | 
			
		||||
			return nil, fmt.Errorf("Invalid filter '%s'", name)
 | 
			
		||||
		}
 | 
			
		||||
	if err := imageFilters.Validate(acceptedImageFilterTags); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if i, ok := imageFilters["dangling"]; ok {
 | 
			
		||||
		for _, value := range i {
 | 
			
		||||
			if v := strings.ToLower(value); v == "true" {
 | 
			
		||||
				danglingOnly = true
 | 
			
		||||
			} else if v != "false" {
 | 
			
		||||
				return nil, fmt.Errorf("Invalid filter 'dangling=%s'", v)
 | 
			
		||||
			}
 | 
			
		||||
	if imageFilters.Include("dangling") {
 | 
			
		||||
		if imageFilters.ExactMatch("dangling", "true") {
 | 
			
		||||
			danglingOnly = true
 | 
			
		||||
		} else if !imageFilters.ExactMatch("dangling", "false") {
 | 
			
		||||
			return nil, fmt.Errorf("Invalid filter 'dangling=%s'", imageFilters.Get("dangling"))
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -82,9 +77,9 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	for id, img := range allImages {
 | 
			
		||||
		if _, ok := imageFilters["label"]; ok {
 | 
			
		||||
		if imageFilters.Include("label") {
 | 
			
		||||
			// Very old image that do not have image.Config (or even labels)
 | 
			
		||||
			if img.Config == nil {
 | 
			
		||||
				// Very old image that do not have image.Config (or even labels)
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			// We are now sure image.Config is not nil
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -8,7 +8,6 @@ import (
 | 
			
		|||
 | 
			
		||||
	"github.com/Sirupsen/logrus"
 | 
			
		||||
	"github.com/docker/docker/api/types"
 | 
			
		||||
	derr "github.com/docker/docker/errors"
 | 
			
		||||
	"github.com/docker/docker/image"
 | 
			
		||||
	"github.com/docker/docker/pkg/graphdb"
 | 
			
		||||
	"github.com/docker/docker/pkg/nat"
 | 
			
		||||
| 
						 | 
				
			
			@ -136,64 +135,65 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error)
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	var filtExited []int
 | 
			
		||||
	if i, ok := psFilters["exited"]; ok {
 | 
			
		||||
		for _, value := range i {
 | 
			
		||||
			code, err := strconv.Atoi(value)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
			filtExited = append(filtExited, code)
 | 
			
		||||
	err = psFilters.WalkValues("exited", func(value string) error {
 | 
			
		||||
		code, err := strconv.Atoi(value)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
		filtExited = append(filtExited, code)
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if i, ok := psFilters["status"]; ok {
 | 
			
		||||
		for _, value := range i {
 | 
			
		||||
			if !isValidStateString(value) {
 | 
			
		||||
				return nil, errors.New("Unrecognised filter value for status")
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			config.All = true
 | 
			
		||||
	err = psFilters.WalkValues("status", func(value string) error {
 | 
			
		||||
		if !isValidStateString(value) {
 | 
			
		||||
			return fmt.Errorf("Unrecognised filter value for status: %s", value)
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		config.All = true
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	var beforeContFilter, sinceContFilter *Container
 | 
			
		||||
	if i, ok := psFilters["before"]; ok {
 | 
			
		||||
		for _, value := range i {
 | 
			
		||||
			beforeContFilter, err = daemon.Get(value)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	err = psFilters.WalkValues("before", func(value string) error {
 | 
			
		||||
		beforeContFilter, err = daemon.Get(value)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if i, ok := psFilters["since"]; ok {
 | 
			
		||||
		for _, value := range i {
 | 
			
		||||
			sinceContFilter, err = daemon.Get(value)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				return nil, err
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	err = psFilters.WalkValues("since", func(value string) error {
 | 
			
		||||
		sinceContFilter, err = daemon.Get(value)
 | 
			
		||||
		return err
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	imagesFilter := map[image.ID]bool{}
 | 
			
		||||
	var ancestorFilter bool
 | 
			
		||||
	if ancestors, ok := psFilters["ancestor"]; ok {
 | 
			
		||||
	if psFilters.Include("ancestor") {
 | 
			
		||||
		ancestorFilter = true
 | 
			
		||||
		// The idea is to walk the graph down the most "efficient" way.
 | 
			
		||||
		for _, ancestor := range ancestors {
 | 
			
		||||
			// First, get the imageId of the ancestor filter (yay)
 | 
			
		||||
		psFilters.WalkValues("ancestor", func(ancestor string) error {
 | 
			
		||||
			id, err := daemon.GetImageID(ancestor)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				logrus.Warnf("Error while looking up for image %v", ancestor)
 | 
			
		||||
				continue
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			if imagesFilter[id] {
 | 
			
		||||
				// Already seen this ancestor, skip it
 | 
			
		||||
				continue
 | 
			
		||||
				return nil
 | 
			
		||||
			}
 | 
			
		||||
			// Then walk down the graph and put the imageIds in imagesFilter
 | 
			
		||||
			populateImageFilterByParents(imagesFilter, id, daemon.imageStore.Children)
 | 
			
		||||
		}
 | 
			
		||||
			return nil
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	names := make(map[string][]string)
 | 
			
		||||
| 
						 | 
				
			
			@ -202,14 +202,14 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error)
 | 
			
		|||
		return nil
 | 
			
		||||
	}, 1)
 | 
			
		||||
 | 
			
		||||
	if config.Before != "" {
 | 
			
		||||
	if config.Before != "" && beforeContFilter == nil {
 | 
			
		||||
		beforeContFilter, err = daemon.Get(config.Before)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if config.Since != "" {
 | 
			
		||||
	if config.Since != "" && sinceContFilter == nil {
 | 
			
		||||
		sinceContFilter, err = daemon.Get(config.Since)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return nil, err
 | 
			
		||||
| 
						 | 
				
			
			@ -397,17 +397,8 @@ func (daemon *Daemon) Volumes(filter string) ([]*types.Volume, error) {
 | 
			
		|||
		return nil, err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	filterUsed := false
 | 
			
		||||
	if i, ok := volFilters["dangling"]; ok {
 | 
			
		||||
		if len(i) > 1 {
 | 
			
		||||
			return nil, derr.ErrorCodeDanglingOne
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		filterValue := i[0]
 | 
			
		||||
		if strings.ToLower(filterValue) == "true" || filterValue == "1" {
 | 
			
		||||
			filterUsed = true
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	filterUsed := volFilters.Include("dangling") &&
 | 
			
		||||
		(volFilters.ExactMatch("dangling", "true") || volFilters.ExactMatch("dangling", "1"))
 | 
			
		||||
 | 
			
		||||
	volumes := daemon.volumes.List()
 | 
			
		||||
	for _, v := range volumes {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -230,9 +230,9 @@ func isNetworkAvailable(c *check.C, name string) bool {
 | 
			
		|||
func getNetworkIDByName(c *check.C, name string) string {
 | 
			
		||||
	var (
 | 
			
		||||
		v          = url.Values{}
 | 
			
		||||
		filterArgs = filters.Args{}
 | 
			
		||||
		filterArgs = filters.NewArgs()
 | 
			
		||||
	)
 | 
			
		||||
	filterArgs["name"] = []string{name}
 | 
			
		||||
	filterArgs.Add("name", name)
 | 
			
		||||
	filterJSON, err := filters.ToParam(filterArgs)
 | 
			
		||||
	c.Assert(err, checker.IsNil)
 | 
			
		||||
	v.Set("filters", filterJSON)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1891,9 +1891,7 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
 | 
			
		|||
 | 
			
		||||
	startEpoch := daemonTime(c).Unix()
 | 
			
		||||
	// Watch for events since epoch.
 | 
			
		||||
	eventsCmd := exec.Command(
 | 
			
		||||
		dockerBinary, "events",
 | 
			
		||||
		"--since", strconv.FormatInt(startEpoch, 10))
 | 
			
		||||
	eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10))
 | 
			
		||||
	stdout, err := eventsCmd.StdoutPipe()
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		c.Fatal(err)
 | 
			
		||||
| 
						 | 
				
			
			@ -1932,12 +1930,12 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) {
 | 
			
		|||
		c.Fatalf("failed to run build: %s", err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	matchCID := regexp.MustCompile("Running in ")
 | 
			
		||||
	matchCID := regexp.MustCompile("Running in (.+)")
 | 
			
		||||
	scanner := bufio.NewScanner(stdoutBuild)
 | 
			
		||||
	for scanner.Scan() {
 | 
			
		||||
		line := scanner.Text()
 | 
			
		||||
		if ok := matchCID.MatchString(line); ok {
 | 
			
		||||
			containerID <- line[len(line)-12:]
 | 
			
		||||
		if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 {
 | 
			
		||||
			containerID <- matches[1]
 | 
			
		||||
			break
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -74,7 +74,7 @@ func (s *DockerSuite) TestImagesErrorWithInvalidFilterNameTest(c *check.C) {
 | 
			
		|||
	c.Assert(out, checker.Contains, "Invalid filter")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (s *DockerSuite) TestImagesFilterLabel(c *check.C) {
 | 
			
		||||
func (s *DockerSuite) TestImagesFilterLabelMatch(c *check.C) {
 | 
			
		||||
	testRequires(c, DaemonIsLinux)
 | 
			
		||||
	imageName1 := "images_filter_test1"
 | 
			
		||||
	imageName2 := "images_filter_test2"
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -5,6 +5,7 @@ package filters
 | 
			
		|||
import (
 | 
			
		||||
	"encoding/json"
 | 
			
		||||
	"errors"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"regexp"
 | 
			
		||||
	"strings"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -15,7 +16,14 @@ import (
 | 
			
		|||
// in an slice.
 | 
			
		||||
// e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu'
 | 
			
		||||
// the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}}
 | 
			
		||||
type Args map[string][]string
 | 
			
		||||
type Args struct {
 | 
			
		||||
	fields map[string]map[string]bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// NewArgs initializes a new Args struct.
 | 
			
		||||
func NewArgs() Args {
 | 
			
		||||
	return Args{fields: map[string]map[string]bool{}}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ParseFlag parses the argument to the filter flag. Like
 | 
			
		||||
//
 | 
			
		||||
| 
						 | 
				
			
			@ -25,9 +33,6 @@ type Args map[string][]string
 | 
			
		|||
// map is created.
 | 
			
		||||
func ParseFlag(arg string, prev Args) (Args, error) {
 | 
			
		||||
	filters := prev
 | 
			
		||||
	if prev == nil {
 | 
			
		||||
		filters = Args{}
 | 
			
		||||
	}
 | 
			
		||||
	if len(arg) == 0 {
 | 
			
		||||
		return filters, nil
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) {
 | 
			
		|||
	}
 | 
			
		||||
 | 
			
		||||
	f := strings.SplitN(arg, "=", 2)
 | 
			
		||||
 | 
			
		||||
	name := strings.ToLower(strings.TrimSpace(f[0]))
 | 
			
		||||
	value := strings.TrimSpace(f[1])
 | 
			
		||||
	filters[name] = append(filters[name], value)
 | 
			
		||||
 | 
			
		||||
	filters.Add(name, value)
 | 
			
		||||
 | 
			
		||||
	return filters, nil
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)")
 | 
			
		|||
// ToParam packs the Args into an string for easy transport from client to server.
 | 
			
		||||
func ToParam(a Args) (string, error) {
 | 
			
		||||
	// this way we don't URL encode {}, just empty space
 | 
			
		||||
	if len(a) == 0 {
 | 
			
		||||
	if a.Len() == 0 {
 | 
			
		||||
		return "", nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	buf, err := json.Marshal(a)
 | 
			
		||||
	buf, err := json.Marshal(a.fields)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return "", err
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) {
 | 
			
		|||
 | 
			
		||||
// FromParam unpacks the filter Args.
 | 
			
		||||
func FromParam(p string) (Args, error) {
 | 
			
		||||
	args := Args{}
 | 
			
		||||
	if len(p) == 0 {
 | 
			
		||||
		return args, nil
 | 
			
		||||
		return NewArgs(), nil
 | 
			
		||||
	}
 | 
			
		||||
	if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil {
 | 
			
		||||
		return nil, err
 | 
			
		||||
 | 
			
		||||
	r := strings.NewReader(p)
 | 
			
		||||
	d := json.NewDecoder(r)
 | 
			
		||||
 | 
			
		||||
	m := map[string]map[string]bool{}
 | 
			
		||||
	if err := d.Decode(&m); err != nil {
 | 
			
		||||
		r.Seek(0, 0)
 | 
			
		||||
 | 
			
		||||
		// Allow parsing old arguments in slice format.
 | 
			
		||||
		// Because other libraries might be sending them in this format.
 | 
			
		||||
		deprecated := map[string][]string{}
 | 
			
		||||
		if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil {
 | 
			
		||||
			m = deprecatedArgs(deprecated)
 | 
			
		||||
		} else {
 | 
			
		||||
			return NewArgs(), err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return args, nil
 | 
			
		||||
	return Args{m}, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Get returns the list of values associates with a field.
 | 
			
		||||
// It returns a slice of strings to keep backwards compatibility with old code.
 | 
			
		||||
func (filters Args) Get(field string) []string {
 | 
			
		||||
	values := filters.fields[field]
 | 
			
		||||
	if values == nil {
 | 
			
		||||
		return make([]string, 0)
 | 
			
		||||
	}
 | 
			
		||||
	slice := make([]string, 0, len(values))
 | 
			
		||||
	for key := range values {
 | 
			
		||||
		slice = append(slice, key)
 | 
			
		||||
	}
 | 
			
		||||
	return slice
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Add adds a new value to a filter field.
 | 
			
		||||
func (filters Args) Add(name, value string) {
 | 
			
		||||
	if _, ok := filters.fields[name]; ok {
 | 
			
		||||
		filters.fields[name][value] = true
 | 
			
		||||
	} else {
 | 
			
		||||
		filters.fields[name] = map[string]bool{value: true}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Del removes a value from a filter field.
 | 
			
		||||
func (filters Args) Del(name, value string) {
 | 
			
		||||
	if _, ok := filters.fields[name]; ok {
 | 
			
		||||
		delete(filters.fields[name], value)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Len returns the number of fields in the arguments.
 | 
			
		||||
func (filters Args) Len() int {
 | 
			
		||||
	return len(filters.fields)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MatchKVList returns true if the values for the specified field maches the ones
 | 
			
		||||
// from the sources.
 | 
			
		||||
// e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}},
 | 
			
		||||
//      field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}}
 | 
			
		||||
//      field is 'label' and sources are {'label1': '1', 'label2': '2'}
 | 
			
		||||
//      it returns true.
 | 
			
		||||
func (filters Args) MatchKVList(field string, sources map[string]string) bool {
 | 
			
		||||
	fieldValues := filters[field]
 | 
			
		||||
	fieldValues := filters.fields[field]
 | 
			
		||||
 | 
			
		||||
	//do not filter if there is no filter set or cannot determine filter
 | 
			
		||||
	if len(fieldValues) == 0 {
 | 
			
		||||
| 
						 | 
				
			
			@ -90,21 +145,16 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool {
 | 
			
		|||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
outer:
 | 
			
		||||
	for _, name2match := range fieldValues {
 | 
			
		||||
	for name2match := range fieldValues {
 | 
			
		||||
		testKV := strings.SplitN(name2match, "=", 2)
 | 
			
		||||
 | 
			
		||||
		for k, v := range sources {
 | 
			
		||||
			if len(testKV) == 1 {
 | 
			
		||||
				if k == testKV[0] {
 | 
			
		||||
					continue outer
 | 
			
		||||
				}
 | 
			
		||||
			} else if k == testKV[0] && v == testKV[1] {
 | 
			
		||||
				continue outer
 | 
			
		||||
			}
 | 
			
		||||
		v, ok := sources[testKV[0]]
 | 
			
		||||
		if !ok {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
		if len(testKV) == 2 && testKV[1] != v {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		return false
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return true
 | 
			
		||||
| 
						 | 
				
			
			@ -115,13 +165,12 @@ outer:
 | 
			
		|||
//      field is 'image.name' and source is 'ubuntu'
 | 
			
		||||
//      it returns true.
 | 
			
		||||
func (filters Args) Match(field, source string) bool {
 | 
			
		||||
	fieldValues := filters[field]
 | 
			
		||||
 | 
			
		||||
	//do not filter if there is no filter set or cannot determine filter
 | 
			
		||||
	if len(fieldValues) == 0 {
 | 
			
		||||
	if filters.ExactMatch(field, source) {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	for _, name2match := range fieldValues {
 | 
			
		||||
 | 
			
		||||
	fieldValues := filters.fields[field]
 | 
			
		||||
	for name2match := range fieldValues {
 | 
			
		||||
		match, err := regexp.MatchString(name2match, source)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			continue
 | 
			
		||||
| 
						 | 
				
			
			@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool {
 | 
			
		|||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ExactMatch returns true if the source matches exactly one of the filters.
 | 
			
		||||
func (filters Args) ExactMatch(field, source string) bool {
 | 
			
		||||
	fieldValues, ok := filters.fields[field]
 | 
			
		||||
	//do not filter if there is no filter set or cannot determine filter
 | 
			
		||||
	if !ok || len(fieldValues) == 0 {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// try to march full name value to avoid O(N) regular expression matching
 | 
			
		||||
	if fieldValues[source] {
 | 
			
		||||
		return true
 | 
			
		||||
	}
 | 
			
		||||
	return false
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Include returns true if the name of the field to filter is in the filters.
 | 
			
		||||
func (filters Args) Include(field string) bool {
 | 
			
		||||
	_, ok := filters.fields[field]
 | 
			
		||||
	return ok
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Validate ensures that all the fields in the filter are valid.
 | 
			
		||||
// It returns an error as soon as it finds an invalid field.
 | 
			
		||||
func (filters Args) Validate(accepted map[string]bool) error {
 | 
			
		||||
	for name := range filters.fields {
 | 
			
		||||
		if !accepted[name] {
 | 
			
		||||
			return fmt.Errorf("Invalid filter '%s'", name)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WalkValues iterates over the list of filtered values for a field.
 | 
			
		||||
// It stops the iteration if it finds an error and it returns that error.
 | 
			
		||||
func (filters Args) WalkValues(field string, op func(value string) error) error {
 | 
			
		||||
	if _, ok := filters.fields[field]; !ok {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
	for v := range filters.fields[field] {
 | 
			
		||||
		if err := op(v); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func deprecatedArgs(d map[string][]string) map[string]map[string]bool {
 | 
			
		||||
	m := map[string]map[string]bool{}
 | 
			
		||||
	for k, v := range d {
 | 
			
		||||
		values := map[string]bool{}
 | 
			
		||||
		for _, vv := range v {
 | 
			
		||||
			values[vv] = true
 | 
			
		||||
		}
 | 
			
		||||
		m[k] = values
 | 
			
		||||
	}
 | 
			
		||||
	return m
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,7 +1,7 @@
 | 
			
		|||
package filters
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"sort"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"testing"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) {
 | 
			
		|||
		"image.name=*untu",
 | 
			
		||||
	}
 | 
			
		||||
	var (
 | 
			
		||||
		args = Args{}
 | 
			
		||||
		args = NewArgs()
 | 
			
		||||
		err  error
 | 
			
		||||
	)
 | 
			
		||||
	for i := range flagArgs {
 | 
			
		||||
| 
						 | 
				
			
			@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) {
 | 
			
		|||
			t.Errorf("failed to parse %s: %s", flagArgs[i], err)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	if len(args["created"]) != 1 {
 | 
			
		||||
	if len(args.Get("created")) != 1 {
 | 
			
		||||
		t.Errorf("failed to set this arg")
 | 
			
		||||
	}
 | 
			
		||||
	if len(args["image.name"]) != 2 {
 | 
			
		||||
	if len(args.Get("image.name")) != 2 {
 | 
			
		||||
		t.Errorf("the args should have collapsed")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
	if args == nil || len(args) != 0 {
 | 
			
		||||
	if args.Len() != 0 {
 | 
			
		||||
		t.Fatalf("Expected an empty Args (map), got %v", args)
 | 
			
		||||
	}
 | 
			
		||||
	if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat {
 | 
			
		||||
| 
						 | 
				
			
			@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) {
 | 
			
		|||
}
 | 
			
		||||
 | 
			
		||||
func TestToParam(t *testing.T) {
 | 
			
		||||
	a := Args{
 | 
			
		||||
		"created":    []string{"today"},
 | 
			
		||||
		"image.name": []string{"ubuntu*", "*untu"},
 | 
			
		||||
	fields := map[string]map[string]bool{
 | 
			
		||||
		"created":    {"today": true},
 | 
			
		||||
		"image.name": {"ubuntu*": true, "*untu": true},
 | 
			
		||||
	}
 | 
			
		||||
	a := Args{fields: fields}
 | 
			
		||||
 | 
			
		||||
	_, err := ToParam(a)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			@ -63,42 +64,48 @@ func TestFromParam(t *testing.T) {
 | 
			
		|||
		"{'key': 'value'}",
 | 
			
		||||
		`{"key": "value"}`,
 | 
			
		||||
	}
 | 
			
		||||
	valids := map[string]Args{
 | 
			
		||||
		`{"key": ["value"]}`: {
 | 
			
		||||
			"key": {"value"},
 | 
			
		||||
	valid := map[*Args][]string{
 | 
			
		||||
		&Args{fields: map[string]map[string]bool{"key": {"value": true}}}: {
 | 
			
		||||
			`{"key": ["value"]}`,
 | 
			
		||||
			`{"key": {"value": true}}`,
 | 
			
		||||
		},
 | 
			
		||||
		`{"key": ["value1", "value2"]}`: {
 | 
			
		||||
			"key": {"value1", "value2"},
 | 
			
		||||
		&Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: {
 | 
			
		||||
			`{"key": ["value1", "value2"]}`,
 | 
			
		||||
			`{"key": {"value1": true, "value2": true}}`,
 | 
			
		||||
		},
 | 
			
		||||
		`{"key1": ["value1"], "key2": ["value2"]}`: {
 | 
			
		||||
			"key1": {"value1"},
 | 
			
		||||
			"key2": {"value2"},
 | 
			
		||||
		&Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: {
 | 
			
		||||
			`{"key1": ["value1"], "key2": ["value2"]}`,
 | 
			
		||||
			`{"key1": {"value1": true}, "key2": {"value2": true}}`,
 | 
			
		||||
		},
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for _, invalid := range invalids {
 | 
			
		||||
		if _, err := FromParam(invalid); err == nil {
 | 
			
		||||
			t.Fatalf("Expected an error with %v, got nothing", invalid)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	for json, expectedArgs := range valids {
 | 
			
		||||
		args, err := FromParam(json)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			t.Fatal(err)
 | 
			
		||||
		}
 | 
			
		||||
		if len(args) != len(expectedArgs) {
 | 
			
		||||
			t.Fatalf("Expected %v, go %v", expectedArgs, args)
 | 
			
		||||
		}
 | 
			
		||||
		for key, expectedValues := range expectedArgs {
 | 
			
		||||
			values := args[key]
 | 
			
		||||
			sort.Strings(values)
 | 
			
		||||
			sort.Strings(expectedValues)
 | 
			
		||||
			if len(values) != len(expectedValues) {
 | 
			
		||||
 | 
			
		||||
	for expectedArgs, matchers := range valid {
 | 
			
		||||
		for _, json := range matchers {
 | 
			
		||||
			args, err := FromParam(json)
 | 
			
		||||
			if err != nil {
 | 
			
		||||
				t.Fatal(err)
 | 
			
		||||
			}
 | 
			
		||||
			if args.Len() != expectedArgs.Len() {
 | 
			
		||||
				t.Fatalf("Expected %v, go %v", expectedArgs, args)
 | 
			
		||||
			}
 | 
			
		||||
			for index, expectedValue := range expectedValues {
 | 
			
		||||
				if values[index] != expectedValue {
 | 
			
		||||
			for key, expectedValues := range expectedArgs.fields {
 | 
			
		||||
				values := args.Get(key)
 | 
			
		||||
 | 
			
		||||
				if len(values) != len(expectedValues) {
 | 
			
		||||
					t.Fatalf("Expected %v, go %v", expectedArgs, args)
 | 
			
		||||
				}
 | 
			
		||||
 | 
			
		||||
				for _, v := range values {
 | 
			
		||||
					if !expectedValues[v] {
 | 
			
		||||
						t.Fatalf("Expected %v, go %v", expectedArgs, args)
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
| 
						 | 
				
			
			@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) {
 | 
			
		|||
	if err != nil {
 | 
			
		||||
		t.Errorf("%s", err)
 | 
			
		||||
	}
 | 
			
		||||
	if len(a) != len(v1) {
 | 
			
		||||
	if a.Len() != v1.Len() {
 | 
			
		||||
		t.Errorf("these should both be empty sets")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestArgsMatchKVList(t *testing.T) {
 | 
			
		||||
	// empty sources
 | 
			
		||||
	args := Args{
 | 
			
		||||
		"created": []string{"today"},
 | 
			
		||||
func TestArgsMatchKVListEmptySources(t *testing.T) {
 | 
			
		||||
	args := NewArgs()
 | 
			
		||||
	if !args.MatchKVList("created", map[string]string{}) {
 | 
			
		||||
		t.Fatalf("Expected true for (%v,created), got true", args)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	args = Args{map[string]map[string]bool{"created": {"today": true}}}
 | 
			
		||||
	if args.MatchKVList("created", map[string]string{}) {
 | 
			
		||||
		t.Fatalf("Expected false for (%v,created), got true", args)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestArgsMatchKVList(t *testing.T) {
 | 
			
		||||
	// Not empty sources
 | 
			
		||||
	sources := map[string]string{
 | 
			
		||||
		"key1": "value1",
 | 
			
		||||
		"key2": "value2",
 | 
			
		||||
		"key3": "value3",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	matches := map[*Args]string{
 | 
			
		||||
		&Args{}: "field",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
			"labels":  []string{"key1"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true},
 | 
			
		||||
			"labels":  map[string]bool{"key1": true}},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
			"labels":  []string{"key1=value1"},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
	}
 | 
			
		||||
	differs := map[*Args]string{
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
			"labels":  []string{"key4"},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
			"labels":  []string{"key1=value3"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true},
 | 
			
		||||
			"labels":  map[string]bool{"key1=value1": true}},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for args, field := range matches {
 | 
			
		||||
		if args.MatchKVList(field, sources) != true {
 | 
			
		||||
			t.Fatalf("Expected true for %v on %v, got false", sources, args)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	differs := map[*Args]string{
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true},
 | 
			
		||||
			"labels":  map[string]bool{"key4": true}},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true},
 | 
			
		||||
			"labels":  map[string]bool{"key1=value3": true}},
 | 
			
		||||
		}: "labels",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for args, field := range differs {
 | 
			
		||||
		if args.MatchKVList(field, sources) != false {
 | 
			
		||||
			t.Fatalf("Expected false for %v on %v, got true", sources, args)
 | 
			
		||||
| 
						 | 
				
			
			@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) {
 | 
			
		|||
 | 
			
		||||
func TestArgsMatch(t *testing.T) {
 | 
			
		||||
	source := "today"
 | 
			
		||||
 | 
			
		||||
	matches := map[*Args]string{
 | 
			
		||||
		&Args{}: "field",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today"},
 | 
			
		||||
			"labels":  []string{"key1"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today": true}},
 | 
			
		||||
		}: "today",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"to*"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"to*": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"to(.*)"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"to(.*)": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"tod"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"tod": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"anything", "to*"},
 | 
			
		||||
		}: "created",
 | 
			
		||||
	}
 | 
			
		||||
	differs := map[*Args]string{
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"tomorrow"},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"to(day"},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"tom(.*)"},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{
 | 
			
		||||
			"created": []string{"today1"},
 | 
			
		||||
			"labels":  []string{"today"},
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"anyting": true, "to*": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for args, field := range matches {
 | 
			
		||||
		if args.Match(field, source) != true {
 | 
			
		||||
			t.Fatalf("Expected true for %v on %v, got false", source, args)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	differs := map[*Args]string{
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"tomorrow": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"to(day": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"tom(.*)": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"tom": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
		&Args{map[string]map[string]bool{
 | 
			
		||||
			"created": map[string]bool{"today1": true},
 | 
			
		||||
			"labels":  map[string]bool{"today": true}},
 | 
			
		||||
		}: "created",
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	for args, field := range differs {
 | 
			
		||||
		if args.Match(field, source) != false {
 | 
			
		||||
			t.Fatalf("Expected false for %v on %v, got true", source, args)
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestAdd(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	v := f.fields["status"]
 | 
			
		||||
	if len(v) != 1 || !v["running"] {
 | 
			
		||||
		t.Fatalf("Expected to include a running status, got %v", v)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	f.Add("status", "paused")
 | 
			
		||||
	if len(v) != 2 || !v["paused"] {
 | 
			
		||||
		t.Fatalf("Expected to include a paused status, got %v", v)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestDel(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	f.Del("status", "running")
 | 
			
		||||
	v := f.fields["status"]
 | 
			
		||||
	if v["running"] {
 | 
			
		||||
		t.Fatalf("Expected to not include a running status filter, got true")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestLen(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	if f.Len() != 0 {
 | 
			
		||||
		t.Fatalf("Expected to not include any field")
 | 
			
		||||
	}
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	if f.Len() != 1 {
 | 
			
		||||
		t.Fatalf("Expected to include one field")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestExactMatch(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
 | 
			
		||||
	if !f.ExactMatch("status", "running") {
 | 
			
		||||
		t.Fatalf("Expected to match `running` when there are no filters, got false")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	f.Add("status", "pause*")
 | 
			
		||||
 | 
			
		||||
	if !f.ExactMatch("status", "running") {
 | 
			
		||||
		t.Fatalf("Expected to match `running` with one of the filters, got false")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if f.ExactMatch("status", "paused") {
 | 
			
		||||
		t.Fatalf("Expected to not match `paused` with one of the filters, got true")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestInclude(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	if f.Include("status") {
 | 
			
		||||
		t.Fatalf("Expected to not include a status key, got true")
 | 
			
		||||
	}
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	if !f.Include("status") {
 | 
			
		||||
		t.Fatalf("Expected to include a status key, got false")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestValidate(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
 | 
			
		||||
	valid := map[string]bool{
 | 
			
		||||
		"status":   true,
 | 
			
		||||
		"dangling": true,
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if err := f.Validate(valid); err != nil {
 | 
			
		||||
		t.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	f.Add("bogus", "running")
 | 
			
		||||
	if err := f.Validate(valid); err == nil {
 | 
			
		||||
		t.Fatalf("Expected to return an error, got nil")
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func TestWalkValues(t *testing.T) {
 | 
			
		||||
	f := NewArgs()
 | 
			
		||||
	f.Add("status", "running")
 | 
			
		||||
	f.Add("status", "paused")
 | 
			
		||||
 | 
			
		||||
	f.WalkValues("status", func(value string) error {
 | 
			
		||||
		if value != "running" && value != "paused" {
 | 
			
		||||
			t.Fatalf("Unexpected value %s", value)
 | 
			
		||||
		}
 | 
			
		||||
		return nil
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	err := f.WalkValues("status", func(value string) error {
 | 
			
		||||
		return fmt.Errorf("return")
 | 
			
		||||
	})
 | 
			
		||||
	if err == nil {
 | 
			
		||||
		t.Fatalf("Expected to get an error, got nil")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	err = f.WalkValues("foo", func(value string) error {
 | 
			
		||||
		return fmt.Errorf("return")
 | 
			
		||||
	})
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher {
 | 
			
		|||
	return &Publisher{
 | 
			
		||||
		buffer:      buffer,
 | 
			
		||||
		timeout:     publishTimeout,
 | 
			
		||||
		subscribers: make(map[subscriber]struct{}),
 | 
			
		||||
		subscribers: make(map[subscriber]topicFunc),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type subscriber chan interface{}
 | 
			
		||||
type topicFunc func(v interface{}) bool
 | 
			
		||||
 | 
			
		||||
// Publisher is basic pub/sub structure. Allows to send events and subscribe
 | 
			
		||||
// to them. Can be safely used from multiple goroutines.
 | 
			
		||||
| 
						 | 
				
			
			@ -25,7 +26,7 @@ type Publisher struct {
 | 
			
		|||
	m           sync.RWMutex
 | 
			
		||||
	buffer      int
 | 
			
		||||
	timeout     time.Duration
 | 
			
		||||
	subscribers map[subscriber]struct{}
 | 
			
		||||
	subscribers map[subscriber]topicFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Len returns the number of subscribers for the publisher
 | 
			
		||||
| 
						 | 
				
			
			@ -38,9 +39,14 @@ func (p *Publisher) Len() int {
 | 
			
		|||
 | 
			
		||||
// Subscribe adds a new subscriber to the publisher returning the channel.
 | 
			
		||||
func (p *Publisher) Subscribe() chan interface{} {
 | 
			
		||||
	return p.SubscribeTopic(nil)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// SubscribeTopic adds a new subscriber that filters messages sent by a topic.
 | 
			
		||||
func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
 | 
			
		||||
	ch := make(chan interface{}, p.buffer)
 | 
			
		||||
	p.m.Lock()
 | 
			
		||||
	p.subscribers[ch] = struct{}{}
 | 
			
		||||
	p.subscribers[ch] = topic
 | 
			
		||||
	p.m.Unlock()
 | 
			
		||||
	return ch
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) {
 | 
			
		|||
// Publish sends the data in v to all subscribers currently registered with the publisher.
 | 
			
		||||
func (p *Publisher) Publish(v interface{}) {
 | 
			
		||||
	p.m.RLock()
 | 
			
		||||
	for sub := range p.subscribers {
 | 
			
		||||
		// send under a select as to not block if the receiver is unavailable
 | 
			
		||||
		if p.timeout > 0 {
 | 
			
		||||
			select {
 | 
			
		||||
			case sub <- v:
 | 
			
		||||
			case <-time.After(p.timeout):
 | 
			
		||||
			}
 | 
			
		||||
			continue
 | 
			
		||||
		}
 | 
			
		||||
		select {
 | 
			
		||||
		case sub <- v:
 | 
			
		||||
		default:
 | 
			
		||||
		}
 | 
			
		||||
	wg := new(sync.WaitGroup)
 | 
			
		||||
	for sub, topic := range p.subscribers {
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
 | 
			
		||||
		go p.sendTopic(sub, topic, v, wg)
 | 
			
		||||
	}
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	p.m.RUnlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -82,3 +81,24 @@ func (p *Publisher) Close() {
 | 
			
		|||
	}
 | 
			
		||||
	p.m.Unlock()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
 | 
			
		||||
	defer wg.Done()
 | 
			
		||||
	if topic != nil && !topic(v) {
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// send under a select as to not block if the receiver is unavailable
 | 
			
		||||
	if p.timeout > 0 {
 | 
			
		||||
		select {
 | 
			
		||||
		case sub <- v:
 | 
			
		||||
		case <-time.After(p.timeout):
 | 
			
		||||
		}
 | 
			
		||||
		return
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	select {
 | 
			
		||||
	case sub <- v:
 | 
			
		||||
	default:
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue