mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
00ba5bdb98
...in preparation for upgrading containerd. Signed-off-by: Cory Snider <csnider@mirantis.com>
407 lines
13 KiB
Go
407 lines
13 KiB
Go
// Copyright 2016 Google LLC.
|
|
// Use of this source code is governed by a BSD-style
|
|
// license that can be found in the LICENSE file.
|
|
|
|
// Package bundler supports bundling (batching) of items. Bundling amortizes an
|
|
// action with fixed costs over multiple items. For example, if an API provides
|
|
// an RPC that accepts a list of items as input, but clients would prefer
|
|
// adding items one at a time, then a Bundler can accept individual items from
|
|
// the client and bundle many of them into a single RPC.
|
|
//
|
|
// This package is experimental and subject to change without notice.
|
|
package bundler
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"reflect"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/semaphore"
|
|
)
|
|
|
|
type mode int
|
|
|
|
const (
|
|
DefaultDelayThreshold = time.Second
|
|
DefaultBundleCountThreshold = 10
|
|
DefaultBundleByteThreshold = 1e6 // 1M
|
|
DefaultBufferedByteLimit = 1e9 // 1G
|
|
)
|
|
|
|
const (
|
|
none mode = iota
|
|
add
|
|
addWait
|
|
)
|
|
|
|
var (
|
|
// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
|
|
ErrOverflow = errors.New("bundler reached buffered byte limit")
|
|
|
|
// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
|
|
ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
|
|
|
|
// errMixedMethods indicates that mutually exclusive methods has been
|
|
// called subsequently.
|
|
errMixedMethods = errors.New("calls to Add and AddWait cannot be mixed")
|
|
)
|
|
|
|
// A Bundler collects items added to it into a bundle until the bundle
|
|
// exceeds a given size, then calls a user-provided function to handle the
|
|
// bundle.
|
|
//
|
|
// The exported fields are only safe to modify prior to the first call to Add
|
|
// or AddWait.
|
|
type Bundler struct {
|
|
// Starting from the time that the first message is added to a bundle, once
|
|
// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
|
|
DelayThreshold time.Duration
|
|
|
|
// Once a bundle has this many items, handle the bundle. Since only one
|
|
// item at a time is added to a bundle, no bundle will exceed this
|
|
// threshold, so it also serves as a limit. The default is
|
|
// DefaultBundleCountThreshold.
|
|
BundleCountThreshold int
|
|
|
|
// Once the number of bytes in current bundle reaches this threshold, handle
|
|
// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
|
|
// but does not cap the total size of a bundle.
|
|
BundleByteThreshold int
|
|
|
|
// The maximum size of a bundle, in bytes. Zero means unlimited.
|
|
BundleByteLimit int
|
|
|
|
// The maximum number of bytes that the Bundler will keep in memory before
|
|
// returning ErrOverflow. The default is DefaultBufferedByteLimit.
|
|
BufferedByteLimit int
|
|
|
|
// The maximum number of handler invocations that can be running at once.
|
|
// The default is 1.
|
|
HandlerLimit int
|
|
|
|
handler func(interface{}) // called to handle a bundle
|
|
itemSliceZero reflect.Value // nil (zero value) for slice of items
|
|
|
|
mu sync.Mutex // guards access to fields below
|
|
flushTimer *time.Timer // implements DelayThreshold
|
|
handlerCount int // # of bundles currently being handled (i.e. handler is invoked on them)
|
|
sem *semaphore.Weighted // enforces BufferedByteLimit
|
|
semOnce sync.Once // guards semaphore initialization
|
|
// The current bundle we're adding items to. Not yet in the queue.
|
|
// Appended to the queue once the flushTimer fires or the bundle
|
|
// thresholds/limits are reached. If curBundle is nil and tail is
|
|
// not, we first try to add items to tail. Once tail is full or handled,
|
|
// we create a new curBundle for the incoming item.
|
|
curBundle *bundle
|
|
// The next bundle in the queue to be handled. Nil if the queue is
|
|
// empty.
|
|
head *bundle
|
|
// The last bundle in the queue to be handled. Nil if the queue is
|
|
// empty. If curBundle is nil and tail isn't, we attempt to add new
|
|
// items to the tail until if becomes full or has been passed to the
|
|
// handler.
|
|
tail *bundle
|
|
curFlush *sync.WaitGroup // counts outstanding bundles since last flush
|
|
prevFlush chan bool // signal used to wait for prior flush
|
|
|
|
// The first call to Add or AddWait, mode will be add or addWait respectively.
|
|
// If there wasn't call yet then mode is none.
|
|
mode mode
|
|
// TODO: consider alternative queue implementation for head/tail bundle. see:
|
|
// https://code-review.googlesource.com/c/google-api-go-client/+/47991/4/support/bundler/bundler.go#74
|
|
}
|
|
|
|
// A bundle is a group of items that were added individually and will be passed
|
|
// to a handler as a slice.
|
|
type bundle struct {
|
|
items reflect.Value // slice of T
|
|
size int // size in bytes of all items
|
|
next *bundle // bundles are handled in order as a linked list queue
|
|
flush *sync.WaitGroup // the counter that tracks flush completion
|
|
}
|
|
|
|
// add appends item to this bundle and increments the total size. It requires
|
|
// that b.mu is locked.
|
|
func (bu *bundle) add(item interface{}, size int) {
|
|
bu.items = reflect.Append(bu.items, reflect.ValueOf(item))
|
|
bu.size += size
|
|
}
|
|
|
|
// NewBundler creates a new Bundler.
|
|
//
|
|
// itemExample is a value of the type that will be bundled. For example, if you
|
|
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
|
|
//
|
|
// handler is a function that will be called on each bundle. If itemExample is
|
|
// of type T, the argument to handler is of type []T. handler is always called
|
|
// sequentially for each bundle, and never in parallel.
|
|
//
|
|
// Configure the Bundler by setting its thresholds and limits before calling
|
|
// any of its methods.
|
|
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
|
|
b := &Bundler{
|
|
DelayThreshold: DefaultDelayThreshold,
|
|
BundleCountThreshold: DefaultBundleCountThreshold,
|
|
BundleByteThreshold: DefaultBundleByteThreshold,
|
|
BufferedByteLimit: DefaultBufferedByteLimit,
|
|
HandlerLimit: 1,
|
|
|
|
handler: handler,
|
|
itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
|
|
curFlush: &sync.WaitGroup{},
|
|
}
|
|
return b
|
|
}
|
|
|
|
func (b *Bundler) initSemaphores() {
|
|
// Create the semaphores lazily, because the user may set limits
|
|
// after NewBundler.
|
|
b.semOnce.Do(func() {
|
|
b.sem = semaphore.NewWeighted(int64(b.BufferedByteLimit))
|
|
})
|
|
}
|
|
|
|
// enqueueCurBundle moves curBundle to the end of the queue. The bundle may be
|
|
// handled immediately if we are below HandlerLimit. It requires that b.mu is
|
|
// locked.
|
|
func (b *Bundler) enqueueCurBundle() {
|
|
// We don't require callers to check if there is a pending bundle. It
|
|
// may have already been appended to the queue. If so, return early.
|
|
if b.curBundle == nil {
|
|
return
|
|
}
|
|
// If we are below the HandlerLimit, the queue must be empty. Handle
|
|
// immediately with a new goroutine.
|
|
if b.handlerCount < b.HandlerLimit {
|
|
b.handlerCount++
|
|
go b.handle(b.curBundle)
|
|
} else if b.tail != nil {
|
|
// There are bundles on the queue, so append to the end
|
|
b.tail.next = b.curBundle
|
|
b.tail = b.curBundle
|
|
} else {
|
|
// The queue is empty, so initialize the queue
|
|
b.head = b.curBundle
|
|
b.tail = b.curBundle
|
|
}
|
|
b.curBundle = nil
|
|
if b.flushTimer != nil {
|
|
b.flushTimer.Stop()
|
|
b.flushTimer = nil
|
|
}
|
|
}
|
|
|
|
// setMode sets the state of Bundler's mode. If mode was defined before
|
|
// and passed state is different from it then return an error.
|
|
func (b *Bundler) setMode(m mode) error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.mode == m || b.mode == none {
|
|
b.mode = m
|
|
return nil
|
|
}
|
|
return errMixedMethods
|
|
}
|
|
|
|
// canFit returns true if bu can fit an additional item of size bytes based
|
|
// on the limits of Bundler b.
|
|
func (b *Bundler) canFit(bu *bundle, size int) bool {
|
|
return (b.BundleByteLimit <= 0 || bu.size+size <= b.BundleByteLimit) &&
|
|
(b.BundleCountThreshold <= 0 || bu.items.Len() < b.BundleCountThreshold)
|
|
}
|
|
|
|
// Add adds item to the current bundle. It marks the bundle for handling and
|
|
// starts a new one if any of the thresholds or limits are exceeded.
|
|
// The type of item must be assignable to the itemExample parameter of the NewBundler
|
|
// method, otherwise there will be a panic.
|
|
//
|
|
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
|
|
// the item can never be handled. Add returns ErrOversizedItem in this case.
|
|
//
|
|
// If adding the item would exceed the maximum memory allowed
|
|
// (Bundler.BufferedByteLimit) or an AddWait call is blocked waiting for
|
|
// memory, Add returns ErrOverflow.
|
|
//
|
|
// Add never blocks.
|
|
func (b *Bundler) Add(item interface{}, size int) error {
|
|
if err := b.setMode(add); err != nil {
|
|
return err
|
|
}
|
|
// If this item exceeds the maximum size of a bundle,
|
|
// we can never send it.
|
|
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
|
|
return ErrOversizedItem
|
|
}
|
|
|
|
// If adding this item would exceed our allotted memory
|
|
// footprint, we can't accept it.
|
|
// (TryAcquire also returns false if anything is waiting on the semaphore,
|
|
// so calls to Add and AddWait shouldn't be mixed.)
|
|
b.initSemaphores()
|
|
if !b.sem.TryAcquire(int64(size)) {
|
|
return ErrOverflow
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.add(item, size)
|
|
}
|
|
|
|
// add adds item to the tail of the bundle queue or curBundle depending on space
|
|
// and nil-ness (see inline comments). It marks curBundle for handling (by
|
|
// appending it to the queue) if any of the thresholds or limits are exceeded.
|
|
// curBundle is lazily initialized. It requires that b.mu is locked.
|
|
func (b *Bundler) add(item interface{}, size int) error {
|
|
// If we don't have a curBundle, see if we can add to the queue tail.
|
|
if b.tail != nil && b.curBundle == nil && b.canFit(b.tail, size) {
|
|
b.tail.add(item, size)
|
|
return nil
|
|
}
|
|
|
|
// If we can't fit in the existing curBundle, move it onto the queue.
|
|
if b.curBundle != nil && !b.canFit(b.curBundle, size) {
|
|
b.enqueueCurBundle()
|
|
}
|
|
|
|
// Create a curBundle if we don't have one.
|
|
if b.curBundle == nil {
|
|
b.curFlush.Add(1)
|
|
b.curBundle = &bundle{
|
|
items: b.itemSliceZero,
|
|
flush: b.curFlush,
|
|
}
|
|
}
|
|
|
|
// Add the item.
|
|
b.curBundle.add(item, size)
|
|
|
|
// If curBundle is ready for handling, move it to the queue.
|
|
if b.curBundle.size >= b.BundleByteThreshold ||
|
|
b.curBundle.items.Len() == b.BundleCountThreshold {
|
|
b.enqueueCurBundle()
|
|
}
|
|
|
|
// If we created a new bundle and it wasn't immediately handled, set a timer
|
|
if b.curBundle != nil && b.flushTimer == nil {
|
|
b.flushTimer = time.AfterFunc(b.DelayThreshold, b.tryHandleBundles)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// tryHandleBundles is the timer callback that handles or queues any current
|
|
// bundle after DelayThreshold time, even if the bundle isn't completely full.
|
|
func (b *Bundler) tryHandleBundles() {
|
|
b.mu.Lock()
|
|
b.enqueueCurBundle()
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
// next returns the next bundle that is ready for handling and removes it from
|
|
// the internal queue. It requires that b.mu is locked.
|
|
func (b *Bundler) next() *bundle {
|
|
if b.head == nil {
|
|
return nil
|
|
}
|
|
out := b.head
|
|
b.head = b.head.next
|
|
if b.head == nil {
|
|
b.tail = nil
|
|
}
|
|
out.next = nil
|
|
return out
|
|
}
|
|
|
|
// handle calls the user-specified handler on the given bundle. handle is
|
|
// intended to be run as a goroutine. After the handler returns, we update the
|
|
// byte total. handle continues processing additional bundles that are ready.
|
|
// If no more bundles are ready, the handler count is decremented and the
|
|
// goroutine ends.
|
|
func (b *Bundler) handle(bu *bundle) {
|
|
for bu != nil {
|
|
b.handler(bu.items.Interface())
|
|
bu = b.postHandle(bu)
|
|
}
|
|
}
|
|
|
|
func (b *Bundler) postHandle(bu *bundle) *bundle {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
|
|
b.sem.Release(int64(bu.size))
|
|
bu.flush.Done()
|
|
|
|
bu = b.next()
|
|
if bu == nil {
|
|
b.handlerCount--
|
|
}
|
|
return bu
|
|
}
|
|
|
|
// AddWait adds item to the current bundle. It marks the bundle for handling and
|
|
// starts a new one if any of the thresholds or limits are exceeded.
|
|
//
|
|
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
|
|
// the item can never be handled. AddWait returns ErrOversizedItem in this case.
|
|
//
|
|
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
|
|
// AddWait blocks until space is available or ctx is done.
|
|
//
|
|
// Calls to Add and AddWait should not be mixed on the same Bundler.
|
|
func (b *Bundler) AddWait(ctx context.Context, item interface{}, size int) error {
|
|
if err := b.setMode(addWait); err != nil {
|
|
return err
|
|
}
|
|
// If this item exceeds the maximum size of a bundle,
|
|
// we can never send it.
|
|
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
|
|
return ErrOversizedItem
|
|
}
|
|
// If adding this item would exceed our allotted memory footprint, block
|
|
// until space is available. The semaphore is FIFO, so there will be no
|
|
// starvation.
|
|
b.initSemaphores()
|
|
if err := b.sem.Acquire(ctx, int64(size)); err != nil {
|
|
return err
|
|
}
|
|
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
return b.add(item, size)
|
|
}
|
|
|
|
// Flush invokes the handler for all remaining items in the Bundler and waits
|
|
// for it to return.
|
|
func (b *Bundler) Flush() {
|
|
b.mu.Lock()
|
|
|
|
// If a curBundle is pending, move it to the queue.
|
|
b.enqueueCurBundle()
|
|
|
|
// Store a pointer to the WaitGroup that counts outstanding bundles
|
|
// in the current flush and create a new one to track the next flush.
|
|
wg := b.curFlush
|
|
b.curFlush = &sync.WaitGroup{}
|
|
|
|
// Flush must wait for all prior, outstanding flushes to complete.
|
|
// We use a channel to communicate completion between each flush in
|
|
// the sequence.
|
|
prev := b.prevFlush
|
|
next := make(chan bool)
|
|
b.prevFlush = next
|
|
|
|
b.mu.Unlock()
|
|
|
|
// Wait until the previous flush is finished.
|
|
if prev != nil {
|
|
<-prev
|
|
}
|
|
|
|
// Wait until this flush is finished.
|
|
wg.Wait()
|
|
|
|
// Allow the next flush to finish.
|
|
close(next)
|
|
}
|