diff --git a/distribution/push_v2.go b/distribution/push_v2.go index e5f9de81f6..a8d9acdbb2 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -42,7 +42,7 @@ type v2Pusher struct { config *ImagePushConfig repo distribution.Repository - // pushState is state built by the Download functions. + // pushState is state built by the Upload functions. pushState pushState } @@ -216,6 +216,7 @@ type v2PushDescriptor struct { repoInfo reference.Named repo distribution.Repository pushState *pushState + remoteDescriptor distribution.Descriptor } func (pd *v2PushDescriptor) Key() string { @@ -230,16 +231,16 @@ func (pd *v2PushDescriptor) DiffID() layer.DiffID { return pd.layer.DiffID() } -func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error { +func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { diffID := pd.DiffID() pd.pushState.Lock() - if _, ok := pd.pushState.remoteLayers[diffID]; ok { + if descriptor, ok := pd.pushState.remoteLayers[diffID]; ok { // it is already known that the push is not needed and // therefore doing a stat is unnecessary pd.pushState.Unlock() progress.Update(progressOutput, pd.ID(), "Layer already exists") - return nil + return descriptor, nil } pd.pushState.Unlock() @@ -249,14 +250,14 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState) if err != nil { progress.Update(progressOutput, pd.ID(), "Image push failed") - return retryOnError(err) + return distribution.Descriptor{}, retryOnError(err) } if exists { progress.Update(progressOutput, pd.ID(), "Layer already exists") pd.pushState.Lock() pd.pushState.remoteLayers[diffID] = descriptor pd.pushState.Unlock() - return nil + return descriptor, nil } } @@ -286,7 +287,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. if mountFrom.SourceRepository != "" { namedRef, err := reference.WithName(mountFrom.SourceRepository) if err != nil { - return err + return distribution.Descriptor{}, err } // TODO (brianbland): We need to construct a reference where the Name is @@ -294,12 +295,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // richer reference package remoteRef, err := distreference.WithName(namedRef.RemoteName()) if err != nil { - return err + return distribution.Descriptor{}, err } canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest) if err != nil { - return err + return distribution.Descriptor{}, err } createOpts = append(createOpts, client.WithMountFrom(canonicalRef)) @@ -320,10 +321,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // Cache mapping from this layer's DiffID to the blobsum if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { - return xfer.DoNotRetry{Err: err} + return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } - return nil + return err.Descriptor, nil } if mountFrom.SourceRepository != "" { // unable to mount layer from this repository, so this source mapping is no longer valid @@ -332,13 +333,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } if err != nil { - return retryOnError(err) + return distribution.Descriptor{}, retryOnError(err) } defer layerUpload.Close() arch, err := pd.layer.TarStream() if err != nil { - return xfer.DoNotRetry{Err: err} + return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } // don't care if this fails; best effort @@ -357,12 +358,12 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. nn, err := layerUpload.ReadFrom(tee) compressedReader.Close() if err != nil { - return retryOnError(err) + return distribution.Descriptor{}, retryOnError(err) } pushDigest := digester.Digest() if _, err := layerUpload.Commit(ctx, distribution.Descriptor{Digest: pushDigest}); err != nil { - return retryOnError(err) + return distribution.Descriptor{}, retryOnError(err) } logrus.Debugf("uploaded layer %s (%s), %d bytes", diffID, pushDigest, nn) @@ -370,7 +371,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // Cache mapping from this layer's DiffID to the blobsum if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { - return xfer.DoNotRetry{Err: err} + return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } pd.pushState.Lock() @@ -379,23 +380,24 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // speaks the v2 protocol. pd.pushState.confirmedV2 = true - pd.pushState.remoteLayers[diffID] = distribution.Descriptor{ + descriptor := distribution.Descriptor{ Digest: pushDigest, MediaType: schema2.MediaTypeLayer, Size: nn, } + pd.pushState.remoteLayers[diffID] = descriptor pd.pushState.Unlock() - return nil + return descriptor, nil +} + +func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { + pd.remoteDescriptor = descriptor } func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { - // Not necessary to lock pushStatus because this is always - // called after all the mutation in pushStatus. - // By the time this function is called, every layer will have - // an entry in remoteLayers. - return pd.pushState.remoteLayers[pd.DiffID()] + return pd.remoteDescriptor } // layerAlreadyExists checks if the registry already know about any of the diff --git a/distribution/xfer/upload.go b/distribution/xfer/upload.go index 8da6a89e39..20fe045ac6 100644 --- a/distribution/xfer/upload.go +++ b/distribution/xfer/upload.go @@ -5,6 +5,7 @@ import ( "time" "github.com/Sirupsen/logrus" + "github.com/docker/distribution" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/progress" "golang.org/x/net/context" @@ -28,8 +29,8 @@ func NewLayerUploadManager(concurrencyLimit int) *LayerUploadManager { type uploadTransfer struct { Transfer - diffID layer.DiffID - err error + remoteDescriptor distribution.Descriptor + err error } // An UploadDescriptor references a layer that may need to be uploaded. @@ -41,7 +42,12 @@ type UploadDescriptor interface { // DiffID should return the DiffID for this layer. DiffID() layer.DiffID // Upload is called to perform the Upload. - Upload(ctx context.Context, progressOutput progress.Output) error + Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) + // SetRemoteDescriptor provides the distribution.Descriptor that was + // returned by Upload. This descriptor is not to be confused with + // the UploadDescriptor interface, which is used for internally + // identifying layers that are being uploaded. + SetRemoteDescriptor(descriptor distribution.Descriptor) } // Upload is a blocking function which ensures the listed layers are present on @@ -50,7 +56,7 @@ type UploadDescriptor interface { func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescriptor, progressOutput progress.Output) error { var ( uploads []*uploadTransfer - dedupDescriptors = make(map[string]struct{}) + dedupDescriptors = make(map[string]*uploadTransfer) ) for _, descriptor := range layers { @@ -60,12 +66,12 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri if _, present := dedupDescriptors[key]; present { continue } - dedupDescriptors[key] = struct{}{} xferFunc := lum.makeUploadFunc(descriptor) upload, watcher := lum.tm.Transfer(descriptor.Key(), xferFunc, progressOutput) defer upload.Release(watcher) uploads = append(uploads, upload.(*uploadTransfer)) + dedupDescriptors[key] = upload.(*uploadTransfer) } for _, upload := range uploads { @@ -78,6 +84,9 @@ func (lum *LayerUploadManager) Upload(ctx context.Context, layers []UploadDescri } } } + for _, l := range layers { + l.SetRemoteDescriptor(dedupDescriptors[l.Key()].remoteDescriptor) + } return nil } @@ -86,7 +95,6 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun return func(progressChan chan<- progress.Progress, start <-chan struct{}, inactive chan<- struct{}) Transfer { u := &uploadTransfer{ Transfer: NewTransfer(), - diffID: descriptor.DiffID(), } go func() { @@ -105,8 +113,9 @@ func (lum *LayerUploadManager) makeUploadFunc(descriptor UploadDescriptor) DoFun retries := 0 for { - err := descriptor.Upload(u.Transfer.Context(), progressOutput) + remoteDescriptor, err := descriptor.Upload(u.Transfer.Context(), progressOutput) if err == nil { + u.remoteDescriptor = remoteDescriptor break } diff --git a/distribution/xfer/upload_test.go b/distribution/xfer/upload_test.go index d87dfcaa25..275d24268d 100644 --- a/distribution/xfer/upload_test.go +++ b/distribution/xfer/upload_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/docker/distribution" "github.com/docker/distribution/digest" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/progress" @@ -35,13 +36,17 @@ func (u *mockUploadDescriptor) DiffID() layer.DiffID { return u.diffID } +// SetRemoteDescriptor is not used in the mock. +func (u *mockUploadDescriptor) SetRemoteDescriptor(remoteDescriptor distribution.Descriptor) { +} + // Upload is called to perform the upload. -func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) error { +func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { if u.currentUploads != nil { defer atomic.AddInt32(u.currentUploads, -1) if atomic.AddInt32(u.currentUploads, 1) > maxUploadConcurrency { - return errors.New("concurrency limit exceeded") + return distribution.Descriptor{}, errors.New("concurrency limit exceeded") } } @@ -49,7 +54,7 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre for i := int64(0); i <= 10; i++ { select { case <-ctx.Done(): - return ctx.Err() + return distribution.Descriptor{}, ctx.Err() case <-time.After(10 * time.Millisecond): progressOutput.WriteProgress(progress.Progress{ID: u.ID(), Current: i, Total: 10}) } @@ -57,10 +62,10 @@ func (u *mockUploadDescriptor) Upload(ctx context.Context, progressOutput progre if u.simulateRetries != 0 { u.simulateRetries-- - return errors.New("simulating retry") + return distribution.Descriptor{}, errors.New("simulating retry") } - return nil + return distribution.Descriptor{}, nil } func uploadDescriptors(currentUploads *int32) []UploadDescriptor { diff --git a/integration-cli/docker_cli_push_test.go b/integration-cli/docker_cli_push_test.go index 0e6717644d..4cea6cd5e7 100644 --- a/integration-cli/docker_cli_push_test.go +++ b/integration-cli/docker_cli_push_test.go @@ -148,6 +148,61 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) { testPushEmptyLayer(c) } +// testConcurrentPush pushes multiple tags to the same repo +// concurrently. +func testConcurrentPush(c *check.C) { + repoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) + + repos := []string{} + for _, tag := range []string{"push1", "push2", "push3"} { + repo := fmt.Sprintf("%v:%v", repoName, tag) + _, err := buildImage(repo, fmt.Sprintf(` + FROM busybox + ENTRYPOINT ["/bin/echo"] + ENV FOO foo + ENV BAR bar + CMD echo %s +`, repo), true) + c.Assert(err, checker.IsNil) + repos = append(repos, repo) + } + + // Push tags, in parallel + results := make(chan error) + + for _, repo := range repos { + go func(repo string) { + _, _, err := runCommandWithOutput(exec.Command(dockerBinary, "push", repo)) + results <- err + }(repo) + } + + for range repos { + err := <-results + c.Assert(err, checker.IsNil, check.Commentf("concurrent push failed with error: %v", err)) + } + + // Clear local images store. + args := append([]string{"rmi"}, repos...) + dockerCmd(c, args...) + + // Re-pull and run individual tags, to make sure pushes succeeded + for _, repo := range repos { + dockerCmd(c, "pull", repo) + dockerCmd(c, "inspect", repo) + out, _ := dockerCmd(c, "run", "--rm", repo) + c.Assert(strings.TrimSpace(out), checker.Equals, "/bin/sh -c echo "+repo) + } +} + +func (s *DockerRegistrySuite) TestConcurrentPush(c *check.C) { + testConcurrentPush(c) +} + +func (s *DockerSchema1RegistrySuite) TestConcurrentPush(c *check.C) { + testConcurrentPush(c) +} + func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) { sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL) // tag the image to upload it to the private registry