From 868f56e0839cb47223a7d988f21ae623d4ea9c6e Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 3 Apr 2015 13:58:56 -0700 Subject: [PATCH 1/4] New package daemon/events Signed-off-by: Alexander Morozov --- daemon/events/events.go | 66 +++++++++++++++++ daemon/events/events_test.go | 135 +++++++++++++++++++++++++++++++++++ 2 files changed, 201 insertions(+) create mode 100644 daemon/events/events.go create mode 100644 daemon/events/events_test.go diff --git a/daemon/events/events.go b/daemon/events/events.go new file mode 100644 index 0000000000..07ee29a346 --- /dev/null +++ b/daemon/events/events.go @@ -0,0 +1,66 @@ +package events + +import ( + "sync" + "time" + + "github.com/docker/docker/pkg/jsonmessage" + "github.com/docker/docker/pkg/pubsub" +) + +const eventsLimit = 64 + +// Events is pubsub channel for *jsonmessage.JSONMessage +type Events struct { + mu sync.Mutex + events []*jsonmessage.JSONMessage + pub *pubsub.Publisher +} + +// New returns new *Events instance +func New() *Events { + return &Events{ + events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), + pub: pubsub.NewPublisher(100*time.Millisecond, 1024), + } +} + +// Subscribe adds new listener to events, returns slice of 64 stored last events +// channel in which you can expect new events in form of interface{}, so you +// need type assertion. +func (e *Events) Subscribe() ([]*jsonmessage.JSONMessage, chan interface{}) { + e.mu.Lock() + current := make([]*jsonmessage.JSONMessage, len(e.events)) + copy(current, e.events) + l := e.pub.Subscribe() + e.mu.Unlock() + return current, l +} + +// Evict evicts listener from pubsub +func (e *Events) Evict(l chan interface{}) { + e.pub.Evict(l) +} + +// Log broadcasts event to listeners. Each listener has 100 millisecond for +// receiving event or it will be skipped. +func (e *Events) Log(action, id, from string) { + go func() { + e.mu.Lock() + jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: time.Now().UTC().Unix()} + if len(e.events) == cap(e.events) { + // discard oldest event + copy(e.events, e.events[1:]) + e.events[len(e.events)-1] = jm + } else { + e.events = append(e.events, jm) + } + e.mu.Unlock() + e.pub.Publish(jm) + }() +} + +// SubscribersCount returns number of event listeners +func (e *Events) SubscribersCount() int { + return e.pub.Len() +} diff --git a/daemon/events/events_test.go b/daemon/events/events_test.go new file mode 100644 index 0000000000..7aa8d9facc --- /dev/null +++ b/daemon/events/events_test.go @@ -0,0 +1,135 @@ +package events + +import ( + "fmt" + "testing" + "time" + + "github.com/docker/docker/pkg/jsonmessage" +) + +func TestEventsLog(t *testing.T) { + e := New() + _, l1 := e.Subscribe() + _, l2 := e.Subscribe() + defer e.Evict(l1) + defer e.Evict(l2) + count := e.SubscribersCount() + if count != 2 { + t.Fatalf("Must be 2 subscribers, got %d", count) + } + e.Log("test", "cont", "image") + select { + case msg := <-l1: + jmsg, ok := msg.(*jsonmessage.JSONMessage) + if !ok { + t.Fatalf("Unexpected type %T", msg) + } + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if jmsg.Status != "test" { + t.Fatalf("Status should be test, got %s", jmsg.Status) + } + if jmsg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", jmsg.ID) + } + if jmsg.From != "image" { + t.Fatalf("From should be image, got %s", jmsg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } + select { + case msg := <-l2: + jmsg, ok := msg.(*jsonmessage.JSONMessage) + if !ok { + t.Fatalf("Unexpected type %T", msg) + } + if len(e.events) != 1 { + t.Fatalf("Must be only one event, got %d", len(e.events)) + } + if jmsg.Status != "test" { + t.Fatalf("Status should be test, got %s", jmsg.Status) + } + if jmsg.ID != "cont" { + t.Fatalf("ID should be cont, got %s", jmsg.ID) + } + if jmsg.From != "image" { + t.Fatalf("From should be image, got %s", jmsg.From) + } + case <-time.After(1 * time.Second): + t.Fatal("Timeout waiting for broadcasted message") + } +} + +func TestEventsLogTimeout(t *testing.T) { + e := New() + _, l := e.Subscribe() + defer e.Evict(l) + + c := make(chan struct{}) + go func() { + e.Log("test", "cont", "image") + close(c) + }() + + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout publishing message") + } +} + +func TestLogEvents(t *testing.T) { + e := New() + + for i := 0; i < eventsLimit+16; i++ { + action := fmt.Sprintf("action_%d", i) + id := fmt.Sprintf("cont_%d", i) + from := fmt.Sprintf("image_%d", i) + e.Log(action, id, from) + } + time.Sleep(50 * time.Millisecond) + current, l := e.Subscribe() + for i := 0; i < 10; i++ { + num := i + eventsLimit + 16 + action := fmt.Sprintf("action_%d", num) + id := fmt.Sprintf("cont_%d", num) + from := fmt.Sprintf("image_%d", num) + e.Log(action, id, from) + } + if len(e.events) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) + } + + var msgs []*jsonmessage.JSONMessage + for len(msgs) < 10 { + m := <-l + jm, ok := (m).(*jsonmessage.JSONMessage) + if !ok { + t.Fatalf("Unexpected type %T", m) + } + msgs = append(msgs, jm) + } + if len(current) != eventsLimit { + t.Fatalf("Must be %d events, got %d", eventsLimit, len(current)) + } + first := current[0] + if first.Status != "action_16" { + t.Fatalf("First action is %s, must be action_16", first.Status) + } + last := current[len(current)-1] + if last.Status != "action_79" { + t.Fatalf("Last action is %s, must be action_79", last.Status) + } + + firstC := msgs[0] + if firstC.Status != "action_80" { + t.Fatalf("First action is %s, must be action_80", firstC.Status) + } + lastC := msgs[len(msgs)-1] + if lastC.Status != "action_89" { + t.Fatalf("Last action is %s, must be action_89", lastC.Status) + } +} From c9eb37f9752d72d9a4280d703368e5e73adfffa1 Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 3 Apr 2015 15:17:49 -0700 Subject: [PATCH 2/4] Remove engine usage for events Signed-off-by: Alexander Morozov --- api/server/server.go | 106 +++++++++++++++++++++++++++++++-- api/server/server_unit_test.go | 41 ------------- builtins/builtins.go | 4 -- daemon/container.go | 8 ++- daemon/daemon.go | 6 +- daemon/image_delete.go | 3 +- daemon/info.go | 7 +-- graph/import.go | 6 +- graph/pull.go | 8 +-- graph/tags.go | 5 +- graph/tags_unit_test.go | 3 +- 11 files changed, 123 insertions(+), 74 deletions(-) diff --git a/api/server/server.go b/api/server/server.go index 502f1ae9e1..795ca080e4 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -3,6 +3,7 @@ package server import ( "bufio" "bytes" + "time" "encoding/base64" "encoding/json" @@ -23,7 +24,9 @@ import ( "github.com/docker/docker/daemon" "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/engine" + "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/version" @@ -324,13 +327,104 @@ func getEvents(eng *engine.Engine, version version.Version, w http.ResponseWrite if err := parseForm(r); err != nil { return err } + var since int64 = -1 + if r.Form.Get("since") != "" { + s, err := strconv.ParseInt(r.Form.Get("since"), 10, 64) + if err != nil { + return err + } + since = s + } - var job = eng.Job("events") - streamJSON(job, w, true) - job.Setenv("since", r.Form.Get("since")) - job.Setenv("until", r.Form.Get("until")) - job.Setenv("filters", r.Form.Get("filters")) - return job.Run() + var until int64 = -1 + if r.Form.Get("until") != "" { + u, err := strconv.ParseInt(r.Form.Get("until"), 10, 64) + if err != nil { + return err + } + until = u + } + timer := time.NewTimer(0) + timer.Stop() + if until > 0 { + dur := time.Unix(until, 0).Sub(time.Now()) + timer = time.NewTimer(dur) + } + + ef, err := filters.FromParam(r.Form.Get("filters")) + if err != nil { + return err + } + + isFiltered := func(field string, filter []string) bool { + if len(filter) == 0 { + return false + } + for _, v := range filter { + if v == field { + return false + } + if strings.Contains(field, ":") { + image := strings.Split(field, ":") + if image[0] == v { + return false + } + } + } + return true + } + + d := getDaemon(eng) + es := d.EventsService + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(utils.NewWriteFlusher(w)) + + getContainerId := func(cn string) string { + c, err := d.Get(cn) + if err != nil { + return "" + } + return c.ID + } + + sendEvent := func(ev *jsonmessage.JSONMessage) error { + //incoming container filter can be name,id or partial id, convert and replace as a full container id + for i, cn := range ef["container"] { + ef["container"][i] = getContainerId(cn) + } + + if isFiltered(ev.Status, ef["event"]) || isFiltered(ev.From, ef["image"]) || + isFiltered(ev.ID, ef["container"]) { + return nil + } + + return enc.Encode(ev) + } + + current, l := es.Subscribe() + defer es.Evict(l) + for _, ev := range current { + if ev.Time < since { + continue + } + if err := sendEvent(ev); err != nil { + return err + } + } + for { + select { + case ev := <-l: + jev, ok := ev.(*jsonmessage.JSONMessage) + if !ok { + continue + } + if err := sendEvent(jev); err != nil { + return err + } + case <-timer.C: + return nil + } + } } func getImagesHistory(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/api/server/server_unit_test.go b/api/server/server_unit_test.go index f83b5cc54f..8e8409ffa7 100644 --- a/api/server/server_unit_test.go +++ b/api/server/server_unit_test.go @@ -250,47 +250,6 @@ func TestGetContainersByName(t *testing.T) { } } -func TestGetEvents(t *testing.T) { - eng := engine.New() - var called bool - eng.Register("events", func(job *engine.Job) error { - called = true - since := job.Getenv("since") - if since != "1" { - t.Fatalf("'since' should be 1, found %#v instead", since) - } - until := job.Getenv("until") - if until != "0" { - t.Fatalf("'until' should be 0, found %#v instead", until) - } - v := &engine.Env{} - v.Set("since", since) - v.Set("until", until) - if _, err := v.WriteTo(job.Stdout); err != nil { - return err - } - return nil - }) - r := serveRequest("GET", "/events?since=1&until=0", nil, eng, t) - if !called { - t.Fatal("handler was not called") - } - assertContentType(r, "application/json", t) - var stdoutJSON struct { - Since int - Until int - } - if err := json.Unmarshal(r.Body.Bytes(), &stdoutJSON); err != nil { - t.Fatal(err) - } - if stdoutJSON.Since != 1 { - t.Errorf("since != 1: %#v", stdoutJSON.Since) - } - if stdoutJSON.Until != 0 { - t.Errorf("until != 0: %#v", stdoutJSON.Until) - } -} - func TestLogs(t *testing.T) { eng := engine.New() var inspect bool diff --git a/builtins/builtins.go b/builtins/builtins.go index d87bdb87ad..149d350090 100644 --- a/builtins/builtins.go +++ b/builtins/builtins.go @@ -8,7 +8,6 @@ import ( "github.com/docker/docker/autogen/dockerversion" "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/engine" - "github.com/docker/docker/events" "github.com/docker/docker/pkg/parsers/kernel" ) @@ -19,9 +18,6 @@ func Register(eng *engine.Engine) error { if err := remote(eng); err != nil { return err } - if err := events.New().Install(eng); err != nil { - return err - } if err := eng.Register("version", dockerVersion); err != nil { return err } diff --git a/daemon/container.go b/daemon/container.go index 228944d2d4..2d9487ea68 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -200,9 +200,11 @@ func (container *Container) WriteHostConfig() error { func (container *Container) LogEvent(action string) { d := container.daemon - if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.ImageID)).Run(); err != nil { - logrus.Errorf("Error logging event %s for %s: %s", action, container.ID, err) - } + d.EventsService.Log( + action, + container.ID, + d.Repositories().ImageName(container.ImageID), + ) } func (container *Container) getResourcePath(path string) (string, error) { diff --git a/daemon/daemon.go b/daemon/daemon.go index a072fc34a4..460625c144 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -19,6 +19,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api" "github.com/docker/docker/autogen/dockerversion" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver/execdrivers" "github.com/docker/docker/daemon/execdriver/lxc" @@ -109,6 +110,7 @@ type Daemon struct { statsCollector *statsCollector defaultLogConfig runconfig.LogConfig RegistryService *registry.Service + EventsService *events.Events } // Install installs daemon capabilities to eng. @@ -932,8 +934,9 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService return nil, err } + eventsService := events.New() logrus.Debug("Creating repository list") - repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService) + repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService, eventsService) if err != nil { return nil, fmt.Errorf("Couldn't create Tag store: %s", err) } @@ -1025,6 +1028,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService statsCollector: newStatsCollector(1 * time.Second), defaultLogConfig: config.LogConfig, RegistryService: registryService, + EventsService: eventsService, } eng.OnShutdown(func() { diff --git a/daemon/image_delete.go b/daemon/image_delete.go index bf3d7ba9ce..6323d323e4 100644 --- a/daemon/image_delete.go +++ b/daemon/image_delete.go @@ -108,7 +108,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types *list = append(*list, types.ImageDelete{ Untagged: utils.ImageReference(repoName, tag), }) - eng.Job("log", "untag", img.ID, "").Run() + daemon.EventsService.Log("untag", img.ID, "") } } tags = daemon.Repositories().ByID()[img.ID] @@ -123,6 +123,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types *list = append(*list, types.ImageDelete{ Deleted: img.ID, }) + daemon.EventsService.Log("delete", img.ID, "") eng.Job("log", "delete", img.ID, "").Run() if img.Parent != "" && !noprune { err := daemon.DeleteImage(eng, img.Parent, list, false, force, noprune) diff --git a/daemon/info.go b/daemon/info.go index 687937e3e2..183a9e68bb 100644 --- a/daemon/info.go +++ b/daemon/info.go @@ -51,11 +51,6 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error { initPath = daemon.SystemInitPath() } - cjob := job.Eng.Job("subscribers_count") - env, _ := cjob.Stdout.AddEnv() - if err := cjob.Run(); err != nil { - return err - } v := &engine.Env{} v.SetJson("ID", daemon.ID) v.SetInt("Containers", len(daemon.List())) @@ -71,7 +66,7 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error { v.Set("SystemTime", time.Now().Format(time.RFC3339Nano)) v.Set("ExecutionDriver", daemon.ExecutionDriver().Name()) v.Set("LoggingDriver", daemon.defaultLogConfig.Type) - v.SetInt("NEventsListener", env.GetInt("count")) + v.SetInt("NEventsListener", daemon.EventsService.SubscribersCount()) v.Set("KernelVersion", kernelVersion) v.Set("OperatingSystem", operatingSystem) v.Set("IndexServerAddress", registry.IndexServerAddress()) diff --git a/graph/import.go b/graph/import.go index 8b99188969..eb63af0b60 100644 --- a/graph/import.go +++ b/graph/import.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" - "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/progressreader" @@ -92,8 +91,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error { if tag != "" { logID = utils.ImageReference(logID, tag) } - if err = job.Eng.Job("log", "import", logID, "").Run(); err != nil { - logrus.Errorf("Error logging event 'import' for %s: %s", logID, err) - } + + s.eventsService.Log("import", logID, "") return nil } diff --git a/graph/pull.go b/graph/pull.go index 08b688cb21..13c69858fd 100644 --- a/graph/pull.go +++ b/graph/pull.go @@ -85,9 +85,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error { logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName) if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil { - if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { - logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err) - } + s.eventsService.Log("pull", logName, "") return nil } else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable { logrus.Errorf("Error from V2 registry: %s", err) @@ -101,9 +99,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error { return err } - if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { - logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err) - } + s.eventsService.Log("pull", logName, "") return nil } diff --git a/graph/tags.go b/graph/tags.go index b6a7987ff8..6346ea8b50 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -13,6 +13,7 @@ import ( "strings" "sync" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/stringid" @@ -40,6 +41,7 @@ type TagStore struct { pullingPool map[string]chan struct{} pushingPool map[string]chan struct{} registryService *registry.Service + eventsService *events.Events } type Repository map[string]string @@ -62,7 +64,7 @@ func (r Repository) Contains(u Repository) bool { return true } -func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service) (*TagStore, error) { +func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service, eventsService *events.Events) (*TagStore, error) { abspath, err := filepath.Abs(path) if err != nil { return nil, err @@ -76,6 +78,7 @@ func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registrySer pullingPool: make(map[string]chan struct{}), pushingPool: make(map[string]chan struct{}), registryService: registryService, + eventsService: eventsService, } // Load the json file if it exists, otherwise create it. if err := store.reload(); os.IsNotExist(err) { diff --git a/graph/tags_unit_test.go b/graph/tags_unit_test.go index 001a10527d..be5624245c 100644 --- a/graph/tags_unit_test.go +++ b/graph/tags_unit_test.go @@ -7,6 +7,7 @@ import ( "path" "testing" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/graphdriver" _ "github.com/docker/docker/daemon/graphdriver/vfs" // import the vfs driver so it is used in the tests "github.com/docker/docker/image" @@ -59,7 +60,7 @@ func mkTestTagStore(root string, t *testing.T) *TagStore { if err != nil { t.Fatal(err) } - store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil) + store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil, events.New()) if err != nil { t.Fatal(err) } From d487ca03e6e897e4bb5f9ba28b268450f059fc0d Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Fri, 3 Apr 2015 15:18:12 -0700 Subject: [PATCH 3/4] Remove events package Signed-off-by: Alexander Morozov --- events/events.go | 231 ------------------------------------------ events/events_test.go | 154 ---------------------------- 2 files changed, 385 deletions(-) delete mode 100644 events/events.go delete mode 100644 events/events_test.go diff --git a/events/events.go b/events/events.go deleted file mode 100644 index 93ea9a0391..0000000000 --- a/events/events.go +++ /dev/null @@ -1,231 +0,0 @@ -package events - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "strings" - "sync" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/pkg/jsonmessage" - "github.com/docker/docker/pkg/parsers/filters" -) - -const eventsLimit = 64 - -type listener chan<- *jsonmessage.JSONMessage - -type Events struct { - mu sync.RWMutex - events []*jsonmessage.JSONMessage - subscribers []listener -} - -func New() *Events { - return &Events{ - events: make([]*jsonmessage.JSONMessage, 0, eventsLimit), - } -} - -// Install installs events public api in docker engine -func (e *Events) Install(eng *engine.Engine) error { - // Here you should describe public interface - jobs := map[string]engine.Handler{ - "events": e.Get, - "log": e.Log, - "subscribers_count": e.SubscribersCount, - } - for name, job := range jobs { - if err := eng.Register(name, job); err != nil { - return err - } - } - return nil -} - -func (e *Events) Get(job *engine.Job) error { - var ( - since = job.GetenvInt64("since") - until = job.GetenvInt64("until") - timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) - ) - - eventFilters, err := filters.FromParam(job.Getenv("filters")) - if err != nil { - return err - } - - // If no until, disable timeout - if job.Getenv("until") == "" { - timeout.Stop() - } - - listener := make(chan *jsonmessage.JSONMessage) - e.subscribe(listener) - defer e.unsubscribe(listener) - - job.Stdout.Write(nil) - - // Resend every event in the [since, until] time interval. - if job.Getenv("since") != "" { - if err := e.writeCurrent(job, since, until, eventFilters); err != nil { - return err - } - } - - for { - select { - case event, ok := <-listener: - if !ok { - return nil - } - if err := writeEvent(job, event, eventFilters); err != nil { - return err - } - case <-timeout.C: - return nil - } - } -} - -func (e *Events) Log(job *engine.Job) error { - if len(job.Args) != 3 { - return fmt.Errorf("usage: %s ACTION ID FROM", job.Name) - } - // not waiting for receivers - go e.log(job.Args[0], job.Args[1], job.Args[2]) - return nil -} - -func (e *Events) SubscribersCount(job *engine.Job) error { - ret := &engine.Env{} - ret.SetInt("count", e.subscribersCount()) - ret.WriteTo(job.Stdout) - return nil -} - -func writeEvent(job *engine.Job, event *jsonmessage.JSONMessage, eventFilters filters.Args) error { - isFiltered := func(field string, filter []string) bool { - if len(filter) == 0 { - return false - } - for _, v := range filter { - if v == field { - return false - } - if strings.Contains(field, ":") { - image := strings.Split(field, ":") - if image[0] == v { - return false - } - } - } - return true - } - - //incoming container filter can be name,id or partial id, convert and replace as a full container id - for i, cn := range eventFilters["container"] { - eventFilters["container"][i] = GetContainerId(job.Eng, cn) - } - - if isFiltered(event.Status, eventFilters["event"]) || isFiltered(event.From, eventFilters["image"]) || - isFiltered(event.ID, eventFilters["container"]) { - return nil - } - - // When sending an event JSON serialization errors are ignored, but all - // other errors lead to the eviction of the listener. - if b, err := json.Marshal(event); err == nil { - if _, err = job.Stdout.Write(b); err != nil { - return err - } - } - return nil -} - -func (e *Events) writeCurrent(job *engine.Job, since, until int64, eventFilters filters.Args) error { - e.mu.RLock() - for _, event := range e.events { - if event.Time >= since && (event.Time <= until || until == 0) { - if err := writeEvent(job, event, eventFilters); err != nil { - e.mu.RUnlock() - return err - } - } - } - e.mu.RUnlock() - return nil -} - -func (e *Events) subscribersCount() int { - e.mu.RLock() - c := len(e.subscribers) - e.mu.RUnlock() - return c -} - -func (e *Events) log(action, id, from string) { - e.mu.Lock() - now := time.Now().UTC().Unix() - jm := &jsonmessage.JSONMessage{Status: action, ID: id, From: from, Time: now} - if len(e.events) == cap(e.events) { - // discard oldest event - copy(e.events, e.events[1:]) - e.events[len(e.events)-1] = jm - } else { - e.events = append(e.events, jm) - } - for _, s := range e.subscribers { - // We give each subscriber a 100ms time window to receive the event, - // after which we move to the next. - select { - case s <- jm: - case <-time.After(100 * time.Millisecond): - } - } - e.mu.Unlock() -} - -func (e *Events) subscribe(l listener) { - e.mu.Lock() - e.subscribers = append(e.subscribers, l) - e.mu.Unlock() -} - -// unsubscribe closes and removes the specified listener from the list of -// previously registed ones. -// It returns a boolean value indicating if the listener was successfully -// found, closed and unregistered. -func (e *Events) unsubscribe(l listener) bool { - e.mu.Lock() - for i, subscriber := range e.subscribers { - if subscriber == l { - close(l) - e.subscribers = append(e.subscribers[:i], e.subscribers[i+1:]...) - e.mu.Unlock() - return true - } - } - e.mu.Unlock() - return false -} - -func GetContainerId(eng *engine.Engine, name string) string { - var buf bytes.Buffer - job := eng.Job("container_inspect", name) - - var outStream io.Writer - - outStream = &buf - job.Stdout.Set(outStream) - - if err := job.Run(); err != nil { - return "" - } - var out struct{ ID string } - json.NewDecoder(&buf).Decode(&out) - return out.ID -} diff --git a/events/events_test.go b/events/events_test.go deleted file mode 100644 index a232576fe5..0000000000 --- a/events/events_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package events - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "testing" - "time" - - "github.com/docker/docker/engine" - "github.com/docker/docker/pkg/jsonmessage" -) - -func TestEventsPublish(t *testing.T) { - e := New() - l1 := make(chan *jsonmessage.JSONMessage) - l2 := make(chan *jsonmessage.JSONMessage) - e.subscribe(l1) - e.subscribe(l2) - count := e.subscribersCount() - if count != 2 { - t.Fatalf("Must be 2 subscribers, got %d", count) - } - go e.log("test", "cont", "image") - select { - case msg := <-l1: - if len(e.events) != 1 { - t.Fatalf("Must be only one event, got %d", len(e.events)) - } - if msg.Status != "test" { - t.Fatalf("Status should be test, got %s", msg.Status) - } - if msg.ID != "cont" { - t.Fatalf("ID should be cont, got %s", msg.ID) - } - if msg.From != "image" { - t.Fatalf("From should be image, got %s", msg.From) - } - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for broadcasted message") - } - select { - case msg := <-l2: - if len(e.events) != 1 { - t.Fatalf("Must be only one event, got %d", len(e.events)) - } - if msg.Status != "test" { - t.Fatalf("Status should be test, got %s", msg.Status) - } - if msg.ID != "cont" { - t.Fatalf("ID should be cont, got %s", msg.ID) - } - if msg.From != "image" { - t.Fatalf("From should be image, got %s", msg.From) - } - case <-time.After(1 * time.Second): - t.Fatal("Timeout waiting for broadcasted message") - } -} - -func TestEventsPublishTimeout(t *testing.T) { - e := New() - l := make(chan *jsonmessage.JSONMessage) - e.subscribe(l) - - c := make(chan struct{}) - go func() { - e.log("test", "cont", "image") - close(c) - }() - - select { - case <-c: - case <-time.After(time.Second): - t.Fatal("Timeout publishing message") - } -} - -func TestLogEvents(t *testing.T) { - e := New() - eng := engine.New() - if err := e.Install(eng); err != nil { - t.Fatal(err) - } - - for i := 0; i < eventsLimit+16; i++ { - action := fmt.Sprintf("action_%d", i) - id := fmt.Sprintf("cont_%d", i) - from := fmt.Sprintf("image_%d", i) - job := eng.Job("log", action, id, from) - if err := job.Run(); err != nil { - t.Fatal(err) - } - } - time.Sleep(50 * time.Millisecond) - if len(e.events) != eventsLimit { - t.Fatalf("Must be %d events, got %d", eventsLimit, len(e.events)) - } - - job := eng.Job("events") - job.SetenvInt64("since", 1) - job.SetenvInt64("until", time.Now().Unix()) - buf := bytes.NewBuffer(nil) - job.Stdout.Add(buf) - if err := job.Run(); err != nil { - t.Fatal(err) - } - buf = bytes.NewBuffer(buf.Bytes()) - dec := json.NewDecoder(buf) - var msgs []jsonmessage.JSONMessage - for { - var jm jsonmessage.JSONMessage - if err := dec.Decode(&jm); err != nil { - if err == io.EOF { - break - } - t.Fatal(err) - } - msgs = append(msgs, jm) - } - if len(msgs) != eventsLimit { - t.Fatalf("Must be %d events, got %d", eventsLimit, len(msgs)) - } - first := msgs[0] - if first.Status != "action_16" { - t.Fatalf("First action is %s, must be action_15", first.Status) - } - last := msgs[len(msgs)-1] - if last.Status != "action_79" { - t.Fatalf("First action is %s, must be action_79", first.Status) - } -} - -func TestEventsCountJob(t *testing.T) { - e := New() - eng := engine.New() - if err := e.Install(eng); err != nil { - t.Fatal(err) - } - l1 := make(chan *jsonmessage.JSONMessage) - l2 := make(chan *jsonmessage.JSONMessage) - e.subscribe(l1) - e.subscribe(l2) - job := eng.Job("subscribers_count") - env, _ := job.Stdout.AddEnv() - if err := job.Run(); err != nil { - t.Fatal(err) - } - count := env.GetInt("count") - if count != 2 { - t.Fatalf("There must be 2 subscribers, got %d", count) - } -} From 8fd2b52146b443dd464df5199d79c69047c81eea Mon Sep 17 00:00:00 2001 From: Alexander Morozov Date: Tue, 7 Apr 2015 08:54:00 -0700 Subject: [PATCH 4/4] Fix fail message in TestEventsImageImport Signed-off-by: Alexander Morozov --- integration-cli/docker_cli_events_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-cli/docker_cli_events_test.go b/integration-cli/docker_cli_events_test.go index 7076ab198b..97e3095137 100644 --- a/integration-cli/docker_cli_events_test.go +++ b/integration-cli/docker_cli_events_test.go @@ -237,7 +237,7 @@ func TestEventsImageImport(t *testing.T) { event := strings.TrimSpace(events[len(events)-1]) if !strings.HasSuffix(event, ": import") { - t.Fatalf("Missing pull event - got:%q", event) + t.Fatalf("Missing import event - got:%q", event) } logDone("events - image import is logged")