From 317a5462e428fab6248fe3b028822250c8c9ff7f Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Fri, 28 Aug 2015 10:09:00 -0700 Subject: [PATCH] Make the broadcaster write messages to the observers in the same units they were written to the broadcaster This means the writing to a WriteFlusher will flush in the same places as it would if the broadcaster wasn't sitting in front of it. Signed-off-by: Aaron Lehmann --- pkg/progressreader/broadcaster.go | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/pkg/progressreader/broadcaster.go b/pkg/progressreader/broadcaster.go index 4b08ce405d..429b1d0f1b 100644 --- a/pkg/progressreader/broadcaster.go +++ b/pkg/progressreader/broadcaster.go @@ -1,7 +1,6 @@ package progressreader import ( - "bytes" "errors" "io" "sync" @@ -19,8 +18,10 @@ type Broadcaster struct { // new data available. cond *sync.Cond // history is a buffer of the progress output so far, so a new observer - // can catch up. - history bytes.Buffer + // can catch up. The history is stored as a slice of separate byte + // slices, so that if the writer is a WriteFlusher, the flushes will + // happen in the right places. + history [][]byte // wg is a WaitGroup used to wait for all writes to finish on Close wg sync.WaitGroup // isClosed is set to true when Close is called to avoid closing c @@ -58,19 +59,20 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { // The condition variable wait is at the end of this loop, so that the // first iteration will write the history so far. for { - newData := broadcaster.history.Bytes()[n:] + newData := broadcaster.history[n:] // Make a copy of newData so we can release the lock - sendData := make([]byte, len(newData), len(newData)) + sendData := make([][]byte, len(newData), len(newData)) copy(sendData, newData) broadcaster.Unlock() - if len(sendData) > 0 { - written, err := observer.Write(sendData) + for len(sendData) > 0 { + _, err := observer.Write(sendData[0]) if err != nil { broadcaster.wg.Done() return } - n += written + n++ + sendData = sendData[1:] } broadcaster.Lock() @@ -82,7 +84,7 @@ func (broadcaster *Broadcaster) receiveWrites(observer io.Writer) { return } - if broadcaster.history.Len() == n { + if len(broadcaster.history) == n { broadcaster.cond.Wait() } @@ -101,7 +103,11 @@ func (broadcaster *Broadcaster) Write(p []byte) (n int, err error) { return 0, errors.New("attempted write to closed progressreader Broadcaster") } - broadcaster.history.Write(p) + // Add message in p to the history slice + newEntry := make([]byte, len(p), len(p)) + copy(newEntry, p) + broadcaster.history = append(broadcaster.history, newEntry) + broadcaster.cond.Broadcast() return len(p), nil