package xfer import ( "runtime" "sync" "github.com/docker/docker/pkg/progress" "golang.org/x/net/context" ) // DoNotRetry is an error wrapper indicating that the error cannot be resolved // with a retry. type DoNotRetry struct { Err error } // Error returns the stringified representation of the encapsulated error. func (e DoNotRetry) Error() string { return e.Err.Error() } // Watcher is returned by Watch and can be passed to Release to stop watching. type Watcher struct { // signalChan is used to signal to the watcher goroutine that // new progress information is available, or that the transfer // has finished. signalChan chan struct{} // releaseChan signals to the watcher goroutine that the watcher // should be detached. releaseChan chan struct{} // running remains open as long as the watcher is watching the // transfer. It gets closed if the transfer finishes or the // watcher is detached. running chan struct{} } // Transfer represents an in-progress transfer. type Transfer interface { Watch(progressOutput progress.Output) *Watcher Release(*Watcher) Context() context.Context Close() Done() <-chan struct{} Released() <-chan struct{} Broadcast(masterProgressChan <-chan progress.Progress) } type transfer struct { mu sync.Mutex ctx context.Context cancel context.CancelFunc // watchers keeps track of the goroutines monitoring progress output, // indexed by the channels that release them. watchers map[chan struct{}]*Watcher // lastProgress is the most recently received progress event. lastProgress progress.Progress // hasLastProgress is true when lastProgress has been set. hasLastProgress bool // running remains open as long as the transfer is in progress. running chan struct{} // released stays open until all watchers release the transfer and // the transfer is no longer tracked by the transfer manager. released chan struct{} // broadcastDone is true if the master progress channel has closed. broadcastDone bool // closed is true if Close has been called closed bool // broadcastSyncChan allows watchers to "ping" the broadcasting // goroutine to wait for it for deplete its input channel. This ensures // a detaching watcher won't miss an event that was sent before it // started detaching. broadcastSyncChan chan struct{} } // NewTransfer creates a new transfer. func NewTransfer() Transfer { t := &transfer{ watchers: make(map[chan struct{}]*Watcher), running: make(chan struct{}), released: make(chan struct{}), broadcastSyncChan: make(chan struct{}), } // This uses context.Background instead of a caller-supplied context // so that a transfer won't be cancelled automatically if the client // which requested it is ^C'd (there could be other viewers). t.ctx, t.cancel = context.WithCancel(context.Background()) return t } // Broadcast copies the progress and error output to all viewers. func (t *transfer) Broadcast(masterProgressChan <-chan progress.Progress) { for { var ( p progress.Progress ok bool ) select { case p, ok = <-masterProgressChan: default: // We've depleted the channel, so now we can handle // reads on broadcastSyncChan to let detaching watchers // know we're caught up. select { case <-t.broadcastSyncChan: continue case p, ok = <-masterProgressChan: } } t.mu.Lock() if ok { t.lastProgress = p t.hasLastProgress = true for _, w := range t.watchers { select { case w.signalChan <- struct{}{}: default: } } } else { t.broadcastDone = true } t.mu.Unlock() if !ok { close(t.running) return } } } // Watch adds a watcher to the transfer. The supplied channel gets progress // updates and is closed when the transfer finishes. func (t *transfer) Watch(progressOutput progress.Output) *Watcher { t.mu.Lock() defer t.mu.Unlock() w := &Watcher{ releaseChan: make(chan struct{}), signalChan: make(chan struct{}), running: make(chan struct{}), } t.watchers[w.releaseChan] = w if t.broadcastDone { close(w.running) return w } go func() { defer func() { close(w.running) }() var ( done bool lastWritten progress.Progress hasLastWritten bool ) for { t.mu.Lock() hasLastProgress := t.hasLastProgress lastProgress := t.lastProgress t.mu.Unlock() // Make sure we don't write the last progress item // twice. if hasLastProgress && (!done || !hasLastWritten || lastProgress != lastWritten) { progressOutput.WriteProgress(lastProgress) lastWritten = lastProgress hasLastWritten = true } if done { return } select { case <-w.signalChan: case <-w.releaseChan: done = true // Since the watcher is going to detach, make // sure the broadcaster is caught up so we // don't miss anything. select { case t.broadcastSyncChan <- struct{}{}: case <-t.running: } case <-t.running: done = true } } }() return w } // Release is the inverse of Watch; indicating that the watcher no longer wants // to be notified about the progress of the transfer. All calls to Watch must // be paired with later calls to Release so that the lifecycle of the transfer // is properly managed. func (t *transfer) Release(watcher *Watcher) { t.mu.Lock() delete(t.watchers, watcher.releaseChan) if len(t.watchers) == 0 { if t.closed { // released may have been closed already if all // watchers were released, then another one was added // while waiting for a previous watcher goroutine to // finish. select { case <-t.released: default: close(t.released) } } else { t.cancel() } } t.mu.Unlock() close(watcher.releaseChan) // Block until the watcher goroutine completes <-watcher.running } // Done returns a channel which is closed if the transfer completes or is // cancelled. Note that having 0 watchers causes a transfer to be cancelled. func (t *transfer) Done() <-chan struct{} { // Note that this doesn't return t.ctx.Done() because that channel will // be closed the moment Cancel is called, and we need to return a // channel that blocks until a cancellation is actually acknowledged by // the transfer function. return t.running } // Released returns a channel which is closed once all watchers release the // transfer AND the transfer is no longer tracked by the transfer manager. func (t *transfer) Released() <-chan struct{} { return t.released } // Context returns the context associated with the transfer. func (t *transfer) Context() context.Context { return t.ctx } // Close is called by the transfer manager when the transfer is no longer // being tracked. func (t *transfer) Close() { t.mu.Lock() t.closed = true if len(t.watchers) == 0 { close(t.released) } t.mu.Unlock() } // DoFunc is a function called by the transfer manager to actually perform // a transfer. It should be non-blocking. It should wait until the start channel // is closed before transferring any data. If the function closes inactive, that // signals to the transfer manager that the job is no longer actively moving // data - for example, it may be waiting for a dependent transfer to finish. // This prevents it from taking up a slot. type DoFunc func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer // TransferManager is used by LayerDownloadManager and LayerUploadManager to // schedule and deduplicate transfers. It is up to the TransferManager // implementation to make the scheduling and concurrency decisions. type TransferManager interface { // Transfer checks if a transfer with the given key is in progress. If // so, it returns progress and error output from that transfer. // Otherwise, it will call xferFunc to initiate the transfer. Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) } type transferManager struct { mu sync.Mutex concurrencyLimit int activeTransfers int transfers map[string]Transfer waitingTransfers []chan struct{} } // NewTransferManager returns a new TransferManager. func NewTransferManager(concurrencyLimit int) TransferManager { return &transferManager{ concurrencyLimit: concurrencyLimit, transfers: make(map[string]Transfer), } } // Transfer checks if a transfer matching the given key is in progress. If not, // it starts one by calling xferFunc. The caller supplies a channel which // receives progress output from the transfer. func (tm *transferManager) Transfer(key string, xferFunc DoFunc, progressOutput progress.Output) (Transfer, *Watcher) { tm.mu.Lock() defer tm.mu.Unlock() for { xfer, present := tm.transfers[key] if !present { break } // Transfer is already in progress. watcher := xfer.Watch(progressOutput) select { case <-xfer.Context().Done(): // We don't want to watch a transfer that has been cancelled. // Wait for it to be removed from the map and try again. xfer.Release(watcher) tm.mu.Unlock() // The goroutine that removes this transfer from the // map is also waiting for xfer.Done(), so yield to it. // This could be avoided by adding a Closed method // to Transfer to allow explicitly waiting for it to be // removed the map, but forcing a scheduling round in // this very rare case seems better than bloating the // interface definition. runtime.Gosched() <-xfer.Done() tm.mu.Lock() default: return xfer, watcher } } start := make(chan struct{}) inactive := make(chan struct{}) if tm.activeTransfers < tm.concurrencyLimit { close(start) tm.activeTransfers++ } else { tm.waitingTransfers = append(tm.waitingTransfers, start) } masterProgressChan := make(chan progress.Progress) xfer := xferFunc(masterProgressChan, start, inactive) watcher := xfer.Watch(progressOutput) go xfer.Broadcast(masterProgressChan) tm.transfers[key] = xfer // When the transfer is finished, remove from the map. go func() { for { select { case <-inactive: tm.mu.Lock() tm.inactivate(start) tm.mu.Unlock() inactive = nil case <-xfer.Done(): tm.mu.Lock() if inactive != nil { tm.inactivate(start) } delete(tm.transfers, key) tm.mu.Unlock() xfer.Close() return } } }() return xfer, watcher } func (tm *transferManager) inactivate(start chan struct{}) { // If the transfer was started, remove it from the activeTransfers // count. select { case <-start: // Start next transfer if any are waiting if len(tm.waitingTransfers) != 0 { close(tm.waitingTransfers[0]) tm.waitingTransfers = tm.waitingTransfers[1:] } else { tm.activeTransfers-- } default: } }