From e273445dd407df6803d7b80863b644a6cfa2c1f5 Mon Sep 17 00:00:00 2001 From: Aaron Lehmann Date: Fri, 29 Jan 2016 14:34:50 -0800 Subject: [PATCH] Fix panic on network timeout during push `Upload` already closes the reader returned by `compress` and the progressreader passed into it, before returning. But even so, the io.Copy inside compress' goroutine needs to attempt a read from the progressreader to notice that it's closed, and this read has a side effect of outputting a progress message. If this happens after `Upload` returns, it can result in a write to a closed channel. Change `compress` to return a channel that allows the caller to wait for its goroutine to finish before freeing any resources connected to the reader that was passed to it. Signed-off-by: Aaron Lehmann --- distribution/push.go | 12 ++++++++++-- distribution/push_v2.go | 7 +++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/distribution/push.go b/distribution/push.go index 113aa14e1c..d0622f82c9 100644 --- a/distribution/push.go +++ b/distribution/push.go @@ -170,7 +170,14 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo // argument so that it can be used with httpBlobWriter's ReadFrom method. // Using httpBlobWriter's Write method would send a PATCH request for every // Write call. -func compress(in io.Reader) io.ReadCloser { +// +// The second return value is a channel that gets closed when the goroutine +// is finished. This allows the caller to make sure the goroutine finishes +// before it releases any resources connected with the reader that was +// passed in. +func compress(in io.Reader) (io.ReadCloser, chan struct{}) { + compressionDone := make(chan struct{}) + pipeReader, pipeWriter := io.Pipe() // Use a bufio.Writer to avoid excessive chunking in HTTP request. bufWriter := bufio.NewWriterSize(pipeWriter, compressionBufSize) @@ -189,7 +196,8 @@ func compress(in io.Reader) io.ReadCloser { } else { pipeWriter.Close() } + close(compressionDone) }() - return pipeReader + return pipeReader, compressionDone } diff --git a/distribution/push_v2.go b/distribution/push_v2.go index 76d4347047..e5f9de81f6 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -345,8 +345,11 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. size, _ := pd.layer.DiffSize() reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing") - defer reader.Close() - compressedReader := compress(reader) + compressedReader, compressionDone := compress(reader) + defer func() { + reader.Close() + <-compressionDone + }() digester := digest.Canonical.New() tee := io.TeeReader(compressedReader, digester.Hash())