From 4f174aa79276c12a1b2b98df2f02d6bee36b7a93 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Wed, 7 Jan 2015 18:02:08 -0800 Subject: [PATCH] Evict stopped containers Signed-off-by: Michael Crosby --- api/client/commands.go | 97 ++++++++++---------- api/client/sort.go | 29 ++++++ {stats => api/stats}/stats.go | 4 +- daemon/container.go | 3 - daemon/daemon.go | 11 ++- daemon/delete.go | 3 + daemon/execdriver/driver.go | 2 +- daemon/execdriver/execdrivers/execdrivers.go | 2 +- daemon/execdriver/lxc/driver.go | 2 +- daemon/execdriver/native/driver.go | 7 +- daemon/start.go | 25 ----- daemon/stats.go | 29 ++++++ daemon/stats_collector.go | 72 ++++++++++++--- docker/flags.go | 1 + 14 files changed, 192 insertions(+), 95 deletions(-) create mode 100644 api/client/sort.go rename {stats => api/stats}/stats.go (96%) create mode 100644 daemon/stats.go diff --git a/api/client/commands.go b/api/client/commands.go index 6c6595c243..34ca32c29d 100644 --- a/api/client/commands.go +++ b/api/client/commands.go @@ -16,15 +16,16 @@ import ( "path" "path/filepath" "runtime" - "sort" "strconv" "strings" + "sync" "text/tabwriter" "text/template" "time" log "github.com/Sirupsen/logrus" "github.com/docker/docker/api" + "github.com/docker/docker/api/stats" "github.com/docker/docker/dockerversion" "github.com/docker/docker/engine" "github.com/docker/docker/graph" @@ -43,7 +44,6 @@ import ( "github.com/docker/docker/pkg/urlutil" "github.com/docker/docker/registry" "github.com/docker/docker/runconfig" - "github.com/docker/docker/stats" "github.com/docker/docker/utils" "github.com/docker/libtrust" ) @@ -2625,25 +2625,10 @@ type containerStats struct { Name string CpuPercentage float64 Memory float64 + MemoryLimit float64 MemoryPercentage float64 - NetworkRx int - NetworkTx int -} - -type statSorter struct { - stats []containerStats -} - -func (s *statSorter) Len() int { - return len(s.stats) -} - -func (s *statSorter) Swap(i, j int) { - s.stats[i], s.stats[j] = s.stats[j], s.stats[i] -} - -func (s *statSorter) Less(i, j int) bool { - return s.stats[i].Name < s.stats[j].Name + NetworkRx float64 + NetworkTx float64 } func (cli *DockerCli) CmdStats(args ...string) error { @@ -2651,40 +2636,49 @@ func (cli *DockerCli) CmdStats(args ...string) error { cmd.Require(flag.Min, 1) utils.ParseFlags(cmd, args, true) + m := &sync.Mutex{} cStats := map[string]containerStats{} for _, name := range cmd.Args() { - go cli.streamStats(name, cStats) + go cli.streamStats(name, cStats, m) } w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0) - for _ = range time.Tick(1000 * time.Millisecond) { + for _ = range time.Tick(500 * time.Millisecond) { fmt.Fprint(cli.out, "\033[2J") fmt.Fprint(cli.out, "\033[H") - fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM\tMEM %\tNET I/O") - sStats := []containerStats{} - for _, s := range cStats { - sStats = append(sStats, s) - } - sorter := &statSorter{sStats} - sort.Sort(sorter) - for _, s := range sStats { - fmt.Fprintf(w, "%s\t%f%%\t%s\t%f%%\t%d/%d\n", + fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O") + m.Lock() + ss := sortStatsByName(cStats) + m.Unlock() + for _, s := range ss { + fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n", s.Name, s.CpuPercentage, - units.HumanSize(s.Memory), + units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit), s.MemoryPercentage, - s.NetworkRx, s.NetworkTx) + units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx)) } w.Flush() } return nil } -func (cli *DockerCli) streamStats(name string, data map[string]containerStats) error { +func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m *sync.Mutex) error { + m.Lock() + data[name] = containerStats{ + Name: name, + } + m.Unlock() + stream, _, err := cli.call("GET", "/containers/"+name+"/stats", nil, false) if err != nil { return err } - + defer func() { + stream.Close() + m.Lock() + delete(data, name) + m.Unlock() + }() var ( previousCpu uint64 previousSystem uint64 @@ -2696,30 +2690,37 @@ func (cli *DockerCli) streamStats(name string, data map[string]containerStats) e if err := dec.Decode(&v); err != nil { return err } - memPercent := float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 - cpuPercent := 0.0 - + var ( + memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0 + cpuPercent = 0.0 + ) if !start { - cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) - systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) - - if systemDelta > 0.0 { - cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) - } + cpuPercent = calcuateCpuPercent(previousCpu, previousSystem, v) } start = false + m.Lock() d := data[name] - d.Name = name d.CpuPercentage = cpuPercent d.Memory = float64(v.MemoryStats.Usage) + d.MemoryLimit = float64(v.MemoryStats.Limit) d.MemoryPercentage = memPercent - d.NetworkRx = int(v.Network.RxBytes) - d.NetworkTx = int(v.Network.TxBytes) + d.NetworkRx = float64(v.Network.RxBytes) + d.NetworkTx = float64(v.Network.TxBytes) data[name] = d + m.Unlock() previousCpu = v.CpuStats.CpuUsage.TotalUsage previousSystem = v.CpuStats.SystemUsage } return nil - +} + +func calcuateCpuPercent(previousCpu, previousSystem uint64, v *stats.Stats) float64 { + cpuPercent := 0.0 + cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu) + systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks) + if systemDelta > 0.0 { + cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage)) + } + return cpuPercent } diff --git a/api/client/sort.go b/api/client/sort.go new file mode 100644 index 0000000000..1b8232c3f8 --- /dev/null +++ b/api/client/sort.go @@ -0,0 +1,29 @@ +package client + +import "sort" + +func sortStatsByName(cStats map[string]containerStats) []containerStats { + sStats := []containerStats{} + for _, s := range cStats { + sStats = append(sStats, s) + } + sorter := &statSorter{sStats} + sort.Sort(sorter) + return sStats +} + +type statSorter struct { + stats []containerStats +} + +func (s *statSorter) Len() int { + return len(s.stats) +} + +func (s *statSorter) Swap(i, j int) { + s.stats[i], s.stats[j] = s.stats[j], s.stats[i] +} + +func (s *statSorter) Less(i, j int) bool { + return s.stats[i].Name < s.stats[j].Name +} diff --git a/stats/stats.go b/api/stats/stats.go similarity index 96% rename from stats/stats.go rename to api/stats/stats.go index e151014f3b..b2820f2439 100644 --- a/stats/stats.go +++ b/api/stats/stats.go @@ -16,7 +16,7 @@ type ThrottlingData struct { ThrottledTime uint64 `json:"throttled_time,omitempty"` } -// All CPU stats are aggregate since container inception. +// All CPU stats are aggregated since container inception. type CpuUsage struct { // Total CPU time consumed. // Units: nanoseconds. @@ -91,6 +91,8 @@ type Stats struct { BlkioStats BlkioStats `json:"blkio_stats,omitempty"` } +// ToStats converts the libcontainer.ContainerStats to the api specific +// structs. This is done to preserve API compatibility and versioning. func ToStats(ls *libcontainer.ContainerStats) *Stats { s := &Stats{} if ls.NetworkStats != nil { diff --git a/daemon/container.go b/daemon/container.go index 4a02328783..046ec71e8a 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -1416,8 +1416,5 @@ func (container *Container) getNetworkedContainer() (*Container, error) { } func (container *Container) Stats() (*execdriver.ResourceStats, error) { - if !container.IsRunning() { - return nil, fmt.Errorf("cannot collect stats on a non running container") - } return container.daemon.Stats(container) } diff --git a/daemon/daemon.go b/daemon/daemon.go index 01d8245dee..82d2bc757b 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -1099,7 +1099,7 @@ func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) { return daemon.execDriver.Stats(c.ID) } -func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) { +func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.ResourceStats, error) { c := daemon.Get(name) if c == nil { return nil, fmt.Errorf("no such container") @@ -1108,6 +1108,15 @@ func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver return ch, nil } +func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan *execdriver.ResourceStats) error { + c := daemon.Get(name) + if c == nil { + return fmt.Errorf("no such container") + } + daemon.statsCollector.unsubscribe(c, ch) + return nil +} + // Nuke kills all containers then removes all content // from the content root, including images, volumes and // container filesystems. diff --git a/daemon/delete.go b/daemon/delete.go index 990e4b448a..59c7651785 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -49,6 +49,9 @@ func (daemon *Daemon) ContainerRm(job *engine.Job) engine.Status { } if container != nil { + // stop collection of stats for the container regardless + // if stats are currently getting collected. + daemon.statsCollector.stopCollection(container) if container.IsRunning() { if forceRemove { if err := container.Kill(); err != nil { diff --git a/daemon/execdriver/driver.go b/daemon/execdriver/driver.go index f33f1671db..f6e0ac7284 100644 --- a/daemon/execdriver/driver.go +++ b/daemon/execdriver/driver.go @@ -16,7 +16,7 @@ import ( type Context map[string]string var ( - ErrNotRunning = errors.New("Process could not be started") + ErrNotRunning = errors.New("Container is not running") ErrWaitTimeoutReached = errors.New("Wait timeout reached") ErrDriverAlreadyRegistered = errors.New("A driver already registered this docker init function") ErrDriverNotFound = errors.New("The requested docker init has not been found") diff --git a/daemon/execdriver/execdrivers/execdrivers.go b/daemon/execdriver/execdrivers/execdrivers.go index b7dd98cf3d..a665985d10 100644 --- a/daemon/execdriver/execdrivers/execdrivers.go +++ b/daemon/execdriver/execdrivers/execdrivers.go @@ -24,7 +24,7 @@ func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdrive // to be backwards compatible return lxc.NewDriver(root, initPath, sysInfo.AppArmor) case "native": - return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal/1000) + return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal) } return nil, fmt.Errorf("unknown exec driver %s", name) } diff --git a/daemon/execdriver/lxc/driver.go b/daemon/execdriver/lxc/driver.go index 7dca19d762..44942b1fe0 100644 --- a/daemon/execdriver/lxc/driver.go +++ b/daemon/execdriver/lxc/driver.go @@ -526,6 +526,6 @@ func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo } func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { - return nil, fmt.Errorf("container stats are not support with LXC") + return nil, fmt.Errorf("container stats are not supported with LXC") } diff --git a/daemon/execdriver/native/driver.go b/daemon/execdriver/native/driver.go index 83e07f392e..450d7e5f3d 100644 --- a/daemon/execdriver/native/driver.go +++ b/daemon/execdriver/native/driver.go @@ -284,6 +284,9 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { c := d.activeContainers[id] state, err := libcontainer.GetState(filepath.Join(d.root, id)) if err != nil { + if os.IsNotExist(err) { + return nil, execdriver.ErrNotRunning + } return nil, err } now := time.Now() @@ -292,13 +295,15 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) { return nil, err } memoryLimit := c.container.Cgroups.Memory + // if the container does not have any memory limit specified set the + // limit to the machines memory if memoryLimit == 0 { memoryLimit = d.machineMemory } return &execdriver.ResourceStats{ + Read: now, ContainerStats: stats, ClockTicks: system.GetClockTicks(), - Read: now, MemoryLimit: memoryLimit, }, nil } diff --git a/daemon/start.go b/daemon/start.go index 150a87f575..363461080f 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -1,14 +1,12 @@ package daemon import ( - "encoding/json" "fmt" "os" "strings" "github.com/docker/docker/engine" "github.com/docker/docker/runconfig" - "github.com/docker/docker/stats" ) func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status { @@ -79,26 +77,3 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig. return nil } - -func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { - s, err := daemon.SubscribeToContainerStats(job.Args[0]) - if err != nil { - return job.Error(err) - } - enc := json.NewEncoder(job.Stdout) - for update := range s { - ss := stats.ToStats(update.ContainerStats) - ss.MemoryStats.Limit = uint64(update.MemoryLimit) - ss.Read = update.Read - ss.ClockTicks = update.ClockTicks - ss.CpuStats.SystemUsage = update.SystemUsage - if err := enc.Encode(ss); err != nil { - return job.Error(err) - } - } - return engine.StatusOK -} - -func mapToAPIStats() { - -} diff --git a/daemon/stats.go b/daemon/stats.go new file mode 100644 index 0000000000..5db1cf6081 --- /dev/null +++ b/daemon/stats.go @@ -0,0 +1,29 @@ +package daemon + +import ( + "encoding/json" + + "github.com/docker/docker/api/stats" + "github.com/docker/docker/engine" +) + +func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status { + s, err := daemon.SubscribeToContainerStats(job.Args[0]) + if err != nil { + return job.Error(err) + } + enc := json.NewEncoder(job.Stdout) + for update := range s { + ss := stats.ToStats(update.ContainerStats) + ss.MemoryStats.Limit = uint64(update.MemoryLimit) + ss.Read = update.Read + ss.ClockTicks = update.ClockTicks + ss.CpuStats.SystemUsage = update.SystemUsage + if err := enc.Encode(ss); err != nil { + // TODO: handle the specific broken pipe + daemon.UnsubscribeToContainerStats(job.Args[0], s) + return job.Error(err) + } + } + return engine.StatusOK +} diff --git a/daemon/stats_collector.go b/daemon/stats_collector.go index a21092a856..8b5662db14 100644 --- a/daemon/stats_collector.go +++ b/daemon/stats_collector.go @@ -13,16 +13,20 @@ import ( "github.com/docker/docker/daemon/execdriver" ) +// newStatsCollector returns a new statsCollector that collections +// network and cgroup stats for a registered container at the specified +// interval. The collector allows non-running containers to be added +// and will start processing stats when they are started. func newStatsCollector(interval time.Duration) *statsCollector { s := &statsCollector{ interval: interval, - containers: make(map[string]*statsCollectorData), + containers: make(map[string]*statsData), } s.start() return s } -type statsCollectorData struct { +type statsData struct { c *Container lastStats *execdriver.ResourceStats subs []chan *execdriver.ResourceStats @@ -32,43 +36,86 @@ type statsCollectorData struct { type statsCollector struct { m sync.Mutex interval time.Duration - containers map[string]*statsCollectorData + containers map[string]*statsData } -func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats { +// collect registers the container with the collector and adds it to +// the event loop for collection on the specified interval returning +// a channel for the subscriber to receive on. +func (s *statsCollector) collect(c *Container) chan *execdriver.ResourceStats { s.m.Lock() + defer s.m.Unlock() ch := make(chan *execdriver.ResourceStats, 1024) - s.containers[c.ID] = &statsCollectorData{ + if _, exists := s.containers[c.ID]; exists { + s.containers[c.ID].subs = append(s.containers[c.ID].subs, ch) + return ch + } + s.containers[c.ID] = &statsData{ c: c, subs: []chan *execdriver.ResourceStats{ ch, }, } - s.m.Unlock() return ch } +// stopCollection closes the channels for all subscribers and removes +// the container from metrics collection. func (s *statsCollector) stopCollection(c *Container) { s.m.Lock() + defer s.m.Unlock() + d := s.containers[c.ID] + if d == nil { + return + } + for _, sub := range d.subs { + close(sub) + } delete(s.containers, c.ID) +} + +// unsubscribe removes a specific subscriber from receiving updates for a +// container's stats. +func (s *statsCollector) unsubscribe(c *Container, ch chan *execdriver.ResourceStats) { + s.m.Lock() + cd := s.containers[c.ID] + for i, sub := range cd.subs { + if ch == sub { + cd.subs = append(cd.subs[:i], cd.subs[i+1:]...) + close(ch) + } + } + // if there are no more subscribers then remove the entire container + // from collection. + if len(cd.subs) == 0 { + delete(s.containers, c.ID) + } s.m.Unlock() } func (s *statsCollector) start() { go func() { for _ = range time.Tick(s.interval) { - log.Debugf("starting collection of container stats") s.m.Lock() for id, d := range s.containers { - systemUsage, err := getSystemCpuUsage() + systemUsage, err := s.getSystemCpuUsage() if err != nil { log.Errorf("collecting system cpu usage for %s: %v", id, err) continue } stats, err := d.c.Stats() if err != nil { - // TODO: @crosbymichael evict container depending on error + if err == execdriver.ErrNotRunning { + continue + } + // if the error is not because the container is currently running then + // evict the container from the collector and close the channel for + // any subscribers currently waiting on changes. log.Errorf("collecting stats for %s: %v", id, err) + for _, sub := range s.containers[id].subs { + close(sub) + } + delete(s.containers, id) continue } stats.SystemUsage = systemUsage @@ -81,14 +128,14 @@ func (s *statsCollector) start() { }() } -// returns value in nanoseconds -func getSystemCpuUsage() (uint64, error) { +// getSystemdCpuUSage returns the host system's cpu usage +// in nanoseconds. +func (s *statsCollector) getSystemCpuUsage() (uint64, error) { f, err := os.Open("/proc/stat") if err != nil { return 0, err } defer f.Close() - sc := bufio.NewScanner(f) for sc.Scan() { parts := strings.Fields(sc.Text()) @@ -97,7 +144,6 @@ func getSystemCpuUsage() (uint64, error) { if len(parts) < 8 { return 0, fmt.Errorf("invalid number of cpu fields") } - var total uint64 for _, i := range parts[1:8] { v, err := strconv.ParseUint(i, 10, 64) diff --git a/docker/flags.go b/docker/flags.go index 719acbe93f..8db636e5ce 100644 --- a/docker/flags.go +++ b/docker/flags.go @@ -98,6 +98,7 @@ func init() { {"save", "Save an image to a tar archive"}, {"search", "Search for an image on the Docker Hub"}, {"start", "Start a stopped container"}, + {"stats", "Receive container stats"}, {"stop", "Stop a running container"}, {"tag", "Tag an image into a repository"}, {"top", "Lookup the running processes of a container"},