From 93d1dd8036d57f5cf1e5cbbbad875ae9a6fa6180 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 25 Nov 2015 20:27:11 -0500 Subject: [PATCH 1/2] Make filtering a linear operation. Improves the current filtering implementation complixity. Currently, the best case is O(N) and worst case O(N^2) for key-value filtering. In the new implementation, the best case is O(1) and worst case O(N), again for key-value filtering. Signed-off-by: David Calavera --- api/client/events.go | 4 +- api/client/images.go | 4 +- api/client/ps.go | 4 +- api/client/volume.go | 4 +- api/server/router/network/network_routes.go | 30 +- daemon/daemon.go | 9 +- daemon/events/filter.go | 39 +-- daemon/images.go | 29 +- daemon/list.go | 89 +++--- integration-cli/docker_api_network_test.go | 4 +- integration-cli/docker_cli_images_test.go | 2 +- pkg/parsers/filters/parse.go | 169 +++++++++-- pkg/parsers/filters/parse_test.go | 295 ++++++++++++++------ 13 files changed, 444 insertions(+), 238 deletions(-) diff --git a/api/client/events.go b/api/client/events.go index 86ed0269b5..fa84eac518 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -26,7 +26,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error { var ( v = url.Values{} - eventFilterArgs = filters.Args{} + eventFilterArgs = filters.NewArgs() ) // Consolidate all filter flags, and sanity check them early. @@ -53,7 +53,7 @@ func (cli *DockerCli) CmdEvents(args ...string) error { } v.Set("until", ts) } - if len(eventFilterArgs) > 0 { + if eventFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(eventFilterArgs) if err != nil { return err diff --git a/api/client/images.go b/api/client/images.go index ba26512a2d..26e7c9706e 100644 --- a/api/client/images.go +++ b/api/client/images.go @@ -36,7 +36,7 @@ func (cli *DockerCli) CmdImages(args ...string) error { // Consolidate all filter flags, and sanity check them early. // They'll get process in the daemon/server. - imageFilterArgs := filters.Args{} + imageFilterArgs := filters.NewArgs() for _, f := range flFilter.GetAll() { var err error imageFilterArgs, err = filters.ParseFlag(f, imageFilterArgs) @@ -47,7 +47,7 @@ func (cli *DockerCli) CmdImages(args ...string) error { matchName := cmd.Arg(0) v := url.Values{} - if len(imageFilterArgs) > 0 { + if imageFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(imageFilterArgs) if err != nil { return err diff --git a/api/client/ps.go b/api/client/ps.go index 76a00214a9..5bcf4bb2ee 100644 --- a/api/client/ps.go +++ b/api/client/ps.go @@ -20,7 +20,7 @@ func (cli *DockerCli) CmdPs(args ...string) error { var ( err error - psFilterArgs = filters.Args{} + psFilterArgs = filters.NewArgs() v = url.Values{} cmd = Cli.Subcmd("ps", nil, Cli.DockerCommands["ps"].Description, true) @@ -72,7 +72,7 @@ func (cli *DockerCli) CmdPs(args ...string) error { } } - if len(psFilterArgs) > 0 { + if psFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(psFilterArgs) if err != nil { return err diff --git a/api/client/volume.go b/api/client/volume.go index 1dc0ea2d04..08d83f5a70 100644 --- a/api/client/volume.go +++ b/api/client/volume.go @@ -54,7 +54,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error { cmd.Require(flag.Exact, 0) cmd.ParseFlags(args, true) - volFilterArgs := filters.Args{} + volFilterArgs := filters.NewArgs() for _, f := range flFilter.GetAll() { var err error volFilterArgs, err = filters.ParseFlag(f, volFilterArgs) @@ -64,7 +64,7 @@ func (cli *DockerCli) CmdVolumeLs(args ...string) error { } v := url.Values{} - if len(volFilterArgs) > 0 { + if volFilterArgs.Len() > 0 { filterJSON, err := filters.ToParam(volFilterArgs) if err != nil { return err diff --git a/api/server/router/network/network_routes.go b/api/server/router/network/network_routes.go index 5b70c4f31d..f639481514 100644 --- a/api/server/router/network/network_routes.go +++ b/api/server/router/network/network_routes.go @@ -29,27 +29,23 @@ func (n *networkRouter) getNetworksList(ctx context.Context, w http.ResponseWrit } list := []*types.NetworkResource{} - var nameFilter, idFilter bool - var names, ids []string - if names, nameFilter = netFilters["name"]; nameFilter { - for _, name := range names { - if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil { - list = append(list, buildNetworkResource(nw)) - } else { - logrus.Errorf("failed to get network for filter=%s : %v", name, err) - } + netFilters.WalkValues("name", func(name string) error { + if nw, err := n.backend.GetNetwork(name, daemon.NetworkByName); err == nil { + list = append(list, buildNetworkResource(nw)) + } else { + logrus.Errorf("failed to get network for filter=%s : %v", name, err) } - } + return nil + }) - if ids, idFilter = netFilters["id"]; idFilter { - for _, id := range ids { - for _, nw := range n.backend.GetNetworksByID(id) { - list = append(list, buildNetworkResource(nw)) - } + netFilters.WalkValues("id", func(id string) error { + for _, nw := range n.backend.GetNetworksByID(id) { + list = append(list, buildNetworkResource(nw)) } - } + return nil + }) - if !nameFilter && !idFilter { + if !netFilters.Include("name") && !netFilters.Include("id") { nwList := n.backend.GetNetworksByID("") for _, nw := range nwList { list = append(list, buildNetworkResource(nw)) diff --git a/daemon/daemon.go b/daemon/daemon.go index aea0eb5553..6211de2e8e 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -536,12 +536,11 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) { func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { // incoming container filter can be name, id or partial id, convert to // a full container id - for i, cn := range filter["container"] { + for _, cn := range filter.Get("container") { c, err := daemon.Get(cn) - if err != nil { - filter["container"][i] = "" - } else { - filter["container"][i] = c.ID + filter.Del("container", cn) + if err == nil { + filter.Add("container", c.ID) } } return events.NewFilter(filter, daemon.GetLabels) diff --git a/daemon/events/filter.go b/daemon/events/filter.go index ae7fba93d1..e15ca436d4 100644 --- a/daemon/events/filter.go +++ b/daemon/events/filter.go @@ -19,14 +19,14 @@ func NewFilter(filter filters.Args, getLabels func(id string) map[string]string) // Include returns true when the event ev is included by the filters func (ef *Filter) Include(ev *jsonmessage.JSONMessage) bool { - return isFieldIncluded(ev.Status, ef.filter["event"]) && - isFieldIncluded(ev.ID, ef.filter["container"]) && + return ef.filter.ExactMatch("event", ev.Status) && + ef.filter.ExactMatch("container", ev.ID) && ef.isImageIncluded(ev.ID, ev.From) && ef.isLabelFieldIncluded(ev.ID) } func (ef *Filter) isLabelFieldIncluded(id string) bool { - if _, ok := ef.filter["label"]; !ok { + if !ef.filter.Include("label") { return true } return ef.filter.MatchKVList("label", ef.getLabels(id)) @@ -37,31 +37,16 @@ func (ef *Filter) isLabelFieldIncluded(id string) bool { // from an image will be included in the image events. Also compare both // against the stripped repo name without any tags. func (ef *Filter) isImageIncluded(eventID string, eventFrom string) bool { - stripTag := func(image string) string { - ref, err := reference.ParseNamed(image) - if err != nil { - return image - } - return ref.Name() - } - - return isFieldIncluded(eventID, ef.filter["image"]) || - isFieldIncluded(eventFrom, ef.filter["image"]) || - isFieldIncluded(stripTag(eventID), ef.filter["image"]) || - isFieldIncluded(stripTag(eventFrom), ef.filter["image"]) + return ef.filter.ExactMatch("image", eventID) || + ef.filter.ExactMatch("image", eventFrom) || + ef.filter.ExactMatch("image", stripTag(eventID)) || + ef.filter.ExactMatch("image", stripTag(eventFrom)) } -func isFieldIncluded(field string, filter []string) bool { - if len(field) == 0 { - return true +func stripTag(image string) string { + ref, err := reference.ParseNamed(image) + if err != nil { + return image } - if len(filter) == 0 { - return true - } - for _, v := range filter { - if v == field { - return true - } - } - return false + return ref.Name() } diff --git a/daemon/images.go b/daemon/images.go index b4c506ee05..a2d3d7bac9 100644 --- a/daemon/images.go +++ b/daemon/images.go @@ -4,7 +4,6 @@ import ( "fmt" "path" "sort" - "strings" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" @@ -13,9 +12,9 @@ import ( "github.com/docker/docker/pkg/parsers/filters" ) -var acceptedImageFilterTags = map[string]struct{}{ - "dangling": {}, - "label": {}, +var acceptedImageFilterTags = map[string]bool{ + "dangling": true, + "label": true, } // byCreated is a temporary type used to sort a list of images by creation @@ -47,19 +46,15 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag if err != nil { return nil, err } - for name := range imageFilters { - if _, ok := acceptedImageFilterTags[name]; !ok { - return nil, fmt.Errorf("Invalid filter '%s'", name) - } + if err := imageFilters.Validate(acceptedImageFilterTags); err != nil { + return nil, err } - if i, ok := imageFilters["dangling"]; ok { - for _, value := range i { - if v := strings.ToLower(value); v == "true" { - danglingOnly = true - } else if v != "false" { - return nil, fmt.Errorf("Invalid filter 'dangling=%s'", v) - } + if imageFilters.Include("dangling") { + if imageFilters.ExactMatch("dangling", "true") { + danglingOnly = true + } else if !imageFilters.ExactMatch("dangling", "false") { + return nil, fmt.Errorf("Invalid filter 'dangling=%s'", imageFilters.Get("dangling")) } } @@ -82,9 +77,9 @@ func (daemon *Daemon) Images(filterArgs, filter string, all bool) ([]*types.Imag } for id, img := range allImages { - if _, ok := imageFilters["label"]; ok { + if imageFilters.Include("label") { + // Very old image that do not have image.Config (or even labels) if img.Config == nil { - // Very old image that do not have image.Config (or even labels) continue } // We are now sure image.Config is not nil diff --git a/daemon/list.go b/daemon/list.go index b64103a0b7..f289f2a66a 100644 --- a/daemon/list.go +++ b/daemon/list.go @@ -8,7 +8,6 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" - derr "github.com/docker/docker/errors" "github.com/docker/docker/image" "github.com/docker/docker/pkg/graphdb" "github.com/docker/docker/pkg/nat" @@ -136,64 +135,65 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error) } var filtExited []int - if i, ok := psFilters["exited"]; ok { - for _, value := range i { - code, err := strconv.Atoi(value) - if err != nil { - return nil, err - } - filtExited = append(filtExited, code) + err = psFilters.WalkValues("exited", func(value string) error { + code, err := strconv.Atoi(value) + if err != nil { + return err } + filtExited = append(filtExited, code) + return nil + }) + if err != nil { + return nil, err } - if i, ok := psFilters["status"]; ok { - for _, value := range i { - if !isValidStateString(value) { - return nil, errors.New("Unrecognised filter value for status") - } - - config.All = true + err = psFilters.WalkValues("status", func(value string) error { + if !isValidStateString(value) { + return fmt.Errorf("Unrecognised filter value for status: %s", value) } + + config.All = true + return nil + }) + if err != nil { + return nil, err } var beforeContFilter, sinceContFilter *Container - if i, ok := psFilters["before"]; ok { - for _, value := range i { - beforeContFilter, err = daemon.Get(value) - if err != nil { - return nil, err - } - } + err = psFilters.WalkValues("before", func(value string) error { + beforeContFilter, err = daemon.Get(value) + return err + }) + if err != nil { + return nil, err } - if i, ok := psFilters["since"]; ok { - for _, value := range i { - sinceContFilter, err = daemon.Get(value) - if err != nil { - return nil, err - } - } + err = psFilters.WalkValues("since", func(value string) error { + sinceContFilter, err = daemon.Get(value) + return err + }) + if err != nil { + return nil, err } imagesFilter := map[image.ID]bool{} var ancestorFilter bool - if ancestors, ok := psFilters["ancestor"]; ok { + if psFilters.Include("ancestor") { ancestorFilter = true - // The idea is to walk the graph down the most "efficient" way. - for _, ancestor := range ancestors { - // First, get the imageId of the ancestor filter (yay) + psFilters.WalkValues("ancestor", func(ancestor string) error { id, err := daemon.GetImageID(ancestor) if err != nil { logrus.Warnf("Error while looking up for image %v", ancestor) - continue + return nil } if imagesFilter[id] { // Already seen this ancestor, skip it - continue + return nil } // Then walk down the graph and put the imageIds in imagesFilter populateImageFilterByParents(imagesFilter, id, daemon.imageStore.Children) - } + return nil + }) } names := make(map[string][]string) @@ -202,14 +202,14 @@ func (daemon *Daemon) foldFilter(config *ContainersConfig) (*listContext, error) return nil }, 1) - if config.Before != "" { + if config.Before != "" && beforeContFilter == nil { beforeContFilter, err = daemon.Get(config.Before) if err != nil { return nil, err } } - if config.Since != "" { + if config.Since != "" && sinceContFilter == nil { sinceContFilter, err = daemon.Get(config.Since) if err != nil { return nil, err @@ -397,17 +397,8 @@ func (daemon *Daemon) Volumes(filter string) ([]*types.Volume, error) { return nil, err } - filterUsed := false - if i, ok := volFilters["dangling"]; ok { - if len(i) > 1 { - return nil, derr.ErrorCodeDanglingOne - } - - filterValue := i[0] - if strings.ToLower(filterValue) == "true" || filterValue == "1" { - filterUsed = true - } - } + filterUsed := volFilters.Include("dangling") && + (volFilters.ExactMatch("dangling", "true") || volFilters.ExactMatch("dangling", "1")) volumes := daemon.volumes.List() for _, v := range volumes { diff --git a/integration-cli/docker_api_network_test.go b/integration-cli/docker_api_network_test.go index 2180cd10e4..bd0b5e490b 100644 --- a/integration-cli/docker_api_network_test.go +++ b/integration-cli/docker_api_network_test.go @@ -230,9 +230,9 @@ func isNetworkAvailable(c *check.C, name string) bool { func getNetworkIDByName(c *check.C, name string) string { var ( v = url.Values{} - filterArgs = filters.Args{} + filterArgs = filters.NewArgs() ) - filterArgs["name"] = []string{name} + filterArgs.Add("name", name) filterJSON, err := filters.ToParam(filterArgs) c.Assert(err, checker.IsNil) v.Set("filters", filterJSON) diff --git a/integration-cli/docker_cli_images_test.go b/integration-cli/docker_cli_images_test.go index 027d8fe53c..9ed7762b0f 100644 --- a/integration-cli/docker_cli_images_test.go +++ b/integration-cli/docker_cli_images_test.go @@ -74,7 +74,7 @@ func (s *DockerSuite) TestImagesErrorWithInvalidFilterNameTest(c *check.C) { c.Assert(out, checker.Contains, "Invalid filter") } -func (s *DockerSuite) TestImagesFilterLabel(c *check.C) { +func (s *DockerSuite) TestImagesFilterLabelMatch(c *check.C) { testRequires(c, DaemonIsLinux) imageName1 := "images_filter_test1" imageName2 := "images_filter_test2" diff --git a/pkg/parsers/filters/parse.go b/pkg/parsers/filters/parse.go index 6c394f1607..7444201fae 100644 --- a/pkg/parsers/filters/parse.go +++ b/pkg/parsers/filters/parse.go @@ -5,6 +5,7 @@ package filters import ( "encoding/json" "errors" + "fmt" "regexp" "strings" ) @@ -15,7 +16,14 @@ import ( // in an slice. // e.g given -f 'label=label1=1' -f 'label=label2=2' -f 'image.name=ubuntu' // the args will be {'label': {'label1=1','label2=2'}, 'image.name', {'ubuntu'}} -type Args map[string][]string +type Args struct { + fields map[string]map[string]bool +} + +// NewArgs initializes a new Args struct. +func NewArgs() Args { + return Args{fields: map[string]map[string]bool{}} +} // ParseFlag parses the argument to the filter flag. Like // @@ -25,9 +33,6 @@ type Args map[string][]string // map is created. func ParseFlag(arg string, prev Args) (Args, error) { filters := prev - if prev == nil { - filters = Args{} - } if len(arg) == 0 { return filters, nil } @@ -37,9 +42,11 @@ func ParseFlag(arg string, prev Args) (Args, error) { } f := strings.SplitN(arg, "=", 2) + name := strings.ToLower(strings.TrimSpace(f[0])) value := strings.TrimSpace(f[1]) - filters[name] = append(filters[name], value) + + filters.Add(name, value) return filters, nil } @@ -50,11 +57,11 @@ var ErrBadFormat = errors.New("bad format of filter (expected name=value)") // ToParam packs the Args into an string for easy transport from client to server. func ToParam(a Args) (string, error) { // this way we don't URL encode {}, just empty space - if len(a) == 0 { + if a.Len() == 0 { return "", nil } - buf, err := json.Marshal(a) + buf, err := json.Marshal(a.fields) if err != nil { return "", err } @@ -63,23 +70,71 @@ func ToParam(a Args) (string, error) { // FromParam unpacks the filter Args. func FromParam(p string) (Args, error) { - args := Args{} if len(p) == 0 { - return args, nil + return NewArgs(), nil } - if err := json.NewDecoder(strings.NewReader(p)).Decode(&args); err != nil { - return nil, err + + r := strings.NewReader(p) + d := json.NewDecoder(r) + + m := map[string]map[string]bool{} + if err := d.Decode(&m); err != nil { + r.Seek(0, 0) + + // Allow parsing old arguments in slice format. + // Because other libraries might be sending them in this format. + deprecated := map[string][]string{} + if deprecatedErr := d.Decode(&deprecated); deprecatedErr == nil { + m = deprecatedArgs(deprecated) + } else { + return NewArgs(), err + } } - return args, nil + return Args{m}, nil +} + +// Get returns the list of values associates with a field. +// It returns a slice of strings to keep backwards compatibility with old code. +func (filters Args) Get(field string) []string { + values := filters.fields[field] + if values == nil { + return make([]string, 0) + } + slice := make([]string, 0, len(values)) + for key := range values { + slice = append(slice, key) + } + return slice +} + +// Add adds a new value to a filter field. +func (filters Args) Add(name, value string) { + if _, ok := filters.fields[name]; ok { + filters.fields[name][value] = true + } else { + filters.fields[name] = map[string]bool{value: true} + } +} + +// Del removes a value from a filter field. +func (filters Args) Del(name, value string) { + if _, ok := filters.fields[name]; ok { + delete(filters.fields[name], value) + } +} + +// Len returns the number of fields in the arguments. +func (filters Args) Len() int { + return len(filters.fields) } // MatchKVList returns true if the values for the specified field maches the ones // from the sources. // e.g. given Args are {'label': {'label1=1','label2=1'}, 'image.name', {'ubuntu'}}, -// field is 'label' and sources are {'label':{'label1=1','label2=2','label3=3'}} +// field is 'label' and sources are {'label1': '1', 'label2': '2'} // it returns true. func (filters Args) MatchKVList(field string, sources map[string]string) bool { - fieldValues := filters[field] + fieldValues := filters.fields[field] //do not filter if there is no filter set or cannot determine filter if len(fieldValues) == 0 { @@ -90,21 +145,16 @@ func (filters Args) MatchKVList(field string, sources map[string]string) bool { return false } -outer: - for _, name2match := range fieldValues { + for name2match := range fieldValues { testKV := strings.SplitN(name2match, "=", 2) - for k, v := range sources { - if len(testKV) == 1 { - if k == testKV[0] { - continue outer - } - } else if k == testKV[0] && v == testKV[1] { - continue outer - } + v, ok := sources[testKV[0]] + if !ok { + return false + } + if len(testKV) == 2 && testKV[1] != v { + return false } - - return false } return true @@ -115,13 +165,12 @@ outer: // field is 'image.name' and source is 'ubuntu' // it returns true. func (filters Args) Match(field, source string) bool { - fieldValues := filters[field] - - //do not filter if there is no filter set or cannot determine filter - if len(fieldValues) == 0 { + if filters.ExactMatch(field, source) { return true } - for _, name2match := range fieldValues { + + fieldValues := filters.fields[field] + for name2match := range fieldValues { match, err := regexp.MatchString(name2match, source) if err != nil { continue @@ -132,3 +181,61 @@ func (filters Args) Match(field, source string) bool { } return false } + +// ExactMatch returns true if the source matches exactly one of the filters. +func (filters Args) ExactMatch(field, source string) bool { + fieldValues, ok := filters.fields[field] + //do not filter if there is no filter set or cannot determine filter + if !ok || len(fieldValues) == 0 { + return true + } + + // try to march full name value to avoid O(N) regular expression matching + if fieldValues[source] { + return true + } + return false +} + +// Include returns true if the name of the field to filter is in the filters. +func (filters Args) Include(field string) bool { + _, ok := filters.fields[field] + return ok +} + +// Validate ensures that all the fields in the filter are valid. +// It returns an error as soon as it finds an invalid field. +func (filters Args) Validate(accepted map[string]bool) error { + for name := range filters.fields { + if !accepted[name] { + return fmt.Errorf("Invalid filter '%s'", name) + } + } + return nil +} + +// WalkValues iterates over the list of filtered values for a field. +// It stops the iteration if it finds an error and it returns that error. +func (filters Args) WalkValues(field string, op func(value string) error) error { + if _, ok := filters.fields[field]; !ok { + return nil + } + for v := range filters.fields[field] { + if err := op(v); err != nil { + return err + } + } + return nil +} + +func deprecatedArgs(d map[string][]string) map[string]map[string]bool { + m := map[string]map[string]bool{} + for k, v := range d { + values := map[string]bool{} + for _, vv := range v { + values[vv] = true + } + m[k] = values + } + return m +} diff --git a/pkg/parsers/filters/parse_test.go b/pkg/parsers/filters/parse_test.go index eb9fcef90f..308d1bcdb8 100644 --- a/pkg/parsers/filters/parse_test.go +++ b/pkg/parsers/filters/parse_test.go @@ -1,7 +1,7 @@ package filters import ( - "sort" + "fmt" "testing" ) @@ -13,7 +13,7 @@ func TestParseArgs(t *testing.T) { "image.name=*untu", } var ( - args = Args{} + args = NewArgs() err error ) for i := range flagArgs { @@ -22,10 +22,10 @@ func TestParseArgs(t *testing.T) { t.Errorf("failed to parse %s: %s", flagArgs[i], err) } } - if len(args["created"]) != 1 { + if len(args.Get("created")) != 1 { t.Errorf("failed to set this arg") } - if len(args["image.name"]) != 2 { + if len(args.Get("image.name")) != 2 { t.Errorf("the args should have collapsed") } } @@ -36,7 +36,7 @@ func TestParseArgsEdgeCase(t *testing.T) { if err != nil { t.Fatal(err) } - if args == nil || len(args) != 0 { + if args.Len() != 0 { t.Fatalf("Expected an empty Args (map), got %v", args) } if args, err = ParseFlag("anything", args); err == nil || err != ErrBadFormat { @@ -45,10 +45,11 @@ func TestParseArgsEdgeCase(t *testing.T) { } func TestToParam(t *testing.T) { - a := Args{ - "created": []string{"today"}, - "image.name": []string{"ubuntu*", "*untu"}, + fields := map[string]map[string]bool{ + "created": {"today": true}, + "image.name": {"ubuntu*": true, "*untu": true}, } + a := Args{fields: fields} _, err := ToParam(a) if err != nil { @@ -63,42 +64,48 @@ func TestFromParam(t *testing.T) { "{'key': 'value'}", `{"key": "value"}`, } - valids := map[string]Args{ - `{"key": ["value"]}`: { - "key": {"value"}, + valid := map[*Args][]string{ + &Args{fields: map[string]map[string]bool{"key": {"value": true}}}: { + `{"key": ["value"]}`, + `{"key": {"value": true}}`, }, - `{"key": ["value1", "value2"]}`: { - "key": {"value1", "value2"}, + &Args{fields: map[string]map[string]bool{"key": {"value1": true, "value2": true}}}: { + `{"key": ["value1", "value2"]}`, + `{"key": {"value1": true, "value2": true}}`, }, - `{"key1": ["value1"], "key2": ["value2"]}`: { - "key1": {"value1"}, - "key2": {"value2"}, + &Args{fields: map[string]map[string]bool{"key1": {"value1": true}, "key2": {"value2": true}}}: { + `{"key1": ["value1"], "key2": ["value2"]}`, + `{"key1": {"value1": true}, "key2": {"value2": true}}`, }, } + for _, invalid := range invalids { if _, err := FromParam(invalid); err == nil { t.Fatalf("Expected an error with %v, got nothing", invalid) } } - for json, expectedArgs := range valids { - args, err := FromParam(json) - if err != nil { - t.Fatal(err) - } - if len(args) != len(expectedArgs) { - t.Fatalf("Expected %v, go %v", expectedArgs, args) - } - for key, expectedValues := range expectedArgs { - values := args[key] - sort.Strings(values) - sort.Strings(expectedValues) - if len(values) != len(expectedValues) { + + for expectedArgs, matchers := range valid { + for _, json := range matchers { + args, err := FromParam(json) + if err != nil { + t.Fatal(err) + } + if args.Len() != expectedArgs.Len() { t.Fatalf("Expected %v, go %v", expectedArgs, args) } - for index, expectedValue := range expectedValues { - if values[index] != expectedValue { + for key, expectedValues := range expectedArgs.fields { + values := args.Get(key) + + if len(values) != len(expectedValues) { t.Fatalf("Expected %v, go %v", expectedArgs, args) } + + for _, v := range values { + if !expectedValues[v] { + t.Fatalf("Expected %v, go %v", expectedArgs, args) + } + } } } } @@ -114,54 +121,63 @@ func TestEmpty(t *testing.T) { if err != nil { t.Errorf("%s", err) } - if len(a) != len(v1) { + if a.Len() != v1.Len() { t.Errorf("these should both be empty sets") } } -func TestArgsMatchKVList(t *testing.T) { - // empty sources - args := Args{ - "created": []string{"today"}, +func TestArgsMatchKVListEmptySources(t *testing.T) { + args := NewArgs() + if !args.MatchKVList("created", map[string]string{}) { + t.Fatalf("Expected true for (%v,created), got true", args) } + + args = Args{map[string]map[string]bool{"created": {"today": true}}} if args.MatchKVList("created", map[string]string{}) { t.Fatalf("Expected false for (%v,created), got true", args) } +} + +func TestArgsMatchKVList(t *testing.T) { // Not empty sources sources := map[string]string{ "key1": "value1", "key2": "value2", "key3": "value3", } + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1": true}}, }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value1"}, - }: "labels", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"today"}, - }: "created", - &Args{ - "created": []string{"today"}, - "labels": []string{"key4"}, - }: "labels", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1=value3"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value1": true}}, }: "labels", } + for args, field := range matches { if args.MatchKVList(field, sources) != true { t.Fatalf("Expected true for %v on %v, got false", sources, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key4": true}}, + }: "labels", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}, + "labels": map[string]bool{"key1=value3": true}}, + }: "labels", + } + for args, field := range differs { if args.MatchKVList(field, sources) != false { t.Fatalf("Expected false for %v on %v, got true", sources, args) @@ -171,48 +187,165 @@ func TestArgsMatchKVList(t *testing.T) { func TestArgsMatch(t *testing.T) { source := "today" + matches := map[*Args]string{ &Args{}: "field", - &Args{ - "created": []string{"today"}, - "labels": []string{"key1"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today": true}}, }: "today", - &Args{ - "created": []string{"to*"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to*": true}}, }: "created", - &Args{ - "created": []string{"to(.*)"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(.*)": true}}, }: "created", - &Args{ - "created": []string{"tod"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tod": true}}, }: "created", - &Args{ - "created": []string{"anything", "to*"}, - }: "created", - } - differs := map[*Args]string{ - &Args{ - "created": []string{"tomorrow"}, - }: "created", - &Args{ - "created": []string{"to(day"}, - }: "created", - &Args{ - "created": []string{"tom(.*)"}, - }: "created", - &Args{ - "created": []string{"today1"}, - "labels": []string{"today"}, + &Args{map[string]map[string]bool{ + "created": map[string]bool{"anyting": true, "to*": true}}, }: "created", } + for args, field := range matches { if args.Match(field, source) != true { t.Fatalf("Expected true for %v on %v, got false", source, args) } } + + differs := map[*Args]string{ + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tomorrow": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"to(day": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom(.*)": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"tom": true}}, + }: "created", + &Args{map[string]map[string]bool{ + "created": map[string]bool{"today1": true}, + "labels": map[string]bool{"today": true}}, + }: "created", + } + for args, field := range differs { if args.Match(field, source) != false { t.Fatalf("Expected false for %v on %v, got true", source, args) } } } + +func TestAdd(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + v := f.fields["status"] + if len(v) != 1 || !v["running"] { + t.Fatalf("Expected to include a running status, got %v", v) + } + + f.Add("status", "paused") + if len(v) != 2 || !v["paused"] { + t.Fatalf("Expected to include a paused status, got %v", v) + } +} + +func TestDel(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Del("status", "running") + v := f.fields["status"] + if v["running"] { + t.Fatalf("Expected to not include a running status filter, got true") + } +} + +func TestLen(t *testing.T) { + f := NewArgs() + if f.Len() != 0 { + t.Fatalf("Expected to not include any field") + } + f.Add("status", "running") + if f.Len() != 1 { + t.Fatalf("Expected to include one field") + } +} + +func TestExactMatch(t *testing.T) { + f := NewArgs() + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` when there are no filters, got false") + } + + f.Add("status", "running") + f.Add("status", "pause*") + + if !f.ExactMatch("status", "running") { + t.Fatalf("Expected to match `running` with one of the filters, got false") + } + + if f.ExactMatch("status", "paused") { + t.Fatalf("Expected to not match `paused` with one of the filters, got true") + } +} + +func TestInclude(t *testing.T) { + f := NewArgs() + if f.Include("status") { + t.Fatalf("Expected to not include a status key, got true") + } + f.Add("status", "running") + if !f.Include("status") { + t.Fatalf("Expected to include a status key, got false") + } +} + +func TestValidate(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + + valid := map[string]bool{ + "status": true, + "dangling": true, + } + + if err := f.Validate(valid); err != nil { + t.Fatal(err) + } + + f.Add("bogus", "running") + if err := f.Validate(valid); err == nil { + t.Fatalf("Expected to return an error, got nil") + } +} + +func TestWalkValues(t *testing.T) { + f := NewArgs() + f.Add("status", "running") + f.Add("status", "paused") + + f.WalkValues("status", func(value string) error { + if value != "running" && value != "paused" { + t.Fatalf("Unexpected value %s", value) + } + return nil + }) + + err := f.WalkValues("status", func(value string) error { + return fmt.Errorf("return") + }) + if err == nil { + t.Fatalf("Expected to get an error, got nil") + } + + err = f.WalkValues("foo", func(value string) error { + return fmt.Errorf("return") + }) + if err != nil { + t.Fatalf("Expected to not iterate when the field doesn't exist, got %v", err) + } +} From 434d2e8745696255a204d9eefc6a2854ff74e5c2 Mon Sep 17 00:00:00 2001 From: David Calavera Date: Wed, 25 Nov 2015 21:03:10 -0500 Subject: [PATCH 2/2] Add PubSub topics. A TopicFunc is an interface to let the pubisher decide whether it needs to send a message to a subscriber or not. It returns true if the publisher must send the message and false otherwise. Users of the pubsub package can create a subscriber with a topic function by calling `pubsub.SubscribeTopic`. Message delivery has also been modified to use concurrent channels per subscriber. That way, topic verification and message delivery is not o(N+M) anymore, based on the number of subscribers and topic verification complexity. Using pubsub topics, the API stops controlling the message delivery, delegating that function to a topic generated with the filtering provided by the user. The publisher sends every message to the subscriber if there is no filter, but the api doesn't have to select messages to return anymore. Signed-off-by: David Calavera --- api/server/router/local/info.go | 26 +++--------- daemon/daemon.go | 15 +++++-- daemon/events/events.go | 42 ++++++++++++++++++- integration-cli/docker_cli_build_test.go | 10 ++--- pkg/pubsub/publisher.go | 52 ++++++++++++++++-------- 5 files changed, 96 insertions(+), 49 deletions(-) diff --git a/api/server/router/local/info.go b/api/server/router/local/info.go index 1ebcae91b7..00314212bd 100644 --- a/api/server/router/local/info.go +++ b/api/server/router/local/info.go @@ -92,27 +92,11 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R enc := json.NewEncoder(output) - current, l, cancel := s.daemon.SubscribeToEvents() - defer cancel() + buffered, l := s.daemon.SubscribeToEvents(since, sinceNano, ef) + defer s.daemon.UnsubscribeFromEvents(l) - eventFilter := s.daemon.GetEventFilter(ef) - handleEvent := func(ev *jsonmessage.JSONMessage) error { - if eventFilter.Include(ev) { - if err := enc.Encode(ev); err != nil { - return err - } - } - return nil - } - - if since == -1 { - current = nil - } - for _, ev := range current { - if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { - continue - } - if err := handleEvent(ev); err != nil { + for _, ev := range buffered { + if err := enc.Encode(ev); err != nil { return err } } @@ -129,7 +113,7 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R if !ok { continue } - if err := handleEvent(jev); err != nil { + if err := enc.Encode(jev); err != nil { return err } case <-timer.C: diff --git a/daemon/daemon.go b/daemon/daemon.go index 6211de2e8e..d5e5540aff 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -532,8 +532,8 @@ func (daemon *Daemon) GetByName(name string) (*Container, error) { return e, nil } -// GetEventFilter returns a filters.Filter for a set of filters -func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { +// getEventFilter returns a filters.Filter for a set of filters +func (daemon *Daemon) getEventFilter(filter filters.Args) *events.Filter { // incoming container filter can be name, id or partial id, convert to // a full container id for _, cn := range filter.Get("container") { @@ -547,8 +547,15 @@ func (daemon *Daemon) GetEventFilter(filter filters.Args) *events.Filter { } // SubscribeToEvents returns the currently record of events, a channel to stream new events from, and a function to cancel the stream of events. -func (daemon *Daemon) SubscribeToEvents() ([]*jsonmessage.JSONMessage, chan interface{}, func()) { - return daemon.EventsService.Subscribe() +func (daemon *Daemon) SubscribeToEvents(since, sinceNano int64, filter filters.Args) ([]*jsonmessage.JSONMessage, chan interface{}) { + ef := daemon.getEventFilter(filter) + return daemon.EventsService.SubscribeTopic(since, sinceNano, ef) +} + +// UnsubscribeFromEvents stops the event subscription for a client by closing the +// channel where the daemon sends events to. +func (daemon *Daemon) UnsubscribeFromEvents(listener chan interface{}) { + daemon.EventsService.Evict(listener) } // GetLabels for a container or image id diff --git a/daemon/events/events.go b/daemon/events/events.go index 996c856f42..3674170fe3 100644 --- a/daemon/events/events.go +++ b/daemon/events/events.go @@ -8,7 +8,10 @@ import ( "github.com/docker/docker/pkg/pubsub" ) -const eventsLimit = 64 +const ( + eventsLimit = 64 + bufferSize = 1024 +) // Events is pubsub channel for *jsonmessage.JSONMessage type Events struct { @@ -21,7 +24,7 @@ type Events struct { func New() *Events { return &Events{ events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), - pub: pubsub.NewPublisher(100*time.Millisecond, 1024), + pub: pubsub.NewPublisher(100*time.Millisecond, bufferSize), } } @@ -42,6 +45,41 @@ func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}, func return current, l, cancel } +// SubscribeTopic adds new listener to events, returns slice of 64 stored +// last events, a channel in which you can expect new events (in form +// of interface{}, so you need type assertion). +func (e *Events) SubscribeTopic(since, sinceNano int64, ef *Filter) ([]*jsonmessage.JSONMessage, chan interface{}) { + e.mu.Lock() + defer e.mu.Unlock() + + var buffered []*jsonmessage.JSONMessage + topic := func(m interface{}) bool { + return ef.Include(m.(*jsonmessage.JSONMessage)) + } + + if since != -1 { + for i := len(e.events) - 1; i >= 0; i-- { + ev := e.events[i] + if ev.Time < since || ((ev.Time == since) && (ev.TimeNano < sinceNano)) { + break + } + if ef.filter.Len() == 0 || topic(ev) { + buffered = append([]*jsonmessage.JSONMessage{ev}, buffered...) + } + } + } + + var ch chan interface{} + if ef.filter.Len() > 0 { + ch = e.pub.SubscribeTopic(topic) + } else { + // Subscribe to all events if there are no filters + ch = e.pub.Subscribe() + } + + return buffered, ch +} + // Evict evicts listener from pubsub func (e *Events) Evict(l chan interface{}) { e.pub.Evict(l) diff --git a/integration-cli/docker_cli_build_test.go b/integration-cli/docker_cli_build_test.go index d4b31bd38f..1707680dba 100644 --- a/integration-cli/docker_cli_build_test.go +++ b/integration-cli/docker_cli_build_test.go @@ -1891,9 +1891,7 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { startEpoch := daemonTime(c).Unix() // Watch for events since epoch. - eventsCmd := exec.Command( - dockerBinary, "events", - "--since", strconv.FormatInt(startEpoch, 10)) + eventsCmd := exec.Command(dockerBinary, "events", "--since", strconv.FormatInt(startEpoch, 10)) stdout, err := eventsCmd.StdoutPipe() if err != nil { c.Fatal(err) @@ -1932,12 +1930,12 @@ func (s *DockerSuite) TestBuildCancellationKillsSleep(c *check.C) { c.Fatalf("failed to run build: %s", err) } - matchCID := regexp.MustCompile("Running in ") + matchCID := regexp.MustCompile("Running in (.+)") scanner := bufio.NewScanner(stdoutBuild) for scanner.Scan() { line := scanner.Text() - if ok := matchCID.MatchString(line); ok { - containerID <- line[len(line)-12:] + if matches := matchCID.FindStringSubmatch(line); len(matches) > 0 { + containerID <- matches[1] break } } diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index ab457cfba9..8529ffa322 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -13,11 +13,12 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { return &Publisher{ buffer: buffer, timeout: publishTimeout, - subscribers: make(map[subscriber]struct{}), + subscribers: make(map[subscriber]topicFunc), } } type subscriber chan interface{} +type topicFunc func(v interface{}) bool // Publisher is basic pub/sub structure. Allows to send events and subscribe // to them. Can be safely used from multiple goroutines. @@ -25,7 +26,7 @@ type Publisher struct { m sync.RWMutex buffer int timeout time.Duration - subscribers map[subscriber]struct{} + subscribers map[subscriber]topicFunc } // Len returns the number of subscribers for the publisher @@ -38,9 +39,14 @@ func (p *Publisher) Len() int { // Subscribe adds a new subscriber to the publisher returning the channel. func (p *Publisher) Subscribe() chan interface{} { + return p.SubscribeTopic(nil) +} + +// SubscribeTopic adds a new subscriber that filters messages sent by a topic. +func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { ch := make(chan interface{}, p.buffer) p.m.Lock() - p.subscribers[ch] = struct{}{} + p.subscribers[ch] = topic p.m.Unlock() return ch } @@ -56,20 +62,13 @@ func (p *Publisher) Evict(sub chan interface{}) { // Publish sends the data in v to all subscribers currently registered with the publisher. func (p *Publisher) Publish(v interface{}) { p.m.RLock() - for sub := range p.subscribers { - // send under a select as to not block if the receiver is unavailable - if p.timeout > 0 { - select { - case sub <- v: - case <-time.After(p.timeout): - } - continue - } - select { - case sub <- v: - default: - } + wg := new(sync.WaitGroup) + for sub, topic := range p.subscribers { + wg.Add(1) + + go p.sendTopic(sub, topic, v, wg) } + wg.Wait() p.m.RUnlock() } @@ -82,3 +81,24 @@ func (p *Publisher) Close() { } p.m.Unlock() } + +func (p *Publisher) sendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) { + defer wg.Done() + if topic != nil && !topic(v) { + return + } + + // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + return + } + + select { + case sub <- v: + default: + } +}