diff --git a/distribution/push.go b/distribution/push.go index f64d0f822c..026c29f726 100644 --- a/distribution/push.go +++ b/distribution/push.go @@ -14,58 +14,40 @@ import ( "github.com/sirupsen/logrus" ) -// Pusher is an interface that abstracts pushing for different API versions. -type Pusher interface { - // Push tries to push the image configured at the creation of Pusher. - // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint. - // - // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. - Push(ctx context.Context) error -} - const compressionBufSize = 32768 -// NewPusher creates a new Pusher interface that will push to either a v1 or v2 -// registry. The endpoint argument contains a Version field that determines -// whether a v1 or v2 pusher will be created. The other parameters are passed -// through to the underlying pusher implementation for use during the actual -// push operation. -func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) { - switch endpoint.Version { - case registry.APIVersion2: - return &v2Pusher{ - v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore), - ref: ref, - endpoint: endpoint, - repoInfo: repoInfo, - config: imagePushConfig, - }, nil - case registry.APIVersion1: - return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL) +// newPusher creates a new pusher for pushing to a v2 registry. +// The parameters are passed through to the underlying pusher implementation for +// use during the actual push operation. +func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher { + return &pusher{ + metadataService: metadata.NewV2MetadataService(config.MetadataStore), + ref: ref, + endpoint: endpoint, + repoInfo: repoInfo, + config: config, } - return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL) } -// Push initiates a push operation on ref. -// ref is the specific variant of the image to be pushed. -// If no tag is provided, all tags will be pushed. -func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error { +// Push initiates a push operation on ref. ref is the specific variant of the +// image to push. If no tag is provided, all tags are pushed. +func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error { // FIXME: Allow to interrupt current push when new push of same image is done. // Resolve the Repository name from fqn to RepositoryInfo - repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref) + repoInfo, err := config.RegistryService.ResolveRepository(ref) if err != nil { return err } - endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) + endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) if err != nil { return err } - progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) + progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) - associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name) + associations := config.ReferenceStore.ReferencesByName(repoInfo.Name) if len(associations) == 0 { return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name)) } @@ -87,14 +69,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo } } - logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version) + logrus.Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL) - pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig) - if err != nil { - lastErr = err - continue - } - if err := pusher.Push(ctx); err != nil { + if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil { // Was this push cancelled? If so, don't try to fall // back. select { @@ -115,7 +92,7 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo return err } - imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push") + config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push") return nil } diff --git a/distribution/push_v2.go b/distribution/push_v2.go index 4aa4f60175..372705dc9c 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -34,13 +34,13 @@ const ( middleLayerMaximumSize = 10 * (1 << 20) // 10MB ) -type v2Pusher struct { - v2MetadataService metadata.V2MetadataService - ref reference.Named - endpoint registry.APIEndpoint - repoInfo *registry.RepositoryInfo - config *ImagePushConfig - repo distribution.Repository +type pusher struct { + metadataService metadata.V2MetadataService + ref reference.Named + endpoint registry.APIEndpoint + repoInfo *registry.RepositoryInfo + config *ImagePushConfig + repo distribution.Repository // pushState is state built by the Upload functions. pushState pushState @@ -56,7 +56,8 @@ type pushState struct { hasAuthInfo bool } -func (p *v2Pusher) Push(ctx context.Context) (err error) { +// TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. +func (p *pusher) push(ctx context.Context) (err error) { p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor) p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull") @@ -66,7 +67,7 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) { return err } - if err = p.pushV2Repository(ctx); err != nil { + if err = p.pushRepository(ctx); err != nil { if continueOnError(err, p.endpoint.Mirror) { return fallbackError{ err: err, @@ -77,14 +78,14 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) { return err } -func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { +func (p *pusher) pushRepository(ctx context.Context) (err error) { if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged { imageID, err := p.config.ReferenceStore.Get(p.ref) if err != nil { return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref)) } - return p.pushV2Tag(ctx, namedTagged, imageID) + return p.pushTag(ctx, namedTagged, imageID) } if !reference.IsNameOnly(p.ref) { @@ -96,7 +97,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) { if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged { pushed++ - if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil { + if err := p.pushTag(ctx, namedTagged, association.ID); err != nil { return err } } @@ -109,7 +110,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { return nil } -func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { +func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref)) imgConfig, err := p.config.ImageStore.Get(ctx, id) @@ -135,14 +136,14 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id var descriptors []xfer.UploadDescriptor - descriptorTemplate := v2PushDescriptor{ - v2MetadataService: p.v2MetadataService, - hmacKey: hmacKey, - repoInfo: p.repoInfo.Name, - ref: p.ref, - endpoint: p.endpoint, - repo: p.repo, - pushState: &p.pushState, + descriptorTemplate := pushDescriptor{ + metadataService: p.metadataService, + hmacKey: hmacKey, + repoInfo: p.repoInfo.Name, + ref: p.ref, + endpoint: p.endpoint, + repo: p.repo, + pushState: &p.pushState, } // Loop bounds condition is to avoid pushing the base layer on Windows. @@ -243,7 +244,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild // descriptors is in reverse order; iterate backwards to get references // appended in the right order. for i := len(descriptors) - 1; i >= 0; i-- { - if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil { + if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil { return nil, err } } @@ -251,33 +252,33 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild return builder.Build(ctx) } -type v2PushDescriptor struct { - layer PushLayer - v2MetadataService metadata.V2MetadataService - hmacKey []byte - repoInfo reference.Named - ref reference.Named - endpoint registry.APIEndpoint - repo distribution.Repository - pushState *pushState - remoteDescriptor distribution.Descriptor +type pushDescriptor struct { + layer PushLayer + metadataService metadata.V2MetadataService + hmacKey []byte + repoInfo reference.Named + ref reference.Named + endpoint registry.APIEndpoint + repo distribution.Repository + pushState *pushState + remoteDescriptor distribution.Descriptor // a set of digests whose presence has been checked in a target repository checkedDigests map[digest.Digest]struct{} } -func (pd *v2PushDescriptor) Key() string { +func (pd *pushDescriptor) Key() string { return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String() } -func (pd *v2PushDescriptor) ID() string { +func (pd *pushDescriptor) ID() string { return stringid.TruncateID(pd.layer.DiffID().String()) } -func (pd *v2PushDescriptor) DiffID() layer.DiffID { +func (pd *pushDescriptor) DiffID() layer.DiffID { return pd.layer.DiffID() } -func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { +func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { // Skip foreign layers unless this registry allows nondistributable artifacts. if !pd.endpoint.AllowNondistributableArtifacts { if fs, ok := pd.layer.(distribution.Describable); ok { @@ -303,10 +304,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer) // Do we have any metadata associated with this layer's DiffID? - v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) + metaData, err := pd.metadataService.GetMetadata(diffID) if err == nil { // check for blob existence in the target repository - descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata) + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData) if exists || err != nil { return descriptor, err } @@ -319,7 +320,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. var layerUpload distribution.BlobWriter // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload - candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata) + candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData) isUnauthorizedError := false for _, mc := range candidates { mountCandidate := mc @@ -329,8 +330,8 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. if len(mountCandidate.SourceRepository) > 0 { namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository) if err != nil { - logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err) - pd.v2MetadataService.Remove(mountCandidate) + logrus.WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef)) + _ = pd.metadataService.Remove(mountCandidate) continue } @@ -338,13 +339,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // with only path to set mount from with remoteRef, err := reference.WithName(reference.Path(namedRef)) if err != nil { - logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err) + logrus.WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef)) continue } canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest) if err != nil { - logrus.Errorf("failed to make canonical reference: %v", err) + logrus.WithError(err).Error("failed to make canonical reference") continue } @@ -366,7 +367,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. pd.pushState.Unlock() // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: err.Descriptor.Digest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -400,7 +401,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. cause = fmt.Sprintf("an error: %v", err.Error()) } logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause) - pd.v2MetadataService.Remove(mountCandidate) + _ = pd.metadataService.Remove(mountCandidate) } if lu != nil { @@ -412,7 +413,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. if maxExistenceChecks-len(pd.checkedDigests) > 0 { // do additional layer existence checks with other known digests if any - descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata) + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData) if exists || err != nil { return descriptor, err } @@ -430,15 +431,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload) } -func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { +func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { pd.remoteDescriptor = descriptor } -func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { +func (pd *pushDescriptor) Descriptor() distribution.Descriptor { return pd.remoteDescriptor } -func (pd *v2PushDescriptor) uploadUsingSession( +func (pd *pushDescriptor) uploadUsingSession( ctx context.Context, progressOutput progress.Output, diffID layer.DiffID, @@ -485,7 +486,7 @@ func (pd *v2PushDescriptor) uploadUsingSession( progress.Update(progressOutput, pd.ID(), "Pushed") // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: pushDigest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -509,7 +510,7 @@ func (pd *v2PushDescriptor) uploadUsingSession( // slice. If it finds one that the registry knows about, it returns the known digest and "true". If // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository // (not just the target one). -func (pd *v2PushDescriptor) layerAlreadyExists( +func (pd *pushDescriptor) layerAlreadyExists( ctx context.Context, progressOutput progress.Output, diffID layer.DiffID, @@ -558,7 +559,7 @@ attempts: case nil: if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) { // cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: desc.Digest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -571,7 +572,7 @@ attempts: case distribution.ErrBlobUnknown: if meta.SourceRepository == pd.repoInfo.Name() { // remove the mapping to the target repository - pd.v2MetadataService.Remove(*meta) + pd.metadataService.Remove(*meta) } default: logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name()) diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go index ea7ce00ec8..def4233786 100644 --- a/distribution/push_v2_test.go +++ b/distribution/push_v2_test.go @@ -395,16 +395,16 @@ func TestLayerAlreadyExists(t *testing.T) { } ctx := context.Background() ms := &mockV2MetadataService{} - pd := &v2PushDescriptor{ + pd := &pushDescriptor{ hmacKey: []byte(tc.hmacKey), repoInfo: repoInfo, layer: &storeLayer{ Layer: layer.EmptyLayer, }, - repo: repo, - v2MetadataService: ms, - pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, - checkedDigests: make(map[digest.Digest]struct{}), + repo: repo, + metadataService: ms, + pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, + checkedDigests: make(map[digest.Digest]struct{}), } desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata) @@ -522,7 +522,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) { } imagePushConfig.ReferenceStore = &mockReferenceStore{} repoInfo, _ := reference.ParseNormalizedNamed("xujihui1985/test.img") - pusher := &v2Pusher{ + pusher := &pusher{ config: imagePushConfig, repoInfo: ®istry.RepositoryInfo{ Name: repoInfo, @@ -536,7 +536,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) { TrimHostname: true, }, } - pusher.Push(context.Background()) + pusher.push(context.Background()) if pusher.pushState.hasAuthInfo != authInfo.expected { t.Errorf("hasAuthInfo does not match expected: %t != %t", authInfo.expected, pusher.pushState.hasAuthInfo) } @@ -598,14 +598,14 @@ func TestPushRegistryWhenAuthInfoEmpty(t *testing.T) { requests: []string{}, }, } - pd := &v2PushDescriptor{ + pd := &pushDescriptor{ hmacKey: []byte("abcd"), repoInfo: repoInfo, layer: &storeLayer{ Layer: layer.EmptyLayer, }, - repo: repo, - v2MetadataService: ms, + repo: repo, + metadataService: ms, pushState: &pushState{ remoteLayers: make(map[layer.DiffID]distribution.Descriptor), hasAuthInfo: false,