From df95474885aeaaa77eeea4c4b31a92b58fc2a67c Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 29 Feb 2016 14:24:51 -0500 Subject: [PATCH] Fixes issue with stats on start event In situations where a client is called like `docker stats` with no arguments or flags, if a container which was already created but not started yet is then subsequently started it will not be added to the stats list as expected. Also splits some of the stats helpers to a separate file from the stats CLI which is already quite long. Signed-off-by: Brian Goff --- api/client/events.go | 30 ++++ api/client/stats.go | 277 ++++++------------------------------ api/client/stats_helpers.go | 193 +++++++++++++++++++++++++ 3 files changed, 267 insertions(+), 233 deletions(-) create mode 100644 api/client/stats_helpers.go diff --git a/api/client/events.go b/api/client/events.go index 54b88f054b..ad38204368 100644 --- a/api/client/events.go +++ b/api/client/events.go @@ -6,10 +6,12 @@ import ( "io" "sort" "strings" + "sync" "time" "golang.org/x/net/context" + "github.com/Sirupsen/logrus" Cli "github.com/docker/docker/cli" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/jsonlog" @@ -115,3 +117,31 @@ func printOutput(event eventtypes.Message, output io.Writer) { } fmt.Fprint(output, "\n") } + +type eventHandler struct { + handlers map[string]func(eventtypes.Message) + mu sync.Mutex + closed bool +} + +func (w *eventHandler) Handle(action string, h func(eventtypes.Message)) { + w.mu.Lock() + w.handlers[action] = h + w.mu.Unlock() +} + +// Watch ranges over the passed in event chan and processes the events based on the +// handlers created for a given action. +// To stop watching, close the event chan. +func (w *eventHandler) Watch(c <-chan eventtypes.Message) { + for e := range c { + w.mu.Lock() + h, exists := w.handlers[e.Action] + w.mu.Unlock() + if !exists { + continue + } + logrus.Debugf("event handler: received event: %v", e) + go h(e) + } +} diff --git a/api/client/stats.go b/api/client/stats.go index ccb924de7e..ff45f4d418 100644 --- a/api/client/stats.go +++ b/api/client/stats.go @@ -1,11 +1,9 @@ package client import ( - "encoding/json" "fmt" "io" "strings" - "sync" "text/tabwriter" "time" @@ -15,134 +13,8 @@ import ( "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/events" "github.com/docker/engine-api/types/filters" - "github.com/docker/go-units" ) -type containerStats struct { - Name string - CPUPercentage float64 - Memory float64 - MemoryLimit float64 - MemoryPercentage float64 - NetworkRx float64 - NetworkTx float64 - BlockRead float64 - BlockWrite float64 - mu sync.RWMutex - err error -} - -type stats struct { - mu sync.Mutex - cs []*containerStats -} - -func (s *stats) isKnownContainer(cid string) bool { - for _, c := range s.cs { - if c.Name == cid { - return true - } - } - return false -} - -func (s *containerStats) Collect(cli *DockerCli, streamStats bool) { - responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats) - if err != nil { - s.mu.Lock() - s.err = err - s.mu.Unlock() - return - } - defer responseBody.Close() - - var ( - previousCPU uint64 - previousSystem uint64 - dec = json.NewDecoder(responseBody) - u = make(chan error, 1) - ) - go func() { - for { - var v *types.StatsJSON - if err := dec.Decode(&v); err != nil { - u <- err - return - } - - var memPercent = 0.0 - var cpuPercent = 0.0 - - // MemoryStats.Limit will never be 0 unless the container is not running and we haven't - // got any data from cgroup - if v.MemoryStats.Limit != 0 { - memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - } - - previousCPU = v.PreCPUStats.CPUUsage.TotalUsage - previousSystem = v.PreCPUStats.SystemUsage - cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v) - blkRead, blkWrite := calculateBlockIO(v.BlkioStats) - s.mu.Lock() - s.CPUPercentage = cpuPercent - s.Memory = float64(v.MemoryStats.Usage) - s.MemoryLimit = float64(v.MemoryStats.Limit) - s.MemoryPercentage = memPercent - s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks) - s.BlockRead = float64(blkRead) - s.BlockWrite = float64(blkWrite) - s.mu.Unlock() - u <- nil - if !streamStats { - return - } - } - }() - for { - select { - case <-time.After(2 * time.Second): - // zero out the values if we have not received an update within - // the specified duration. - s.mu.Lock() - s.CPUPercentage = 0 - s.Memory = 0 - s.MemoryPercentage = 0 - s.MemoryLimit = 0 - s.NetworkRx = 0 - s.NetworkTx = 0 - s.BlockRead = 0 - s.BlockWrite = 0 - s.mu.Unlock() - case err := <-u: - if err != nil { - s.mu.Lock() - s.err = err - s.mu.Unlock() - return - } - } - if !streamStats { - return - } - } -} - -func (s *containerStats) Display(w io.Writer) error { - s.mu.RLock() - defer s.mu.RUnlock() - if s.err != nil { - return s.err - } - fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n", - s.Name, - s.CPUPercentage, - units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit), - s.MemoryPercentage, - units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx), - units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite)) - return nil -} - // CmdStats displays a live stream of resource usage statistics for one or more containers. // // This shows real-time information on CPU usage, memory usage, and network I/O. @@ -157,27 +29,11 @@ func (cli *DockerCli) CmdStats(args ...string) error { names := cmd.Args() showAll := len(names) == 0 - - // The containerChan is the central synchronization piece for this function, - // and all messages to either add or remove an element to the list of - // monitored containers go through this. - // - // - When watching all containers, a goroutine subscribes to the events - // API endpoint and messages this channel accordingly. - // - When watching a particular subset of containers, we feed the - // requested list of containers to this channel. - // - For both codepaths, a goroutine is responsible for watching this - // channel and subscribing to the stats API for containers. - type containerEvent struct { - id string - event string - err error - } - containerChan := make(chan containerEvent) + closeChan := make(chan error) // monitorContainerEvents watches for container creation and removal (only // used when calling `docker stats` without arguments). - monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) { + monitorContainerEvents := func(started chan<- struct{}, c chan events.Message) { f := filters.NewArgs() f.Add("type", "container") options := types.EventsOptions{ @@ -188,91 +44,82 @@ func (cli *DockerCli) CmdStats(args ...string) error { // unblock the main goroutine. close(started) if err != nil { - c <- containerEvent{err: err} + closeChan <- err return } defer resBody.Close() + decodeEvents(resBody, func(event events.Message, err error) error { if err != nil { - c <- containerEvent{"", "", err} - } else { - c <- containerEvent{event.ID[:12], event.Action, err} + closeChan <- err + return nil } + c <- event return nil }) } + cStats := stats{} // getContainerList simulates creation event for all previously existing // containers (only used when calling `docker stats` without arguments). - getContainerList := func(c chan<- containerEvent) { + getContainerList := func() { options := types.ContainerListOptions{ All: *all, } cs, err := cli.client.ContainerList(options) if err != nil { - containerChan <- containerEvent{"", "", err} + closeChan <- err } - for _, c := range cs { - containerChan <- containerEvent{c.ID[:12], "create", nil} + for _, container := range cs { + s := &containerStats{Name: container.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) } } - // Monitor the containerChan and start collection for each container. - cStats := stats{} - closeChan := make(chan error) - go func(stopChan chan<- error, c <-chan containerEvent) { - for { - event := <-c - if event.err != nil { - stopChan <- event.err - return - } - switch event.event { - case "create": - cStats.mu.Lock() - if !cStats.isKnownContainer(event.id) { - s := &containerStats{Name: event.id} - cStats.cs = append(cStats.cs, s) - go s.Collect(cli, !*noStream) - } - cStats.mu.Unlock() - case "stop": - case "die": - if !*all { - var remove int - // cStats cannot be O(1) with a map cause ranging over it would cause - // containers in stats to move up and down in the list...:( - cStats.mu.Lock() - for i, s := range cStats.cs { - if s.Name == event.id { - remove = i - break - } - } - cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...) - cStats.mu.Unlock() - } - } - } - }(closeChan, containerChan) - if showAll { // If no names were specified, start a long running goroutine which // monitors container events. We make sure we're subscribed before // retrieving the list of running containers to avoid a race where we // would "miss" a creation. started := make(chan struct{}) - go monitorContainerEvents(started, containerChan) + eh := eventHandler{handlers: make(map[string]func(events.Message))} + eh.Handle("create", func(e events.Message) { + if *all { + s := &containerStats{Name: e.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) + } + }) + + eh.Handle("start", func(e events.Message) { + s := &containerStats{Name: e.ID[:12]} + cStats.add(s) + go s.Collect(cli.client, !*noStream) + }) + + eh.Handle("die", func(e events.Message) { + if !*all { + cStats.remove(e.ID[:12]) + } + }) + + eventChan := make(chan events.Message) + go eh.Watch(eventChan) + go monitorContainerEvents(started, eventChan) + defer close(eventChan) <-started // Start a short-lived goroutine to retrieve the initial list of // containers. - go getContainerList(containerChan) + go getContainerList() } else { // Artificially send creation events for the containers we were asked to // monitor (same code path than we use when monitoring all containers). for _, name := range names { - containerChan <- containerEvent{name, "create", nil} + s := &containerStats{Name: name} + cStats.add(s) + go s.Collect(cli.client, !*noStream) } // We don't expect any asynchronous errors: closeChan can be closed. @@ -304,6 +151,7 @@ func (cli *DockerCli) CmdStats(args ...string) error { } io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n") } + for range time.Tick(500 * time.Millisecond) { printHeader() toRemove := []int{} @@ -343,40 +191,3 @@ func (cli *DockerCli) CmdStats(args ...string) error { } return nil } - -func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { - var ( - cpuPercent = 0.0 - // calculate the change for the cpu usage of the container in between readings - cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) - // calculate the change for the entire system between readings - systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) - ) - - if systemDelta > 0.0 && cpuDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 - } - return cpuPercent -} - -func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) { - for _, bioEntry := range blkio.IoServiceBytesRecursive { - switch strings.ToLower(bioEntry.Op) { - case "read": - blkRead = blkRead + bioEntry.Value - case "write": - blkWrite = blkWrite + bioEntry.Value - } - } - return -} - -func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) { - var rx, tx float64 - - for _, v := range network { - rx += float64(v.RxBytes) - tx += float64(v.TxBytes) - } - return rx, tx -} diff --git a/api/client/stats_helpers.go b/api/client/stats_helpers.go new file mode 100644 index 0000000000..985a83fb0e --- /dev/null +++ b/api/client/stats_helpers.go @@ -0,0 +1,193 @@ +package client + +import ( + "encoding/json" + "fmt" + "io" + "strings" + "sync" + "time" + + "github.com/docker/engine-api/client" + "github.com/docker/engine-api/types" + "github.com/docker/go-units" + "golang.org/x/net/context" +) + +type containerStats struct { + Name string + CPUPercentage float64 + Memory float64 + MemoryLimit float64 + MemoryPercentage float64 + NetworkRx float64 + NetworkTx float64 + BlockRead float64 + BlockWrite float64 + mu sync.RWMutex + err error +} + +type stats struct { + mu sync.Mutex + cs []*containerStats +} + +func (s *stats) add(cs *containerStats) { + s.mu.Lock() + if _, exists := s.isKnownContainer(cs.Name); !exists { + s.cs = append(s.cs, cs) + } + s.mu.Unlock() +} + +func (s *stats) remove(id string) { + s.mu.Lock() + if i, exists := s.isKnownContainer(id); exists { + s.cs = append(s.cs[:i], s.cs[i+1:]...) + } + s.mu.Unlock() +} + +func (s *stats) isKnownContainer(cid string) (int, bool) { + for i, c := range s.cs { + if c.Name == cid { + return i, true + } + } + return -1, false +} + +func (s *containerStats) Collect(cli client.APIClient, streamStats bool) { + responseBody, err := cli.ContainerStats(context.Background(), s.Name, streamStats) + if err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + return + } + defer responseBody.Close() + + var ( + previousCPU uint64 + previousSystem uint64 + dec = json.NewDecoder(responseBody) + u = make(chan error, 1) + ) + go func() { + for { + var v *types.StatsJSON + if err := dec.Decode(&v); err != nil { + u <- err + return + } + + var memPercent = 0.0 + var cpuPercent = 0.0 + + // MemoryStats.Limit will never be 0 unless the container is not running and we haven't + // got any data from cgroup + if v.MemoryStats.Limit != 0 { + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + } + + previousCPU = v.PreCPUStats.CPUUsage.TotalUsage + previousSystem = v.PreCPUStats.SystemUsage + cpuPercent = calculateCPUPercent(previousCPU, previousSystem, v) + blkRead, blkWrite := calculateBlockIO(v.BlkioStats) + s.mu.Lock() + s.CPUPercentage = cpuPercent + s.Memory = float64(v.MemoryStats.Usage) + s.MemoryLimit = float64(v.MemoryStats.Limit) + s.MemoryPercentage = memPercent + s.NetworkRx, s.NetworkTx = calculateNetwork(v.Networks) + s.BlockRead = float64(blkRead) + s.BlockWrite = float64(blkWrite) + s.mu.Unlock() + u <- nil + if !streamStats { + return + } + } + }() + for { + select { + case <-time.After(2 * time.Second): + // zero out the values if we have not received an update within + // the specified duration. + s.mu.Lock() + s.CPUPercentage = 0 + s.Memory = 0 + s.MemoryPercentage = 0 + s.MemoryLimit = 0 + s.NetworkRx = 0 + s.NetworkTx = 0 + s.BlockRead = 0 + s.BlockWrite = 0 + s.mu.Unlock() + case err := <-u: + if err != nil { + s.mu.Lock() + s.err = err + s.mu.Unlock() + return + } + } + if !streamStats { + return + } + } +} + +func (s *containerStats) Display(w io.Writer) error { + s.mu.RLock() + defer s.mu.RUnlock() + if s.err != nil { + return s.err + } + fmt.Fprintf(w, "%s\t%.2f%%\t%s / %s\t%.2f%%\t%s / %s\t%s / %s\n", + s.Name, + s.CPUPercentage, + units.HumanSize(s.Memory), units.HumanSize(s.MemoryLimit), + s.MemoryPercentage, + units.HumanSize(s.NetworkRx), units.HumanSize(s.NetworkTx), + units.HumanSize(s.BlockRead), units.HumanSize(s.BlockWrite)) + return nil +} + +func calculateCPUPercent(previousCPU, previousSystem uint64, v *types.StatsJSON) float64 { + var ( + cpuPercent = 0.0 + // calculate the change for the cpu usage of the container in between readings + cpuDelta = float64(v.CPUStats.CPUUsage.TotalUsage) - float64(previousCPU) + // calculate the change for the entire system between readings + systemDelta = float64(v.CPUStats.SystemUsage) - float64(previousSystem) + ) + + if systemDelta > 0.0 && cpuDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(len(v.CPUStats.CPUUsage.PercpuUsage)) * 100.0 + } + return cpuPercent +} + +func calculateBlockIO(blkio types.BlkioStats) (blkRead uint64, blkWrite uint64) { + for _, bioEntry := range blkio.IoServiceBytesRecursive { + switch strings.ToLower(bioEntry.Op) { + case "read": + blkRead = blkRead + bioEntry.Value + case "write": + blkWrite = blkWrite + bioEntry.Value + } + } + return +} + +func calculateNetwork(network map[string]types.NetworkStats) (float64, float64) { + var rx, tx float64 + + for _, v := range network { + rx += float64(v.RxBytes) + tx += float64(v.TxBytes) + } + return rx, tx +}