add since for polling, rename some vars

This commit is contained in:
Victor Vieux 2013-07-12 16:29:23 +00:00
parent b8d52ec266
commit 2e4d4c9f60
3 changed files with 68 additions and 26 deletions

51
api.go
View File

@ -218,24 +218,55 @@ func getInfo(srv *Server, version float64, w http.ResponseWriter, r *http.Reques
} }
func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error { func getEvents(srv *Server, version float64, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
events := make(chan utils.JSONMessage) sendEvent := func(wf *utils.WriteFlusher, event *utils.JSONMessage) (bool, error) {
srv.Lock()
srv.events[r.RemoteAddr] = events
srv.Unlock()
w.Header().Set("Content-Type", "application/json")
wf := utils.NewWriteFlusher(w)
for {
event := <-events
b, err := json.Marshal(event) b, err := json.Marshal(event)
if err != nil { if err != nil {
continue return true, nil
} }
_, err = wf.Write(b) _, err = wf.Write(b)
if err != nil { if err != nil {
utils.Debugf("%s", err) utils.Debugf("%s", err)
srv.Lock() srv.Lock()
delete(srv.events, r.RemoteAddr) delete(srv.listeners, r.RemoteAddr)
srv.Unlock() srv.Unlock()
return false, err
}
return false, nil
}
if err := parseForm(r); err != nil {
return err
}
listener := make(chan utils.JSONMessage)
srv.Lock()
srv.listeners[r.RemoteAddr] = listener
srv.Unlock()
since, err := strconv.ParseInt(r.Form.Get("since"), 10, 0)
if err != nil {
since = 0
}
w.Header().Set("Content-Type", "application/json")
wf := utils.NewWriteFlusher(w)
if since != 0 {
for _, event := range srv.events {
if event.Time >= since {
skip, err := sendEvent(wf, &event)
if skip {
continue
}
if err != nil {
return err
}
}
}
}
for {
event := <-listener
skip, err := sendEvent(wf, &event)
if skip {
continue
}
if err != nil {
return err return err
} }
} }

View File

@ -1058,7 +1058,8 @@ func (cli *DockerCli) CmdCommit(args ...string) error {
} }
func (cli *DockerCli) CmdEvents(args ...string) error { func (cli *DockerCli) CmdEvents(args ...string) error {
cmd := Subcmd("events", "", "Get real time events from the server") cmd := Subcmd("events", "[OPTIONS]", "Get real time events from the server")
since := cmd.String("since", "", "Show events previously created (used for polling).")
if err := cmd.Parse(args); err != nil { if err := cmd.Parse(args); err != nil {
return nil return nil
} }
@ -1068,7 +1069,12 @@ func (cli *DockerCli) CmdEvents(args ...string) error {
return nil return nil
} }
if err := cli.stream("GET", "/events", nil, cli.out); err != nil { v := url.Values{}
if *since != "" {
v.Set("since", *since)
}
if err := cli.stream("GET", "/events?"+v.Encode(), nil, cli.out); err != nil {
return err return err
} }
return nil return nil

View File

@ -35,7 +35,7 @@ func (srv *Server) ContainerKill(name string) error {
if err := container.Kill(); err != nil { if err := container.Kill(); err != nil {
return fmt.Errorf("Error killing container %s: %s", name, err) return fmt.Errorf("Error killing container %s: %s", name, err)
} }
srv.SendEvent("kill", name) srv.LogEvent("kill", name)
} else { } else {
return fmt.Errorf("No such container: %s", name) return fmt.Errorf("No such container: %s", name)
} }
@ -54,7 +54,7 @@ func (srv *Server) ContainerExport(name string, out io.Writer) error {
if _, err := io.Copy(out, data); err != nil { if _, err := io.Copy(out, data); err != nil {
return err return err
} }
srv.SendEvent("export", name) srv.LogEvent("export", name)
return nil return nil
} }
return fmt.Errorf("No such container: %s", name) return fmt.Errorf("No such container: %s", name)
@ -814,7 +814,7 @@ func (srv *Server) ContainerCreate(config *Config) (string, error) {
} }
return "", err return "", err
} }
srv.SendEvent("create", container.ShortID()) srv.LogEvent("create", container.ShortID())
return container.ShortID(), nil return container.ShortID(), nil
} }
@ -823,7 +823,7 @@ func (srv *Server) ContainerRestart(name string, t int) error {
if err := container.Restart(t); err != nil { if err := container.Restart(t); err != nil {
return fmt.Errorf("Error restarting container %s: %s", name, err) return fmt.Errorf("Error restarting container %s: %s", name, err)
} }
srv.SendEvent("restart", name) srv.LogEvent("restart", name)
} else { } else {
return fmt.Errorf("No such container: %s", name) return fmt.Errorf("No such container: %s", name)
} }
@ -843,7 +843,7 @@ func (srv *Server) ContainerDestroy(name string, removeVolume bool) error {
if err := srv.runtime.Destroy(container); err != nil { if err := srv.runtime.Destroy(container); err != nil {
return fmt.Errorf("Error destroying container %s: %s", name, err) return fmt.Errorf("Error destroying container %s: %s", name, err)
} }
srv.SendEvent("destroy", name) srv.LogEvent("destroy", name)
if removeVolume { if removeVolume {
// Retrieve all volumes from all remaining containers // Retrieve all volumes from all remaining containers
@ -910,7 +910,7 @@ func (srv *Server) deleteImageAndChildren(id string, imgs *[]APIRmi) error {
return err return err
} }
*imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)}) *imgs = append(*imgs, APIRmi{Deleted: utils.TruncateID(id)})
srv.SendEvent("delete", utils.TruncateID(id)) srv.LogEvent("delete", utils.TruncateID(id))
return nil return nil
} }
return nil return nil
@ -954,7 +954,7 @@ func (srv *Server) deleteImage(img *Image, repoName, tag string) ([]APIRmi, erro
} }
if tagDeleted { if tagDeleted {
imgs = append(imgs, APIRmi{Untagged: img.ShortID()}) imgs = append(imgs, APIRmi{Untagged: img.ShortID()})
srv.SendEvent("untag", img.ShortID()) srv.LogEvent("untag", img.ShortID())
} }
if len(srv.runtime.repositories.ByID()[img.ID]) == 0 { if len(srv.runtime.repositories.ByID()[img.ID]) == 0 {
if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil { if err := srv.deleteImageAndChildren(img.ID, &imgs); err != nil {
@ -1027,7 +1027,7 @@ func (srv *Server) ContainerStart(name string, hostConfig *HostConfig) error {
if err := container.Start(hostConfig); err != nil { if err := container.Start(hostConfig); err != nil {
return fmt.Errorf("Error starting container %s: %s", name, err) return fmt.Errorf("Error starting container %s: %s", name, err)
} }
srv.SendEvent("start", name) srv.LogEvent("start", name)
} else { } else {
return fmt.Errorf("No such container: %s", name) return fmt.Errorf("No such container: %s", name)
} }
@ -1039,7 +1039,7 @@ func (srv *Server) ContainerStop(name string, t int) error {
if err := container.Stop(t); err != nil { if err := container.Stop(t); err != nil {
return fmt.Errorf("Error stopping container %s: %s", name, err) return fmt.Errorf("Error stopping container %s: %s", name, err)
} }
srv.SendEvent("stop", name) srv.LogEvent("stop", name)
} else { } else {
return fmt.Errorf("No such container: %s", name) return fmt.Errorf("No such container: %s", name)
} }
@ -1173,15 +1173,19 @@ func NewServer(flGraphPath string, autoRestart, enableCors bool, dns ListOpts) (
enableCors: enableCors, enableCors: enableCors,
pullingPool: make(map[string]struct{}), pullingPool: make(map[string]struct{}),
pushingPool: make(map[string]struct{}), pushingPool: make(map[string]struct{}),
events: make(map[string]chan utils.JSONMessage), events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events
listeners: make(map[string]chan utils.JSONMessage),
} }
runtime.srv = srv runtime.srv = srv
return srv, nil return srv, nil
} }
func (srv *Server) SendEvent(action, id string) { func (srv *Server) LogEvent(action, id string) {
for _, c := range srv.events { now := time.Now().Unix()
c <- utils.JSONMessage{Status: action, ID: id, Time: time.Now().Unix()} jm := utils.JSONMessage{Status: action, ID: id, Time: now}
srv.events = append(srv.events, jm)
for _, c := range srv.listeners {
c <- jm
} }
} }
@ -1191,5 +1195,6 @@ type Server struct {
enableCors bool enableCors bool
pullingPool map[string]struct{} pullingPool map[string]struct{}
pushingPool map[string]struct{} pushingPool map[string]struct{}
events map[string]chan utils.JSONMessage events []utils.JSONMessage
listeners map[string]chan utils.JSONMessage
} }