1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

distribution/xfer: remove TransferManager interface, un-export newTransferManager

The `TransferManager` interface only had a single implementation, and neither
`LayerDownloadManager`, nor `LayerUploadManager` currently had an option to
provide a custom implementation, so we can un-export this.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-01-22 12:19:42 +01:00
parent cf31aa0fa0
commit d746a836fc
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
4 changed files with 15 additions and 24 deletions

View file

@ -23,7 +23,7 @@ const maxDownloadAttempts = 5
// layers. // layers.
type LayerDownloadManager struct { type LayerDownloadManager struct {
layerStore layer.Store layerStore layer.Store
tm TransferManager tm *transferManager
waitDuration time.Duration waitDuration time.Duration
maxDownloadAttempts int maxDownloadAttempts int
} }
@ -37,7 +37,7 @@ func (ldm *LayerDownloadManager) SetConcurrency(concurrency int) {
func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager { func NewLayerDownloadManager(layerStore layer.Store, concurrencyLimit int, options ...func(*LayerDownloadManager)) *LayerDownloadManager {
manager := LayerDownloadManager{ manager := LayerDownloadManager{
layerStore: layerStore, layerStore: layerStore,
tm: NewTransferManager(concurrencyLimit), tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second, waitDuration: time.Second,
maxDownloadAttempts: maxDownloadAttempts, maxDownloadAttempts: maxDownloadAttempts,
} }

View file

@ -271,18 +271,9 @@ func (t *transfer) Close() {
// This prevents it from taking up a slot. // This prevents it from taking up a slot.
type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer
// TransferManager is used by LayerDownloadManager and LayerUploadManager to // transferManager is used by LayerDownloadManager and LayerUploadManager to
// schedule and deduplicate transfers. It is up to the TransferManager // schedule and deduplicate transfers. It is up to the transferManager
// implementation to make the scheduling and concurrency decisions. // to make the scheduling and concurrency decisions.
type TransferManager interface {
// Transfer checks if a transfer with the given key is in progress. If
// so, it returns progress and error output from that transfer.
// Otherwise, it will call xferFunc to initiate the transfer.
Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher)
// SetConcurrency set the concurrencyLimit so that it is adjustable daemon reload
SetConcurrency(concurrency int)
}
type transferManager struct { type transferManager struct {
mu sync.Mutex mu sync.Mutex
@ -292,8 +283,8 @@ type transferManager struct {
waitingTransfers []chan struct{} waitingTransfers []chan struct{}
} }
// NewTransferManager returns a new TransferManager. // newTransferManager returns a new transferManager.
func NewTransferManager(concurrencyLimit int) TransferManager { func newTransferManager(concurrencyLimit int) *transferManager {
return &transferManager{ return &transferManager{
concurrencyLimit: concurrencyLimit, concurrencyLimit: concurrencyLimit,
transfers: make(map[string]Transfer), transfers: make(map[string]Transfer),

View file

@ -29,7 +29,7 @@ func TestTransfer(t *testing.T) {
} }
} }
tm := NewTransferManager(5) tm := newTransferManager(5)
progressChan := make(chan progress.Progress) progressChan := make(chan progress.Progress)
progressDone := make(chan struct{}) progressDone := make(chan struct{})
receivedProgress := make(map[string]int64) receivedProgress := make(map[string]int64)
@ -91,7 +91,7 @@ func TestConcurrencyLimit(t *testing.T) {
} }
} }
tm := NewTransferManager(concurrencyLimit) tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress) progressChan := make(chan progress.Progress)
progressDone := make(chan struct{}) progressDone := make(chan struct{})
receivedProgress := make(map[string]int64) receivedProgress := make(map[string]int64)
@ -152,7 +152,7 @@ func TestInactiveJobs(t *testing.T) {
} }
} }
tm := NewTransferManager(concurrencyLimit) tm := newTransferManager(concurrencyLimit)
progressChan := make(chan progress.Progress) progressChan := make(chan progress.Progress)
progressDone := make(chan struct{}) progressDone := make(chan struct{})
receivedProgress := make(map[string]int64) receivedProgress := make(map[string]int64)
@ -211,7 +211,7 @@ func TestWatchRelease(t *testing.T) {
} }
} }
tm := NewTransferManager(5) tm := newTransferManager(5)
type watcherInfo struct { type watcherInfo struct {
watcher *Watcher watcher *Watcher
@ -290,7 +290,7 @@ func TestWatchFinishedTransfer(t *testing.T) {
} }
} }
tm := NewTransferManager(5) tm := newTransferManager(5)
// Start a transfer // Start a transfer
watchers := make([]*Watcher, 3) watchers := make([]*Watcher, 3)
@ -343,7 +343,7 @@ func TestDuplicateTransfer(t *testing.T) {
} }
} }
tm := NewTransferManager(5) tm := newTransferManager(5)
type transferInfo struct { type transferInfo struct {
xfer Transfer xfer Transfer

View file

@ -16,7 +16,7 @@ const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for // LayerUploadManager provides task management and progress reporting for
// uploads. // uploads.
type LayerUploadManager struct { type LayerUploadManager struct {
tm TransferManager tm *transferManager
waitDuration time.Duration waitDuration time.Duration
} }
@ -28,7 +28,7 @@ func (lum *LayerUploadManager) SetConcurrency(concurrency int) {
// NewLayerUploadManager returns a new LayerUploadManager. // NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager { func NewLayerUploadManager(concurrencyLimit int, options ...func(*LayerUploadManager)) *LayerUploadManager {
manager := LayerUploadManager{ manager := LayerUploadManager{
tm: NewTransferManager(concurrencyLimit), tm: newTransferManager(concurrencyLimit),
waitDuration: time.Second, waitDuration: time.Second,
} }
for _, option := range options { for _, option := range options {