// Copyright 2016 Google Inc. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // Package bundler supports bundling (batching) of items. Bundling amortizes an // action with fixed costs over multiple items. For example, if an API provides // an RPC that accepts a list of items as input, but clients would prefer // adding items one at a time, then a Bundler can accept individual items from // the client and bundle many of them into a single RPC. // // This package is experimental and subject to change without notice. package bundler import ( "errors" "reflect" "sync" "time" ) const ( DefaultDelayThreshold = time.Second DefaultBundleCountThreshold = 10 DefaultBundleByteThreshold = 1e6 // 1M DefaultBufferedByteLimit = 1e9 // 1G ) var ( // ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit. ErrOverflow = errors.New("bundler reached buffered byte limit") // ErrOversizedItem indicates that an item's size exceeds the maximum bundle size. ErrOversizedItem = errors.New("item size exceeds bundle byte limit") ) // A Bundler collects items added to it into a bundle until the bundle // exceeds a given size, then calls a user-provided function to handle the bundle. type Bundler struct { // Starting from the time that the first message is added to a bundle, once // this delay has passed, handle the bundle. The default is DefaultDelayThreshold. DelayThreshold time.Duration // Once a bundle has this many items, handle the bundle. Since only one // item at a time is added to a bundle, no bundle will exceed this // threshold, so it also serves as a limit. The default is // DefaultBundleCountThreshold. BundleCountThreshold int // Once the number of bytes in current bundle reaches this threshold, handle // the bundle. The default is DefaultBundleByteThreshold. This triggers handling, // but does not cap the total size of a bundle. BundleByteThreshold int // The maximum size of a bundle, in bytes. Zero means unlimited. BundleByteLimit int // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. The default is DefaultBufferedByteLimit. BufferedByteLimit int handler func(interface{}) // called to handle a bundle itemSliceZero reflect.Value // nil (zero value) for slice of items donec chan struct{} // closed when the Bundler is closed handlec chan int // sent to when a bundle is ready for handling timer *time.Timer // implements DelayThreshold mu sync.Mutex bufferedSize int // total bytes buffered closedBundles []bundle // bundles waiting to be handled curBundle bundle // incoming items added to this bundle calledc chan struct{} // closed and re-created after handler is called } type bundle struct { items reflect.Value // slice of item type size int // size in bytes of all items } // NewBundler creates a new Bundler. When you are finished with a Bundler, call // its Close method. // // itemExample is a value of the type that will be bundled. For example, if you // want to create bundles of *Entry, you could pass &Entry{} for itemExample. // // handler is a function that will be called on each bundle. If itemExample is // of type T, the argument to handler is of type []T. handler is always called // sequentially for each bundle, and never in parallel. func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler { b := &Bundler{ DelayThreshold: DefaultDelayThreshold, BundleCountThreshold: DefaultBundleCountThreshold, BundleByteThreshold: DefaultBundleByteThreshold, BufferedByteLimit: DefaultBufferedByteLimit, handler: handler, itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))), donec: make(chan struct{}), handlec: make(chan int, 1), calledc: make(chan struct{}), timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout } b.curBundle.items = b.itemSliceZero go b.background() return b } // Add adds item to the current bundle. It marks the bundle for handling and // starts a new one if any of the thresholds or limits are exceeded. // // If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then // the item can never be handled. Add returns ErrOversizedItem in this case. // // If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit), // Add returns ErrOverflow. // // Add never blocks. func (b *Bundler) Add(item interface{}, size int) error { // If this item exceeds the maximum size of a bundle, // we can never send it. if b.BundleByteLimit > 0 && size > b.BundleByteLimit { return ErrOversizedItem } b.mu.Lock() defer b.mu.Unlock() // If adding this item would exceed our allotted memory // footprint, we can't accept it. if b.bufferedSize+size > b.BufferedByteLimit { return ErrOverflow } // If adding this item to the current bundle would cause it to exceed the // maximum bundle size, close the current bundle and start a new one. if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit { b.closeAndHandleBundle() } // Add the item. b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item)) b.curBundle.size += size b.bufferedSize += size // If this is the first item in the bundle, restart the timer. if b.curBundle.items.Len() == 1 { b.timer.Reset(b.DelayThreshold) } // If the current bundle equals the count threshold, close it. if b.curBundle.items.Len() == b.BundleCountThreshold { b.closeAndHandleBundle() } // If the current bundle equals or exceeds the byte threshold, close it. if b.curBundle.size >= b.BundleByteThreshold { b.closeAndHandleBundle() } return nil } // Flush waits until all items in the Bundler have been handled (that is, // until the last invocation of handler has returned). func (b *Bundler) Flush() { b.mu.Lock() b.closeBundle() // Unconditionally trigger the handling goroutine, to ensure calledc is closed // even if there are no outstanding bundles. select { case b.handlec <- 1: default: } calledc := b.calledc // remember locally, because it may change b.mu.Unlock() <-calledc } // Close calls Flush, then shuts down the Bundler. Close should always be // called on a Bundler when it is no longer needed. You must wait for all calls // to Add to complete before calling Close. Calling Add concurrently with Close // may result in the added items being ignored. func (b *Bundler) Close() { b.Flush() b.mu.Lock() b.timer.Stop() b.mu.Unlock() close(b.donec) } func (b *Bundler) closeAndHandleBundle() { if b.closeBundle() { // We have created a closed bundle. // Send to handlec without blocking. select { case b.handlec <- 1: default: } } } // closeBundle finishes the current bundle, adds it to the list of closed // bundles and informs the background goroutine that there are bundles ready // for processing. // // This should always be called with b.mu held. func (b *Bundler) closeBundle() bool { if b.curBundle.items.Len() == 0 { return false } b.closedBundles = append(b.closedBundles, b.curBundle) b.curBundle.items = b.itemSliceZero b.curBundle.size = 0 return true } // background runs in a separate goroutine, waiting for events and handling // bundles. func (b *Bundler) background() { done := false for { timedOut := false // Wait for something to happen. select { case <-b.handlec: case <-b.donec: done = true case <-b.timer.C: timedOut = true } // Handle closed bundles. b.mu.Lock() if timedOut { b.closeBundle() } buns := b.closedBundles b.closedBundles = nil // Closing calledc means we've sent all bundles. We need // a new channel for the next set of bundles, which may start // accumulating as soon as we release the lock. calledc := b.calledc b.calledc = make(chan struct{}) b.mu.Unlock() for i, bun := range buns { b.handler(bun.items.Interface()) // Drop the bundle's items, reducing our memory footprint. buns[i].items = reflect.Value{} // buns[i] because bun is a copy // Note immediately that we have more space, so Adds that occur // during this loop will have a chance of succeeding. b.mu.Lock() b.bufferedSize -= bun.size b.mu.Unlock() } // Signal that we've sent all outstanding bundles. close(calledc) if done { break } } }