diff --git a/api/server/server.go b/api/server/server.go index 502f1ae9e1..795ca080e4 100644 --- a/api/server/server.go +++ b/api/server/server.go @@ -3,6 +3,7 @@ package server import ( "bufio" "bytes" + "time" "encoding/base64" "encoding/json" @@ -23,7 +24,9 @@ import ( "github.com/docker/docker/daemon" "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/engine" + "github.com/docker/docker/pkg/jsonmessage" "github.com/docker/docker/pkg/parsers" + "github.com/docker/docker/pkg/parsers/filters" "github.com/docker/docker/pkg/stdcopy" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/version" @@ -324,13 +327,104 @@ func getEvents(eng *engine.Engine, version version.Version, w http.ResponseWrite if err := parseForm(r); err != nil { return err } + var since int64 = -1 + if r.Form.Get("since") != "" { + s, err := strconv.ParseInt(r.Form.Get("since"), 10, 64) + if err != nil { + return err + } + since = s + } - var job = eng.Job("events") - streamJSON(job, w, true) - job.Setenv("since", r.Form.Get("since")) - job.Setenv("until", r.Form.Get("until")) - job.Setenv("filters", r.Form.Get("filters")) - return job.Run() + var until int64 = -1 + if r.Form.Get("until") != "" { + u, err := strconv.ParseInt(r.Form.Get("until"), 10, 64) + if err != nil { + return err + } + until = u + } + timer := time.NewTimer(0) + timer.Stop() + if until > 0 { + dur := time.Unix(until, 0).Sub(time.Now()) + timer = time.NewTimer(dur) + } + + ef, err := filters.FromParam(r.Form.Get("filters")) + if err != nil { + return err + } + + isFiltered := func(field string, filter []string) bool { + if len(filter) == 0 { + return false + } + for _, v := range filter { + if v == field { + return false + } + if strings.Contains(field, ":") { + image := strings.Split(field, ":") + if image[0] == v { + return false + } + } + } + return true + } + + d := getDaemon(eng) + es := d.EventsService + w.Header().Set("Content-Type", "application/json") + enc := json.NewEncoder(utils.NewWriteFlusher(w)) + + getContainerId := func(cn string) string { + c, err := d.Get(cn) + if err != nil { + return "" + } + return c.ID + } + + sendEvent := func(ev *jsonmessage.JSONMessage) error { + //incoming container filter can be name,id or partial id, convert and replace as a full container id + for i, cn := range ef["container"] { + ef["container"][i] = getContainerId(cn) + } + + if isFiltered(ev.Status, ef["event"]) || isFiltered(ev.From, ef["image"]) || + isFiltered(ev.ID, ef["container"]) { + return nil + } + + return enc.Encode(ev) + } + + current, l := es.Subscribe() + defer es.Evict(l) + for _, ev := range current { + if ev.Time < since { + continue + } + if err := sendEvent(ev); err != nil { + return err + } + } + for { + select { + case ev := <-l: + jev, ok := ev.(*jsonmessage.JSONMessage) + if !ok { + continue + } + if err := sendEvent(jev); err != nil { + return err + } + case <-timer.C: + return nil + } + } } func getImagesHistory(eng *engine.Engine, version version.Version, w http.ResponseWriter, r *http.Request, vars map[string]string) error { diff --git a/api/server/server_unit_test.go b/api/server/server_unit_test.go index f83b5cc54f..8e8409ffa7 100644 --- a/api/server/server_unit_test.go +++ b/api/server/server_unit_test.go @@ -250,47 +250,6 @@ func TestGetContainersByName(t *testing.T) { } } -func TestGetEvents(t *testing.T) { - eng := engine.New() - var called bool - eng.Register("events", func(job *engine.Job) error { - called = true - since := job.Getenv("since") - if since != "1" { - t.Fatalf("'since' should be 1, found %#v instead", since) - } - until := job.Getenv("until") - if until != "0" { - t.Fatalf("'until' should be 0, found %#v instead", until) - } - v := &engine.Env{} - v.Set("since", since) - v.Set("until", until) - if _, err := v.WriteTo(job.Stdout); err != nil { - return err - } - return nil - }) - r := serveRequest("GET", "/events?since=1&until=0", nil, eng, t) - if !called { - t.Fatal("handler was not called") - } - assertContentType(r, "application/json", t) - var stdoutJSON struct { - Since int - Until int - } - if err := json.Unmarshal(r.Body.Bytes(), &stdoutJSON); err != nil { - t.Fatal(err) - } - if stdoutJSON.Since != 1 { - t.Errorf("since != 1: %#v", stdoutJSON.Since) - } - if stdoutJSON.Until != 0 { - t.Errorf("until != 0: %#v", stdoutJSON.Until) - } -} - func TestLogs(t *testing.T) { eng := engine.New() var inspect bool diff --git a/builtins/builtins.go b/builtins/builtins.go index d87bdb87ad..149d350090 100644 --- a/builtins/builtins.go +++ b/builtins/builtins.go @@ -8,7 +8,6 @@ import ( "github.com/docker/docker/autogen/dockerversion" "github.com/docker/docker/daemon/networkdriver/bridge" "github.com/docker/docker/engine" - "github.com/docker/docker/events" "github.com/docker/docker/pkg/parsers/kernel" ) @@ -19,9 +18,6 @@ func Register(eng *engine.Engine) error { if err := remote(eng); err != nil { return err } - if err := events.New().Install(eng); err != nil { - return err - } if err := eng.Register("version", dockerVersion); err != nil { return err } diff --git a/daemon/container.go b/daemon/container.go index 228944d2d4..2d9487ea68 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -200,9 +200,11 @@ func (container *Container) WriteHostConfig() error { func (container *Container) LogEvent(action string) { d := container.daemon - if err := d.eng.Job("log", action, container.ID, d.Repositories().ImageName(container.ImageID)).Run(); err != nil { - logrus.Errorf("Error logging event %s for %s: %s", action, container.ID, err) - } + d.EventsService.Log( + action, + container.ID, + d.Repositories().ImageName(container.ImageID), + ) } func (container *Container) getResourcePath(path string) (string, error) { diff --git a/daemon/daemon.go b/daemon/daemon.go index a072fc34a4..460625c144 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -19,6 +19,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api" "github.com/docker/docker/autogen/dockerversion" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/execdriver" "github.com/docker/docker/daemon/execdriver/execdrivers" "github.com/docker/docker/daemon/execdriver/lxc" @@ -109,6 +110,7 @@ type Daemon struct { statsCollector *statsCollector defaultLogConfig runconfig.LogConfig RegistryService *registry.Service + EventsService *events.Events } // Install installs daemon capabilities to eng. @@ -932,8 +934,9 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService return nil, err } + eventsService := events.New() logrus.Debug("Creating repository list") - repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService) + repositories, err := graph.NewTagStore(path.Join(config.Root, "repositories-"+driver.String()), g, trustKey, registryService, eventsService) if err != nil { return nil, fmt.Errorf("Couldn't create Tag store: %s", err) } @@ -1025,6 +1028,7 @@ func NewDaemonFromDirectory(config *Config, eng *engine.Engine, registryService statsCollector: newStatsCollector(1 * time.Second), defaultLogConfig: config.LogConfig, RegistryService: registryService, + EventsService: eventsService, } eng.OnShutdown(func() { diff --git a/daemon/image_delete.go b/daemon/image_delete.go index bf3d7ba9ce..6323d323e4 100644 --- a/daemon/image_delete.go +++ b/daemon/image_delete.go @@ -108,7 +108,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types *list = append(*list, types.ImageDelete{ Untagged: utils.ImageReference(repoName, tag), }) - eng.Job("log", "untag", img.ID, "").Run() + daemon.EventsService.Log("untag", img.ID, "") } } tags = daemon.Repositories().ByID()[img.ID] @@ -123,6 +123,7 @@ func (daemon *Daemon) DeleteImage(eng *engine.Engine, name string, list *[]types *list = append(*list, types.ImageDelete{ Deleted: img.ID, }) + daemon.EventsService.Log("delete", img.ID, "") eng.Job("log", "delete", img.ID, "").Run() if img.Parent != "" && !noprune { err := daemon.DeleteImage(eng, img.Parent, list, false, force, noprune) diff --git a/daemon/info.go b/daemon/info.go index 687937e3e2..183a9e68bb 100644 --- a/daemon/info.go +++ b/daemon/info.go @@ -51,11 +51,6 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error { initPath = daemon.SystemInitPath() } - cjob := job.Eng.Job("subscribers_count") - env, _ := cjob.Stdout.AddEnv() - if err := cjob.Run(); err != nil { - return err - } v := &engine.Env{} v.SetJson("ID", daemon.ID) v.SetInt("Containers", len(daemon.List())) @@ -71,7 +66,7 @@ func (daemon *Daemon) CmdInfo(job *engine.Job) error { v.Set("SystemTime", time.Now().Format(time.RFC3339Nano)) v.Set("ExecutionDriver", daemon.ExecutionDriver().Name()) v.Set("LoggingDriver", daemon.defaultLogConfig.Type) - v.SetInt("NEventsListener", env.GetInt("count")) + v.SetInt("NEventsListener", daemon.EventsService.SubscribersCount()) v.Set("KernelVersion", kernelVersion) v.Set("OperatingSystem", operatingSystem) v.Set("IndexServerAddress", registry.IndexServerAddress()) diff --git a/graph/import.go b/graph/import.go index 8b99188969..eb63af0b60 100644 --- a/graph/import.go +++ b/graph/import.go @@ -7,7 +7,6 @@ import ( "net/http" "net/url" - "github.com/Sirupsen/logrus" "github.com/docker/docker/engine" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/progressreader" @@ -92,8 +91,7 @@ func (s *TagStore) CmdImport(job *engine.Job) error { if tag != "" { logID = utils.ImageReference(logID, tag) } - if err = job.Eng.Job("log", "import", logID, "").Run(); err != nil { - logrus.Errorf("Error logging event 'import' for %s: %s", logID, err) - } + + s.eventsService.Log("import", logID, "") return nil } diff --git a/graph/pull.go b/graph/pull.go index 08b688cb21..13c69858fd 100644 --- a/graph/pull.go +++ b/graph/pull.go @@ -85,9 +85,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error { logrus.Debugf("pulling v2 repository with local name %q", repoInfo.LocalName) if err := s.pullV2Repository(job.Eng, r, job.Stdout, repoInfo, tag, sf, job.GetenvBool("parallel")); err == nil { - if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { - logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err) - } + s.eventsService.Log("pull", logName, "") return nil } else if err != registry.ErrDoesNotExist && err != ErrV2RegistryUnavailable { logrus.Errorf("Error from V2 registry: %s", err) @@ -101,9 +99,7 @@ func (s *TagStore) CmdPull(job *engine.Job) error { return err } - if err = job.Eng.Job("log", "pull", logName, "").Run(); err != nil { - logrus.Errorf("Error logging event 'pull' for %s: %s", logName, err) - } + s.eventsService.Log("pull", logName, "") return nil } diff --git a/graph/tags.go b/graph/tags.go index b6a7987ff8..6346ea8b50 100644 --- a/graph/tags.go +++ b/graph/tags.go @@ -13,6 +13,7 @@ import ( "strings" "sync" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/image" "github.com/docker/docker/pkg/parsers" "github.com/docker/docker/pkg/stringid" @@ -40,6 +41,7 @@ type TagStore struct { pullingPool map[string]chan struct{} pushingPool map[string]chan struct{} registryService *registry.Service + eventsService *events.Events } type Repository map[string]string @@ -62,7 +64,7 @@ func (r Repository) Contains(u Repository) bool { return true } -func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service) (*TagStore, error) { +func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registryService *registry.Service, eventsService *events.Events) (*TagStore, error) { abspath, err := filepath.Abs(path) if err != nil { return nil, err @@ -76,6 +78,7 @@ func NewTagStore(path string, graph *Graph, key libtrust.PrivateKey, registrySer pullingPool: make(map[string]chan struct{}), pushingPool: make(map[string]chan struct{}), registryService: registryService, + eventsService: eventsService, } // Load the json file if it exists, otherwise create it. if err := store.reload(); os.IsNotExist(err) { diff --git a/graph/tags_unit_test.go b/graph/tags_unit_test.go index 001a10527d..be5624245c 100644 --- a/graph/tags_unit_test.go +++ b/graph/tags_unit_test.go @@ -7,6 +7,7 @@ import ( "path" "testing" + "github.com/docker/docker/daemon/events" "github.com/docker/docker/daemon/graphdriver" _ "github.com/docker/docker/daemon/graphdriver/vfs" // import the vfs driver so it is used in the tests "github.com/docker/docker/image" @@ -59,7 +60,7 @@ func mkTestTagStore(root string, t *testing.T) *TagStore { if err != nil { t.Fatal(err) } - store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil) + store, err := NewTagStore(path.Join(root, "tags"), graph, nil, nil, events.New()) if err != nil { t.Fatal(err) }