diff --git a/api/client/events.go b/api/client/events.go index 54b88f054b..ad38204368 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -6,10 +6,12 @@ import ( "io" "sort" "strings" + "sync" "time" "golang.org/x/net/context" + "github.com/Sirupsen/logrus" Cli "github.com/docker/docker/cli" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/jsonlog" @@ -115,3 +117,31 @@ func printOutput(event eventtypes.Message, output io.Writer) { } fmt.Fprint(output, "\n") } + +type eventHandler struct { + handlers map[string]func(eventtypes.Message) + mu sync.Mutex + closed bool +} + +func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) { + w.mu.Lock() + w.handlers[action] = h + w.mu.Unlock() +} + +// Watch ranges over the passed in event chan and processes the events based on the +// handlers created for a given action. +// To stop watching, close the event chan. +func (w *eventHandler) Watch(c <-chan eventtypes.Message) { + for e := range c { + w.mu.Lock() + h, exists := w.handlers[e.Action] + w.mu.Unlock() + if !exists { + continue + } + logrus.Debugf("event handler: received event: %v", e) + go h(e) + } +} diff --git a/api/client/stats.go b/api/client/stats.go index ccb924de7e..ff45f4d418 100644 --- a/api/client/stats.go +++ b/api/client/stats.go @@ -1,11 +1,9 @@ package client import ( - "encoding/json" "fmt" "io" "strings" - "sync" "text/tabwriter" "time" @@ -15,134 +13,8 @@ import ( "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" - "github.com/docker/go-units" ) -type containerStats struct { - Name string - CPUPercentage float64 - Memory float64 - MemoryLimit float64 - MemoryPercentage float64 - NetworkRx float64 - NetworkTx float64 - BlockRead float64 - BlockWrite float64 - mu sync.RWMutex - err error -} - -type stats struct { - mu sync.Mutex - cs []*containerStats -} - -func (s *stats) isKnownContainer(cid string) bool { - for _, c := range s.cs { - if c.Name == cid { - return true - } - } - return false -} - -func (s *containerStats) Collect(cli *DockerCli, streamStats bool) { - responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats) - if err != nil { - s.mu.Lock() - s.err = err - s.mu.Unlock() - return - } - defer responseBody.Close() - - var ( - previousCPU uint64 - previousSystem uint64 - dec = json.NewDecoder(responseBody) - u = make(chan error, 1) - ) - go func() { - for { - var v *types.StatsJSON - if err := dec.Decode(&v); err != nil { - u <- err - return - } - - var memPercent = 0.0 - var cpuPercent = 0.0 - - // MemoryStats.Limit will never be 0 unless the container is not running and we haven't - // got any data from cgroup - if v.MemoryStats.Limit != 0 { - memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - } - - previousCPU = v.PreCPUStats.CPUUsage.TotalUsage - previousSystem = v.PreCPUStats.SystemUsage - cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v) - blkRead, blkWrite := calculateBlockIO(v.BlkioStats) - s.mu.Lock() - s.CPUPercentage = cpuPercent - s.Memory = float64(v.MemoryStats.Usage) - s.MemoryLimit = float64(v.MemoryStats.Limit) - s.MemoryPercentage = memPercent - s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks) - s.BlockRead = float64(blkRead) - s.BlockWrite = float64(blkWrite) - s.mu.Unlock() - u <- nil - if !streamStats { - return - } - } - }() - for { - select { - case <-time.After(2 * time.Second): - // zero out the values if we have not received an update within - // the specified duration. - s.mu.Lock() - s.CPUPercentage = 0 - s.Memory = 0 - s.MemoryPercentage = 0 - s.MemoryLimit = 0 - s.NetworkRx = 0 - s.NetworkTx = 0 - s.BlockRead = 0 - s.BlockWrite = 0 - s.mu.Unlock() - case err := <-u: - if err != nil { - s.mu.Lock() - s.err = err - s.mu.Unlock() - return - } - } - if !streamStats { - return - } - } -} - -func (s *containerStats) Display(w io.Writer) error { - s.mu.RLock() - defer s.mu.RUnlock() - if s.err != nil { - return s.err - } - fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n", - s.Name, - s.CPUPercentage, - units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit), - s.MemoryPercentage, - units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx), - units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite)) - return nil -} - // CmdStats displays a live stream of resource usage statistics for one or more containers. // // This shows real-time information on CPU usage, memory usage, and network I/O. @@ -157,27 +29,11 @@ func (cli *DockerCli) CmdStats(args ...string) error { names := cmd.Args() showAll := len(names) == 0 - - // The containerChan is the central synchronization piece for this function, - // and all messages to either add or remove an element to the list of - // monitored containers go through this. - // - // - When watching all containers, a goroutine subscribes to the events - // API endpoint and messages this channel accordingly. - // - When watching a particular subset of containers, we feed the - // requested list of containers to this channel. - // - For both codepaths, a goroutine is responsible for watching this - // channel and subscribing to the stats API for containers. - type containerEvent struct { - id string - event string - err error - } - containerChan := make(chan containerEvent) + closeChan := make(chan error) // monitorContainerEvents watches for container creation and removal (only // used when calling `docker stats` without arguments). - monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) { + monitorContainerEvents := func(started chan<- struct{}, c chan events.Message) { f := filters.NewArgs() f.Add("type", "container") options := types.EventsOptions{ @@ -188,91 +44,82 @@ func (cli *DockerCli) CmdStats(args ...string) error { // unblock the main goroutine. close(started) if err != nil { - c <- containerEvent{err: err} + closeChan <- err return } defer resBody.Close() + decodeEvents(resBody, func(event events.Message, err error) error { if err != nil { - c <- containerEvent{"", "", err} - } else { - c <- containerEvent{event.ID[:12], event.Action, err} + closeChan <- err + return nil } + c <- event return nil }) } + cStats := stats{} // getContainerList simulates creation event for all previously existing // containers (only used when calling `docker stats` without arguments). - getContainerList := func(c chan<- containerEvent) { + getContainerList := func() { options := types.ContainerListOptions{ All: *all, } cs, err := cli.client.ContainerList(options) if err != nil { - containerChan <- containerEvent{"", "", err} + closeChan <- err } - for _, c := range cs { - containerChan <- containerEvent{c.ID[:12], "create", nil} + for _, container := range cs { + s := &containerStats{Name: container.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) } } - // Monitor the containerChan and start collection for each container. - cStats := stats{} - closeChan := make(chan error) - go func(stopChan chan<- error, c <-chan containerEvent) { - for { - event := <-c - if event.err != nil { - stopChan <- event.err - return - } - switch event.event { - case "create": - cStats.mu.Lock() - if !cStats.isKnownContainer(event.id) { - s := &containerStats{Name: event.id} - cStats.cs = append(cStats.cs, s) - go s.Collect(cli, !*noStream) - } - cStats.mu.Unlock() - case "stop": - case "die": - if !*all { - var remove int - // cStats cannot be O(1) with a map cause ranging over it would cause - // containers in stats to move up and down in the list...:( - cStats.mu.Lock() - for i, s := range cStats.cs { - if s.Name == event.id { - remove = i - break - } - } - cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...) - cStats.mu.Unlock() - } - } - } - }(closeChan, containerChan) - if showAll { // If no names were specified, start a long running goroutine which // monitors container events. We make sure we're subscribed before // retrieving the list of running containers to avoid a race where we // would "miss" a creation. started := make(chan struct{}) - go monitorContainerEvents(started, containerChan) + eh := eventHandler{handlers: make(map[string]func(events.Message))} + eh.Handle("create", func(e events.Message) { + if *all { + s := &containerStats{Name: e.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) + } + }) + + eh.Handle("start", func(e events.Message) { + s := &containerStats{Name: e.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) + }) + + eh.Handle("die", func(e events.Message) { + if !*all { + cStats.remove(e.ID[:12]) + } + }) + + eventChan := make(chan events.Message) + go eh.Watch(eventChan) + go monitorContainerEvents(started, eventChan) + defer close(eventChan) <-started // Start a short-lived goroutine to retrieve the initial list of // containers. - go getContainerList(containerChan) + go getContainerList() } else { // Artificially send creation events for the containers we were asked to // monitor (same code path than we use when monitoring all containers). for _, name := range names { - containerChan <- containerEvent{name, "create", nil} + s := &containerStats{Name: name} + cStats.add(s) + go s.Collect(cli.client, !*noStream) } // We don't expect any asynchronous errors: closeChan can be closed. @@ -304,6 +151,7 @@ func (cli *DockerCli) CmdStats(args ...string) error { } io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n") } + for range time.Tick(500 * time.Millisecond) { printHeader() toRemove := []int{} @@ -343,40 +191,3 @@ func (cli *DockerCli) CmdStats(args ...string) error { } return nil } - -func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { - var ( - cpuPercent = 0.0 - // calculate the change for the cpu usage of the container in between readings - cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) - // calculate the change for the entire system between readings - systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) - ) - - if systemDelta > 0.0 && cpuDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 - } - return cpuPercent -} - -func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) { - for _, bioEntry := range blkio.IoServiceBytesRecursive { - switch strings.ToLower(bioEntry.Op) { - case "read": - blkRead = blkRead + bioEntry.Value - case "write": - blkWrite = blkWrite + bioEntry.Value - } - } - return -} - -func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) { - var rx, tx float64 - - for _, v := range network { - rx += float64(v.RxBytes) - tx += float64(v.TxBytes) - } - return rx, tx -} diff --git a/api/client/stats_helpers.go b/api/client/stats_helpers.go new file mode 100644 index 0000000000..985a83fb0e --- /dev/null +++ b/api/client/stats_helpers.go @@ -0,0 +1,193 @@ +package client + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/docker/engine-api/client" + "github.com/docker/engine-api/types" + "github.com/docker/go-units" + "golang.org/x/net/context" +) + +type containerStats struct { + Name string + CPUPercentage float64 + Memory float64 + MemoryLimit float64 + MemoryPercentage float64 + NetworkRx float64 + NetworkTx float64 + BlockRead float64 + BlockWrite float64 + mu sync.RWMutex + err error +} + +type stats struct { + mu sync.Mutex + cs []*containerStats +} + +func (s *stats) add(cs *containerStats) { + s.mu.Lock() + if _, exists := s.isKnownContainer(cs.Name); !exists { + s.cs = append(s.cs, cs) + } + s.mu.Unlock() +} + +func (s *stats) remove(id string) { + s.mu.Lock() + if i, exists := s.isKnownContainer(id); exists { + s.cs = append(s.cs[:i], s.cs[i+1:]...) + } + s.mu.Unlock() +} + +func (s *stats) isKnownContainer(cid string) (int, bool) { + for i, c := range s.cs { + if c.Name == cid { + return i, true + } + } + return -1, false +} + +func (s *containerStats) Collect(cli client.APIClient, streamStats bool) { + responseBody, err := cli.ContainerStats(context.Background(), s.Name, streamStats) + if err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + return + } + defer responseBody.Close() + + var ( + previousCPU uint64 + previousSystem uint64 + dec = json.NewDecoder(responseBody) + u = make(chan error, 1) + ) + go func() { + for { + var v *types.StatsJSON + if err := dec.Decode(&v); err != nil { + u <- err + return + } + + var memPercent = 0.0 + var cpuPercent = 0.0 + + // MemoryStats.Limit will never be 0 unless the container is not running and we haven't + // got any data from cgroup + if v.MemoryStats.Limit != 0 { + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + } + + previousCPU = v.PreCPUStats.CPUUsage.TotalUsage + previousSystem = v.PreCPUStats.SystemUsage + cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v) + blkRead, blkWrite := calculateBlockIO(v.BlkioStats) + s.mu.Lock() + s.CPUPercentage = cpuPercent + s.Memory = float64(v.MemoryStats.Usage) + s.MemoryLimit = float64(v.MemoryStats.Limit) + s.MemoryPercentage = memPercent + s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks) + s.BlockRead = float64(blkRead) + s.BlockWrite = float64(blkWrite) + s.mu.Unlock() + u <- nil + if !streamStats { + return + } + } + }() + for { + select { + case <-time.After(2 * time.Second): + // zero out the values if we have not received an update within + // the specified duration. + s.mu.Lock() + s.CPUPercentage = 0 + s.Memory = 0 + s.MemoryPercentage = 0 + s.MemoryLimit = 0 + s.NetworkRx = 0 + s.NetworkTx = 0 + s.BlockRead = 0 + s.BlockWrite = 0 + s.mu.Unlock() + case err := <-u: + if err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + return + } + } + if !streamStats { + return + } + } +} + +func (s *containerStats) Display(w io.Writer) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.err != nil { + return s.err + } + fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n", + s.Name, + s.CPUPercentage, + units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit), + s.MemoryPercentage, + units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx), + units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite)) + return nil +} + +func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) + // calculate the change for the entire system between readings + systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 + } + return cpuPercent +} + +func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) { + for _, bioEntry := range blkio.IoServiceBytesRecursive { + switch strings.ToLower(bioEntry.Op) { + case "read": + blkRead = blkRead + bioEntry.Value + case "write": + blkWrite = blkWrite + bioEntry.Value + } + } + return +} + +func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) { + var rx, tx float64 + + for _, v := range network { + rx += float64(v.RxBytes) + tx += float64(v.TxBytes) + } + return rx, tx +}