diff --git a/api.go b/api.go index 9d0348b608..0a7f5744b1 100644 --- a/api.go +++ b/api.go @@ -218,24 +218,55 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques } func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { - events := make(chan utils.JSONMessage) - srv.Lock() - srv.events[r.RemoteAddr] = events - srv.Unlock() - w.Header().Set("Content-Type", "application/json") - wf := utils.NewWriteFlusher(w) - for { - event := <-events + sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (bool, error) { b, err := json.Marshal(event) if err != nil { - continue + return true, nil } _, err = wf.Write(b) if err != nil { utils.Debugf("%s", err) srv.Lock() - delete(srv.events, r.RemoteAddr) + delete(srv.listeners, r.RemoteAddr) srv.Unlock() + return false, err + } + return false, nil + } + + if err := parseForm(r); err != nil { + return err + } + listener := make(chan utils.JSONMessage) + srv.Lock() + srv.listeners[r.RemoteAddr] = listener + srv.Unlock() + since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0) + if err != nil { + since = 0 + } + w.Header().Set("Content-Type", "application/json") + wf := utils.NewWriteFlusher(w) + if since != 0 { + for _, event := range srv.events { + if event.Time >= since { + skip, err := sendEvent(wf, &event) + if skip { + continue + } + if err != nil { + return err + } + } + } + } + for { + event := <-listener + skip, err := sendEvent(wf, &event) + if skip { + continue + } + if err != nil { return err } } diff --git a/commands.go b/commands.go index 12647feead..2ab107bb77 100644 --- a/commands.go +++ b/commands.go @@ -1058,7 +1058,8 @@ func (cli *DockerCli) CmdCommit(args ...string) error { } func (cli *DockerCli) CmdEvents(args ...string) error { - cmd := Subcmd("events", "", "Get real time events from the server") + cmd := Subcmd("events", "[OPTIONS]", "Get real time events from the server") + since := cmd.String("since", "", "Show events previously created (used for polling).") if err := cmd.Parse(args); err != nil { return nil } @@ -1068,7 +1069,12 @@ func (cli *DockerCli) CmdEvents(args ...string) error { return nil } - if err := cli.stream("GET", "/events", nil, cli.out); err != nil { + v := url.Values{} + if *since != "" { + v.Set("since", *since) + } + + if err := cli.stream("GET", "/events?"+v.Encode(), nil, cli.out); err != nil { return err } return nil diff --git a/server.go b/server.go index 8ba90c69e3..7efb850882 100644 --- a/server.go +++ b/server.go @@ -35,7 +35,7 @@ func (srv *Server) ContainerKill(name string) error { if err := container.Kill(); err != nil { return fmt.Errorf("Error killing container %s: %s", name, err) } - srv.SendEvent("kill", name) + srv.LogEvent("kill", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -54,7 +54,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error { if _, err := io.Copy(out, data); err != nil { return err } - srv.SendEvent("export", name) + srv.LogEvent("export", name) return nil } return fmt.Errorf("No such container: %s", name) @@ -814,7 +814,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) { } return "", err } - srv.SendEvent("create", container.ShortID()) + srv.LogEvent("create", container.ShortID()) return container.ShortID(), nil } @@ -823,7 +823,7 @@ func (srv *Server) ContainerRestart(name string, t int) error { if err := container.Restart(t); err != nil { return fmt.Errorf("Error restarting container %s: %s", name, err) } - srv.SendEvent("restart", name) + srv.LogEvent("restart", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -843,7 +843,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error { if err := srv.runtime.Destroy(container); err != nil { return fmt.Errorf("Error destroying container %s: %s", name, err) } - srv.SendEvent("destroy", name) + srv.LogEvent("destroy", name) if removeVolume { // Retrieve all volumes from all remaining containers @@ -910,7 +910,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error { return err } *imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)}) - srv.SendEvent("delete", utils.TruncateID(id)) + srv.LogEvent("delete", utils.TruncateID(id)) return nil } return nil @@ -954,7 +954,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro } if tagDeleted { imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) - srv.SendEvent("untag", img.ShortID()) + srv.LogEvent("untag", img.ShortID()) } if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { @@ -1027,7 +1027,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error { if err := container.Start(hostConfig); err != nil { return fmt.Errorf("Error starting container %s: %s", name, err) } - srv.SendEvent("start", name) + srv.LogEvent("start", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1039,7 +1039,7 @@ func (srv *Server) ContainerStop(name string, t int) error { if err := container.Stop(t); err != nil { return fmt.Errorf("Error stopping container %s: %s", name, err) } - srv.SendEvent("stop", name) + srv.LogEvent("stop", name) } else { return fmt.Errorf("No such container: %s", name) } @@ -1173,15 +1173,19 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) ( enableCors: enableCors, pullingPool: make(map[string]struct{}), pushingPool: make(map[string]struct{}), - events: make(map[string]chan utils.JSONMessage), + events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events + listeners: make(map[string]chan utils.JSONMessage), } runtime.srv = srv return srv, nil } -func (srv *Server) SendEvent(action, id string) { - for _, c := range srv.events { - c <- utils.JSONMessage{Status: action, ID: id, Time: time.Now().Unix()} +func (srv *Server) LogEvent(action, id string) { + now := time.Now().Unix() + jm := utils.JSONMessage{Status: action, ID: id, Time: now} + srv.events = append(srv.events, jm) + for _, c := range srv.listeners { + c <- jm } } @@ -1191,5 +1195,6 @@ type Server struct { enableCors bool pullingPool map[string]struct{} pushingPool map[string]struct{} - events map[string]chan utils.JSONMessage + events []utils.JSONMessage + listeners map[string]chan utils.JSONMessage }