mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
262 lines
8.8 KiB
Go
262 lines
8.8 KiB
Go
|
// Copyright 2016 Google Inc. All Rights Reserved.
|
||
|
//
|
||
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
// you may not use this file except in compliance with the License.
|
||
|
// You may obtain a copy of the License at
|
||
|
//
|
||
|
// http://www.apache.org/licenses/LICENSE-2.0
|
||
|
//
|
||
|
// Unless required by applicable law or agreed to in writing, software
|
||
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
// See the License for the specific language governing permissions and
|
||
|
// limitations under the License.
|
||
|
|
||
|
// Package bundler supports bundling (batching) of items. Bundling amortizes an
|
||
|
// action with fixed costs over multiple items. For example, if an API provides
|
||
|
// an RPC that accepts a list of items as input, but clients would prefer
|
||
|
// adding items one at a time, then a Bundler can accept individual items from
|
||
|
// the client and bundle many of them into a single RPC.
|
||
|
//
|
||
|
// This package is experimental and subject to change without notice.
|
||
|
package bundler
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"reflect"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
const (
|
||
|
DefaultDelayThreshold = time.Second
|
||
|
DefaultBundleCountThreshold = 10
|
||
|
DefaultBundleByteThreshold = 1e6 // 1M
|
||
|
DefaultBufferedByteLimit = 1e9 // 1G
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
|
||
|
ErrOverflow = errors.New("bundler reached buffered byte limit")
|
||
|
|
||
|
// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
|
||
|
ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
|
||
|
)
|
||
|
|
||
|
// A Bundler collects items added to it into a bundle until the bundle
|
||
|
// exceeds a given size, then calls a user-provided function to handle the bundle.
|
||
|
type Bundler struct {
|
||
|
// Starting from the time that the first message is added to a bundle, once
|
||
|
// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
|
||
|
DelayThreshold time.Duration
|
||
|
|
||
|
// Once a bundle has this many items, handle the bundle. Since only one
|
||
|
// item at a time is added to a bundle, no bundle will exceed this
|
||
|
// threshold, so it also serves as a limit. The default is
|
||
|
// DefaultBundleCountThreshold.
|
||
|
BundleCountThreshold int
|
||
|
|
||
|
// Once the number of bytes in current bundle reaches this threshold, handle
|
||
|
// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
|
||
|
// but does not cap the total size of a bundle.
|
||
|
BundleByteThreshold int
|
||
|
|
||
|
// The maximum size of a bundle, in bytes. Zero means unlimited.
|
||
|
BundleByteLimit int
|
||
|
|
||
|
// The maximum number of bytes that the Bundler will keep in memory before
|
||
|
// returning ErrOverflow. The default is DefaultBufferedByteLimit.
|
||
|
BufferedByteLimit int
|
||
|
|
||
|
handler func(interface{}) // called to handle a bundle
|
||
|
itemSliceZero reflect.Value // nil (zero value) for slice of items
|
||
|
donec chan struct{} // closed when the Bundler is closed
|
||
|
handlec chan int // sent to when a bundle is ready for handling
|
||
|
timer *time.Timer // implements DelayThreshold
|
||
|
|
||
|
mu sync.Mutex
|
||
|
bufferedSize int // total bytes buffered
|
||
|
closedBundles []bundle // bundles waiting to be handled
|
||
|
curBundle bundle // incoming items added to this bundle
|
||
|
calledc chan struct{} // closed and re-created after handler is called
|
||
|
}
|
||
|
|
||
|
type bundle struct {
|
||
|
items reflect.Value // slice of item type
|
||
|
size int // size in bytes of all items
|
||
|
}
|
||
|
|
||
|
// NewBundler creates a new Bundler. When you are finished with a Bundler, call
|
||
|
// its Close method.
|
||
|
//
|
||
|
// itemExample is a value of the type that will be bundled. For example, if you
|
||
|
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
|
||
|
//
|
||
|
// handler is a function that will be called on each bundle. If itemExample is
|
||
|
// of type T, the argument to handler is of type []T. handler is always called
|
||
|
// sequentially for each bundle, and never in parallel.
|
||
|
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
|
||
|
b := &Bundler{
|
||
|
DelayThreshold: DefaultDelayThreshold,
|
||
|
BundleCountThreshold: DefaultBundleCountThreshold,
|
||
|
BundleByteThreshold: DefaultBundleByteThreshold,
|
||
|
BufferedByteLimit: DefaultBufferedByteLimit,
|
||
|
|
||
|
handler: handler,
|
||
|
itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
|
||
|
donec: make(chan struct{}),
|
||
|
handlec: make(chan int, 1),
|
||
|
calledc: make(chan struct{}),
|
||
|
timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout
|
||
|
}
|
||
|
b.curBundle.items = b.itemSliceZero
|
||
|
go b.background()
|
||
|
return b
|
||
|
}
|
||
|
|
||
|
// Add adds item to the current bundle. It marks the bundle for handling and
|
||
|
// starts a new one if any of the thresholds or limits are exceeded.
|
||
|
//
|
||
|
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
|
||
|
// the item can never be handled. Add returns ErrOversizedItem in this case.
|
||
|
//
|
||
|
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
|
||
|
// Add returns ErrOverflow.
|
||
|
//
|
||
|
// Add never blocks.
|
||
|
func (b *Bundler) Add(item interface{}, size int) error {
|
||
|
// If this item exceeds the maximum size of a bundle,
|
||
|
// we can never send it.
|
||
|
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
|
||
|
return ErrOversizedItem
|
||
|
}
|
||
|
b.mu.Lock()
|
||
|
defer b.mu.Unlock()
|
||
|
// If adding this item would exceed our allotted memory
|
||
|
// footprint, we can't accept it.
|
||
|
if b.bufferedSize+size > b.BufferedByteLimit {
|
||
|
return ErrOverflow
|
||
|
}
|
||
|
// If adding this item to the current bundle would cause it to exceed the
|
||
|
// maximum bundle size, close the current bundle and start a new one.
|
||
|
if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
|
||
|
b.closeAndHandleBundle()
|
||
|
}
|
||
|
// Add the item.
|
||
|
b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
|
||
|
b.curBundle.size += size
|
||
|
b.bufferedSize += size
|
||
|
// If this is the first item in the bundle, restart the timer.
|
||
|
if b.curBundle.items.Len() == 1 {
|
||
|
b.timer.Reset(b.DelayThreshold)
|
||
|
}
|
||
|
// If the current bundle equals the count threshold, close it.
|
||
|
if b.curBundle.items.Len() == b.BundleCountThreshold {
|
||
|
b.closeAndHandleBundle()
|
||
|
}
|
||
|
// If the current bundle equals or exceeds the byte threshold, close it.
|
||
|
if b.curBundle.size >= b.BundleByteThreshold {
|
||
|
b.closeAndHandleBundle()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Flush waits until all items in the Bundler have been handled (that is,
|
||
|
// until the last invocation of handler has returned).
|
||
|
func (b *Bundler) Flush() {
|
||
|
b.mu.Lock()
|
||
|
b.closeBundle()
|
||
|
// Unconditionally trigger the handling goroutine, to ensure calledc is closed
|
||
|
// even if there are no outstanding bundles.
|
||
|
select {
|
||
|
case b.handlec <- 1:
|
||
|
default:
|
||
|
}
|
||
|
calledc := b.calledc // remember locally, because it may change
|
||
|
b.mu.Unlock()
|
||
|
<-calledc
|
||
|
}
|
||
|
|
||
|
// Close calls Flush, then shuts down the Bundler. Close should always be
|
||
|
// called on a Bundler when it is no longer needed. You must wait for all calls
|
||
|
// to Add to complete before calling Close. Calling Add concurrently with Close
|
||
|
// may result in the added items being ignored.
|
||
|
func (b *Bundler) Close() {
|
||
|
b.Flush()
|
||
|
b.mu.Lock()
|
||
|
b.timer.Stop()
|
||
|
b.mu.Unlock()
|
||
|
close(b.donec)
|
||
|
}
|
||
|
|
||
|
func (b *Bundler) closeAndHandleBundle() {
|
||
|
if b.closeBundle() {
|
||
|
// We have created a closed bundle.
|
||
|
// Send to handlec without blocking.
|
||
|
select {
|
||
|
case b.handlec <- 1:
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// closeBundle finishes the current bundle, adds it to the list of closed
|
||
|
// bundles and informs the background goroutine that there are bundles ready
|
||
|
// for processing.
|
||
|
//
|
||
|
// This should always be called with b.mu held.
|
||
|
func (b *Bundler) closeBundle() bool {
|
||
|
if b.curBundle.items.Len() == 0 {
|
||
|
return false
|
||
|
}
|
||
|
b.closedBundles = append(b.closedBundles, b.curBundle)
|
||
|
b.curBundle.items = b.itemSliceZero
|
||
|
b.curBundle.size = 0
|
||
|
return true
|
||
|
}
|
||
|
|
||
|
// background runs in a separate goroutine, waiting for events and handling
|
||
|
// bundles.
|
||
|
func (b *Bundler) background() {
|
||
|
done := false
|
||
|
for {
|
||
|
timedOut := false
|
||
|
// Wait for something to happen.
|
||
|
select {
|
||
|
case <-b.handlec:
|
||
|
case <-b.donec:
|
||
|
done = true
|
||
|
case <-b.timer.C:
|
||
|
timedOut = true
|
||
|
}
|
||
|
// Handle closed bundles.
|
||
|
b.mu.Lock()
|
||
|
if timedOut {
|
||
|
b.closeBundle()
|
||
|
}
|
||
|
buns := b.closedBundles
|
||
|
b.closedBundles = nil
|
||
|
// Closing calledc means we've sent all bundles. We need
|
||
|
// a new channel for the next set of bundles, which may start
|
||
|
// accumulating as soon as we release the lock.
|
||
|
calledc := b.calledc
|
||
|
b.calledc = make(chan struct{})
|
||
|
b.mu.Unlock()
|
||
|
for i, bun := range buns {
|
||
|
b.handler(bun.items.Interface())
|
||
|
// Drop the bundle's items, reducing our memory footprint.
|
||
|
buns[i].items = reflect.Value{} // buns[i] because bun is a copy
|
||
|
// Note immediately that we have more space, so Adds that occur
|
||
|
// during this loop will have a chance of succeeding.
|
||
|
b.mu.Lock()
|
||
|
b.bufferedSize -= bun.size
|
||
|
b.mu.Unlock()
|
||
|
}
|
||
|
// Signal that we've sent all outstanding bundles.
|
||
|
close(calledc)
|
||
|
if done {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
}
|