1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Fix concurrent uploads that share layers

Concurrent uploads which share layers worked correctly as of #18353,
but unfortunately #18785 caused a regression. This PR removed the logic
that shares digests between different push sessions. This overlooked the
case where one session was waiting for another session to upload a
layer.

This commit adds back the ability to propagate this digest information,
using the distribution.Descriptor type because this is what is received
from stats and uploads, and also what is ultimately needed for building
the manifest.

Surprisingly, there was no test covering this case. This commit adds
one. It fails without the fix.

See recent comments on #9132.

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
(cherry picked from commit 5c99eebe81)
This commit is contained in:
Aaron Lehmann 2016-03-01 10:56:05 -08:00 committed by Tibor Vass
parent 4b2e7f0562
commit f05d42ee03
4 changed files with 106 additions and 35 deletions

View file

@ -42,7 +42,7 @@ type v2Pusher struct {
config *ImagePushConfig config *ImagePushConfig
repo distribution.Repository repo distribution.Repository
// pushState is state built by the Download functions. // pushState is state built by the Upload functions.
pushState pushState pushState pushState
} }
@ -216,6 +216,7 @@ type v2PushDescriptor struct {
repoInfo reference.Named repoInfo reference.Named
repo distribution.Repository repo distribution.Repository
pushState *pushState pushState *pushState
remoteDescriptor distribution.Descriptor
} }
func (pd *v2PushDescriptor) Key() string { func (pd *v2PushDescriptor) Key() string {
@ -230,16 +231,16 @@ func (pd *v2PushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID() return pd.layer.DiffID()
} }
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error { func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
diffID := pd.DiffID() diffID := pd.DiffID()
pd.pushState.Lock() pd.pushState.Lock()
if _, ok := pd.pushState.remoteLayers[diffID]; ok { if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok {
// it is already known that the push is not needed and // it is already known that the push is not needed and
// therefore doing a stat is unnecessary // therefore doing a stat is unnecessary
pd.pushState.Unlock() pd.pushState.Unlock()
progress.Update(progressOutput, pd.ID(), "Layer already exists") progress.Update(progressOutput, pd.ID(), "Layer already exists")
return nil return descriptor, nil
} }
pd.pushState.Unlock() pd.pushState.Unlock()
@ -249,14 +250,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState) descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState)
if err != nil { if err != nil {
progress.Update(progressOutput, pd.ID(), "Image push failed") progress.Update(progressOutput, pd.ID(), "Image push failed")
return retryOnError(err) return distribution.Descriptor{}, retryOnError(err)
} }
if exists { if exists {
progress.Update(progressOutput, pd.ID(), "Layer already exists") progress.Update(progressOutput, pd.ID(), "Layer already exists")
pd.pushState.Lock() pd.pushState.Lock()
pd.pushState.remoteLayers[diffID] = descriptor pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock() pd.pushState.Unlock()
return nil return descriptor, nil
} }
} }
@ -286,7 +287,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if mountFrom.SourceRepository != "" { if mountFrom.SourceRepository != "" {
namedRef, err := reference.WithName(mountFrom.SourceRepository) namedRef, err := reference.WithName(mountFrom.SourceRepository)
if err != nil { if err != nil {
return err return distribution.Descriptor{}, err
} }
// TODO (brianbland): We need to construct a reference where the Name is // TODO (brianbland): We need to construct a reference where the Name is
@ -294,12 +295,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// richer reference package // richer reference package
remoteRef, err := distreference.WithName(namedRef.RemoteName()) remoteRef, err := distreference.WithName(namedRef.RemoteName())
if err != nil { if err != nil {
return err return distribution.Descriptor{}, err
} }
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest) canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
if err != nil { if err != nil {
return err return distribution.Descriptor{}, err
} }
createOpts = append(createOpts, client.WithMountFrom(canonicalRef)) createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
@ -320,10 +321,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// Cache mapping from this layer's DiffID to the blobsum // Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err} return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
} }
return nil return err.Descriptor, nil
} }
if mountFrom.SourceRepository != "" { if mountFrom.SourceRepository != "" {
// unable to mount layer from this repository, so this source mapping is no longer valid // unable to mount layer from this repository, so this source mapping is no longer valid
@ -332,13 +333,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
} }
if err != nil { if err != nil {
return retryOnError(err) return distribution.Descriptor{}, retryOnError(err)
} }
defer layerUpload.Close() defer layerUpload.Close()
arch, err := pd.layer.TarStream() arch, err := pd.layer.TarStream()
if err != nil { if err != nil {
return xfer.DoNotRetry{Err: err} return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
} }
// don't care if this fails; best effort // don't care if this fails; best effort
@ -357,12 +358,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
nn, err := layerUpload.ReadFrom(tee) nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close() compressedReader.Close()
if err != nil { if err != nil {
return retryOnError(err) return distribution.Descriptor{}, retryOnError(err)
} }
pushDigest := digester.Digest() pushDigest := digester.Digest()
if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil { if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil {
return retryOnError(err) return distribution.Descriptor{}, retryOnError(err)
} }
logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn) logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn)
@ -370,7 +371,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// Cache mapping from this layer's DiffID to the blobsum // Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err} return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
} }
pd.pushState.Lock() pd.pushState.Lock()
@ -379,23 +380,24 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// speaks the v2 protocol. // speaks the v2 protocol.
pd.pushState.confirmedV2 = true pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = distribution.Descriptor{ descriptor := distribution.Descriptor{
Digest: pushDigest, Digest: pushDigest,
MediaType: schema2.MediaTypeLayer, MediaType: schema2.MediaTypeLayer,
Size: nn, Size: nn,
} }
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock() pd.pushState.Unlock()
return nil return descriptor, nil
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
} }
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
// Not necessary to lock pushStatus because this is always return pd.remoteDescriptor
// called after all the mutation in pushStatus.
// By the time this function is called, every layer will have
// an entry in remoteLayers.
return pd.pushState.remoteLayers[pd.DiffID()]
} }
// layerAlreadyExists checks if the registry already know about any of the // layerAlreadyExists checks if the registry already know about any of the

View file

@ -5,6 +5,7 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/docker/layer" "github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/progress"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -28,7 +29,7 @@ func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager {
type uploadTransfer struct { type uploadTransfer struct {
Transfer Transfer
diffID layer.DiffID remoteDescriptor distribution.Descriptor
err error err error
} }
@ -41,7 +42,12 @@ type UploadDescriptor interface {
// DiffID should return the DiffID for this layer. // DiffID should return the DiffID for this layer.
DiffID() layer.DiffID DiffID() layer.DiffID
// Upload is called to perform the Upload. // Upload is called to perform the Upload.
Upload(ctx context.Context, progressOutput progress.Output) error Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error)
// SetRemoteDescriptor provides the distribution.Descriptor that was
// returned by Upload. This descriptor is not to be confused with
// the UploadDescriptor interface, which is used for internally
// identifying layers that are being uploaded.
SetRemoteDescriptor(descriptor distribution.Descriptor)
} }
// Upload is a blocking function which ensures the listed layers are present on // Upload is a blocking function which ensures the listed layers are present on
@ -50,7 +56,7 @@ type UploadDescriptor interface {
func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error { func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error {
var ( var (
uploads []*uploadTransfer uploads []*uploadTransfer
dedupDescriptors = make(map[string]struct{}) dedupDescriptors = make(map[string]*uploadTransfer)
) )
for _, descriptor := range layers { for _, descriptor := range layers {
@ -60,12 +66,12 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
if _, present := dedupDescriptors[key]; present { if _, present := dedupDescriptors[key]; present {
continue continue
} }
dedupDescriptors[key] = struct{}{}
xferFunc := lum.makeUploadFunc(descriptor) xferFunc := lum.makeUploadFunc(descriptor)
upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput) upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput)
defer upload.Release(watcher) defer upload.Release(watcher)
uploads = append(uploads, upload.(*uploadTransfer)) uploads = append(uploads, upload.(*uploadTransfer))
dedupDescriptors[key] = upload.(*uploadTransfer)
} }
for _, upload := range uploads { for _, upload := range uploads {
@ -78,6 +84,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri
} }
} }
} }
for _, l := range layers {
l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor)
}
return nil return nil
} }
@ -86,7 +95,6 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer {
u := &uploadTransfer{ u := &uploadTransfer{
Transfer: NewTransfer(), Transfer: NewTransfer(),
diffID: descriptor.DiffID(),
} }
go func() { go func() {
@ -105,8 +113,9 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun
retries := 0 retries := 0
for { for {
err := descriptor.Upload(u.Transfer.Context(), progressOutput) remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput)
if err == nil { if err == nil {
u.remoteDescriptor = remoteDescriptor
break break
} }

View file

@ -6,6 +6,7 @@ import (
"testing" "testing"
"time" "time"
"github.com/docker/distribution"
"github.com/docker/distribution/digest" "github.com/docker/distribution/digest"
"github.com/docker/docker/layer" "github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress" "github.com/docker/docker/pkg/progress"
@ -35,13 +36,17 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID {
return u.diffID return u.diffID
} }
// SetRemoteDescriptor is not used in the mock.
func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) {
}
// Upload is called to perform the upload. // Upload is called to perform the upload.
func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error { func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
if u.currentUploads != nil { if u.currentUploads != nil {
defer atomic.AddInt32(u.currentUploads, -1) defer atomic.AddInt32(u.currentUploads, -1)
if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency { if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency {
return errors.New("concurrency limit exceeded") return distribution.Descriptor{}, errors.New("concurrency limit exceeded")
} }
} }
@ -49,7 +54,7 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
for i := int64(0); i <= 10; i++ { for i := int64(0); i <= 10; i++ {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return distribution.Descriptor{}, ctx.Err()
case <-time.After(10 * time.Millisecond): case <-time.After(10 * time.Millisecond):
progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10}) progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10})
} }
@ -57,10 +62,10 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre
if u.simulateRetries != 0 { if u.simulateRetries != 0 {
u.simulateRetries-- u.simulateRetries--
return errors.New("simulating retry") return distribution.Descriptor{}, errors.New("simulating retry")
} }
return nil return distribution.Descriptor{}, nil
} }
func uploadDescriptors(currentUploads *int32) []UploadDescriptor { func uploadDescriptors(currentUploads *int32) []UploadDescriptor {

View file

@ -148,6 +148,61 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) {
testPushEmptyLayer(c) testPushEmptyLayer(c)
} }
// testConcurrentPush pushes multiple tags to the same repo
// concurrently.
func testConcurrentPush(c *check.C) {
repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
repos := []string{}
for _, tag := range []string{"push1", "push2", "push3"} {
repo := fmt.Sprintf("%v:%v", repoName, tag)
_, err := buildImage(repo, fmt.Sprintf(`
FROM busybox
ENTRYPOINT ["/bin/echo"]
ENV FOO foo
ENV BAR bar
CMD echo %s
`, repo), true)
c.Assert(err, checker.IsNil)
repos = append(repos, repo)
}
// Push tags, in parallel
results := make(chan error)
for _, repo := range repos {
go func(repo string) {
_, _, err := runCommandWithOutput(exec.Command(dockerBinary, "push", repo))
results <- err
}(repo)
}
for range repos {
err := <-results
c.Assert(err, checker.IsNil, check.Commentf("concurrent push failed with error: %v", err))
}
// Clear local images store.
args := append([]string{"rmi"}, repos...)
dockerCmd(c, args...)
// Re-pull and run individual tags, to make sure pushes succeeded
for _, repo := range repos {
dockerCmd(c, "pull", repo)
dockerCmd(c, "inspect", repo)
out, _ := dockerCmd(c, "run", "--rm", repo)
c.Assert(strings.TrimSpace(out), checker.Equals, "/bin/sh -c echo "+repo)
}
}
func (s *DockerRegistrySuite) TestConcurrentPush(c *check.C) {
testConcurrentPush(c)
}
func (s *DockerSchema1RegistrySuite) TestConcurrentPush(c *check.C) {
testConcurrentPush(c)
}
func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) { func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) {
sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
// tag the image to upload it to the private registry // tag the image to upload it to the private registry