1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/distribution/xfer/upload.go
Aaron Lehmann 5c99eebe81 Fix concurrent uploads that share layers
Concurrent uploads which share layers worked correctly as of #18353,
but unfortunately #18785 caused a regression. This PR removed the logic
that shares digests between different push sessions. This overlooked the
case where one session was waiting for another session to upload a
layer.

This commit adds back the ability to propagate this digest information,
using the distribution.Descriptor type because this is what is received
from stats and uploads, and also what is ultimately needed for building
the manifest.

Surprisingly, there was no test covering this case. This commit adds
one. It fails without the fix.

See recent comments on #9132.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
2016-03-01 11:14:44 -08:00

163 lines
4.1 KiB
Go

package xfer
import (
"errors"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"golang.org/x/net/context"
)
const maxUploadAttempts = 5
// LayerUploadManager provides task management and progress reporting for
// uploads.
type LayerUploadManager struct {
tm TransferManager
}
// NewLayerUploadManager returns a new LayerUploadManager.
func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
return &LayerUploadManager{
tm: NewTransferManager(concurrencyLimit),
}
}
type uploadTransfer struct {
Transfer
remoteDescriptor distribution.Descriptor
err error
}
// An UploadDescriptor references a layer that may need to be uploaded.
type UploadDescriptor interface {
// Key returns the key used to deduplicate uploads.
Key() string
// ID returns the ID for display purposes.
ID() string
// DiffID should return the DiffID for this layer.
DiffID() layer.DiffID
// Upload is called to perform the Upload.
Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
// SetRemoteDescriptor provides the distribution.Descriptor that was
// returned by Upload. This descriptor is not to be confused with
// the UploadDescriptor interface, which is used for internally
// identifying layers that are being uploaded.
SetRemoteDescriptor(descriptor distribution.Descriptor)
}
// Upload is a blocking function which ensures the listed layers are present on
// the remote registry. It uses the string returned by the Key method to
// deduplicate uploads.
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
var (
uploads []*uploadTransfer
dedupDescriptors = make(map[string]*uploadTransfer)
)
for _, descriptor := range layers {
progress.Update(progressOutput, descriptor.ID(), "Preparing")
key := descriptor.Key()
if _, present := dedupDescriptors[key]; present {
continue
}
xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher)
uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
}
for _, upload := range uploads {
select {
case <-ctx.Done():
return ctx.Err()
case <-upload.Transfer.Done():
if upload.err != nil {
return upload.err
}
}
}
for _, l := range layers {
l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
}
return nil
}
func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFunc {
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
u := &uploadTransfer{
Transfer: NewTransfer(),
}
go func() {
defer func() {
close(progressChan)
}()
progressOutput := progress.ChanOutput(progressChan)
select {
case <-start:
default:
progress.Update(progressOutput, descriptor.ID(), "Waiting")
<-start
}
retries := 0
for {
remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
if err == nil {
u.remoteDescriptor = remoteDescriptor
break
}
// If an error was returned because the context
// was cancelled, we shouldn't retry.
select {
case <-u.Transfer.Context().Done():
u.err = err
return
default:
}
retries++
if _, isDNR := err.(DoNotRetry); isDNR || retries == maxUploadAttempts {
logrus.Errorf("Upload failed: %v", err)
u.err = err
return
}
logrus.Errorf("Upload failed, retrying: %v", err)
delay := retries * 5
ticker := time.NewTicker(time.Second)
selectLoop:
for {
progress.Updatef(progressOutput, descriptor.ID(), "Retrying in %d seconds", delay)
select {
case <-ticker.C:
delay--
if delay == 0 {
ticker.Stop()
break selectLoop
}
case <-u.Transfer.Context().Done():
ticker.Stop()
u.err = errors.New("upload cancelled during retry delay")
return
}
}
}
}()
return u
}
}