diff --git a/pkg/ioutils/bytespipe.go b/pkg/ioutils/bytespipe.go index 87514b643d..d1dfdae0cc 100644 --- a/pkg/ioutils/bytespipe.go +++ b/pkg/ioutils/bytespipe.go @@ -50,12 +50,12 @@ func NewBytesPipe() *BytesPipe { // It can allocate new []byte slices in a process of writing. func (bp *BytesPipe) Write(p []byte) (int, error) { bp.mu.Lock() + defer bp.mu.Unlock() written := 0 loop0: for { if bp.closeErr != nil { - bp.mu.Unlock() return written, ErrClosed } @@ -72,7 +72,6 @@ loop0: // errBufferFull is an error we expect to get if the buffer is full if err != nil && err != errBufferFull { bp.wait.Broadcast() - bp.mu.Unlock() return written, err } @@ -100,7 +99,6 @@ loop0: bp.buf = append(bp.buf, getBuffer(nextCap)) } bp.wait.Broadcast() - bp.mu.Unlock() return written, nil } @@ -126,17 +124,14 @@ func (bp *BytesPipe) Close() error { // Data could be read only once. func (bp *BytesPipe) Read(p []byte) (n int, err error) { bp.mu.Lock() + defer bp.mu.Unlock() if bp.bufLen == 0 { if bp.closeErr != nil { - err := bp.closeErr - bp.mu.Unlock() - return 0, err + return 0, bp.closeErr } bp.wait.Wait() if bp.bufLen == 0 && bp.closeErr != nil { - err := bp.closeErr - bp.mu.Unlock() - return 0, err + return 0, bp.closeErr } } @@ -161,7 +156,6 @@ func (bp *BytesPipe) Read(p []byte) (n int, err error) { } bp.wait.Broadcast() - bp.mu.Unlock() return }