moby--moby/distribution/pool.go

52 lines
1.2 KiB
Go

package distribution
import (
"sync"
"github.com/docker/docker/pkg/broadcaster"
)
// A Pool manages concurrent pulls. It deduplicates in-progress downloads.
type Pool struct {
sync.Mutex
pullingPool map[string]*broadcaster.Buffered
}
// NewPool creates a new Pool.
func NewPool() *Pool {
return &Pool{
pullingPool: make(map[string]*broadcaster.Buffered),
}
}
// add checks if a pull is already running, and returns (broadcaster, true)
// if a running operation is found. Otherwise, it creates a new one and returns
// (broadcaster, false).
func (pool *Pool) add(key string) (*broadcaster.Buffered, bool) {
pool.Lock()
defer pool.Unlock()
if p, exists := pool.pullingPool[key]; exists {
return p, true
}
broadcaster := broadcaster.NewBuffered()
pool.pullingPool[key] = broadcaster
return broadcaster, false
}
func (pool *Pool) removeWithError(key string, broadcasterResult error) error {
pool.Lock()
defer pool.Unlock()
if broadcaster, exists := pool.pullingPool[key]; exists {
broadcaster.CloseWithError(broadcasterResult)
delete(pool.pullingPool, key)
}
return nil
}
func (pool *Pool) remove(key string) error {
return pool.removeWithError(key, nil)
}