diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index 718d463b57..121d980467 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -56,7 +56,7 @@ func WithMaxDownloadAttempts(max int) func(*LayerDownloadManager) { } type downloadTransfer struct { - Transfer + transfer layerStore layer.Store layer layer.Layer @@ -148,10 +148,10 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // Does this layer have the same data as a previous layer in // the stack? If so, avoid downloading it more than once. - var topDownloadUncasted Transfer + var topDownloadUncasted transfer if existingDownload, ok := downloadsByKey[key]; ok { xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) - defer topDownload.Transfer.Release(watcher) + defer topDownload.transfer.Release(watcher) topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) continue @@ -163,7 +163,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima var xferFunc DoFunc if topDownload != nil { xferFunc = ldm.makeDownloadFunc(descriptor, "", topDownload) - defer topDownload.Transfer.Release(watcher) + defer topDownload.transfer.Release(watcher) } else { xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) } @@ -192,7 +192,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima select { case <-ctx.Done(): - topDownload.Transfer.Release(watcher) + topDownload.transfer.Release(watcher) return rootFS, func() {}, ctx.Err() case <-topDownload.Done(): break @@ -200,7 +200,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima l, err := topDownload.result() if err != nil { - topDownload.Transfer.Release(watcher) + topDownload.transfer.Release(watcher) return rootFS, func() {}, err } @@ -208,13 +208,13 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // base layer on Windows. for range layers { if l == nil { - topDownload.Transfer.Release(watcher) + topDownload.transfer.Release(watcher) return rootFS, func() {}, errors.New("internal error: too few parent layers") } rootFS.DiffIDs = append([]layer.DiffID{l.DiffID()}, rootFS.DiffIDs...) l = l.Parent() } - return rootFS, func() { topDownload.Transfer.Release(watcher) }, err + return rootFS, func() { topDownload.transfer.Release(watcher) }, err } // makeDownloadFunc returns a function that performs the layer download and @@ -223,9 +223,9 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima // on top of parentDownload's resulting layer. Otherwise, it registers the // layer on top of the ChainID given by parentLayer. func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer layer.ChainID, parentDownload *downloadTransfer) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { d := &downloadTransfer{ - Transfer: newTransfer(), + transfer: newTransfer(), layerStore: ldm.layerStore, } @@ -267,7 +267,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, defer descriptor.Close() for { - downloadReader, size, err = descriptor.Download(d.Transfer.Context(), progressOutput) + downloadReader, size, err = descriptor.Download(d.transfer.Context(), progressOutput) if err == nil { break } @@ -275,7 +275,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): d.err = err return default: @@ -302,7 +302,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, ticker.Stop() break selectLoop } - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): ticker.Stop() d.err = errors.New("download cancelled during retry delay") return @@ -315,7 +315,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, if parentDownload != nil { select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): d.err = errors.New("layer registration cancelled") downloadReader.Close() return @@ -331,7 +331,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, parentLayer = l.ChainID() } - reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.Transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") + reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(d.transfer.Context(), downloadReader), progressOutput, size, descriptor.ID(), "Extracting") defer reader.Close() inflatedLayerData, err := archive.DecompressStream(reader) @@ -351,7 +351,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, } if err != nil { select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): d.err = errors.New("layer registration cancelled") default: d.err = fmt.Errorf("failed to register layer: %v", err) @@ -368,7 +368,7 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.Transfer.Released() + <-d.transfer.Released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } @@ -387,9 +387,9 @@ func (ldm *LayerDownloadManager) makeDownloadFunc(descriptor DownloadDescriptor, // interfere with the progress reporting for sourceDownload, which has the same // Key. func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor DownloadDescriptor, sourceDownload *downloadTransfer, parentDownload *downloadTransfer) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { d := &downloadTransfer{ - Transfer: newTransfer(), + transfer: newTransfer(), layerStore: ldm.layerStore, } @@ -403,7 +403,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa close(inactive) select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): d.err = errors.New("layer registration cancelled") return case <-parentDownload.Done(): @@ -420,7 +420,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa // parentDownload finished, but wait for it explicitly // to be sure. select { - case <-d.Transfer.Context().Done(): + case <-d.transfer.Context().Done(): d.err = errors.New("layer registration cancelled") return case <-sourceDownload.Done(): @@ -461,7 +461,7 @@ func (ldm *LayerDownloadManager) makeDownloadFuncFromDownload(descriptor Downloa // Doesn't actually need to be its own goroutine, but // done like this so we can defer close(c). go func() { - <-d.Transfer.Released() + <-d.transfer.Released() if d.layer != nil { layer.ReleaseAndLog(d.layerStore, d.layer) } diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index cff403456b..68ee9c3ae1 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -34,8 +34,8 @@ type Watcher struct { running chan struct{} } -// Transfer represents an in-progress transfer. -type Transfer interface { +// transfer represents an in-progress transfer. +type transfer interface { Watch(progressOutput progress.Output) *Watcher Release(*Watcher) Context() context.Context @@ -45,7 +45,7 @@ type Transfer interface { Broadcast(mainProgressChan <-chan progress.Progress) } -type transfer struct { +type xfer struct { mu sync.Mutex ctx context.Context @@ -78,8 +78,8 @@ type transfer struct { } // newTransfer creates a new transfer. -func newTransfer() Transfer { - t := &transfer{ +func newTransfer() transfer { + t := &xfer{ watchers: make(map[chan struct{}]*Watcher), running: make(chan struct{}), released: make(chan struct{}), @@ -95,7 +95,7 @@ func newTransfer() Transfer { } // Broadcast copies the progress and error output to all viewers. -func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) { +func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) { for { var ( p progress.Progress @@ -137,7 +137,7 @@ func (t *transfer) Broadcast(mainProgressChan <-chan progress.Progress) { // Watch adds a watcher to the transfer. The supplied channel gets progress // updates and is closed when the transfer finishes. -func (t *transfer) Watch(progressOutput progress.Output) *Watcher { +func (t *xfer) Watch(progressOutput progress.Output) *Watcher { t.mu.Lock() defer t.mu.Unlock() @@ -205,7 +205,7 @@ func (t *transfer) Watch(progressOutput progress.Output) *Watcher { // to be notified about the progress of the transfer. All calls to Watch must // be paired with later calls to Release so that the lifecycle of the transfer // is properly managed. -func (t *transfer) Release(watcher *Watcher) { +func (t *xfer) Release(watcher *Watcher) { t.mu.Lock() delete(t.watchers, watcher.releaseChan) @@ -233,7 +233,7 @@ func (t *transfer) Release(watcher *Watcher) { // Done returns a channel which is closed if the transfer completes or is // cancelled. Note that having 0 watchers causes a transfer to be cancelled. -func (t *transfer) Done() <-chan struct{} { +func (t *xfer) Done() <-chan struct{} { // Note that this doesn't return t.ctx.Done() because that channel will // be closed the moment Cancel is called, and we need to return a // channel that blocks until a cancellation is actually acknowledged by @@ -243,18 +243,18 @@ func (t *transfer) Done() <-chan struct{} { // Released returns a channel which is closed once all watchers release the // transfer AND the transfer is no longer tracked by the transferManager. -func (t *transfer) Released() <-chan struct{} { +func (t *xfer) Released() <-chan struct{} { return t.released } // Context returns the context associated with the transfer. -func (t *transfer) Context() context.Context { +func (t *xfer) Context() context.Context { return t.ctx } // Close is called by the transferManager when the transfer is no longer // being tracked. -func (t *transfer) Close() { +func (t *xfer) Close() { t.mu.Lock() t.closed = true if len(t.watchers) == 0 { @@ -269,7 +269,7 @@ func (t *transfer) Close() { // signals to the transferManager that the job is no longer actively moving // data - for example, it may be waiting for a dependent transfer to finish. // This prevents it from taking up a slot. -type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer +type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer // transferManager is used by LayerDownloadManager and LayerUploadManager to // schedule and deduplicate transfers. It is up to the transferManager @@ -279,7 +279,7 @@ type transferManager struct { concurrencyLimit int activeTransfers int - transfers map[string]Transfer + transfers map[string]transfer waitingTransfers []chan struct{} } @@ -287,7 +287,7 @@ type transferManager struct { func newTransferManager(concurrencyLimit int) *transferManager { return &transferManager{ concurrencyLimit: concurrencyLimit, - transfers: make(map[string]Transfer), + transfers: make(map[string]transfer), } } @@ -298,10 +298,10 @@ func (tm *transferManager) setConcurrency(concurrency int) { tm.mu.Unlock() } -// transfer checks if a Transfer matching the given key is in progress. If not, +// transfer checks if a transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. -func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { +func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *Watcher) { tm.mu.Lock() defer tm.mu.Unlock() @@ -310,7 +310,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput if !present { break } - // Transfer is already in progress. + // transfer is already in progress. watcher := xfer.Watch(progressOutput) select { @@ -322,7 +322,7 @@ func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput // The goroutine that removes this transfer from the // map is also waiting for xfer.Done(), so yield to it. // This could be avoided by adding a Closed method - // to Transfer to allow explicitly waiting for it to be + // to transfer to allow explicitly waiting for it to be // removed the map, but forcing a scheduling round in // this very rare case seems better than bloating the // interface definition. diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index a13d118a00..6fd3c6dd87 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -10,7 +10,7 @@ import ( func TestTransfer(t *testing.T) { makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { select { case <-start: default: @@ -47,7 +47,7 @@ func TestTransfer(t *testing.T) { // Start a few transfers ids := []string{"id1", "id2", "id3"} - xfers := make([]Transfer, len(ids)) + xfers := make([]transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) @@ -72,7 +72,7 @@ func TestConcurrencyLimit(t *testing.T) { var runningJobs int32 makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { xfer := newTransfer() go func() { <-start @@ -105,7 +105,7 @@ func TestConcurrencyLimit(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} - xfers := make([]Transfer, len(ids)) + xfers := make([]transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) @@ -131,7 +131,7 @@ func TestInactiveJobs(t *testing.T) { testDone := make(chan struct{}) makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { xfer := newTransfer() go func() { <-start @@ -166,7 +166,7 @@ func TestInactiveJobs(t *testing.T) { // Start more transfers than the concurrency limit ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} - xfers := make([]Transfer, len(ids)) + xfers := make([]transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) @@ -191,7 +191,7 @@ func TestWatchRelease(t *testing.T) { ready := make(chan struct{}) makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, _ chan<- struct{}) transfer { xfer := newTransfer() go func() { defer func() { @@ -233,7 +233,7 @@ func TestWatchRelease(t *testing.T) { // Start a transfer watchers := make([]watcherInfo, 5) - var xfer Transfer + var xfer transfer watchers[0].progressChan = make(chan progress.Progress) watchers[0].progressDone = make(chan struct{}) watchers[0].receivedFirstProgress = make(chan struct{}) @@ -280,7 +280,7 @@ func TestWatchRelease(t *testing.T) { func TestWatchFinishedTransfer(t *testing.T) { makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer { xfer := newTransfer() go func() { // Finish immediately @@ -294,7 +294,7 @@ func TestWatchFinishedTransfer(t *testing.T) { // Start a transfer watchers := make([]*Watcher, 3) - var xfer Transfer + var xfer transfer xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) // Give it a watcher immediately @@ -322,7 +322,7 @@ func TestDuplicateTransfer(t *testing.T) { var xferFuncCalls int32 makeXferFunc := func(id string) DoFunc { - return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, _ <-chan struct{}, _ chan<- struct{}) transfer { atomic.AddInt32(&xferFuncCalls, 1) xfer := newTransfer() go func() { @@ -346,7 +346,7 @@ func TestDuplicateTransfer(t *testing.T) { tm := newTransferManager(5) type transferInfo struct { - xfer Transfer + xfer transfer watcher *Watcher progressChan chan progress.Progress progressDone chan struct{} diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index cc4e0daf9d..b97a2d8c9a 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -38,7 +38,7 @@ func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadMan } type uploadTransfer struct { - Transfer + transfer remoteDescriptor distribution.Descriptor err error @@ -89,7 +89,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri select { case <-ctx.Done(): return ctx.Err() - case <-upload.Transfer.Done(): + case <-upload.transfer.Done(): if upload.err != nil { return upload.err } @@ -103,9 +103,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri } func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc { - return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { + return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) transfer { u := &uploadTransfer{ - Transfer: newTransfer(), + transfer: newTransfer(), } go func() { @@ -124,7 +124,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun retries := 0 for { - remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput) + remoteDescriptor, err := descriptor.Upload(u.transfer.Context(), progressOutput) if err == nil { u.remoteDescriptor = remoteDescriptor break @@ -133,7 +133,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun // If an error was returned because the context // was cancelled, we shouldn't retry. select { - case <-u.Transfer.Context().Done(): + case <-u.transfer.Context().Done(): u.err = err return default: @@ -160,7 +160,7 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun ticker.Stop() break selectLoop } - case <-u.Transfer.Context().Done(): + case <-u.transfer.Context().Done(): ticker.Stop() u.err = errors.New("upload cancelled during retry delay") return