diff --git a/daemon/daemon.go b/daemon/daemon.go index bec0b34be5..9024eff64c 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -65,6 +65,7 @@ import ( "github.com/sirupsen/logrus" "go.etcd.io/bbolt" "golang.org/x/sync/semaphore" + "golang.org/x/sync/singleflight" "google.golang.org/grpc" "google.golang.org/grpc/backoff" ) @@ -117,10 +118,11 @@ type Daemon struct { seccompProfile []byte seccompProfilePath string - diskUsageRunning int32 - pruneRunning int32 - hosts map[string]bool // hosts stores the addresses the daemon is listening on - startupDone chan struct{} + usage singleflight.Group + + pruneRunning int32 + hosts map[string]bool // hosts stores the addresses the daemon is listening on + startupDone chan struct{} attachmentStore network.AttachmentStore attachableNetworkLock *locker.Locker diff --git a/daemon/disk_usage.go b/daemon/disk_usage.go index bc333cd5e3..b96e626f45 100644 --- a/daemon/disk_usage.go +++ b/daemon/disk_usage.go @@ -3,36 +3,47 @@ package daemon // import "github.com/docker/docker/daemon" import ( "context" "fmt" - "sync/atomic" "github.com/docker/docker/api/server/router/system" "github.com/docker/docker/api/types" - "github.com/docker/docker/api/types/filters" "golang.org/x/sync/errgroup" ) -// SystemDiskUsage returns information about the daemon data disk usage -func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) { - if !atomic.CompareAndSwapInt32(&daemon.diskUsageRunning, 0, 1) { - return nil, fmt.Errorf("a disk usage operation is already running") +// ContainerDiskUsage returns information about container data disk usage. +func (daemon *Daemon) ContainerDiskUsage(ctx context.Context) ([]*types.Container, error) { + ch := daemon.usage.DoChan("ContainerDiskUsage", func() (interface{}, error) { + // Retrieve container list + containers, err := daemon.Containers(&types.ContainerListOptions{ + Size: true, + All: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to retrieve container list: %v", err) + } + return containers, nil + }) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-ch: + if res.Err != nil { + return nil, res.Err + } + return res.Val.([]*types.Container), nil } - defer atomic.StoreInt32(&daemon.diskUsageRunning, 0) +} +// SystemDiskUsage returns information about the daemon data disk usage. +// Callers must not mutate contents of the returned fields. +func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsageOptions) (*types.DiskUsage, error) { eg, ctx := errgroup.WithContext(ctx) var containers []*types.Container if opts.Containers { eg.Go(func() error { var err error - // Retrieve container list - containers, err = daemon.Containers(&types.ContainerListOptions{ - Size: true, - All: true, - }) - if err != nil { - return fmt.Errorf("failed to retrieve container list: %v", err) - } - return nil + containers, err = daemon.ContainerDiskUsage(ctx) + return err }) } @@ -43,16 +54,8 @@ func (daemon *Daemon) SystemDiskUsage(ctx context.Context, opts system.DiskUsage if opts.Images { eg.Go(func() error { var err error - // Get all top images with extra attributes - images, err = daemon.imageService.Images(ctx, types.ImageListOptions{ - Filters: filters.NewArgs(), - SharedSize: true, - ContainerCount: true, - }) - if err != nil { - return fmt.Errorf("failed to retrieve image list: %v", err) - } - return nil + images, err = daemon.imageService.ImageDiskUsage(ctx) + return err }) eg.Go(func() error { var err error diff --git a/daemon/images/service.go b/daemon/images/service.go index 17144a6f6e..85c3bd5d40 100644 --- a/daemon/images/service.go +++ b/daemon/images/service.go @@ -2,10 +2,13 @@ package images // import "github.com/docker/docker/daemon/images" import ( "context" + "fmt" "os" "github.com/containerd/containerd/content" "github.com/containerd/containerd/leases" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" "github.com/docker/docker/container" daemonevents "github.com/docker/docker/daemon/events" "github.com/docker/docker/distribution" @@ -19,6 +22,7 @@ import ( digest "github.com/opencontainers/go-digest" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" ) type containerStore interface { @@ -86,6 +90,7 @@ type ImageService struct { leases leases.Manager content content.Store contentNamespace string + usage singleflight.Group } // DistributionServices provides daemon image storage services @@ -195,25 +200,36 @@ func (i *ImageService) ReleaseLayer(rwlayer layer.RWLayer, containerOS string) e // LayerDiskUsage returns the number of bytes used by layer stores // called from disk_usage.go func (i *ImageService) LayerDiskUsage(ctx context.Context) (int64, error) { - var allLayersSize int64 - layerRefs := i.getLayerRefs() - allLayers := i.layerStore.Map() - for _, l := range allLayers { - select { - case <-ctx.Done(): - return allLayersSize, ctx.Err() - default: - size, err := l.DiffSize() - if err == nil { - if _, ok := layerRefs[l.ChainID()]; ok { - allLayersSize += size + ch := i.usage.DoChan("LayerDiskUsage", func() (interface{}, error) { + var allLayersSize int64 + layerRefs := i.getLayerRefs() + allLayers := i.layerStore.Map() + for _, l := range allLayers { + select { + case <-ctx.Done(): + return allLayersSize, ctx.Err() + default: + size, err := l.DiffSize() + if err == nil { + if _, ok := layerRefs[l.ChainID()]; ok { + allLayersSize += size + } + } else { + logrus.Warnf("failed to get diff size for layer %v", l.ChainID()) } - } else { - logrus.Warnf("failed to get diff size for layer %v", l.ChainID()) } } + return allLayersSize, nil + }) + select { + case <-ctx.Done(): + return 0, ctx.Err() + case res := <-ch: + if res.Err != nil { + return 0, res.Err + } + return res.Val.(int64), nil } - return allLayersSize, nil } func (i *ImageService) getLayerRefs() map[layer.ChainID]int { @@ -237,6 +253,31 @@ func (i *ImageService) getLayerRefs() map[layer.ChainID]int { return layerRefs } +// ImageDiskUsage returns information about image data disk usage. +func (i *ImageService) ImageDiskUsage(ctx context.Context) ([]*types.ImageSummary, error) { + ch := i.usage.DoChan("ImageDiskUsage", func() (interface{}, error) { + // Get all top images with extra attributes + images, err := i.Images(ctx, types.ImageListOptions{ + Filters: filters.NewArgs(), + SharedSize: true, + ContainerCount: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to retrieve image list: %v", err) + } + return images, nil + }) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-ch: + if res.Err != nil { + return nil, res.Err + } + return res.Val.([]*types.ImageSummary), nil + } +} + // UpdateConfig values // // called from reload.go diff --git a/vendor/golang.org/x/sync/singleflight/singleflight.go b/vendor/golang.org/x/sync/singleflight/singleflight.go new file mode 100644 index 0000000000..690eb85013 --- /dev/null +++ b/vendor/golang.org/x/sync/singleflight/singleflight.go @@ -0,0 +1,212 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package singleflight provides a duplicate function call suppression +// mechanism. +package singleflight // import "golang.org/x/sync/singleflight" + +import ( + "bytes" + "errors" + "fmt" + "runtime" + "runtime/debug" + "sync" +) + +// errGoexit indicates the runtime.Goexit was called in +// the user given function. +var errGoexit = errors.New("runtime.Goexit was called") + +// A panicError is an arbitrary value recovered from a panic +// with the stack trace during the execution of given function. +type panicError struct { + value interface{} + stack []byte +} + +// Error implements error interface. +func (p *panicError) Error() string { + return fmt.Sprintf("%v\n\n%s", p.value, p.stack) +} + +func newPanicError(v interface{}) error { + stack := debug.Stack() + + // The first line of the stack trace is of the form "goroutine N [status]:" + // but by the time the panic reaches Do the goroutine may no longer exist + // and its status will have changed. Trim out the misleading line. + if line := bytes.IndexByte(stack[:], '\n'); line >= 0 { + stack = stack[line+1:] + } + return &panicError{value: v, stack: stack} +} + +// call is an in-flight or completed singleflight.Do call +type call struct { + wg sync.WaitGroup + + // These fields are written once before the WaitGroup is done + // and are only read after the WaitGroup is done. + val interface{} + err error + + // forgotten indicates whether Forget was called with this call's key + // while the call was still in flight. + forgotten bool + + // These fields are read and written with the singleflight + // mutex held before the WaitGroup is done, and are read but + // not written after the WaitGroup is done. + dups int + chans []chan<- Result +} + +// Group represents a class of work and forms a namespace in +// which units of work can be executed with duplicate suppression. +type Group struct { + mu sync.Mutex // protects m + m map[string]*call // lazily initialized +} + +// Result holds the results of Do, so they can be passed +// on a channel. +type Result struct { + Val interface{} + Err error + Shared bool +} + +// Do executes and returns the results of the given function, making +// sure that only one execution is in-flight for a given key at a +// time. If a duplicate comes in, the duplicate caller waits for the +// original to complete and receives the same results. +// The return value shared indicates whether v was given to multiple callers. +func (g *Group) Do(key string, fn func() (interface{}, error)) (v interface{}, err error, shared bool) { + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + g.mu.Unlock() + c.wg.Wait() + + if e, ok := c.err.(*panicError); ok { + panic(e) + } else if c.err == errGoexit { + runtime.Goexit() + } + return c.val, c.err, true + } + c := new(call) + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + g.doCall(c, key, fn) + return c.val, c.err, c.dups > 0 +} + +// DoChan is like Do but returns a channel that will receive the +// results when they are ready. +// +// The returned channel will not be closed. +func (g *Group) DoChan(key string, fn func() (interface{}, error)) <-chan Result { + ch := make(chan Result, 1) + g.mu.Lock() + if g.m == nil { + g.m = make(map[string]*call) + } + if c, ok := g.m[key]; ok { + c.dups++ + c.chans = append(c.chans, ch) + g.mu.Unlock() + return ch + } + c := &call{chans: []chan<- Result{ch}} + c.wg.Add(1) + g.m[key] = c + g.mu.Unlock() + + go g.doCall(c, key, fn) + + return ch +} + +// doCall handles the single call for a key. +func (g *Group) doCall(c *call, key string, fn func() (interface{}, error)) { + normalReturn := false + recovered := false + + // use double-defer to distinguish panic from runtime.Goexit, + // more details see https://golang.org/cl/134395 + defer func() { + // the given function invoked runtime.Goexit + if !normalReturn && !recovered { + c.err = errGoexit + } + + c.wg.Done() + g.mu.Lock() + defer g.mu.Unlock() + if !c.forgotten { + delete(g.m, key) + } + + if e, ok := c.err.(*panicError); ok { + // In order to prevent the waiting channels from being blocked forever, + // needs to ensure that this panic cannot be recovered. + if len(c.chans) > 0 { + go panic(e) + select {} // Keep this goroutine around so that it will appear in the crash dump. + } else { + panic(e) + } + } else if c.err == errGoexit { + // Already in the process of goexit, no need to call again + } else { + // Normal return + for _, ch := range c.chans { + ch <- Result{c.val, c.err, c.dups > 0} + } + } + }() + + func() { + defer func() { + if !normalReturn { + // Ideally, we would wait to take a stack trace until we've determined + // whether this is a panic or a runtime.Goexit. + // + // Unfortunately, the only way we can distinguish the two is to see + // whether the recover stopped the goroutine from terminating, and by + // the time we know that, the part of the stack trace relevant to the + // panic has been discarded. + if r := recover(); r != nil { + c.err = newPanicError(r) + } + } + }() + + c.val, c.err = fn() + normalReturn = true + }() + + if !normalReturn { + recovered = true + } +} + +// Forget tells the singleflight to forget about a key. Future calls +// to Do for this key will call the function rather than waiting for +// an earlier call to complete. +func (g *Group) Forget(key string) { + g.mu.Lock() + if c, ok := g.m[key]; ok { + c.forgotten = true + } + delete(g.m, key) + g.mu.Unlock() +} diff --git a/volume/service/service.go b/volume/service/service.go index 3dcd9b336c..43d963c177 100644 --- a/volume/service/service.go +++ b/volume/service/service.go @@ -17,6 +17,7 @@ import ( "github.com/docker/docker/volume/service/opts" "github.com/pkg/errors" "github.com/sirupsen/logrus" + "golang.org/x/sync/singleflight" ) type ds interface { @@ -36,6 +37,7 @@ type VolumesService struct { ds ds pruneRunning int32 eventLogger VolumeEventLogger + usage singleflight.Group } // NewVolumeService creates a new volume service @@ -182,14 +184,25 @@ var acceptedListFilters = map[string]bool{ // volumes with mount options are not really local even if they are using the // local driver. func (s *VolumesService) LocalVolumesSize(ctx context.Context) ([]*types.Volume, error) { - ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool { - dv, ok := v.(volume.DetailedVolume) - return ok && len(dv.Options()) == 0 - }))) - if err != nil { - return nil, err + ch := s.usage.DoChan("LocalVolumesSize", func() (interface{}, error) { + ls, _, err := s.vs.Find(ctx, And(ByDriver(volume.DefaultDriverName), CustomFilter(func(v volume.Volume) bool { + dv, ok := v.(volume.DetailedVolume) + return ok && len(dv.Options()) == 0 + }))) + if err != nil { + return nil, err + } + return s.volumesToAPI(ctx, ls, calcSize(true)), nil + }) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case res := <-ch: + if res.Err != nil { + return nil, res.Err + } + return res.Val.([]*types.Volume), nil } - return s.volumesToAPI(ctx, ls, calcSize(true)), nil } // Prune removes (local) volumes which match the past in filter arguments.