Merge pull request #19682 from aaronlehmann/transfer-release-race

Fix watching a released transfer
This commit is contained in:
Sebastiaan van Stijn 2016-01-25 15:37:47 -08:00
commit 31adcc96d0
2 changed files with 98 additions and 15 deletions

View File

@ -1,6 +1,7 @@
package xfer package xfer
import ( import (
"runtime"
"sync" "sync"
"github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/progress"
@ -38,7 +39,7 @@ type Transfer interface {
Watch(progressOutput progress.Output) *Watcher Watch(progressOutput progress.Output) *Watcher
Release(*Watcher) Release(*Watcher)
Context() context.Context Context() context.Context
Cancel() Close()
Done() <-chan struct{} Done() <-chan struct{}
Released() <-chan struct{} Released() <-chan struct{}
Broadcast(masterProgressChan <-chan progress.Progress) Broadcast(masterProgressChan <-chan progress.Progress)
@ -61,11 +62,14 @@ type transfer struct {
// running remains open as long as the transfer is in progress. // running remains open as long as the transfer is in progress.
running chan struct{} running chan struct{}
// hasWatchers stays open until all watchers release the transfer. // released stays open until all watchers release the transfer and
hasWatchers chan struct{} // the transfer is no longer tracked by the transfer manager.
released chan struct{}
// broadcastDone is true if the master progress channel has closed. // broadcastDone is true if the master progress channel has closed.
broadcastDone bool broadcastDone bool
// closed is true if Close has been called
closed bool
// broadcastSyncChan allows watchers to "ping" the broadcasting // broadcastSyncChan allows watchers to "ping" the broadcasting
// goroutine to wait for it for deplete its input channel. This ensures // goroutine to wait for it for deplete its input channel. This ensures
// a detaching watcher won't miss an event that was sent before it // a detaching watcher won't miss an event that was sent before it
@ -78,7 +82,7 @@ func NewTransfer() Transfer {
t := &transfer{ t := &transfer{
watchers: make(map[chan struct{}]*Watcher), watchers: make(map[chan struct{}]*Watcher),
running: make(chan struct{}), running: make(chan struct{}),
hasWatchers: make(chan struct{}), released: make(chan struct{}),
broadcastSyncChan: make(chan struct{}), broadcastSyncChan: make(chan struct{}),
} }
@ -144,13 +148,13 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher {
running: make(chan struct{}), running: make(chan struct{}),
} }
t.watchers[w.releaseChan] = w
if t.broadcastDone { if t.broadcastDone {
close(w.running) close(w.running)
return w return w
} }
t.watchers[w.releaseChan] = w
go func() { go func() {
defer func() { defer func() {
close(w.running) close(w.running)
@ -202,8 +206,19 @@ func (t *transfer) Release(watcher *Watcher) {
delete(t.watchers, watcher.releaseChan) delete(t.watchers, watcher.releaseChan)
if len(t.watchers) == 0 { if len(t.watchers) == 0 {
close(t.hasWatchers) if t.closed {
t.cancel() // released may have been closed already if all
// watchers were released, then another one was added
// while waiting for a previous watcher goroutine to
// finish.
select {
case <-t.released:
default:
close(t.released)
}
} else {
t.cancel()
}
} }
t.mu.Unlock() t.mu.Unlock()
@ -223,9 +238,9 @@ func (t *transfer) Done() <-chan struct{} {
} }
// Released returns a channel which is closed once all watchers release the // Released returns a channel which is closed once all watchers release the
// transfer. // transfer AND the transfer is no longer tracked by the transfer manager.
func (t *transfer) Released() <-chan struct{} { func (t *transfer) Released() <-chan struct{} {
return t.hasWatchers return t.released
} }
// Context returns the context associated with the transfer. // Context returns the context associated with the transfer.
@ -233,9 +248,15 @@ func (t *transfer) Context() context.Context {
return t.ctx return t.ctx
} }
// Cancel cancels the context associated with the transfer. // Close is called by the transfer manager when the transfer is no longer
func (t *transfer) Cancel() { // being tracked.
t.cancel() func (t *transfer) Close() {
t.mu.Lock()
t.closed = true
if len(t.watchers) == 0 {
close(t.released)
}
t.mu.Unlock()
} }
// DoFunc is a function called by the transfer manager to actually perform // DoFunc is a function called by the transfer manager to actually perform
@ -280,10 +301,33 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
if xfer, present := tm.transfers[key]; present { for {
xfer, present := tm.transfers[key]
if !present {
break
}
// Transfer is already in progress. // Transfer is already in progress.
watcher := xfer.Watch(progressOutput) watcher := xfer.Watch(progressOutput)
return xfer, watcher
select {
case <-xfer.Context().Done():
// We don't want to watch a transfer that has been cancelled.
// Wait for it to be removed from the map and try again.
xfer.Release(watcher)
tm.mu.Unlock()
// The goroutine that removes this transfer from the
// map is also waiting for xfer.Done(), so yield to it.
// This could be avoided by adding a Closed method
// to Transfer to allow explicitly waiting for it to be
// removed the map, but forcing a scheduling round in
// this very rare case seems better than bloating the
// interface definition.
runtime.Gosched()
<-xfer.Done()
tm.mu.Lock()
default:
return xfer, watcher
}
} }
start := make(chan struct{}) start := make(chan struct{})
@ -318,6 +362,7 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput
} }
delete(tm.transfers, key) delete(tm.transfers, key)
tm.mu.Unlock() tm.mu.Unlock()
xfer.Close()
return return
} }
} }

View File

@ -291,6 +291,44 @@ func TestWatchRelease(t *testing.T) {
} }
} }
func TestWatchFinishedTransfer(t *testing.T) {
makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
xfer := NewTransfer()
go func() {
// Finish immediately
close(progressChan)
}()
return xfer
}
}
tm := NewTransferManager(5)
// Start a transfer
watchers := make([]*Watcher, 3)
var xfer Transfer
xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
// Give it a watcher immediately
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
// Wait for the transfer to complete
<-xfer.Done()
// Set up another watcher
watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
// Release the watchers
for _, w := range watchers {
xfer.Release(w)
}
// Now that all watchers have been released, Released() should
// return a closed channel.
<-xfer.Released()
}
func TestDuplicateTransfer(t *testing.T) { func TestDuplicateTransfer(t *testing.T) {
ready := make(chan struct{}) ready := make(chan struct{})