mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
52 lines
1.2 KiB
Go
52 lines
1.2 KiB
Go
|
package distribution
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
|
||
|
"github.com/docker/docker/pkg/broadcaster"
|
||
|
)
|
||
|
|
||
|
// A Pool manages concurrent pulls. It deduplicates in-progress downloads.
|
||
|
type Pool struct {
|
||
|
sync.Mutex
|
||
|
pullingPool map[string]*broadcaster.Buffered
|
||
|
}
|
||
|
|
||
|
// NewPool creates a new Pool.
|
||
|
func NewPool() *Pool {
|
||
|
return &Pool{
|
||
|
pullingPool: make(map[string]*broadcaster.Buffered),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// add checks if a pull is already running, and returns (broadcaster, true)
|
||
|
// if a running operation is found. Otherwise, it creates a new one and returns
|
||
|
// (broadcaster, false).
|
||
|
func (pool *Pool) add(key string) (*broadcaster.Buffered, bool) {
|
||
|
pool.Lock()
|
||
|
defer pool.Unlock()
|
||
|
|
||
|
if p, exists := pool.pullingPool[key]; exists {
|
||
|
return p, true
|
||
|
}
|
||
|
|
||
|
broadcaster := broadcaster.NewBuffered()
|
||
|
pool.pullingPool[key] = broadcaster
|
||
|
|
||
|
return broadcaster, false
|
||
|
}
|
||
|
|
||
|
func (pool *Pool) removeWithError(key string, broadcasterResult error) error {
|
||
|
pool.Lock()
|
||
|
defer pool.Unlock()
|
||
|
if broadcaster, exists := pool.pullingPool[key]; exists {
|
||
|
broadcaster.CloseWithError(broadcasterResult)
|
||
|
delete(pool.pullingPool, key)
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (pool *Pool) remove(key string) error {
|
||
|
return pool.removeWithError(key, nil)
|
||
|
}
|