mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
572ce80230
This commit adds a transfer manager which deduplicates and schedules transfers, and also an upload manager and download manager that build on top of the transfer manager to provide high-level interfaces for uploads and downloads. The push and pull code is modified to use these building blocks. Some benefits of the changes: - Simplification of push/pull code - Pushes can upload layers concurrently - Failed downloads and uploads are retried after backoff delays - Cancellation is supported, but individual transfers will only be cancelled if all pushes or pulls using them are cancelled. - The distribution code is decoupled from Docker Engine packages and API conventions (i.e. streamformatter), which will make it easier to split out. This commit also includes unit tests for the new distribution/xfer package. The tests cover 87.8% of the statements in the package. Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
385 lines
9.9 KiB
Go
385 lines
9.9 KiB
Go
package xfer
|
|
|
|
import (
|
|
"sync/atomic"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/docker/docker/pkg/progress"
|
|
)
|
|
|
|
func TestTransfer(t *testing.T) {
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
select {
|
|
case <-start:
|
|
default:
|
|
t.Fatalf("transfer function not started even though concurrency limit not reached")
|
|
}
|
|
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
val, present := receivedProgress[p.ID]
|
|
if !present {
|
|
if p.Current != 0 {
|
|
t.Fatalf("got unexpected progress value: %d (expected 0)", p.Current)
|
|
}
|
|
} else if p.Current == 10 {
|
|
// Special case: last progress output may be
|
|
// repeated because the transfer finishing
|
|
// causes the latest progress output to be
|
|
// written to the channel (in case the watcher
|
|
// missed it).
|
|
if p.Current != 9 && p.Current != 10 {
|
|
t.Fatalf("got unexpected progress value: %d (expected %d)", p.Current, val+1)
|
|
}
|
|
} else if p.Current != val+1 {
|
|
t.Fatalf("got unexpected progress value: %d (expected %d)", p.Current, val+1)
|
|
}
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start a few transfers
|
|
ids := []string{"id1", "id2", "id3"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestConcurrencyLimit(t *testing.T) {
|
|
concurrencyLimit := 3
|
|
var runningJobs int32
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
<-start
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
if int(totalJobs) > concurrencyLimit {
|
|
t.Fatalf("too many jobs running")
|
|
}
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start more transfers than the concurrency limit
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestInactiveJobs(t *testing.T) {
|
|
concurrencyLimit := 3
|
|
var runningJobs int32
|
|
testDone := make(chan struct{})
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
<-start
|
|
totalJobs := atomic.AddInt32(&runningJobs, 1)
|
|
if int(totalJobs) > concurrencyLimit {
|
|
t.Fatalf("too many jobs running")
|
|
}
|
|
for i := 0; i <= 10; i++ {
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10}
|
|
time.Sleep(10 * time.Millisecond)
|
|
}
|
|
atomic.AddInt32(&runningJobs, -1)
|
|
close(inactive)
|
|
<-testDone
|
|
close(progressChan)
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(concurrencyLimit)
|
|
progressChan := make(chan progress.Progress)
|
|
progressDone := make(chan struct{})
|
|
receivedProgress := make(map[string]int64)
|
|
|
|
go func() {
|
|
for p := range progressChan {
|
|
receivedProgress[p.ID] = p.Current
|
|
}
|
|
close(progressDone)
|
|
}()
|
|
|
|
// Start more transfers than the concurrency limit
|
|
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
|
|
xfers := make([]Transfer, len(ids))
|
|
watchers := make([]*Watcher, len(ids))
|
|
for i, id := range ids {
|
|
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
|
|
}
|
|
|
|
close(testDone)
|
|
for i, xfer := range xfers {
|
|
<-xfer.Done()
|
|
xfer.Release(watchers[i])
|
|
}
|
|
close(progressChan)
|
|
<-progressDone
|
|
|
|
for _, id := range ids {
|
|
if receivedProgress[id] != 10 {
|
|
t.Fatalf("final progress value %d instead of 10", receivedProgress[id])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestWatchRelease(t *testing.T) {
|
|
ready := make(chan struct{})
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
<-ready
|
|
for i := int64(0); ; i++ {
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
case <-xfer.Context().Done():
|
|
return
|
|
}
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
}
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
type watcherInfo struct {
|
|
watcher *Watcher
|
|
progressChan chan progress.Progress
|
|
progressDone chan struct{}
|
|
receivedFirstProgress chan struct{}
|
|
}
|
|
|
|
progressConsumer := func(w watcherInfo) {
|
|
first := true
|
|
for range w.progressChan {
|
|
if first {
|
|
close(w.receivedFirstProgress)
|
|
}
|
|
first = false
|
|
}
|
|
close(w.progressDone)
|
|
}
|
|
|
|
// Start a transfer
|
|
watchers := make([]watcherInfo, 5)
|
|
var xfer Transfer
|
|
watchers[0].progressChan = make(chan progress.Progress)
|
|
watchers[0].progressDone = make(chan struct{})
|
|
watchers[0].receivedFirstProgress = make(chan struct{})
|
|
xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
|
|
go progressConsumer(watchers[0])
|
|
|
|
// Give it multiple watchers
|
|
for i := 1; i != len(watchers); i++ {
|
|
watchers[i].progressChan = make(chan progress.Progress)
|
|
watchers[i].progressDone = make(chan struct{})
|
|
watchers[i].receivedFirstProgress = make(chan struct{})
|
|
watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan))
|
|
go progressConsumer(watchers[i])
|
|
}
|
|
|
|
// Now that the watchers are set up, allow the transfer goroutine to
|
|
// proceed.
|
|
close(ready)
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
for _, w := range watchers {
|
|
<-w.receivedFirstProgress
|
|
}
|
|
|
|
// Release one watcher every 5ms
|
|
for _, w := range watchers {
|
|
xfer.Release(w.watcher)
|
|
<-time.After(5 * time.Millisecond)
|
|
}
|
|
|
|
// Now that all watchers have been released, Released() should
|
|
// return a closed channel.
|
|
<-xfer.Released()
|
|
|
|
// Done() should return a closed channel because the xfer func returned
|
|
// due to cancellation.
|
|
<-xfer.Done()
|
|
|
|
for _, w := range watchers {
|
|
close(w.progressChan)
|
|
<-w.progressDone
|
|
}
|
|
}
|
|
|
|
func TestDuplicateTransfer(t *testing.T) {
|
|
ready := make(chan struct{})
|
|
|
|
var xferFuncCalls int32
|
|
|
|
makeXferFunc := func(id string) DoFunc {
|
|
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
|
|
atomic.AddInt32(&xferFuncCalls, 1)
|
|
xfer := NewTransfer()
|
|
go func() {
|
|
defer func() {
|
|
close(progressChan)
|
|
}()
|
|
<-ready
|
|
for i := int64(0); ; i++ {
|
|
select {
|
|
case <-time.After(10 * time.Millisecond):
|
|
case <-xfer.Context().Done():
|
|
return
|
|
}
|
|
progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10}
|
|
}
|
|
}()
|
|
return xfer
|
|
}
|
|
}
|
|
|
|
tm := NewTransferManager(5)
|
|
|
|
type transferInfo struct {
|
|
xfer Transfer
|
|
watcher *Watcher
|
|
progressChan chan progress.Progress
|
|
progressDone chan struct{}
|
|
receivedFirstProgress chan struct{}
|
|
}
|
|
|
|
progressConsumer := func(t transferInfo) {
|
|
first := true
|
|
for range t.progressChan {
|
|
if first {
|
|
close(t.receivedFirstProgress)
|
|
}
|
|
first = false
|
|
}
|
|
close(t.progressDone)
|
|
}
|
|
|
|
// Try to start multiple transfers with the same ID
|
|
transfers := make([]transferInfo, 5)
|
|
for i := range transfers {
|
|
t := &transfers[i]
|
|
t.progressChan = make(chan progress.Progress)
|
|
t.progressDone = make(chan struct{})
|
|
t.receivedFirstProgress = make(chan struct{})
|
|
t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
|
|
go progressConsumer(*t)
|
|
}
|
|
|
|
// Allow the transfer goroutine to proceed.
|
|
close(ready)
|
|
|
|
// Confirm that each watcher gets progress output.
|
|
for _, t := range transfers {
|
|
<-t.receivedFirstProgress
|
|
}
|
|
|
|
// Confirm that the transfer function was called exactly once.
|
|
if xferFuncCalls != 1 {
|
|
t.Fatal("transfer function wasn't called exactly once")
|
|
}
|
|
|
|
// Release one watcher every 5ms
|
|
for _, t := range transfers {
|
|
t.xfer.Release(t.watcher)
|
|
<-time.After(5 * time.Millisecond)
|
|
}
|
|
|
|
for _, t := range transfers {
|
|
// Now that all watchers have been released, Released() should
|
|
// return a closed channel.
|
|
<-t.xfer.Released()
|
|
// Done() should return a closed channel because the xfer func returned
|
|
// due to cancellation.
|
|
<-t.xfer.Done()
|
|
}
|
|
|
|
for _, t := range transfers {
|
|
close(t.progressChan)
|
|
<-t.progressDone
|
|
}
|
|
}
|