diff --git a/pkg/ioutils/readers.go b/pkg/ioutils/readers.go index 22f46fbd92..58ff1af639 100644 --- a/pkg/ioutils/readers.go +++ b/pkg/ioutils/readers.go @@ -2,8 +2,11 @@ package ioutils import ( "bytes" + "crypto/rand" "io" + "math/big" "sync" + "time" ) type readCloserWrapper struct { @@ -42,20 +45,40 @@ func NewReaderErrWrapper(r io.Reader, closer func()) io.Reader { } } +// bufReader allows the underlying reader to continue to produce +// output by pre-emptively reading from the wrapped reader. +// This is achieved by buffering this data in bufReader's +// expanding buffer. type bufReader struct { sync.Mutex - buf *bytes.Buffer - reader io.Reader - err error - wait sync.Cond - drainBuf []byte + buf *bytes.Buffer + reader io.Reader + err error + wait sync.Cond + drainBuf []byte + reuseBuf []byte + maxReuse int64 + resetTimeout time.Duration + bufLenResetThreshold int64 + maxReadDataReset int64 } func NewBufReader(r io.Reader) *bufReader { + var timeout int + if randVal, err := rand.Int(rand.Reader, big.NewInt(120)); err == nil { + timeout = int(randVal.Int64()) + 180 + } else { + timeout = 300 + } reader := &bufReader{ - buf: &bytes.Buffer{}, - drainBuf: make([]byte, 1024), - reader: r, + buf: &bytes.Buffer{}, + drainBuf: make([]byte, 1024), + reuseBuf: make([]byte, 4096), + maxReuse: 1000, + resetTimeout: time.Second * time.Duration(timeout), + bufLenResetThreshold: 100 * 1024, + maxReadDataReset: 10 * 1024 * 1024, + reader: r, } reader.wait.L = &reader.Mutex go reader.drain() @@ -74,14 +97,94 @@ func NewBufReaderWithDrainbufAndBuffer(r io.Reader, drainBuffer []byte, buffer * } func (r *bufReader) drain() { + var ( + duration time.Duration + lastReset time.Time + now time.Time + reset bool + bufLen int64 + dataSinceReset int64 + maxBufLen int64 + reuseBufLen int64 + reuseCount int64 + ) + reuseBufLen = int64(len(r.reuseBuf)) + lastReset = time.Now() for { n, err := r.reader.Read(r.drainBuf) + dataSinceReset += int64(n) r.Lock() + bufLen = int64(r.buf.Len()) + if bufLen > maxBufLen { + maxBufLen = bufLen + } + + // Avoid unbounded growth of the buffer over time. + // This has been discovered to be the only non-intrusive + // solution to the unbounded growth of the buffer. + // Alternative solutions such as compression, multiple + // buffers, channels and other similar pieces of code + // were reducing throughput, overall Docker performance + // or simply crashed Docker. + // This solution releases the buffer when specific + // conditions are met to avoid the continuous resizing + // of the buffer for long lived containers. + // + // Move data to the front of the buffer if it's + // smaller than what reuseBuf can store + if bufLen > 0 && reuseBufLen >= bufLen { + n, _ := r.buf.Read(r.reuseBuf) + r.buf.Write(r.reuseBuf[0:n]) + // Take action if the buffer has been reused too many + // times and if there's data in the buffer. + // The timeout is also used as means to avoid doing + // these operations more often or less often than + // required. + // The various conditions try to detect heavy activity + // in the buffer which might be indicators of heavy + // growth of the buffer. + } else if reuseCount >= r.maxReuse && bufLen > 0 { + now = time.Now() + duration = now.Sub(lastReset) + timeoutReached := duration >= r.resetTimeout + + // The timeout has been reached and the + // buffered data couldn't be moved to the front + // of the buffer, so the buffer gets reset. + if timeoutReached && bufLen > reuseBufLen { + reset = true + } + // The amount of buffered data is too high now, + // reset the buffer. + if timeoutReached && maxBufLen >= r.bufLenResetThreshold { + reset = true + } + // Reset the buffer if a certain amount of + // data has gone through the buffer since the + // last reset. + if timeoutReached && dataSinceReset >= r.maxReadDataReset { + reset = true + } + // The buffered data is moved to a fresh buffer, + // swap the old buffer with the new one and + // reset all counters. + if reset { + newbuf := &bytes.Buffer{} + newbuf.ReadFrom(r.buf) + r.buf = newbuf + lastReset = now + reset = false + dataSinceReset = 0 + maxBufLen = 0 + reuseCount = 0 + } + } if err != nil { r.err = err } else { r.buf.Write(r.drainBuf[0:n]) } + reuseCount++ r.wait.Signal() r.Unlock() if err != nil {