distribution/xfer: un-export Transfer interface

The only implementations and uses are internal to this package.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-01-22 13:11:24 +01:00
parent 765844e419
commit 96a750aab4
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
4 changed files with 60 additions and 60 deletions

View File

@ -56,7 +56,7 @@ func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) {
} }
type downloadTransfer struct { type downloadTransfer struct {
Transfer transfer
layerStore layer.Store layerStore layer.Store
layer layer.Layer layer layer.Layer
@ -148,10 +148,10 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// Does this layer have the same data as a previous layer in // Does this layer have the same data as a previous layer in
// the stack? If so, avoid downloading it more than once. // the stack? If so, avoid downloading it more than once.
var topDownloadUncasted Transfer var topDownloadUncasted transfer
if existingDownload, ok := downloadsByKey[key]; ok { if existingDownload, ok := downloadsByKey[key]; ok {
xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
defer topDownload.Transfer.Release(watcher) defer topDownload.transfer.Release(watcher)
topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer) topDownload = topDownloadUncasted.(*downloadTransfer)
continue continue
@ -163,7 +163,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
var xferFunc DoFunc var xferFunc DoFunc
if topDownload != nil { if topDownload != nil {
xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload)
defer topDownload.Transfer.Release(watcher) defer topDownload.transfer.Release(watcher)
} else { } else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
} }
@ -192,7 +192,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
select { select {
case <-ctx.Done(): case <-ctx.Done():
topDownload.Transfer.Release(watcher) topDownload.transfer.Release(watcher)
return rootFS, func() {}, ctx.Err() return rootFS, func() {}, ctx.Err()
case <-topDownload.Done(): case <-topDownload.Done():
break break
@ -200,7 +200,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
l, err := topDownload.result() l, err := topDownload.result()
if err != nil { if err != nil {
topDownload.Transfer.Release(watcher) topDownload.transfer.Release(watcher)
return rootFS, func() {}, err return rootFS, func() {}, err
} }
@ -208,13 +208,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// base layer on Windows. // base layer on Windows.
for range layers { for range layers {
if l == nil { if l == nil {
topDownload.Transfer.Release(watcher) topDownload.transfer.Release(watcher)
return rootFS, func() {}, errors.New("internal error: too few parent layers") return rootFS, func() {}, errors.New("internal error: too few parent layers")
} }
rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...) rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...)
l = l.Parent() l = l.Parent()
} }
return rootFS, func() { topDownload.Transfer.Release(watcher) }, err return rootFS, func() { topDownload.transfer.Release(watcher) }, err
} }
// makeDownloadFunc returns a function that performs the layer download and // makeDownloadFunc returns a function that performs the layer download and
@ -223,9 +223,9 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
// on top of parentDownload's resulting layer. Otherwise, it registers the // on top of parentDownload's resulting layer. Otherwise, it registers the
// layer on top of the ChainID given by parentLayer. // layer on top of the ChainID given by parentLayer.
func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc { func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
d := &downloadTransfer{ d := &downloadTransfer{
Transfer: newTransfer(), transfer: newTransfer(),
layerStore: ldm.layerStore, layerStore: ldm.layerStore,
} }
@ -267,7 +267,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
defer descriptor.Close() defer descriptor.Close()
for { for {
downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput) downloadReader, size, err = descriptor.Download(d.transfer.Context(), progressOutput)
if err == nil { if err == nil {
break break
} }
@ -275,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// If an error was returned because the context // If an error was returned because the context
// was cancelled, we shouldn't retry. // was cancelled, we shouldn't retry.
select { select {
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
d.err = err d.err = err
return return
default: default:
@ -302,7 +302,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
ticker.Stop() ticker.Stop()
break selectLoop break selectLoop
} }
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
ticker.Stop() ticker.Stop()
d.err = errors.New("download cancelled during retry delay") d.err = errors.New("download cancelled during retry delay")
return return
@ -315,7 +315,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
if parentDownload != nil { if parentDownload != nil {
select { select {
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
d.err = errors.New("layer registration cancelled") d.err = errors.New("layer registration cancelled")
downloadReader.Close() downloadReader.Close()
return return
@ -331,7 +331,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
parentLayer = l.ChainID() parentLayer = l.ChainID()
} }
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting")
defer reader.Close() defer reader.Close()
inflatedLayerData, err := archive.DecompressStream(reader) inflatedLayerData, err := archive.DecompressStream(reader)
@ -351,7 +351,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
} }
if err != nil { if err != nil {
select { select {
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
d.err = errors.New("layer registration cancelled") d.err = errors.New("layer registration cancelled")
default: default:
d.err = fmt.Errorf("failed to register layer: %v", err) d.err = fmt.Errorf("failed to register layer: %v", err)
@ -368,7 +368,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// Doesn't actually need to be its own goroutine, but // Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c). // done like this so we can defer close(c).
go func() { go func() {
<-d.Transfer.Released() <-d.transfer.Released()
if d.layer != nil { if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer) layer.ReleaseAndLog(d.layerStore, d.layer)
} }
@ -387,9 +387,9 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor,
// interfere with the progress reporting for sourceDownload, which has the same // interfere with the progress reporting for sourceDownload, which has the same
// Key. // Key.
func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc { func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
d := &downloadTransfer{ d := &downloadTransfer{
Transfer: newTransfer(), transfer: newTransfer(),
layerStore: ldm.layerStore, layerStore: ldm.layerStore,
} }
@ -403,7 +403,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
close(inactive) close(inactive)
select { select {
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
d.err = errors.New("layer registration cancelled") d.err = errors.New("layer registration cancelled")
return return
case <-parentDownload.Done(): case <-parentDownload.Done():
@ -420,7 +420,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
// parentDownload finished, but wait for it explicitly // parentDownload finished, but wait for it explicitly
// to be sure. // to be sure.
select { select {
case <-d.Transfer.Context().Done(): case <-d.transfer.Context().Done():
d.err = errors.New("layer registration cancelled") d.err = errors.New("layer registration cancelled")
return return
case <-sourceDownload.Done(): case <-sourceDownload.Done():
@ -461,7 +461,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa
// Doesn't actually need to be its own goroutine, but // Doesn't actually need to be its own goroutine, but
// done like this so we can defer close(c). // done like this so we can defer close(c).
go func() { go func() {
<-d.Transfer.Released() <-d.transfer.Released()
if d.layer != nil { if d.layer != nil {
layer.ReleaseAndLog(d.layerStore, d.layer) layer.ReleaseAndLog(d.layerStore, d.layer)
} }

View File

@ -34,8 +34,8 @@ type Watcher struct {
running chan struct{} running chan struct{}
} }
// Transfer represents an in-progress transfer. // transfer represents an in-progress transfer.
type Transfer interface { type transfer interface {
Watch(progressOutput progress.Output) *Watcher Watch(progressOutput progress.Output) *Watcher
Release(*Watcher) Release(*Watcher)
Context() context.Context Context() context.Context
@ -45,7 +45,7 @@ type Transfer interface {
Broadcast(mainProgressChan <-chan progress.Progress) Broadcast(mainProgressChan <-chan progress.Progress)
} }
type transfer struct { type xfer struct {
mu sync.Mutex mu sync.Mutex
ctx context.Context ctx context.Context
@ -78,8 +78,8 @@ type transfer struct {
} }
// newTransfer creates a new transfer. // newTransfer creates a new transfer.
func newTransfer() Transfer { func newTransfer() transfer {
t := &transfer{ t := &xfer{
watchers: make(map[chan struct{}]*Watcher), watchers: make(map[chan struct{}]*Watcher),
running: make(chan struct{}), running: make(chan struct{}),
released: make(chan struct{}), released: make(chan struct{}),
@ -95,7 +95,7 @@ func newTransfer() Transfer {
} }
// Broadcast copies the progress and error output to all viewers. // Broadcast copies the progress and error output to all viewers.
func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) { func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) {
for { for {
var ( var (
p progress.Progress p progress.Progress
@ -137,7 +137,7 @@ func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) {
// Watch adds a watcher to the transfer. The supplied channel gets progress // Watch adds a watcher to the transfer. The supplied channel gets progress
// updates and is closed when the transfer finishes. // updates and is closed when the transfer finishes.
func (t *transfer) Watch(progressOutput progress.Output) *Watcher { func (t *xfer) Watch(progressOutput progress.Output) *Watcher {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
@ -205,7 +205,7 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher {
// to be notified about the progress of the transfer. All calls to Watch must // to be notified about the progress of the transfer. All calls to Watch must
// be paired with later calls to Release so that the lifecycle of the transfer // be paired with later calls to Release so that the lifecycle of the transfer
// is properly managed. // is properly managed.
func (t *transfer) Release(watcher *Watcher) { func (t *xfer) Release(watcher *Watcher) {
t.mu.Lock() t.mu.Lock()
delete(t.watchers, watcher.releaseChan) delete(t.watchers, watcher.releaseChan)
@ -233,7 +233,7 @@ func (t *transfer) Release(watcher *Watcher) {
// Done returns a channel which is closed if the transfer completes or is // Done returns a channel which is closed if the transfer completes or is
// cancelled. Note that having 0 watchers causes a transfer to be cancelled. // cancelled. Note that having 0 watchers causes a transfer to be cancelled.
func (t *transfer) Done() <-chan struct{} { func (t *xfer) Done() <-chan struct{} {
// Note that this doesn't return t.ctx.Done() because that channel will // Note that this doesn't return t.ctx.Done() because that channel will
// be closed the moment Cancel is called, and we need to return a // be closed the moment Cancel is called, and we need to return a
// channel that blocks until a cancellation is actually acknowledged by // channel that blocks until a cancellation is actually acknowledged by
@ -243,18 +243,18 @@ func (t *transfer) Done() <-chan struct{} {
// Released returns a channel which is closed once all watchers release the // Released returns a channel which is closed once all watchers release the
// transfer AND the transfer is no longer tracked by the transferManager. // transfer AND the transfer is no longer tracked by the transferManager.
func (t *transfer) Released() <-chan struct{} { func (t *xfer) Released() <-chan struct{} {
return t.released return t.released
} }
// Context returns the context associated with the transfer. // Context returns the context associated with the transfer.
func (t *transfer) Context() context.Context { func (t *xfer) Context() context.Context {
return t.ctx return t.ctx
} }
// Close is called by the transferManager when the transfer is no longer // Close is called by the transferManager when the transfer is no longer
// being tracked. // being tracked.
func (t *transfer) Close() { func (t *xfer) Close() {
t.mu.Lock() t.mu.Lock()
t.closed = true t.closed = true
if len(t.watchers) == 0 { if len(t.watchers) == 0 {
@ -269,7 +269,7 @@ func (t *transfer) Close() {
// signals to the transferManager that the job is no longer actively moving // signals to the transferManager that the job is no longer actively moving
// data - for example, it may be waiting for a dependent transfer to finish. // data - for example, it may be waiting for a dependent transfer to finish.
// This prevents it from taking up a slot. // This prevents it from taking up a slot.
type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer
// transferManager is used by LayerDownloadManager and LayerUploadManager to // transferManager is used by LayerDownloadManager and LayerUploadManager to
// schedule and deduplicate transfers. It is up to the transferManager // schedule and deduplicate transfers. It is up to the transferManager
@ -279,7 +279,7 @@ type transferManager struct {
concurrencyLimit int concurrencyLimit int
activeTransfers int activeTransfers int
transfers map[string]Transfer transfers map[string]transfer
waitingTransfers []chan struct{} waitingTransfers []chan struct{}
} }
@ -287,7 +287,7 @@ type transferManager struct {
func newTransferManager(concurrencyLimit int) *transferManager { func newTransferManager(concurrencyLimit int) *transferManager {
return &transferManager{ return &transferManager{
concurrencyLimit: concurrencyLimit, concurrencyLimit: concurrencyLimit,
transfers: make(map[string]Transfer), transfers: make(map[string]transfer),
} }
} }
@ -298,10 +298,10 @@ func (tm *transferManager) setConcurrency(concurrency int) {
tm.mu.Unlock() tm.mu.Unlock()
} }
// transfer checks if a Transfer matching the given key is in progress. If not, // transfer checks if a transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which // it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer. // receives progress output from the transfer.
func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *Watcher) {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()
@ -310,7 +310,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
if !present { if !present {
break break
} }
// Transfer is already in progress. // transfer is already in progress.
watcher := xfer.Watch(progressOutput) watcher := xfer.Watch(progressOutput)
select { select {
@ -322,7 +322,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput
// The goroutine that removes this transfer from the // The goroutine that removes this transfer from the
// map is also waiting for xfer.Done(), so yield to it. // map is also waiting for xfer.Done(), so yield to it.
// This could be avoided by adding a Closed method // This could be avoided by adding a Closed method
// to Transfer to allow explicitly waiting for it to be // to transfer to allow explicitly waiting for it to be
// removed the map, but forcing a scheduling round in // removed the map, but forcing a scheduling round in
// this very rare case seems better than bloating the // this very rare case seems better than bloating the
// interface definition. // interface definition.

View File

@ -10,7 +10,7 @@ import (
func TestTransfer(t *testing.T) { func TestTransfer(t *testing.T) {
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
select { select {
case <-start: case <-start:
default: default:
@ -47,7 +47,7 @@ func TestTransfer(t *testing.T) {
// Start a few transfers // Start a few transfers
ids := []string{"id1", "id2", "id3"} ids := []string{"id1", "id2", "id3"}
xfers := make([]Transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
@ -72,7 +72,7 @@ func TestConcurrencyLimit(t *testing.T) {
var runningJobs int32 var runningJobs int32
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer() xfer := newTransfer()
go func() { go func() {
<-start <-start
@ -105,7 +105,7 @@ func TestConcurrencyLimit(t *testing.T) {
// Start more transfers than the concurrency limit // Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]Transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
@ -131,7 +131,7 @@ func TestInactiveJobs(t *testing.T) {
testDone := make(chan struct{}) testDone := make(chan struct{})
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
xfer := newTransfer() xfer := newTransfer()
go func() { go func() {
<-start <-start
@ -166,7 +166,7 @@ func TestInactiveJobs(t *testing.T) {
// Start more transfers than the concurrency limit // Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]Transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
@ -191,7 +191,7 @@ func TestWatchRelease(t *testing.T) {
ready := make(chan struct{}) ready := make(chan struct{})
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer() xfer := newTransfer()
go func() { go func() {
defer func() { defer func() {
@ -233,7 +233,7 @@ func TestWatchRelease(t *testing.T) {
// Start a transfer // Start a transfer
watchers := make([]watcherInfo, 5) watchers := make([]watcherInfo, 5)
var xfer Transfer var xfer transfer
watchers[0].progressChan = make(chan progress.Progress) watchers[0].progressChan = make(chan progress.Progress)
watchers[0].progressDone = make(chan struct{}) watchers[0].progressDone = make(chan struct{})
watchers[0].receivedFirstProgress = make(chan struct{}) watchers[0].receivedFirstProgress = make(chan struct{})
@ -280,7 +280,7 @@ func TestWatchRelease(t *testing.T) {
func TestWatchFinishedTransfer(t *testing.T) { func TestWatchFinishedTransfer(t *testing.T) {
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer {
xfer := newTransfer() xfer := newTransfer()
go func() { go func() {
// Finish immediately // Finish immediately
@ -294,7 +294,7 @@ func TestWatchFinishedTransfer(t *testing.T) {
// Start a transfer // Start a transfer
watchers := make([]*Watcher, 3) watchers := make([]*Watcher, 3)
var xfer Transfer var xfer transfer
xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
// Give it a watcher immediately // Give it a watcher immediately
@ -322,7 +322,7 @@ func TestDuplicateTransfer(t *testing.T) {
var xferFuncCalls int32 var xferFuncCalls int32
makeXferFunc := func(id string) DoFunc { makeXferFunc := func(id string) DoFunc {
return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer {
atomic.AddInt32(&xferFuncCalls, 1) atomic.AddInt32(&xferFuncCalls, 1)
xfer := newTransfer() xfer := newTransfer()
go func() { go func() {
@ -346,7 +346,7 @@ func TestDuplicateTransfer(t *testing.T) {
tm := newTransferManager(5) tm := newTransferManager(5)
type transferInfo struct { type transferInfo struct {
xfer Transfer xfer transfer
watcher *Watcher watcher *Watcher
progressChan chan progress.Progress progressChan chan progress.Progress
progressDone chan struct{} progressDone chan struct{}

View File

@ -38,7 +38,7 @@ func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadMan
} }
type uploadTransfer struct { type uploadTransfer struct {
Transfer transfer
remoteDescriptor distribution.Descriptor remoteDescriptor distribution.Descriptor
err error err error
@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case <-upload.Transfer.Done(): case <-upload.transfer.Done():
if upload.err != nil { if upload.err != nil {
return upload.err return upload.err
} }
@ -103,9 +103,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
} }
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc { func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer {
u := &uploadTransfer{ u := &uploadTransfer{
Transfer: newTransfer(), transfer: newTransfer(),
} }
go func() { go func() {
@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
retries := 0 retries := 0
for { for {
remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput) remoteDescriptor, err := descriptor.Upload(u.transfer.Context(), progressOutput)
if err == nil { if err == nil {
u.remoteDescriptor = remoteDescriptor u.remoteDescriptor = remoteDescriptor
break break
@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
// If an error was returned because the context // If an error was returned because the context
// was cancelled, we shouldn't retry. // was cancelled, we shouldn't retry.
select { select {
case <-u.Transfer.Context().Done(): case <-u.transfer.Context().Done():
u.err = err u.err = err
return return
default: default:
@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
ticker.Stop() ticker.Stop()
break selectLoop break selectLoop
} }
case <-u.Transfer.Context().Done(): case <-u.transfer.Context().Done():
ticker.Stop() ticker.Stop()
u.err = errors.New("upload cancelled during retry delay") u.err = errors.New("upload cancelled during retry delay")
return return