1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

distribution/xfer: un-export Watcher

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-01-22 14:32:18 +01:00
parent 96a750aab4
commit 849d8c2d02
No known key found for this signature in database
GPG key ID: 76698F39D527CE8C
3 changed files with 17 additions and 17 deletions

View file

@ -108,7 +108,7 @@ func (ldm *LayerDownloadManager) Download(ctx context.Context, initialRootFS ima
var ( var (
topLayer layer.Layer topLayer layer.Layer
topDownload *downloadTransfer topDownload *downloadTransfer
watcher *Watcher watcher *watcher
missingLayer bool missingLayer bool
transferKey = "" transferKey = ""
downloadsByKey = make(map[string]*downloadTransfer) downloadsByKey = make(map[string]*downloadTransfer)

View file

@ -19,8 +19,8 @@ func (e DoNotRetry) Error() string {
return e.Err.Error() return e.Err.Error()
} }
// Watcher is returned by Watch and can be passed to Release to stop watching. // watcher is returned by Watch and can be passed to Release to stop watching.
type Watcher struct { type watcher struct {
// signalChan is used to signal to the watcher goroutine that // signalChan is used to signal to the watcher goroutine that
// new progress information is available, or that the transfer // new progress information is available, or that the transfer
// has finished. // has finished.
@ -36,8 +36,8 @@ type Watcher struct {
// transfer represents an in-progress transfer. // transfer represents an in-progress transfer.
type transfer interface { type transfer interface {
Watch(progressOutput progress.Output) *Watcher Watch(progressOutput progress.Output) *watcher
Release(*Watcher) Release(*watcher)
Context() context.Context Context() context.Context
Close() Close()
Done() <-chan struct{} Done() <-chan struct{}
@ -53,7 +53,7 @@ type xfer struct {
// watchers keeps track of the goroutines monitoring progress output, // watchers keeps track of the goroutines monitoring progress output,
// indexed by the channels that release them. // indexed by the channels that release them.
watchers map[chan struct{}]*Watcher watchers map[chan struct{}]*watcher
// lastProgress is the most recently received progress event. // lastProgress is the most recently received progress event.
lastProgress progress.Progress lastProgress progress.Progress
@ -80,7 +80,7 @@ type xfer struct {
// newTransfer creates a new transfer. // newTransfer creates a new transfer.
func newTransfer() transfer { func newTransfer() transfer {
t := &xfer{ t := &xfer{
watchers: make(map[chan struct{}]*Watcher), watchers: make(map[chan struct{}]*watcher),
running: make(chan struct{}), running: make(chan struct{}),
released: make(chan struct{}), released: make(chan struct{}),
broadcastSyncChan: make(chan struct{}), broadcastSyncChan: make(chan struct{}),
@ -137,11 +137,11 @@ func (t *xfer) Broadcast(mainProgressChan <-chan progress.Progress) {
// Watch adds a watcher to the transfer. The supplied channel gets progress // Watch adds a watcher to the transfer. The supplied channel gets progress
// updates and is closed when the transfer finishes. // updates and is closed when the transfer finishes.
func (t *xfer) Watch(progressOutput progress.Output) *Watcher { func (t *xfer) Watch(progressOutput progress.Output) *watcher {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()
w := &Watcher{ w := &watcher{
releaseChan: make(chan struct{}), releaseChan: make(chan struct{}),
signalChan: make(chan struct{}), signalChan: make(chan struct{}),
running: make(chan struct{}), running: make(chan struct{}),
@ -205,7 +205,7 @@ func (t *xfer) Watch(progressOutput progress.Output) *Watcher {
// to be notified about the progress of the transfer. All calls to Watch must // to be notified about the progress of the transfer. All calls to Watch must
// be paired with later calls to Release so that the lifecycle of the transfer // be paired with later calls to Release so that the lifecycle of the transfer
// is properly managed. // is properly managed.
func (t *xfer) Release(watcher *Watcher) { func (t *xfer) Release(watcher *watcher) {
t.mu.Lock() t.mu.Lock()
delete(t.watchers, watcher.releaseChan) delete(t.watchers, watcher.releaseChan)
@ -301,7 +301,7 @@ func (tm *transferManager) setConcurrency(concurrency int) {
// transfer checks if a transfer matching the given key is in progress. If not, // transfer checks if a transfer matching the given key is in progress. If not,
// it starts one by calling xferFunc. The caller supplies a channel which // it starts one by calling xferFunc. The caller supplies a channel which
// receives progress output from the transfer. // receives progress output from the transfer.
func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *Watcher) { func (tm *transferManager) transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (transfer, *watcher) {
tm.mu.Lock() tm.mu.Lock()
defer tm.mu.Unlock() defer tm.mu.Unlock()

View file

@ -48,7 +48,7 @@ func TestTransfer(t *testing.T) {
// Start a few transfers // Start a few transfers
ids := []string{"id1", "id2", "id3"} ids := []string{"id1", "id2", "id3"}
xfers := make([]transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
@ -106,7 +106,7 @@ func TestConcurrencyLimit(t *testing.T) {
// Start more transfers than the concurrency limit // Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
@ -167,7 +167,7 @@ func TestInactiveJobs(t *testing.T) {
// Start more transfers than the concurrency limit // Start more transfers than the concurrency limit
ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"} ids := []string{"id1", "id2", "id3", "id4", "id5", "id6", "id7", "id8"}
xfers := make([]transfer, len(ids)) xfers := make([]transfer, len(ids))
watchers := make([]*Watcher, len(ids)) watchers := make([]*watcher, len(ids))
for i, id := range ids { for i, id := range ids {
xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan)) xfers[i], watchers[i] = tm.transfer(id, makeXferFunc(id), progress.ChanOutput(progressChan))
} }
@ -214,7 +214,7 @@ func TestWatchRelease(t *testing.T) {
tm := newTransferManager(5) tm := newTransferManager(5)
type watcherInfo struct { type watcherInfo struct {
watcher *Watcher watcher *watcher
progressChan chan progress.Progress progressChan chan progress.Progress
progressDone chan struct{} progressDone chan struct{}
receivedFirstProgress chan struct{} receivedFirstProgress chan struct{}
@ -293,7 +293,7 @@ func TestWatchFinishedTransfer(t *testing.T) {
tm := newTransferManager(5) tm := newTransferManager(5)
// Start a transfer // Start a transfer
watchers := make([]*Watcher, 3) watchers := make([]*watcher, 3)
var xfer transfer var xfer transfer
xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress))) xfer, watchers[0] = tm.transfer("id1", makeXferFunc("id1"), progress.ChanOutput(make(chan progress.Progress)))
@ -347,7 +347,7 @@ func TestDuplicateTransfer(t *testing.T) {
type transferInfo struct { type transferInfo struct {
xfer transfer xfer transfer
watcher *Watcher watcher *watcher
progressChan chan progress.Progress progressChan chan progress.Progress
progressDone chan struct{} progressDone chan struct{}
receivedFirstProgress chan struct{} receivedFirstProgress chan struct{}