1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/daemon/stats/collector.go
Sebastiaan van Stijn 481b8e54b4
Fix stats collector spinning CPU if no stats are collected
Commit fd0e24b718 changed
the stats collection loop to use a `sleep()` instead
of `time.Tick()` in the for-loop.

This change caused a regression in situations where
no stats are being collected, or an error is hit
in the loop (in which case the loop would `continue`,
and the `sleep()` is not hit).

This patch puts the sleep at the start of the loop
to guarantee it's always hit.

This will delay the sampling, which is similar to the
behavior before fd0e24b718.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
2018-03-15 17:56:15 +01:00

159 lines
4.2 KiB
Go

package stats // import "github.com/docker/docker/daemon/stats"
import (
"bufio"
"sync"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/container"
"github.com/docker/docker/pkg/pubsub"
"github.com/sirupsen/logrus"
)
// Collector manages and provides container resource stats
type Collector struct {
m sync.Mutex
supervisor supervisor
interval time.Duration
publishers map[*container.Container]*pubsub.Publisher
bufReader *bufio.Reader
// The following fields are not set on Windows currently.
clockTicksPerSecond uint64
}
// NewCollector creates a stats collector that will poll the supervisor with the specified interval
func NewCollector(supervisor supervisor, interval time.Duration) *Collector {
s := &Collector{
interval: interval,
supervisor: supervisor,
publishers: make(map[*container.Container]*pubsub.Publisher),
bufReader: bufio.NewReaderSize(nil, 128),
}
platformNewStatsCollector(s)
return s
}
type supervisor interface {
// GetContainerStats collects all the stats related to a container
GetContainerStats(container *container.Container) (*types.StatsJSON, error)
}
// Collect registers the container with the collector and adds it to
// the event loop for collection on the specified interval returning
// a channel for the subscriber to receive on.
func (s *Collector) Collect(c *container.Container) chan interface{} {
s.m.Lock()
defer s.m.Unlock()
publisher, exists := s.publishers[c]
if !exists {
publisher = pubsub.NewPublisher(100*time.Millisecond, 1024)
s.publishers[c] = publisher
}
return publisher.Subscribe()
}
// StopCollection closes the channels for all subscribers and removes
// the container from metrics collection.
func (s *Collector) StopCollection(c *container.Container) {
s.m.Lock()
if publisher, exists := s.publishers[c]; exists {
publisher.Close()
delete(s.publishers, c)
}
s.m.Unlock()
}
// Unsubscribe removes a specific subscriber from receiving updates for a container's stats.
func (s *Collector) Unsubscribe(c *container.Container, ch chan interface{}) {
s.m.Lock()
publisher := s.publishers[c]
if publisher != nil {
publisher.Evict(ch)
if publisher.Len() == 0 {
delete(s.publishers, c)
}
}
s.m.Unlock()
}
// Run starts the collectors and will indefinitely collect stats from the supervisor
func (s *Collector) Run() {
type publishersPair struct {
container *container.Container
publisher *pubsub.Publisher
}
// we cannot determine the capacity here.
// it will grow enough in first iteration
var pairs []publishersPair
for {
// Put sleep at the start so that it will always be hit,
// preventing a tight loop if no stats are collected.
time.Sleep(s.interval)
// it does not make sense in the first iteration,
// but saves allocations in further iterations
pairs = pairs[:0]
s.m.Lock()
for container, publisher := range s.publishers {
// copy pointers here to release the lock ASAP
pairs = append(pairs, publishersPair{container, publisher})
}
s.m.Unlock()
if len(pairs) == 0 {
continue
}
onlineCPUs, err := s.getNumberOnlineCPUs()
if err != nil {
logrus.Errorf("collecting system online cpu count: %v", err)
continue
}
for _, pair := range pairs {
stats, err := s.supervisor.GetContainerStats(pair.container)
switch err.(type) {
case nil:
// Sample system CPU usage close to container usage to avoid
// noise in metric calculations.
systemUsage, err := s.getSystemCPUUsage()
if err != nil {
logrus.WithError(err).WithField("container_id", pair.container.ID).Errorf("collecting system cpu usage")
continue
}
// FIXME: move to containerd on Linux (not Windows)
stats.CPUStats.SystemUsage = systemUsage
stats.CPUStats.OnlineCPUs = onlineCPUs
pair.publisher.Publish(*stats)
case notRunningErr, notFoundErr:
// publish empty stats containing only name and ID if not running or not found
pair.publisher.Publish(types.StatsJSON{
Name: pair.container.Name,
ID: pair.container.ID,
})
default:
logrus.Errorf("collecting stats for %s: %v", pair.container.ID, err)
}
}
}
}
type notRunningErr interface {
error
Conflict()
}
type notFoundErr interface {
error
NotFound()
}