Evict stopped containers

Signed-off-by: Michael Crosby <crosbymichael@gmail.com>
This commit is contained in:
Michael Crosby 2015-01-07 18:02:08 -08:00
parent 2640a10bca
commit 4f174aa792
14 changed files with 192 additions and 95 deletions

View File

@ -16,15 +16,16 @@ import (
"path"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
"sync"
"text/tabwriter"
"text/template"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/api"
"github.com/docker/docker/api/stats"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/engine"
"github.com/docker/docker/graph"
@ -43,7 +44,6 @@ import (
"github.com/docker/docker/pkg/urlutil"
"github.com/docker/docker/registry"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/stats"
"github.com/docker/docker/utils"
"github.com/docker/libtrust"
)
@ -2625,25 +2625,10 @@ type containerStats struct {
Name string
CpuPercentage float64
Memory float64
MemoryLimit float64
MemoryPercentage float64
NetworkRx int
NetworkTx int
}
type statSorter struct {
stats []containerStats
}
func (s *statSorter) Len() int {
return len(s.stats)
}
func (s *statSorter) Swap(i, j int) {
s.stats[i], s.stats[j] = s.stats[j], s.stats[i]
}
func (s *statSorter) Less(i, j int) bool {
return s.stats[i].Name < s.stats[j].Name
NetworkRx float64
NetworkTx float64
}
func (cli *DockerCli) CmdStats(args ...string) error {
@ -2651,40 +2636,49 @@ func (cli *DockerCli) CmdStats(args ...string) error {
cmd.Require(flag.Min, 1)
utils.ParseFlags(cmd, args, true)
m := &sync.Mutex{}
cStats := map[string]containerStats{}
for _, name := range cmd.Args() {
go cli.streamStats(name, cStats)
go cli.streamStats(name, cStats, m)
}
w := tabwriter.NewWriter(cli.out, 20, 1, 3, ' ', 0)
for _ = range time.Tick(1000 * time.Millisecond) {
for _ = range time.Tick(500 * time.Millisecond) {
fmt.Fprint(cli.out, "\033[2J")
fmt.Fprint(cli.out, "\033[H")
fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM\tMEM %\tNET I/O")
sStats := []containerStats{}
for _, s := range cStats {
sStats = append(sStats, s)
}
sorter := &statSorter{sStats}
sort.Sort(sorter)
for _, s := range sStats {
fmt.Fprintf(w, "%s\t%f%%\t%s\t%f%%\t%d/%d\n",
fmt.Fprintln(w, "CONTAINER\tCPU %\tMEM USAGE/LIMIT\tMEM %\tNET I/O")
m.Lock()
ss := sortStatsByName(cStats)
m.Unlock()
for _, s := range ss {
fmt.Fprintf(w, "%s\t%.2f%%\t%s/%s\t%.2f%%\t%s/%s\n",
s.Name,
s.CpuPercentage,
units.HumanSize(s.Memory),
units.BytesSize(s.Memory), units.BytesSize(s.MemoryLimit),
s.MemoryPercentage,
s.NetworkRx, s.NetworkTx)
units.BytesSize(s.NetworkRx), units.BytesSize(s.NetworkTx))
}
w.Flush()
}
return nil
}
func (cli *DockerCli) streamStats(name string, data map[string]containerStats) error {
func (cli *DockerCli) streamStats(name string, data map[string]containerStats, m *sync.Mutex) error {
m.Lock()
data[name] = containerStats{
Name: name,
}
m.Unlock()
stream, _, err := cli.call("GET", "/containers/"+name+"/stats", nil, false)
if err != nil {
return err
}
defer func() {
stream.Close()
m.Lock()
delete(data, name)
m.Unlock()
}()
var (
previousCpu uint64
previousSystem uint64
@ -2696,30 +2690,37 @@ func (cli *DockerCli) streamStats(name string, data map[string]containerStats) e
if err := dec.Decode(&v); err != nil {
return err
}
memPercent := float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
cpuPercent := 0.0
var (
memPercent = float64(v.MemoryStats.Usage) / float64(v.MemoryStats.Limit) * 100.0
cpuPercent = 0.0
)
if !start {
cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu)
systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks)
if systemDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage))
}
cpuPercent = calcuateCpuPercent(previousCpu, previousSystem, v)
}
start = false
m.Lock()
d := data[name]
d.Name = name
d.CpuPercentage = cpuPercent
d.Memory = float64(v.MemoryStats.Usage)
d.MemoryLimit = float64(v.MemoryStats.Limit)
d.MemoryPercentage = memPercent
d.NetworkRx = int(v.Network.RxBytes)
d.NetworkTx = int(v.Network.TxBytes)
d.NetworkRx = float64(v.Network.RxBytes)
d.NetworkTx = float64(v.Network.TxBytes)
data[name] = d
m.Unlock()
previousCpu = v.CpuStats.CpuUsage.TotalUsage
previousSystem = v.CpuStats.SystemUsage
}
return nil
}
func calcuateCpuPercent(previousCpu, previousSystem uint64, v *stats.Stats) float64 {
cpuPercent := 0.0
cpuDelta := float64(v.CpuStats.CpuUsage.TotalUsage) - float64(previousCpu)
systemDelta := float64(int(v.CpuStats.SystemUsage)/v.ClockTicks) - float64(int(previousSystem)/v.ClockTicks)
if systemDelta > 0.0 {
cpuPercent = (cpuDelta / systemDelta) * float64(v.ClockTicks*len(v.CpuStats.CpuUsage.PercpuUsage))
}
return cpuPercent
}

29
api/client/sort.go Normal file
View File

@ -0,0 +1,29 @@
package client
import "sort"
func sortStatsByName(cStats map[string]containerStats) []containerStats {
sStats := []containerStats{}
for _, s := range cStats {
sStats = append(sStats, s)
}
sorter := &statSorter{sStats}
sort.Sort(sorter)
return sStats
}
type statSorter struct {
stats []containerStats
}
func (s *statSorter) Len() int {
return len(s.stats)
}
func (s *statSorter) Swap(i, j int) {
s.stats[i], s.stats[j] = s.stats[j], s.stats[i]
}
func (s *statSorter) Less(i, j int) bool {
return s.stats[i].Name < s.stats[j].Name
}

View File

@ -16,7 +16,7 @@ type ThrottlingData struct {
ThrottledTime uint64 `json:"throttled_time,omitempty"`
}
// All CPU stats are aggregate since container inception.
// All CPU stats are aggregated since container inception.
type CpuUsage struct {
// Total CPU time consumed.
// Units: nanoseconds.
@ -91,6 +91,8 @@ type Stats struct {
BlkioStats BlkioStats `json:"blkio_stats,omitempty"`
}
// ToStats converts the libcontainer.ContainerStats to the api specific
// structs. This is done to preserve API compatibility and versioning.
func ToStats(ls *libcontainer.ContainerStats) *Stats {
s := &Stats{}
if ls.NetworkStats != nil {

View File

@ -1416,8 +1416,5 @@ func (container *Container) getNetworkedContainer() (*Container, error) {
}
func (container *Container) Stats() (*execdriver.ResourceStats, error) {
if !container.IsRunning() {
return nil, fmt.Errorf("cannot collect stats on a non running container")
}
return container.daemon.Stats(container)
}

View File

@ -1099,7 +1099,7 @@ func (daemon *Daemon) Stats(c *Container) (*execdriver.ResourceStats, error) {
return daemon.execDriver.Stats(c.ID)
}
func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver.ResourceStats, error) {
func (daemon *Daemon) SubscribeToContainerStats(name string) (chan *execdriver.ResourceStats, error) {
c := daemon.Get(name)
if c == nil {
return nil, fmt.Errorf("no such container")
@ -1108,6 +1108,15 @@ func (daemon *Daemon) SubscribeToContainerStats(name string) (<-chan *execdriver
return ch, nil
}
func (daemon *Daemon) UnsubscribeToContainerStats(name string, ch chan *execdriver.ResourceStats) error {
c := daemon.Get(name)
if c == nil {
return fmt.Errorf("no such container")
}
daemon.statsCollector.unsubscribe(c, ch)
return nil
}
// Nuke kills all containers then removes all content
// from the content root, including images, volumes and
// container filesystems.

View File

@ -49,6 +49,9 @@ func (daemon *Daemon) ContainerRm(job *engine.Job) engine.Status {
}
if container != nil {
// stop collection of stats for the container regardless
// if stats are currently getting collected.
daemon.statsCollector.stopCollection(container)
if container.IsRunning() {
if forceRemove {
if err := container.Kill(); err != nil {

View File

@ -16,7 +16,7 @@ import (
type Context map[string]string
var (
ErrNotRunning = errors.New("Process could not be started")
ErrNotRunning = errors.New("Container is not running")
ErrWaitTimeoutReached = errors.New("Wait timeout reached")
ErrDriverAlreadyRegistered = errors.New("A driver already registered this docker init function")
ErrDriverNotFound = errors.New("The requested docker init has not been found")

View File

@ -24,7 +24,7 @@ func NewDriver(name, root, initPath string, sysInfo *sysinfo.SysInfo) (execdrive
// to be backwards compatible
return lxc.NewDriver(root, initPath, sysInfo.AppArmor)
case "native":
return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal/1000)
return native.NewDriver(path.Join(root, "execdriver", "native"), initPath, meminfo.MemTotal)
}
return nil, fmt.Errorf("unknown exec driver %s", name)
}

View File

@ -526,6 +526,6 @@ func (d *driver) Exec(c *execdriver.Command, processConfig *execdriver.ProcessCo
}
func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) {
return nil, fmt.Errorf("container stats are not support with LXC")
return nil, fmt.Errorf("container stats are not supported with LXC")
}

View File

@ -284,6 +284,9 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) {
c := d.activeContainers[id]
state, err := libcontainer.GetState(filepath.Join(d.root, id))
if err != nil {
if os.IsNotExist(err) {
return nil, execdriver.ErrNotRunning
}
return nil, err
}
now := time.Now()
@ -292,13 +295,15 @@ func (d *driver) Stats(id string) (*execdriver.ResourceStats, error) {
return nil, err
}
memoryLimit := c.container.Cgroups.Memory
// if the container does not have any memory limit specified set the
// limit to the machines memory
if memoryLimit == 0 {
memoryLimit = d.machineMemory
}
return &execdriver.ResourceStats{
Read: now,
ContainerStats: stats,
ClockTicks: system.GetClockTicks(),
Read: now,
MemoryLimit: memoryLimit,
}, nil
}

View File

@ -1,14 +1,12 @@
package daemon
import (
"encoding/json"
"fmt"
"os"
"strings"
"github.com/docker/docker/engine"
"github.com/docker/docker/runconfig"
"github.com/docker/docker/stats"
)
func (daemon *Daemon) ContainerStart(job *engine.Job) engine.Status {
@ -79,26 +77,3 @@ func (daemon *Daemon) setHostConfig(container *Container, hostConfig *runconfig.
return nil
}
func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status {
s, err := daemon.SubscribeToContainerStats(job.Args[0])
if err != nil {
return job.Error(err)
}
enc := json.NewEncoder(job.Stdout)
for update := range s {
ss := stats.ToStats(update.ContainerStats)
ss.MemoryStats.Limit = uint64(update.MemoryLimit)
ss.Read = update.Read
ss.ClockTicks = update.ClockTicks
ss.CpuStats.SystemUsage = update.SystemUsage
if err := enc.Encode(ss); err != nil {
return job.Error(err)
}
}
return engine.StatusOK
}
func mapToAPIStats() {
}

29
daemon/stats.go Normal file
View File

@ -0,0 +1,29 @@
package daemon
import (
"encoding/json"
"github.com/docker/docker/api/stats"
"github.com/docker/docker/engine"
)
func (daemon *Daemon) ContainerStats(job *engine.Job) engine.Status {
s, err := daemon.SubscribeToContainerStats(job.Args[0])
if err != nil {
return job.Error(err)
}
enc := json.NewEncoder(job.Stdout)
for update := range s {
ss := stats.ToStats(update.ContainerStats)
ss.MemoryStats.Limit = uint64(update.MemoryLimit)
ss.Read = update.Read
ss.ClockTicks = update.ClockTicks
ss.CpuStats.SystemUsage = update.SystemUsage
if err := enc.Encode(ss); err != nil {
// TODO: handle the specific broken pipe
daemon.UnsubscribeToContainerStats(job.Args[0], s)
return job.Error(err)
}
}
return engine.StatusOK
}

View File

@ -13,16 +13,20 @@ import (
"github.com/docker/docker/daemon/execdriver"
)
// newStatsCollector returns a new statsCollector that collections
// network and cgroup stats for a registered container at the specified
// interval. The collector allows non-running containers to be added
// and will start processing stats when they are started.
func newStatsCollector(interval time.Duration) *statsCollector {
s := &statsCollector{
interval: interval,
containers: make(map[string]*statsCollectorData),
containers: make(map[string]*statsData),
}
s.start()
return s
}
type statsCollectorData struct {
type statsData struct {
c *Container
lastStats *execdriver.ResourceStats
subs []chan *execdriver.ResourceStats
@ -32,43 +36,86 @@ type statsCollectorData struct {
type statsCollector struct {
m sync.Mutex
interval time.Duration
containers map[string]*statsCollectorData
containers map[string]*statsData
}
func (s *statsCollector) collect(c *Container) <-chan *execdriver.ResourceStats {
// collect registers the container with the collector and adds it to
// the event loop for collection on the specified interval returning
// a channel for the subscriber to receive on.
func (s *statsCollector) collect(c *Container) chan *execdriver.ResourceStats {
s.m.Lock()
defer s.m.Unlock()
ch := make(chan *execdriver.ResourceStats, 1024)
s.containers[c.ID] = &statsCollectorData{
if _, exists := s.containers[c.ID]; exists {
s.containers[c.ID].subs = append(s.containers[c.ID].subs, ch)
return ch
}
s.containers[c.ID] = &statsData{
c: c,
subs: []chan *execdriver.ResourceStats{
ch,
},
}
s.m.Unlock()
return ch
}
// stopCollection closes the channels for all subscribers and removes
// the container from metrics collection.
func (s *statsCollector) stopCollection(c *Container) {
s.m.Lock()
defer s.m.Unlock()
d := s.containers[c.ID]
if d == nil {
return
}
for _, sub := range d.subs {
close(sub)
}
delete(s.containers, c.ID)
}
// unsubscribe removes a specific subscriber from receiving updates for a
// container's stats.
func (s *statsCollector) unsubscribe(c *Container, ch chan *execdriver.ResourceStats) {
s.m.Lock()
cd := s.containers[c.ID]
for i, sub := range cd.subs {
if ch == sub {
cd.subs = append(cd.subs[:i], cd.subs[i+1:]...)
close(ch)
}
}
// if there are no more subscribers then remove the entire container
// from collection.
if len(cd.subs) == 0 {
delete(s.containers, c.ID)
}
s.m.Unlock()
}
func (s *statsCollector) start() {
go func() {
for _ = range time.Tick(s.interval) {
log.Debugf("starting collection of container stats")
s.m.Lock()
for id, d := range s.containers {
systemUsage, err := getSystemCpuUsage()
systemUsage, err := s.getSystemCpuUsage()
if err != nil {
log.Errorf("collecting system cpu usage for %s: %v", id, err)
continue
}
stats, err := d.c.Stats()
if err != nil {
// TODO: @crosbymichael evict container depending on error
if err == execdriver.ErrNotRunning {
continue
}
// if the error is not because the container is currently running then
// evict the container from the collector and close the channel for
// any subscribers currently waiting on changes.
log.Errorf("collecting stats for %s: %v", id, err)
for _, sub := range s.containers[id].subs {
close(sub)
}
delete(s.containers, id)
continue
}
stats.SystemUsage = systemUsage
@ -81,14 +128,14 @@ func (s *statsCollector) start() {
}()
}
// returns value in nanoseconds
func getSystemCpuUsage() (uint64, error) {
// getSystemdCpuUSage returns the host system's cpu usage
// in nanoseconds.
func (s *statsCollector) getSystemCpuUsage() (uint64, error) {
f, err := os.Open("/proc/stat")
if err != nil {
return 0, err
}
defer f.Close()
sc := bufio.NewScanner(f)
for sc.Scan() {
parts := strings.Fields(sc.Text())
@ -97,7 +144,6 @@ func getSystemCpuUsage() (uint64, error) {
if len(parts) < 8 {
return 0, fmt.Errorf("invalid number of cpu fields")
}
var total uint64
for _, i := range parts[1:8] {
v, err := strconv.ParseUint(i, 10, 64)

View File

@ -98,6 +98,7 @@ func init() {
{"save", "Save an image to a tar archive"},
{"search", "Search for an image on the Docker Hub"},
{"start", "Start a stopped container"},
{"stats", "Receive container stats"},
{"stop", "Stop a running container"},
{"tag", "Tag an image into a repository"},
{"top", "Lookup the running processes of a container"},