Merge pull request #20753 from icecrime/debugging_win2lin

Fix client-side race in `docker stats`
This commit is contained in:
Antonio Murdaca 2016-02-29 08:20:17 +01:00
commit 5ef74c6595
1 changed files with 138 additions and 102 deletions

View File

@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"sync"
"text/tabwriter"
@ -38,6 +37,15 @@ type stats struct {
cs []*containerStats
}
func (s *stats) isKnownContainer(cid string) bool {
for _, c := range s.cs {
if c.Name == cid {
return true
}
}
return false
}
func (s *containerStats) Collect(cli *DockerCli, streamStats bool) {
responseBody, err := cli.client.ContainerStats(context.Background(), s.Name, streamStats)
if err != nil {
@ -150,27 +158,145 @@ func (cli *DockerCli) CmdStats(args ...string) error {
names := cmd.Args()
showAll := len(names) == 0
if showAll {
// The containerChan is the central synchronization piece for this function,
// and all messages to either add or remove an element to the list of
// monitored containers go through this.
//
// - When watching all containers, a goroutine subscribes to the events
// API endpoint and messages this channel accordingly.
// - When watching a particular subset of containers, we feed the
// requested list of containers to this channel.
// - For both codepaths, a goroutine is responsible for watching this
// channel and subscribing to the stats API for containers.
type containerEvent struct {
id string
event string
err error
}
containerChan := make(chan containerEvent)
// monitorContainerEvents watches for container creation and removal (only
// used when calling `docker stats` without arguments).
monitorContainerEvents := func(started chan<- struct{}, c chan<- containerEvent) {
f := filters.NewArgs()
f.Add("type", "container")
options := types.EventsOptions{
Filters: f,
}
resBody, err := cli.client.Events(context.Background(), options)
// Whether we successfully subscribed to events or not, we can now
// unblock the main goroutine.
close(started)
if err != nil {
c <- containerEvent{err: err}
return
}
defer resBody.Close()
decodeEvents(resBody, func(event events.Message, err error) error {
if err != nil {
c <- containerEvent{"", "", err}
} else {
c <- containerEvent{event.ID[:12], event.Action, err}
}
return nil
})
}
// getContainerList simulates creation event for all previously existing
// containers (only used when calling `docker stats` without arguments).
getContainerList := func(c chan<- containerEvent) {
options := types.ContainerListOptions{
All: *all,
}
cs, err := cli.client.ContainerList(options)
if err != nil {
return err
containerChan <- containerEvent{"", "", err}
}
for _, c := range cs {
names = append(names, c.ID[:12])
containerChan <- containerEvent{c.ID[:12], "create", nil}
}
}
if len(names) == 0 && !showAll {
return fmt.Errorf("No containers found")
}
sort.Strings(names)
var (
cStats = stats{}
w = tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
)
// Monitor the containerChan and start collection for each container.
cStats := stats{}
closeChan := make(chan error)
go func(stopChan chan<- error, c <-chan containerEvent) {
for {
event := <-c
if event.err != nil {
stopChan <- event.err
return
}
switch event.event {
case "create":
cStats.mu.Lock()
if !cStats.isKnownContainer(event.id) {
s := &containerStats{Name: event.id}
cStats.cs = append(cStats.cs, s)
go s.Collect(cli, !*noStream)
}
cStats.mu.Unlock()
case "stop":
case "die":
if !*all {
var remove int
// cStats cannot be O(1) with a map cause ranging over it would cause
// containers in stats to move up and down in the list...:(
cStats.mu.Lock()
for i, s := range cStats.cs {
if s.Name == event.id {
remove = i
break
}
}
cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
cStats.mu.Unlock()
}
}
}
}(closeChan, containerChan)
if showAll {
// If no names were specified, start a long running goroutine which
// monitors container events. We make sure we're subscribed before
// retrieving the list of running containers to avoid a race where we
// would "miss" a creation.
started := make(chan struct{})
go monitorContainerEvents(started, containerChan)
<-started
// Start a short-lived goroutine to retrieve the initial list of
// containers.
go getContainerList(containerChan)
} else {
// Artificially send creation events for the containers we were asked to
// monitor (same code path than we use when monitoring all containers).
for _, name := range names {
containerChan <- containerEvent{name, "create", nil}
}
// We don't expect any asynchronous errors: closeChan can be closed.
close(closeChan)
// Do a quick pause to detect any error with the provided list of
// container names.
time.Sleep(1500 * time.Millisecond)
var errs []string
cStats.mu.Lock()
for _, c := range cStats.cs {
c.mu.Lock()
if c.err != nil {
errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err))
}
c.mu.Unlock()
}
cStats.mu.Unlock()
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ", "))
}
}
w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
printHeader := func() {
if !*noStream {
fmt.Fprint(cli.out, "\033[2J")
@ -178,96 +304,6 @@ func (cli *DockerCli) CmdStats(args ...string) error {
}
io.WriteString(w, "CONTAINER\tCPU %\tMEM USAGE / LIMIT\tMEM %\tNET I/O\tBLOCK I/O\n")
}
for _, n := range names {
s := &containerStats{Name: n}
// no need to lock here since only the main goroutine is running here
cStats.cs = append(cStats.cs, s)
go s.Collect(cli, !*noStream)
}
closeChan := make(chan error)
if showAll {
type watch struct {
cid string
event string
err error
}
getNewContainers := func(c chan<- watch) {
f := filters.NewArgs()
f.Add("type", "container")
options := types.EventsOptions{
Filters: f,
}
resBody, err := cli.client.Events(context.Background(), options)
if err != nil {
c <- watch{err: err}
return
}
defer resBody.Close()
decodeEvents(resBody, func(event events.Message, err error) error {
if err != nil {
c <- watch{err: err}
return nil
}
c <- watch{event.ID[:12], event.Action, nil}
return nil
})
}
go func(stopChan chan<- error) {
cChan := make(chan watch)
go getNewContainers(cChan)
for {
c := <-cChan
if c.err != nil {
stopChan <- c.err
return
}
switch c.event {
case "create":
s := &containerStats{Name: c.cid}
cStats.mu.Lock()
cStats.cs = append(cStats.cs, s)
cStats.mu.Unlock()
go s.Collect(cli, !*noStream)
case "stop":
case "die":
if !*all {
var remove int
// cStats cannot be O(1) with a map cause ranging over it would cause
// containers in stats to move up and down in the list...:(
cStats.mu.Lock()
for i, s := range cStats.cs {
if s.Name == c.cid {
remove = i
break
}
}
cStats.cs = append(cStats.cs[:remove], cStats.cs[remove+1:]...)
cStats.mu.Unlock()
}
}
}
}(closeChan)
} else {
close(closeChan)
}
// do a quick pause so that any failed connections for containers that do not exist are able to be
// evicted before we display the initial or default values.
time.Sleep(1500 * time.Millisecond)
var errs []string
cStats.mu.Lock()
for _, c := range cStats.cs {
c.mu.Lock()
if c.err != nil {
errs = append(errs, fmt.Sprintf("%s: %v", c.Name, c.err))
}
c.mu.Unlock()
}
cStats.mu.Unlock()
if len(errs) > 0 {
return fmt.Errorf("%s", strings.Join(errs, ", "))
}
for range time.Tick(500 * time.Millisecond) {
printHeader()
toRemove := []int{}