2018-02-05 16:05:59 -05:00
|
|
|
package xfer // import "github.com/docker/docker/distribution/xfer"
|
2015-11-13 19:59:01 -05:00
|
|
|
|
|
|
|
import (
|
|
|
|
"sync/atomic"
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/docker/docker/pkg/progress"
|
|
|
|
)
|
|
|
|
|
|
|
|
func TestTransfer(t *testing.T) {
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
|
2015-11-13 19:59:01 -05:00
|
|
|
select {
|
|
|
|
case <-start:
|
|
|
|
default:
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
t.Errorf("%s: transfer function not started even though concurrency limit not reached", id)
|
2015-11-13 19:59:01 -05:00
|
|
|
}
|
|
|
|
|
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
}
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
progressChan := make(chan progress.Progress)
|
|
|
|
progressDone := make(chan struct{})
|
|
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for p := range progressChan {
|
|
|
|
val, present := receivedProgress[p.ID]
|
2016-03-28 21:27:29 -04:00
|
|
|
if present && p.Current <= val {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
t.Errorf("%s: got unexpected progress value: %d (expected <= %d)", p.ID, p.Current, val)
|
2015-11-13 19:59:01 -05:00
|
|
|
}
|
|
|
|
receivedProgress[p.ID] = p.Current
|
|
|
|
}
|
|
|
|
close(progressDone)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Start a few transfers
|
|
|
|
ids := []string{"id1", "id2", "id3"}
|
|
|
|
xfers := make([]Transfer, len(ids))
|
|
|
|
watchers := make([]*Watcher, len(ids))
|
|
|
|
for i, id := range ids {
|
|
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, xfer := range xfers {
|
|
|
|
<-xfer.Done()
|
|
|
|
xfer.Release(watchers[i])
|
|
|
|
}
|
|
|
|
close(progressChan)
|
|
|
|
<-progressDone
|
|
|
|
|
|
|
|
for _, id := range ids {
|
|
|
|
if receivedProgress[id] != 10 {
|
|
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestConcurrencyLimit(t *testing.T) {
|
|
|
|
concurrencyLimit := 3
|
|
|
|
var runningJobs int32
|
|
|
|
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
|
2015-11-13 19:59:01 -05:00
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
<-start
|
|
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
|
|
if int(totalJobs) > concurrencyLimit {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
t.Errorf("%s: too many jobs running (%d > %d)", id, totalJobs, concurrencyLimit)
|
2015-11-13 19:59:01 -05:00
|
|
|
}
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
}
|
|
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
|
|
progressChan := make(chan progress.Progress)
|
|
|
|
progressDone := make(chan struct{})
|
|
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for p := range progressChan {
|
|
|
|
receivedProgress[p.ID] = p.Current
|
|
|
|
}
|
|
|
|
close(progressDone)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Start more transfers than the concurrency limit
|
|
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
|
|
xfers := make([]Transfer, len(ids))
|
|
|
|
watchers := make([]*Watcher, len(ids))
|
|
|
|
for i, id := range ids {
|
|
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
|
|
}
|
|
|
|
|
|
|
|
for i, xfer := range xfers {
|
|
|
|
<-xfer.Done()
|
|
|
|
xfer.Release(watchers[i])
|
|
|
|
}
|
|
|
|
close(progressChan)
|
|
|
|
<-progressDone
|
|
|
|
|
|
|
|
for _, id := range ids {
|
|
|
|
if receivedProgress[id] != 10 {
|
|
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestInactiveJobs(t *testing.T) {
|
|
|
|
concurrencyLimit := 3
|
|
|
|
var runningJobs int32
|
|
|
|
testDone := make(chan struct{})
|
|
|
|
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
<-start
|
|
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
|
|
if int(totalJobs) > concurrencyLimit {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
t.Errorf("%s: too many jobs running (%d > %d)", id, totalJobs, concurrencyLimit)
|
2015-11-13 19:59:01 -05:00
|
|
|
}
|
|
|
|
for i := 0; i <= 10; i++ {
|
|
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
|
|
}
|
|
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
|
|
close(inactive)
|
|
|
|
<-testDone
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
|
|
progressChan := make(chan progress.Progress)
|
|
|
|
progressDone := make(chan struct{})
|
|
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for p := range progressChan {
|
|
|
|
receivedProgress[p.ID] = p.Current
|
|
|
|
}
|
|
|
|
close(progressDone)
|
|
|
|
}()
|
|
|
|
|
|
|
|
// Start more transfers than the concurrency limit
|
|
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
|
|
xfers := make([]Transfer, len(ids))
|
|
|
|
watchers := make([]*Watcher, len(ids))
|
|
|
|
for i, id := range ids {
|
|
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
|
|
}
|
|
|
|
|
|
|
|
close(testDone)
|
|
|
|
for i, xfer := range xfers {
|
|
|
|
<-xfer.Done()
|
|
|
|
xfer.Release(watchers[i])
|
|
|
|
}
|
|
|
|
close(progressChan)
|
|
|
|
<-progressDone
|
|
|
|
|
|
|
|
for _, id := range ids {
|
|
|
|
if receivedProgress[id] != 10 {
|
|
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func TestWatchRelease(t *testing.T) {
|
|
|
|
ready := make(chan struct{})
|
|
|
|
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer {
|
2015-11-13 19:59:01 -05:00
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
defer func() {
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
<-ready
|
|
|
|
for i := int64(0); ; i++ {
|
|
|
|
select {
|
|
|
|
case <-time.After(10 * time.Millisecond):
|
|
|
|
case <-xfer.Context().Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
|
|
|
|
type watcherInfo struct {
|
|
|
|
watcher *Watcher
|
|
|
|
progressChan chan progress.Progress
|
|
|
|
progressDone chan struct{}
|
|
|
|
receivedFirstProgress chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
progressConsumer := func(w watcherInfo) {
|
|
|
|
first := true
|
|
|
|
for range w.progressChan {
|
|
|
|
if first {
|
|
|
|
close(w.receivedFirstProgress)
|
|
|
|
}
|
|
|
|
first = false
|
|
|
|
}
|
|
|
|
close(w.progressDone)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Start a transfer
|
|
|
|
watchers := make([]watcherInfo, 5)
|
|
|
|
var xfer Transfer
|
|
|
|
watchers[0].progressChan = make(chan progress.Progress)
|
|
|
|
watchers[0].progressDone = make(chan struct{})
|
|
|
|
watchers[0].receivedFirstProgress = make(chan struct{})
|
|
|
|
xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
|
|
|
|
go progressConsumer(watchers[0])
|
|
|
|
|
|
|
|
// Give it multiple watchers
|
|
|
|
for i := 1; i != len(watchers); i++ {
|
|
|
|
watchers[i].progressChan = make(chan progress.Progress)
|
|
|
|
watchers[i].progressDone = make(chan struct{})
|
|
|
|
watchers[i].receivedFirstProgress = make(chan struct{})
|
|
|
|
watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
|
|
|
|
go progressConsumer(watchers[i])
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now that the watchers are set up, allow the transfer goroutine to
|
|
|
|
// proceed.
|
|
|
|
close(ready)
|
|
|
|
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
|
|
for _, w := range watchers {
|
|
|
|
<-w.receivedFirstProgress
|
|
|
|
}
|
|
|
|
|
|
|
|
// Release one watcher every 5ms
|
|
|
|
for _, w := range watchers {
|
|
|
|
xfer.Release(w.watcher)
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
|
|
// return a closed channel.
|
|
|
|
<-xfer.Released()
|
|
|
|
|
|
|
|
// Done() should return a closed channel because the xfer func returned
|
|
|
|
// due to cancellation.
|
|
|
|
<-xfer.Done()
|
|
|
|
|
|
|
|
for _, w := range watchers {
|
|
|
|
close(w.progressChan)
|
|
|
|
<-w.progressDone
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-01-25 14:37:09 -05:00
|
|
|
func TestWatchFinishedTransfer(t *testing.T) {
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer {
|
2016-01-25 14:37:09 -05:00
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
// Finish immediately
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
|
|
|
|
// Start a transfer
|
|
|
|
watchers := make([]*Watcher, 3)
|
|
|
|
var xfer Transfer
|
|
|
|
xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
|
|
|
|
// Give it a watcher immediately
|
|
|
|
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
|
|
|
|
// Wait for the transfer to complete
|
|
|
|
<-xfer.Done()
|
|
|
|
|
|
|
|
// Set up another watcher
|
|
|
|
watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
|
|
|
|
|
|
|
|
// Release the watchers
|
|
|
|
for _, w := range watchers {
|
|
|
|
xfer.Release(w)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
|
|
// return a closed channel.
|
|
|
|
<-xfer.Released()
|
|
|
|
}
|
|
|
|
|
2015-11-13 19:59:01 -05:00
|
|
|
func TestDuplicateTransfer(t *testing.T) {
|
|
|
|
ready := make(chan struct{})
|
|
|
|
|
|
|
|
var xferFuncCalls int32
|
|
|
|
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
TestTransfer*: don't call t.Fatal from a goroutine
staticcheck go linter warns:
> distribution/xfer/transfer_test.go:37:2: SA2002: the goroutine calls T.Fatalf, which must be called in the same goroutine as the test (staticcheck)
What it doesn't say is why. The reason is, t.Fatalf() calls t.FailNow(),
which is expected to stop test execution right now. It does so by
calling runtime.Goexit(), which, unless called from a main goroutine,
does not stop test execution.
Anyway, long story short, if we don't care much about stopping the test
case immediately, we can just replace t.Fatalf() with t.Errorf() which
still marks the test case as failed, but won't stop it immediately.
This patch was tested to check that the test fails if any of the
goroutines call t.Errorf():
1. Failure in DoFunc ("transfer function not started ...") was tested by
decreading the NewTransferManager() argument:
- tm := NewTransferManager(5)
+ tm := NewTransferManager(2)
2. Failure "got unexpected progress value" was tested by injecting a random:
- if present && p.Current <= val {
+ if present && p.Current <= val || rand.Intn(100) > 80 {
3. Failure in DoFunc ("too many jobs running") was tested by increasing
the NewTransferManager() argument:
- tm := NewTransferManager(concurrencyLimit)
+ tm := NewTransferManager(concurrencyLimit + 1)
While at it:
* fix/amend some error messages
* use _ for unused arguments of DoFunc
Signed-off-by: Kir Kolyshkin <kolyshkin@gmail.com>
2019-08-06 18:37:07 -04:00
|
|
|
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer {
|
2015-11-13 19:59:01 -05:00
|
|
|
atomic.AddInt32(&xferFuncCalls, 1)
|
|
|
|
xfer := NewTransfer()
|
|
|
|
go func() {
|
|
|
|
defer func() {
|
|
|
|
close(progressChan)
|
|
|
|
}()
|
|
|
|
<-ready
|
|
|
|
for i := int64(0); ; i++ {
|
|
|
|
select {
|
|
|
|
case <-time.After(10 * time.Millisecond):
|
|
|
|
case <-xfer.Context().Done():
|
|
|
|
return
|
|
|
|
}
|
|
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
return xfer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
|
|
|
|
type transferInfo struct {
|
|
|
|
xfer Transfer
|
|
|
|
watcher *Watcher
|
|
|
|
progressChan chan progress.Progress
|
|
|
|
progressDone chan struct{}
|
|
|
|
receivedFirstProgress chan struct{}
|
|
|
|
}
|
|
|
|
|
|
|
|
progressConsumer := func(t transferInfo) {
|
|
|
|
first := true
|
|
|
|
for range t.progressChan {
|
|
|
|
if first {
|
|
|
|
close(t.receivedFirstProgress)
|
|
|
|
}
|
|
|
|
first = false
|
|
|
|
}
|
|
|
|
close(t.progressDone)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Try to start multiple transfers with the same ID
|
|
|
|
transfers := make([]transferInfo, 5)
|
|
|
|
for i := range transfers {
|
|
|
|
t := &transfers[i]
|
|
|
|
t.progressChan = make(chan progress.Progress)
|
|
|
|
t.progressDone = make(chan struct{})
|
|
|
|
t.receivedFirstProgress = make(chan struct{})
|
|
|
|
t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
|
|
|
|
go progressConsumer(*t)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Allow the transfer goroutine to proceed.
|
|
|
|
close(ready)
|
|
|
|
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
|
|
for _, t := range transfers {
|
|
|
|
<-t.receivedFirstProgress
|
|
|
|
}
|
|
|
|
|
|
|
|
// Confirm that the transfer function was called exactly once.
|
|
|
|
if xferFuncCalls != 1 {
|
|
|
|
t.Fatal("transfer function wasn't called exactly once")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Release one watcher every 5ms
|
|
|
|
for _, t := range transfers {
|
|
|
|
t.xfer.Release(t.watcher)
|
|
|
|
<-time.After(5 * time.Millisecond)
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range transfers {
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
|
|
// return a closed channel.
|
|
|
|
<-t.xfer.Released()
|
|
|
|
// Done() should return a closed channel because the xfer func returned
|
|
|
|
// due to cancellation.
|
|
|
|
<-t.xfer.Done()
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, t := range transfers {
|
|
|
|
close(t.progressChan)
|
|
|
|
<-t.progressDone
|
|
|
|
}
|
|
|
|
}
|