diff --git a/distribution/xfer/download.go b/distribution/xfer/download.go index eb457830ff..83b41ecb4d 100644 --- a/distribution/xfer/download.go +++ b/distribution/xfer/download.go @@ -30,7 +30,7 @@ type LayerDownloadManager struct { // SetConcurrency sets the max concurrent downloads for each pull func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) { - ldm.tm.SetConcurrency(concurrency) + ldm.tm.setConcurrency(concurrency) } // NewLayerDownloadManager returns a new LayerDownloadManager. @@ -152,7 +152,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima if existingDownload, ok := downloadsByKey[key]; ok { xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) defer topDownload.Transfer.Release(watcher) - topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) + topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) continue } @@ -167,7 +167,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima } else { xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) } - topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) + topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput) topDownload = topDownloadUncasted.(*downloadTransfer) downloadsByKey[key] = topDownload } diff --git a/distribution/xfer/transfer.go b/distribution/xfer/transfer.go index 8f06f22e89..91c5ce2b6a 100644 --- a/distribution/xfer/transfer.go +++ b/distribution/xfer/transfer.go @@ -291,17 +291,17 @@ func newTransferManager(concurrencyLimit int) *transferManager { } } -// SetConcurrency sets the concurrencyLimit -func (tm *transferManager) SetConcurrency(concurrency int) { +// setConcurrency sets the concurrencyLimit +func (tm *transferManager) setConcurrency(concurrency int) { tm.mu.Lock() tm.concurrencyLimit = concurrency tm.mu.Unlock() } -// Transfer checks if a transfer matching the given key is in progress. If not, +// transfer checks if a Transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. -func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { +func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { tm.mu.Lock() defer tm.mu.Unlock() diff --git a/distribution/xfer/transfer_test.go b/distribution/xfer/transfer_test.go index f63de0d78e..6364ae0aa5 100644 --- a/distribution/xfer/transfer_test.go +++ b/distribution/xfer/transfer_test.go @@ -50,7 +50,7 @@ func TestTransfer(t *testing.T) { xfers := make([]Transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } for i, xfer := range xfers { @@ -108,7 +108,7 @@ func TestConcurrencyLimit(t *testing.T) { xfers := make([]Transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } for i, xfer := range xfers { @@ -169,7 +169,7 @@ func TestInactiveJobs(t *testing.T) { xfers := make([]Transfer, len(ids)) watchers := make([]*Watcher, len(ids)) for i, id := range ids { - xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) + xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) } close(testDone) @@ -237,7 +237,7 @@ func TestWatchRelease(t *testing.T) { watchers[0].progressChan = make(chan progress.Progress) watchers[0].progressDone = make(chan struct{}) watchers[0].receivedFirstProgress = make(chan struct{}) - xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan)) + xfer, watchers[0].watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan)) go progressConsumer(watchers[0]) // Give it multiple watchers @@ -295,7 +295,7 @@ func TestWatchFinishedTransfer(t *testing.T) { // Start a transfer watchers := make([]*Watcher, 3) var xfer Transfer - xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) + xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) // Give it a watcher immediately watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) @@ -371,7 +371,7 @@ func TestDuplicateTransfer(t *testing.T) { t.progressChan = make(chan progress.Progress) t.progressDone = make(chan struct{}) t.receivedFirstProgress = make(chan struct{}) - t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan)) + t.xfer, t.watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan)) go progressConsumer(*t) } diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index 98b6799e15..a369daff5c 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -22,7 +22,7 @@ type LayerUploadManager struct { // SetConcurrency sets the max concurrent uploads for each push func (lum *LayerUploadManager) SetConcurrency(concurrency int) { - lum.tm.SetConcurrency(concurrency) + lum.tm.setConcurrency(concurrency) } // NewLayerUploadManager returns a new LayerUploadManager. @@ -79,7 +79,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri } xferFunc := lum.makeUploadFunc(descriptor) - upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput) + upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput) defer upload.Release(watcher) uploads = append(uploads, upload.(*uploadTransfer)) dedupDescriptors[key] = upload.(*uploadTransfer)