diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index 6eddfac0dd..af1a3995d2 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -23,21 +23,21 @@ const maxDownloadAttempts = 5 // layers. type LayerDownloadManager struct { layerStore layer.Store - tm TransferManager + tm *transferManager waitDuration time.Duration maxDownloadAttempts int } // SetConcurrency sets the max concurrent downloads for each pull func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) { - ldm.tm.SetConcurrency(concurrency) + ldm.tm.setConcurrency(concurrency) } // NewLayerDownloadManager returns a new LayerDownloadManager. -func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager { +func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...DownloadOption) *LayerDownloadManager { manager := LayerDownloadManager{ layerStore: layerStore, - tm: NewTransferManager(concurrencyLimit), + tm: newTransferManager(concurrencyLimit), waitDuration: time.Second, maxDownloadAttempts: maxDownloadAttempts, } @@ -47,16 +47,19 @@ func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, optio return &manager } +// DownloadOption set options for the LayerDownloadManager. +type DownloadOption func(*LayerDownloadManager) + // WithMaxDownloadAttempts configures the maximum number of download // attempts for a download manager. -func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) { +func WithMaxDownloadAttempts(max int) DownloadOption { return func(dlm *LayerDownloadManager) { dlm.maxDownloadAttempts = max } } type downloadTransfer struct { - Transfer + transfer layerStore layer.Store layer layer.Layer @@ -87,13 +90,17 @@ type DownloadDescriptor interface { Close() } -// DownloadDescriptorWithRegistered is a DownloadDescriptor that has an -// additional Registered method which gets called after a downloaded layer is -// registered. This allows the user of the download manager to know the DiffID -// of each registered layer. This method is called if a cast to -// DownloadDescriptorWithRegistered is successful. -type DownloadDescriptorWithRegistered interface { - DownloadDescriptor +// DigestRegisterer can be implemented by a DownloadDescriptor, and provides a +// Registered method which gets called after a downloaded layer is registered. +// This allows the user of the download manager to know the DiffID of each +// registered layer. This method is called if a cast to DigestRegisterer is +// successful. +type DigestRegisterer interface { + + // TODO existing implementations in distribution and builder-next swallow errors + // when registering the diffID. Consider changing the Registered signature + // to return the error. + Registered(diffID layer.DiffID) } @@ -108,7 +115,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima var ( topLayer layer.Layer topDownload *downloadTransfer - watcher *Watcher + watcher *watcher missingLayer bool transferKey = "" downloadsByKey = make(map[string]*downloadTransfer) @@ -137,8 +144,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima missingLayer = false rootFS.Append(diffID) // Register this repository as a source of this layer. - withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered) - if hasRegistered { // As layerstore may set the driver + if withRegistered, ok := descriptor.(DigestRegisterer); ok { // As layerstore may set the driver withRegistered.Registered(diffID) } continue @@ -148,11 +154,11 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // Does this layer have the same data as a previous layer in // the stack? If so, avoid downloading it more than once. - var topDownloadUncasted Transfer + var topDownloadUncasted transfer if existingDownload, ok := downloadsByKey[key]; ok { xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) - defer topDownload.Transfer.Release(watcher) - topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) + defer topDownload.transfer.release(watcher) + topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) continue } @@ -160,14 +166,14 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // Layer is not known to exist - download and register it. progress.Update(progressOutput, descriptor.ID(), "Pulling fs layer") - var xferFunc DoFunc + var xferFunc doFunc if topDownload != nil { xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) - defer topDownload.Transfer.Release(watcher) + defer topDownload.transfer.release(watcher) } else { xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) } - topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) + topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) downloadsByKey[key] = topDownload } @@ -192,15 +198,15 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima select { case <-ctx.Done(): - topDownload.Transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, ctx.Err() - case <-topDownload.Done(): + case <-topDownload.done(): break } l, err := topDownload.result() if err != nil { - topDownload.Transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, err } @@ -208,13 +214,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // base layer on Windows. for range layers { if l == nil { - topDownload.Transfer.Release(watcher) + topDownload.transfer.release(watcher) return rootFS, func() {}, errors.New("internal error: too few parent layers") } rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...) l = l.Parent() } - return rootFS, func() { topDownload.Transfer.Release(watcher) }, err + return rootFS, func() { topDownload.transfer.release(watcher) }, err } // makeDownloadFunc returns a function that performs the layer download and @@ -222,10 +228,10 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // complete before the registration step, and registers the downloaded data // on top of parentDownload's resulting layer. Otherwise, it registers the // layer on top of the ChainID given by parentLayer. -func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { +func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { d := &downloadTransfer{ - Transfer: NewTransfer(), + transfer: newTransfer(), layerStore: ldm.layerStore, } @@ -247,7 +253,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // Did the parent download already fail or get // cancelled? select { - case <-parentDownload.Done(): + case <-parentDownload.done(): _, err := parentDownload.result() if err != nil { d.err = err @@ -267,7 +273,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, defer descriptor.Close() for { - downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput) + downloadReader, size, err = descriptor.Download(d.transfer.context(), progressOutput) if err == nil { break } @@ -275,7 +281,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = err return default: @@ -302,7 +308,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, ticker.Stop() break selectLoop } - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): ticker.Stop() d.err = errors.New("download cancelled during retry delay") return @@ -315,11 +321,11 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, if parentDownload != nil { select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") downloadReader.Close() return - case <-parentDownload.Done(): + case <-parentDownload.done(): } l, err := parentDownload.result() @@ -331,7 +337,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer = l.ChainID() } - reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") + reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") defer reader.Close() inflatedLayerData, err := archive.DecompressStream(reader) @@ -351,7 +357,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, } if err != nil { select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") default: d.err = fmt.Errorf("failed to register layer: %v", err) @@ -360,15 +366,15 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, } progress.Update(progressOutput, descriptor.ID(), "Pull complete") - withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered) - if hasRegistered { + + if withRegistered, ok := descriptor.(DigestRegisterer); ok { withRegistered.Registered(d.layer.DiffID()) } // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.Transfer.Released() + <-d.transfer.released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } @@ -386,10 +392,10 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // parentDownload. This function does not log progress output because it would // interfere with the progress reporting for sourceDownload, which has the same // Key. -func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { +func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { d := &downloadTransfer{ - Transfer: NewTransfer(), + transfer: newTransfer(), layerStore: ldm.layerStore, } @@ -403,10 +409,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa close(inactive) select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") return - case <-parentDownload.Done(): + case <-parentDownload.done(): } l, err := parentDownload.result() @@ -420,10 +426,10 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa // parentDownload finished, but wait for it explicitly // to be sure. select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.context().Done(): d.err = errors.New("layer registration cancelled") return - case <-sourceDownload.Done(): + case <-sourceDownload.done(): } l, err = sourceDownload.result() @@ -453,15 +459,14 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa return } - withRegistered, hasRegistered := descriptor.(DownloadDescriptorWithRegistered) - if hasRegistered { + if withRegistered, ok := descriptor.(DigestRegisterer); ok { withRegistered.Registered(d.layer.DiffID()) } // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.Transfer.Released() + <-d.transfer.released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index 82df793ebe..d5f96c8aea 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -19,8 +19,8 @@ func (e DoNotRetry) Error() string { return e.Err.Error() } -// Watcher is returned by Watch and can be passed to Release to stop watching. -type Watcher struct { +// watcher is returned by Watch and can be passed to Release to stop watching. +type watcher struct { // signalChan is used to signal to the watcher goroutine that // new progress information is available, or that the transfer // has finished. @@ -34,18 +34,18 @@ type Watcher struct { running chan struct{} } -// Transfer represents an in-progress transfer. -type Transfer interface { - Watch(progressOutput progress.Output) *Watcher - Release(*Watcher) - Context() context.Context - Close() - Done() <-chan struct{} - Released() <-chan struct{} - Broadcast(mainProgressChan <-chan progress.Progress) +// transfer represents an in-progress transfer. +type transfer interface { + watch(progressOutput progress.Output) *watcher + release(*watcher) + context() context.Context + close() + done() <-chan struct{} + released() <-chan struct{} + broadcast(mainProgressChan <-chan progress.Progress) } -type transfer struct { +type xfer struct { mu sync.Mutex ctx context.Context @@ -53,7 +53,7 @@ type transfer struct { // watchers keeps track of the goroutines monitoring progress output, // indexed by the channels that release them. - watchers map[chan struct{}]*Watcher + watchers map[chan struct{}]*watcher // lastProgress is the most recently received progress event. lastProgress progress.Progress @@ -62,9 +62,9 @@ type transfer struct { // running remains open as long as the transfer is in progress. running chan struct{} - // released stays open until all watchers release the transfer and - // the transfer is no longer tracked by the transfer manager. - released chan struct{} + // releasedChan stays open until all watchers release the transfer and + // the transfer is no longer tracked by the transferManager. + releasedChan chan struct{} // broadcastDone is true if the main progress channel has closed. broadcastDone bool @@ -77,12 +77,12 @@ type transfer struct { broadcastSyncChan chan struct{} } -// NewTransfer creates a new transfer. -func NewTransfer() Transfer { - t := &transfer{ - watchers: make(map[chan struct{}]*Watcher), +// newTransfer creates a new transfer. +func newTransfer() transfer { + t := &xfer{ + watchers: make(map[chan struct{}]*watcher), running: make(chan struct{}), - released: make(chan struct{}), + releasedChan: make(chan struct{}), broadcastSyncChan: make(chan struct{}), } @@ -95,7 +95,7 @@ func NewTransfer() Transfer { } // Broadcast copies the progress and error output to all viewers. -func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) { +func (t *xfer) broadcast(mainProgressChan <-chan progress.Progress) { for { var ( p progress.Progress @@ -137,11 +137,11 @@ func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) { // Watch adds a watcher to the transfer. The supplied channel gets progress // updates and is closed when the transfer finishes. -func (t *transfer) Watch(progressOutput progress.Output) *Watcher { +func (t *xfer) watch(progressOutput progress.Output) *watcher { t.mu.Lock() defer t.mu.Unlock() - w := &Watcher{ + w := &watcher{ releaseChan: make(chan struct{}), signalChan: make(chan struct{}), running: make(chan struct{}), @@ -205,7 +205,7 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher { // to be notified about the progress of the transfer. All calls to Watch must // be paired with later calls to Release so that the lifecycle of the transfer // is properly managed. -func (t *transfer) Release(watcher *Watcher) { +func (t *xfer) release(watcher *watcher) { t.mu.Lock() delete(t.watchers, watcher.releaseChan) @@ -216,9 +216,9 @@ func (t *transfer) Release(watcher *Watcher) { // while waiting for a previous watcher goroutine to // finish. select { - case <-t.released: + case <-t.releasedChan: default: - close(t.released) + close(t.releasedChan) } } else { t.cancel() @@ -233,7 +233,7 @@ func (t *transfer) Release(watcher *Watcher) { // Done returns a channel which is closed if the transfer completes or is // cancelled. Note that having 0 watchers causes a transfer to be cancelled. -func (t *transfer) Done() <-chan struct{} { +func (t *xfer) done() <-chan struct{} { // Note that this doesn't return t.ctx.Done() because that channel will // be closed the moment Cancel is called, and we need to return a // channel that blocks until a cancellation is actually acknowledged by @@ -242,75 +242,66 @@ func (t *transfer) Done() <-chan struct{} { } // Released returns a channel which is closed once all watchers release the -// transfer AND the transfer is no longer tracked by the transfer manager. -func (t *transfer) Released() <-chan struct{} { - return t.released +// transfer AND the transfer is no longer tracked by the transferManager. +func (t *xfer) released() <-chan struct{} { + return t.releasedChan } // Context returns the context associated with the transfer. -func (t *transfer) Context() context.Context { +func (t *xfer) context() context.Context { return t.ctx } -// Close is called by the transfer manager when the transfer is no longer +// Close is called by the transferManager when the transfer is no longer // being tracked. -func (t *transfer) Close() { +func (t *xfer) close() { t.mu.Lock() t.closed = true if len(t.watchers) == 0 { - close(t.released) + close(t.releasedChan) } t.mu.Unlock() } -// DoFunc is a function called by the transfer manager to actually perform +// doFunc is a function called by the transferManager to actually perform // a transfer. It should be non-blocking. It should wait until the start channel // is closed before transferring any data. If the function closes inactive, that -// signals to the transfer manager that the job is no longer actively moving +// signals to the transferManager that the job is no longer actively moving // data - for example, it may be waiting for a dependent transfer to finish. // This prevents it from taking up a slot. -type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer - -// TransferManager is used by LayerDownloadManager and LayerUploadManager to -// schedule and deduplicate transfers. It is up to the TransferManager -// implementation to make the scheduling and concurrency decisions. -type TransferManager interface { - // Transfer checks if a transfer with the given key is in progress. If - // so, it returns progress and error output from that transfer. - // Otherwise, it will call xferFunc to initiate the transfer. - Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) - // SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload - SetConcurrency(concurrency int) -} +type doFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer +// transferManager is used by LayerDownloadManager and LayerUploadManager to +// schedule and deduplicate transfers. It is up to the transferManager +// to make the scheduling and concurrency decisions. type transferManager struct { mu sync.Mutex concurrencyLimit int activeTransfers int - transfers map[string]Transfer + transfers map[string]transfer waitingTransfers []chan struct{} } -// NewTransferManager returns a new TransferManager. -func NewTransferManager(concurrencyLimit int) TransferManager { +// newTransferManager returns a new transferManager. +func newTransferManager(concurrencyLimit int) *transferManager { return &transferManager{ concurrencyLimit: concurrencyLimit, - transfers: make(map[string]Transfer), + transfers: make(map[string]transfer), } } -// SetConcurrency sets the concurrencyLimit -func (tm *transferManager) SetConcurrency(concurrency int) { +// setConcurrency sets the concurrencyLimit +func (tm *transferManager) setConcurrency(concurrency int) { tm.mu.Lock() tm.concurrencyLimit = concurrency tm.mu.Unlock() } -// Transfer checks if a transfer matching the given key is in progress. If not, +// transfer checks if a transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. -func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { +func (tm *transferManager) transfer(key string, xferFunc doFunc, progressOutput progress.Output) (transfer, *watcher) { tm.mu.Lock() defer tm.mu.Unlock() @@ -319,24 +310,24 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput if !present { break } - // Transfer is already in progress. - watcher := xfer.Watch(progressOutput) + // transfer is already in progress. + watcher := xfer.watch(progressOutput) select { - case <-xfer.Context().Done(): + case <-xfer.context().Done(): // We don't want to watch a transfer that has been cancelled. // Wait for it to be removed from the map and try again. - xfer.Release(watcher) + xfer.release(watcher) tm.mu.Unlock() // The goroutine that removes this transfer from the // map is also waiting for xfer.Done(), so yield to it. // This could be avoided by adding a Closed method - // to Transfer to allow explicitly waiting for it to be + // to transfer to allow explicitly waiting for it to be // removed the map, but forcing a scheduling round in // this very rare case seems better than bloating the // interface definition. runtime.Gosched() - <-xfer.Done() + <-xfer.done() tm.mu.Lock() default: return xfer, watcher @@ -355,8 +346,8 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput mainProgressChan := make(chan progress.Progress) xfer := xferFunc(mainProgressChan, start, inactive) - watcher := xfer.Watch(progressOutput) - go xfer.Broadcast(mainProgressChan) + watcher := xfer.watch(progressOutput) + go xfer.broadcast(mainProgressChan) tm.transfers[key] = xfer // When the transfer is finished, remove from the map. @@ -368,14 +359,14 @@ func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput tm.inactivate(start) tm.mu.Unlock() inactive = nil - case <-xfer.Done(): + case <-xfer.done(): tm.mu.Lock() if inactive != nil { tm.inactivate(start) } delete(tm.transfers, key) tm.mu.Unlock() - xfer.Close() + xfer.close() return } } diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index f8f8e96fe0..305bb91662 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -9,15 +9,15 @@ import ( ) func TestTransfer(t *testing.T) { - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { select { case <-start: default: t.Errorf("%s: transfer function not started even though concurrency limit not reached", id) } - xfer := NewTransfer() + xfer := newTransfer() go func() { for i := 0; i <= 10; i++ { progressChan <- progress.Progress{ID: id, Action: "testing", Current: int64(i), Total: 10} @@ -29,7 +29,7 @@ func TestTransfer(t *testing.T) { } } - tm := NewTransferManager(5) + tm := newTransferManager(5) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) receivedProgress := make(map[string]int64) @@ -47,15 +47,15 @@ func TestTransfer(t *testing.T) { // Start a few transfers ids := []string{"id1", "id2", "id3"} - xfers := make([]Transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + xfers := make([]transfer, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -68,12 +68,12 @@ func TestTransfer(t *testing.T) { } func TestConcurrencyLimit(t *testing.T) { - concurrencyLimit := 3 + const concurrencyLimit = 3 var runningJobs int32 - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { - xfer := NewTransfer() + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { + xfer := newTransfer() go func() { <-start totalJobs := atomic.AddInt32(&runningJobs, 1) @@ -91,7 +91,7 @@ func TestConcurrencyLimit(t *testing.T) { } } - tm := NewTransferManager(concurrencyLimit) + tm := newTransferManager(concurrencyLimit) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) receivedProgress := make(map[string]int64) @@ -105,15 +105,15 @@ func TestConcurrencyLimit(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} - xfers := make([]Transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + xfers := make([]transfer, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -126,13 +126,13 @@ func TestConcurrencyLimit(t *testing.T) { } func TestInactiveJobs(t *testing.T) { - concurrencyLimit := 3 + const concurrencyLimit = 3 var runningJobs int32 testDone := make(chan struct{}) - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { - xfer := NewTransfer() + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { + xfer := newTransfer() go func() { <-start totalJobs := atomic.AddInt32(&runningJobs, 1) @@ -152,7 +152,7 @@ func TestInactiveJobs(t *testing.T) { } } - tm := NewTransferManager(concurrencyLimit) + tm := newTransferManager(concurrencyLimit) progressChan := make(chan progress.Progress) progressDone := make(chan struct{}) receivedProgress := make(map[string]int64) @@ -166,16 +166,16 @@ func TestInactiveJobs(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} - xfers := make([]Transfer, len(ids)) - watchers := make([]*Watcher, len(ids)) + xfers := make([]transfer, len(ids)) + watchers := make([]*watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } close(testDone) for i, xfer := range xfers { - <-xfer.Done() - xfer.Release(watchers[i]) + <-xfer.done() + xfer.release(watchers[i]) } close(progressChan) <-progressDone @@ -190,9 +190,9 @@ func TestInactiveJobs(t *testing.T) { func TestWatchRelease(t *testing.T) { ready := make(chan struct{}) - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { - xfer := NewTransfer() + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { + xfer := newTransfer() go func() { defer func() { close(progressChan) @@ -201,7 +201,7 @@ func TestWatchRelease(t *testing.T) { for i := int64(0); ; i++ { select { case <-time.After(10 * time.Millisecond): - case <-xfer.Context().Done(): + case <-xfer.context().Done(): return } progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10} @@ -211,10 +211,10 @@ func TestWatchRelease(t *testing.T) { } } - tm := NewTransferManager(5) + tm := newTransferManager(5) type watcherInfo struct { - watcher *Watcher + watcher *watcher progressChan chan progress.Progress progressDone chan struct{} receivedFirstProgress chan struct{} @@ -233,11 +233,11 @@ func TestWatchRelease(t *testing.T) { // Start a transfer watchers := make([]watcherInfo, 5) - var xfer Transfer + var xfer transfer watchers[0].progressChan = make(chan progress.Progress) watchers[0].progressDone = make(chan struct{}) watchers[0].receivedFirstProgress = make(chan struct{}) - xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan)) + xfer, watchers[0].watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan)) go progressConsumer(watchers[0]) // Give it multiple watchers @@ -245,7 +245,7 @@ func TestWatchRelease(t *testing.T) { watchers[i].progressChan = make(chan progress.Progress) watchers[i].progressDone = make(chan struct{}) watchers[i].receivedFirstProgress = make(chan struct{}) - watchers[i].watcher = xfer.Watch(progress.ChanOutput(watchers[i].progressChan)) + watchers[i].watcher = xfer.watch(progress.ChanOutput(watchers[i].progressChan)) go progressConsumer(watchers[i]) } @@ -260,17 +260,17 @@ func TestWatchRelease(t *testing.T) { // Release one watcher every 5ms for _, w := range watchers { - xfer.Release(w.watcher) + xfer.release(w.watcher) <-time.After(5 * time.Millisecond) } // Now that all watchers have been released, Released() should // return a closed channel. - <-xfer.Released() + <-xfer.released() // Done() should return a closed channel because the xfer func returned // due to cancellation. - <-xfer.Done() + <-xfer.done() for _, w := range watchers { close(w.progressChan) @@ -279,9 +279,9 @@ func TestWatchRelease(t *testing.T) { } func TestWatchFinishedTransfer(t *testing.T) { - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { - xfer := NewTransfer() + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer { + xfer := newTransfer() go func() { // Finish immediately close(progressChan) @@ -290,30 +290,30 @@ func TestWatchFinishedTransfer(t *testing.T) { } } - tm := NewTransferManager(5) + tm := newTransferManager(5) // Start a transfer - watchers := make([]*Watcher, 3) - var xfer Transfer - xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) + watchers := make([]*watcher, 3) + var xfer transfer + xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) // Give it a watcher immediately - watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + watchers[1] = xfer.watch(progress.ChanOutput(make(chan progress.Progress))) // Wait for the transfer to complete - <-xfer.Done() + <-xfer.done() // Set up another watcher - watchers[2] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) + watchers[2] = xfer.watch(progress.ChanOutput(make(chan progress.Progress))) // Release the watchers for _, w := range watchers { - xfer.Release(w) + xfer.release(w) } // Now that all watchers have been released, Released() should // return a closed channel. - <-xfer.Released() + <-xfer.released() } func TestDuplicateTransfer(t *testing.T) { @@ -321,10 +321,10 @@ func TestDuplicateTransfer(t *testing.T) { var xferFuncCalls int32 - makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { + makeXferFunc := func(id string) doFunc { + return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer { atomic.AddInt32(&xferFuncCalls, 1) - xfer := NewTransfer() + xfer := newTransfer() go func() { defer func() { close(progressChan) @@ -333,7 +333,7 @@ func TestDuplicateTransfer(t *testing.T) { for i := int64(0); ; i++ { select { case <-time.After(10 * time.Millisecond): - case <-xfer.Context().Done(): + case <-xfer.context().Done(): return } progressChan <- progress.Progress{ID: id, Action: "testing", Current: i, Total: 10} @@ -343,11 +343,11 @@ func TestDuplicateTransfer(t *testing.T) { } } - tm := NewTransferManager(5) + tm := newTransferManager(5) type transferInfo struct { - xfer Transfer - watcher *Watcher + xfer transfer + watcher *watcher progressChan chan progress.Progress progressDone chan struct{} receivedFirstProgress chan struct{} @@ -371,7 +371,7 @@ func TestDuplicateTransfer(t *testing.T) { t.progressChan = make(chan progress.Progress) t.progressDone = make(chan struct{}) t.receivedFirstProgress = make(chan struct{}) - t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan)) + t.xfer, t.watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan)) go progressConsumer(*t) } @@ -390,17 +390,17 @@ func TestDuplicateTransfer(t *testing.T) { // Release one watcher every 5ms for _, t := range transfers { - t.xfer.Release(t.watcher) + t.xfer.release(t.watcher) <-time.After(5 * time.Millisecond) } for _, t := range transfers { // Now that all watchers have been released, Released() should // return a closed channel. - <-t.xfer.Released() + <-t.xfer.released() // Done() should return a closed channel because the xfer func returned // due to cancellation. - <-t.xfer.Done() + <-t.xfer.done() } for _, t := range transfers { diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index 33b45ad747..40705bad6c 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -16,19 +16,19 @@ const maxUploadAttempts = 5 // LayerUploadManager provides task management and progress reporting for // uploads. type LayerUploadManager struct { - tm TransferManager + tm *transferManager waitDuration time.Duration } // SetConcurrency sets the max concurrent uploads for each push func (lum *LayerUploadManager) SetConcurrency(concurrency int) { - lum.tm.SetConcurrency(concurrency) + lum.tm.setConcurrency(concurrency) } // NewLayerUploadManager returns a new LayerUploadManager. func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager { manager := LayerUploadManager{ - tm: NewTransferManager(concurrencyLimit), + tm: newTransferManager(concurrencyLimit), waitDuration: time.Second, } for _, option := range options { @@ -38,7 +38,7 @@ func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadMan } type uploadTransfer struct { - Transfer + transfer remoteDescriptor distribution.Descriptor err error @@ -79,8 +79,8 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri } xferFunc := lum.makeUploadFunc(descriptor) - upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput) - defer upload.Release(watcher) + upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput) + defer upload.release(watcher) uploads = append(uploads, upload.(*uploadTransfer)) dedupDescriptors[key] = upload.(*uploadTransfer) } @@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri select { case <-ctx.Done(): return ctx.Err() - case <-upload.Transfer.Done(): + case <-upload.transfer.done(): if upload.err != nil { return upload.err } @@ -102,10 +102,10 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri return nil } -func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { +func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) doFunc { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { u := &uploadTransfer{ - Transfer: NewTransfer(), + transfer: newTransfer(), } go func() { @@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun retries := 0 for { - remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput) + remoteDescriptor, err := descriptor.Upload(u.transfer.context(), progressOutput) if err == nil { u.remoteDescriptor = remoteDescriptor break @@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-u.Transfer.Context().Done(): + case <-u.transfer.context().Done(): u.err = err return default: @@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun ticker.Stop() break selectLoop } - case <-u.Transfer.Context().Done(): + case <-u.transfer.context().Done(): ticker.Stop() u.err = errors.New("upload cancelled during retry delay") return diff --git a/distribution/xfer/upload_test.go b/distribution/xfer/upload_test.go index 4507feac7b..9150a63d7a 100644 --- a/distribution/xfer/upload_test.go +++ b/distribution/xfer/upload_test.go @@ -69,12 +69,12 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre func uploadDescriptors(currentUploads *int32) []UploadDescriptor { return []UploadDescriptor{ - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"), 0}, - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:1515325234325236634634608943609283523908626098235490238423902343"), 0}, - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:6929356290463485374960346430698374523437683470934634534953453453"), 0}, - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"), 0}, - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:8159352387436803946235346346368745389534789534897538734598734987"), 1}, - &mockUploadDescriptor{currentUploads, layer.DiffID("sha256:4637863963478346897346987346987346789346789364879364897364987346"), 0}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:1515325234325236634634608943609283523908626098235490238423902343"}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:6929356290463485374960346430698374523437683470934634534953453453"}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:8159352387436803946235346346368745389534789534897538734598734987", simulateRetries: 1}, + &mockUploadDescriptor{currentUploads: currentUploads, diffID: "sha256:4637863963478346897346987346987346789346789364879364897364987346"}, } }