diff --git a/api/client/stats.go b/api/client/stats.go index 0bfbe43f9d..ccb924de7e 100644 --- a/api/client/stats.go +++ b/api/client/stats.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "io" - "sort" "strings" "sync" "text/tabwriter" @@ -38,6 +37,15 @@ type stats struct { cs []*containerStats } +func (s *stats) isKnownContainer(cid string) bool { + for _, c := range s.cs { + if c.Name == cid { + return true + } + } + return false +} + func (s *containerStats) Collect(cli *DockerCli, streamStats bool) { responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats) if err != nil { @@ -150,27 +158,145 @@ func (cli *DockerCli) CmdStats(args ...string) error { names := cmd.Args() showAll := len(names) == 0 - if showAll { + // The containerChan is the central synchronization piece for this function, + // and all messages to either add or remove an element to the list of + // monitored containers go through this. + // + // - When watching all containers, a goroutine subscribes to the events + // API endpoint and messages this channel accordingly. + // - When watching a particular subset of containers, we feed the + // requested list of containers to this channel. + // - For both codepaths, a goroutine is responsible for watching this + // channel and subscribing to the stats API for containers. + type containerEvent struct { + id string + event string + err error + } + containerChan := make(chan containerEvent) + + // monitorContainerEvents watches for container creation and removal (only + // used when calling `docker stats` without arguments). + monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) { + f := filters.NewArgs() + f.Add("type", "container") + options := types.EventsOptions{ + Filters: f, + } + resBody, err := cli.client.Events(context.Background(), options) + // Whether we successfully subscribed to events or not, we can now + // unblock the main goroutine. + close(started) + if err != nil { + c <- containerEvent{err: err} + return + } + defer resBody.Close() + decodeEvents(resBody, func(event events.Message, err error) error { + if err != nil { + c <- containerEvent{"", "", err} + } else { + c <- containerEvent{event.ID[:12], event.Action, err} + } + return nil + }) + } + + // getContainerList simulates creation event for all previously existing + // containers (only used when calling `docker stats` without arguments). + getContainerList := func(c chan<- containerEvent) { options := types.ContainerListOptions{ All: *all, } cs, err := cli.client.ContainerList(options) if err != nil { - return err + containerChan <- containerEvent{"", "", err} } for _, c := range cs { - names = append(names, c.ID[:12]) + containerChan <- containerEvent{c.ID[:12], "create", nil} } } - if len(names) == 0 && !showAll { - return fmt.Errorf("No containers found") - } - sort.Strings(names) - var ( - cStats = stats{} - w = tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) - ) + // Monitor the containerChan and start collection for each container. + cStats := stats{} + closeChan := make(chan error) + go func(stopChan chan<- error, c <-chan containerEvent) { + for { + event := <-c + if event.err != nil { + stopChan <- event.err + return + } + switch event.event { + case "create": + cStats.mu.Lock() + if !cStats.isKnownContainer(event.id) { + s := &containerStats{Name: event.id} + cStats.cs = append(cStats.cs, s) + go s.Collect(cli, !*noStream) + } + cStats.mu.Unlock() + case "stop": + case "die": + if !*all { + var remove int + // cStats cannot be O(1) with a map cause ranging over it would cause + // containers in stats to move up and down in the list...:( + cStats.mu.Lock() + for i, s := range cStats.cs { + if s.Name == event.id { + remove = i + break + } + } + cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...) + cStats.mu.Unlock() + } + } + } + }(closeChan, containerChan) + + if showAll { + // If no names were specified, start a long running goroutine which + // monitors container events. We make sure we're subscribed before + // retrieving the list of running containers to avoid a race where we + // would "miss" a creation. + started := make(chan struct{}) + go monitorContainerEvents(started, containerChan) + <-started + + // Start a short-lived goroutine to retrieve the initial list of + // containers. + go getContainerList(containerChan) + } else { + // Artificially send creation events for the containers we were asked to + // monitor (same code path than we use when monitoring all containers). + for _, name := range names { + containerChan <- containerEvent{name, "create", nil} + } + + // We don't expect any asynchronous errors: closeChan can be closed. + close(closeChan) + + // Do a quick pause to detect any error with the provided list of + // container names. + time.Sleep(1500 * time.Millisecond) + var errs []string + cStats.mu.Lock() + for _, c := range cStats.cs { + c.mu.Lock() + if c.err != nil { + errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err)) + } + c.mu.Unlock() + } + cStats.mu.Unlock() + if len(errs) > 0 { + return fmt.Errorf("%s", strings.Join(errs, ", ")) + } + } + + w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) printHeader := func() { if !*noStream { fmt.Fprint(cli.out, "\033[2J") @@ -178,96 +304,6 @@ func (cli *DockerCli) CmdStats(args ...string) error { } io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n") } - for _, n := range names { - s := &containerStats{Name: n} - // no need to lock here since only the main goroutine is running here - cStats.cs = append(cStats.cs, s) - go s.Collect(cli, !*noStream) - } - closeChan := make(chan error) - if showAll { - type watch struct { - cid string - event string - err error - } - getNewContainers := func(c chan<- watch) { - f := filters.NewArgs() - f.Add("type", "container") - options := types.EventsOptions{ - Filters: f, - } - resBody, err := cli.client.Events(context.Background(), options) - if err != nil { - c <- watch{err: err} - return - } - defer resBody.Close() - - decodeEvents(resBody, func(event events.Message, err error) error { - if err != nil { - c <- watch{err: err} - return nil - } - - c <- watch{event.ID[:12], event.Action, nil} - return nil - }) - } - go func(stopChan chan<- error) { - cChan := make(chan watch) - go getNewContainers(cChan) - for { - c := <-cChan - if c.err != nil { - stopChan <- c.err - return - } - switch c.event { - case "create": - s := &containerStats{Name: c.cid} - cStats.mu.Lock() - cStats.cs = append(cStats.cs, s) - cStats.mu.Unlock() - go s.Collect(cli, !*noStream) - case "stop": - case "die": - if !*all { - var remove int - // cStats cannot be O(1) with a map cause ranging over it would cause - // containers in stats to move up and down in the list...:( - cStats.mu.Lock() - for i, s := range cStats.cs { - if s.Name == c.cid { - remove = i - break - } - } - cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...) - cStats.mu.Unlock() - } - } - } - }(closeChan) - } else { - close(closeChan) - } - // do a quick pause so that any failed connections for containers that do not exist are able to be - // evicted before we display the initial or default values. - time.Sleep(1500 * time.Millisecond) - var errs []string - cStats.mu.Lock() - for _, c := range cStats.cs { - c.mu.Lock() - if c.err != nil { - errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err)) - } - c.mu.Unlock() - } - cStats.mu.Unlock() - if len(errs) > 0 { - return fmt.Errorf("%s", strings.Join(errs, ", ")) - } for range time.Tick(500 * time.Millisecond) { printHeader() toRemove := []int{}