1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

distribution/xfer: un-export transferManager.setConcurrency() and .transfer()

They're only used within the package itself, so no need to have them public.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-01-22 12:40:35 +01:00
parent d746a836fc
commit 874b11495b
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
4 changed files with 15 additions and 15 deletions

View file

@ -30,7 +30,7 @@ type LayerDownloadManager struct {
// SetConcurrency sets the max concurrent downloads for each pull // SetConcurrency sets the max concurrent downloads for each pull
func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) { func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
ldm.tm.SetConcurrency(concurrency) ldm.tm.setConcurrency(concurrency)
} }
// NewLayerDownloadManager returns a new LayerDownloadManager. // NewLayerDownloadManager returns a new LayerDownloadManager.
@ -152,7 +152,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
if existingDownload, ok := downloadsByKey[key]; ok { if existingDownload, ok := downloadsByKey[key]; ok {
xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload) xferFunc := ldm.makeDownloadFuncFromDownload(descriptor, existingDownload, topDownload)
defer topDownload.Transfer.Release(watcher) defer topDownload.Transfer.Release(watcher)
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer) topDownload = topDownloadUncasted.(*downloadTransfer)
continue continue
} }
@ -167,7 +167,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
} else { } else {
xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil) xferFunc = ldm.makeDownloadFunc(descriptor, rootFS.ChainID(), nil)
} }
topDownloadUncasted, watcher = ldm.tm.Transfer(transferKey, xferFunc, progressOutput) topDownloadUncasted, watcher = ldm.tm.transfer(transferKey, xferFunc, progressOutput)
topDownload = topDownloadUncasted.(*downloadTransfer) topDownload = topDownloadUncasted.(*downloadTransfer)
downloadsByKey[key] = topDownload downloadsByKey[key] = topDownload
} }

View file

@ -291,17 +291,17 @@ func newTransferManager(concurrencyLimit int) *transferManager {
} }
} }
// SetConcurrency sets the concurrencyLimit // setConcurrency sets the concurrencyLimit
func (tm *transferManager) SetConcurrency(concurrency int) { func (tm *transferManager) setConcurrency(concurrency int) {
tm.mu.Lock() tm.mu.Lock()
tm.concurrencyLimit = concurrency tm.concurrencyLimit = concurrency
tm.mu.Unlock() tm.mu.Unlock()
} }
// Transfer checks if a transfer matching the given key is in progress. If not, // transfer checks if a Transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which // it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer. // receives progress output from the transfer.
func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()

View file

@ -50,7 +50,7 @@ func TestTransfer(t *testing.T) {
xfers := make([]Transfer, len(ids)) xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
for i, xfer := range xfers { for i, xfer := range xfers {
@ -108,7 +108,7 @@ func TestConcurrencyLimit(t *testing.T) {
xfers := make([]Transfer, len(ids)) xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
for i, xfer := range xfers { for i, xfer := range xfers {
@ -169,7 +169,7 @@ func TestInactiveJobs(t *testing.T) {
xfers := make([]Transfer, len(ids)) xfers := make([]Transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*Watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.Transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
close(testDone) close(testDone)
@ -237,7 +237,7 @@ func TestWatchRelease(t *testing.T) {
watchers[0].progressChan = make(chan progress.Progress) watchers[0].progressChan = make(chan progress.Progress)
watchers[0].progressDone = make(chan struct{}) watchers[0].progressDone = make(chan struct{})
watchers[0].receivedFirstProgress = make(chan struct{}) watchers[0].receivedFirstProgress = make(chan struct{})
xfer, watchers[0].watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan)) xfer, watchers[0].watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(watchers[0].progressChan))
go progressConsumer(watchers[0]) go progressConsumer(watchers[0])
// Give it multiple watchers // Give it multiple watchers
@ -295,7 +295,7 @@ func TestWatchFinishedTransfer(t *testing.T) {
// Start a transfer // Start a transfer
watchers := make([]*Watcher, 3) watchers := make([]*Watcher, 3)
var xfer Transfer var xfer Transfer
xfer, watchers[0] = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
// Give it a watcher immediately // Give it a watcher immediately
watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress))) watchers[1] = xfer.Watch(progress.ChanOutput(make(chan progress.Progress)))
@ -371,7 +371,7 @@ func TestDuplicateTransfer(t *testing.T) {
t.progressChan = make(chan progress.Progress) t.progressChan = make(chan progress.Progress)
t.progressDone = make(chan struct{}) t.progressDone = make(chan struct{})
t.receivedFirstProgress = make(chan struct{}) t.receivedFirstProgress = make(chan struct{})
t.xfer, t.watcher = tm.Transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan)) t.xfer, t.watcher = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(t.progressChan))
go progressConsumer(*t) go progressConsumer(*t)
} }

View file

@ -22,7 +22,7 @@ type LayerUploadManager struct {
// SetConcurrency sets the max concurrent uploads for each push // SetConcurrency sets the max concurrent uploads for each push
func (lum *LayerUploadManager) SetConcurrency(concurrency int) { func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
lum.tm.SetConcurrency(concurrency) lum.tm.setConcurrency(concurrency)
} }
// NewLayerUploadManager returns a new LayerUploadManager. // NewLayerUploadManager returns a new LayerUploadManager.
@ -79,7 +79,7 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
} }
xferFunc := lum.makeUploadFunc(descriptor) xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput) upload, watcher := lum.tm.transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher) defer upload.Release(watcher)
uploads = append(uploads, upload.(*uploadTransfer)) uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer) dedupDescriptors[key] = upload.(*uploadTransfer)