mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
3e2b50ccaa
Things could go wrong if Watch was called after the last existing watcher was released. The call to Watch would succeed even though it was not really adding a watcher, and the corresponding call to Release would close hasWatchers a second time. The fix for this is twofold: 1. We allow transfers to gain new watchers after the watcher count has touched zero. This means that the channel returned by Released should not be closed until all watchers have been released AND the transfer is no longer tracked by the transfer manager, meaning it won't be possible for additional calls to Watch to race with closing the channel returned by Released. The Transfer interface has a new method called Close so the transfer can know when the transfer manager no longer references it. Remove the Cancel method. It's not used and should not be exported. 2. Even though (1) makes it possible to add watchers after all the previous watchers have been released, we want to avoid doing this in practice. A transfer that has had all its watchers released is in the process of being cancelled, and attaching to one of these will never be the correct behavior. Add a check if a watcher is attaching to a cancelled transfer. In this case, wait for the transfer to be removed from the map and try again. This will ensure correct behavior when a watcher tries to attach during the race window. Either (1) or (2) should be sufficient to fix the race involved here, but the combination is the most correct approach. (1) fixes the low-level plumbing to be resilient to the race condition, and (2) avoids using it in a racy way. Fixes #19606 Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
423 lines
11 KiB
Go
423 lines
11 KiB
Go
package xfer
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/docker/pkg/progress"
|
|
)
|
|
|
|
func TestTransfer(t *testing.T) {
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
select {
|
|
case <-start:
|
|
default:
|
|
t.Fatalf("transfer function not started even though concurrency limit not reached")
|
|
}
|
|
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
val, present := receivedProgress[p.ID]
|
|
if !present {
|
|
if p.Current != 0 {
|
|
t.Fatalf("got unexpected progress value: %d (expected 0)", p.Current)
|
|
}
|
|
} else if p.Current == 10 {
|
|
// Special case: last progress output may be
|
|
// repeated because the transfer finishing
|
|
// causes the latest progress output to be
|
|
// written to the channel (in case the watcher
|
|
// missed it).
|
|
if p.Current != 9 && p.Current != 10 {
|
|
t.Fatalf("got unexpected progress value: %d (expected %d)", p.Current, val+1)
|
|
}
|
|
} else if p.Current != val+1 {
|
|
t.Fatalf("got unexpected progress value: %d (expected %d)", p.Current, val+1)
|
|
}
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start a few transfers
|
|
ids := []string{"id1", "id2", "id3"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConcurrencyLimit(t *testing.T) {
|
|
concurrencyLimit := 3
|
|
var runningJobs int32
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
<-start
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
if int(totalJobs) > concurrencyLimit {
|
|
t.Fatalf("too many jobs running")
|
|
}
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start more transfers than the concurrency limit
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestInactiveJobs(t *testing.T) {
|
|
concurrencyLimit := 3
|
|
var runningJobs int32
|
|
testDone := make(chan struct{})
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
<-start
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
if int(totalJobs) > concurrencyLimit {
|
|
t.Fatalf("too many jobs running")
|
|
}
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
close(inactive)
|
|
<-testDone
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start more transfers than the concurrency limit
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
close(testDone)
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestWatchRelease(t *testing.T) {
|
|
ready := make(chan struct{})
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
<-ready
|
|
for i := int64(0); ; i++ {
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
case <-xfer.Context().Done():
|
|
return
|
|
}
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
}
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
type watcherInfo struct {
|
|
watcher *Watcher
|
|
progressChan chan progress.Progress
|
|
progressDone chan struct{}
|
|
receivedFirstProgress chan struct{}
|
|
}
|
|
|
|
progressConsumer := func(w watcherInfo) {
|
|
first := true
|
|
for range w.progressChan {
|
|
if first {
|
|
close(w.receivedFirstProgress)
|
|
}
|
|
first = false
|
|
}
|
|
close(w.progressDone)
|
|
}
|
|
|
|
// Start a transfer
|
|
watchers := make([]watcherInfo, 5)
|
|
var xfer Transfer
|
|
watchers[0].progressChan = make(chan progress.Progress)
|
|
watchers[0].progressDone = make(chan struct{})
|
|
watchers[0].receivedFirstProgress = make(chan struct{})
|
|
xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
|
|
go progressConsumer(watchers[0])
|
|
|
|
// Give it multiple watchers
|
|
for i := 1; i != len(watchers); i++ {
|
|
watchers[i].progressChan = make(chan progress.Progress)
|
|
watchers[i].progressDone = make(chan struct{})
|
|
watchers[i].receivedFirstProgress = make(chan struct{})
|
|
watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
|
|
go progressConsumer(watchers[i])
|
|
}
|
|
|
|
// Now that the watchers are set up, allow the transfer goroutine to
|
|
// proceed.
|
|
close(ready)
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
for _, w := range watchers {
|
|
<-w.receivedFirstProgress
|
|
}
|
|
|
|
// Release one watcher every 5ms
|
|
for _, w := range watchers {
|
|
xfer.Release(w.watcher)
|
|
<-time.After(5 * time.Millisecond)
|
|
}
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
// return a closed channel.
|
|
<-xfer.Released()
|
|
|
|
// Done() should return a closed channel because the xfer func returned
|
|
// due to cancellation.
|
|
<-xfer.Done()
|
|
|
|
for _, w := range watchers {
|
|
close(w.progressChan)
|
|
<-w.progressDone
|
|
}
|
|
}
|
|
|
|
func TestWatchFinishedTransfer(t *testing.T) {
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
// Finish immediately
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
// Start a transfer
|
|
watchers := make([]*Watcher, 3)
|
|
var xfer Transfer
|
|
xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
// Give it a watcher immediately
|
|
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
// Wait for the transfer to complete
|
|
<-xfer.Done()
|
|
|
|
// Set up another watcher
|
|
watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
// Release the watchers
|
|
for _, w := range watchers {
|
|
xfer.Release(w)
|
|
}
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
// return a closed channel.
|
|
<-xfer.Released()
|
|
}
|
|
|
|
func TestDuplicateTransfer(t *testing.T) {
|
|
ready := make(chan struct{})
|
|
|
|
var xferFuncCalls int32
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
atomic.AddInt32(&xferFuncCalls, 1)
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
<-ready
|
|
for i := int64(0); ; i++ {
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
case <-xfer.Context().Done():
|
|
return
|
|
}
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
}
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
type transferInfo struct {
|
|
xfer Transfer
|
|
watcher *Watcher
|
|
progressChan chan progress.Progress
|
|
progressDone chan struct{}
|
|
receivedFirstProgress chan struct{}
|
|
}
|
|
|
|
progressConsumer := func(t transferInfo) {
|
|
first := true
|
|
for range t.progressChan {
|
|
if first {
|
|
close(t.receivedFirstProgress)
|
|
}
|
|
first = false
|
|
}
|
|
close(t.progressDone)
|
|
}
|
|
|
|
// Try to start multiple transfers with the same ID
|
|
transfers := make([]transferInfo, 5)
|
|
for i := range transfers {
|
|
t := &transfers[i]
|
|
t.progressChan = make(chan progress.Progress)
|
|
t.progressDone = make(chan struct{})
|
|
t.receivedFirstProgress = make(chan struct{})
|
|
t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
|
|
go progressConsumer(*t)
|
|
}
|
|
|
|
// Allow the transfer goroutine to proceed.
|
|
close(ready)
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
for _, t := range transfers {
|
|
<-t.receivedFirstProgress
|
|
}
|
|
|
|
// Confirm that the transfer function was called exactly once.
|
|
if xferFuncCalls != 1 {
|
|
t.Fatal("transfer function wasn't called exactly once")
|
|
}
|
|
|
|
// Release one watcher every 5ms
|
|
for _, t := range transfers {
|
|
t.xfer.Release(t.watcher)
|
|
<-time.After(5 * time.Millisecond)
|
|
}
|
|
|
|
for _, t := range transfers {
|
|
// Now that all watchers have been released, Released() should
|
|
// return a closed channel.
|
|
<-t.xfer.Released()
|
|
// Done() should return a closed channel because the xfer func returned
|
|
// due to cancellation.
|
|
<-t.xfer.Done()
|
|
}
|
|
|
|
for _, t := range transfers {
|
|
close(t.progressChan)
|
|
<-t.progressDone
|
|
}
|
|
}
|