From 1e75ab0ab9ac2517eba1ac4966b6ad6ade4b71e5 Mon Sep 17 00:00:00 2001 From: Sebastiaan van Stijn Date: Sun, 27 Feb 2022 21:33:35 +0100 Subject: [PATCH] distribution: remove Pusher interface, NewPusher(), and redundant V1 checks It's only used internally, so we can refer to the implementation itself. Given that RegistryService.LookupPushEndpoints now only returns V2 endpoints, we no longer need to check if an endpoint is possibly V1. Also rename some types that had "v2" in their name, now that we only support v2. Signed-off-by: Sebastiaan van Stijn --- distribution/push.go | 63 +++++++-------------- distribution/push_v2.go | 107 ++++++++++++++++++----------------- distribution/push_v2_test.go | 20 +++---- 3 files changed, 84 insertions(+), 106 deletions(-) diff --git a/distribution/push.go b/distribution/push.go index f64d0f822c..026c29f726 100644 --- a/distribution/push.go +++ b/distribution/push.go @@ -14,58 +14,40 @@ import ( "github.com/sirupsen/logrus" ) -// Pusher is an interface that abstracts pushing for different API versions. -type Pusher interface { - // Push tries to push the image configured at the creation of Pusher. - // Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint. - // - // TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. - Push(ctx context.Context) error -} - const compressionBufSize = 32768 -// NewPusher creates a new Pusher interface that will push to either a v1 or v2 -// registry. The endpoint argument contains a Version field that determines -// whether a v1 or v2 pusher will be created. The other parameters are passed -// through to the underlying pusher implementation for use during the actual -// push operation. -func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) { - switch endpoint.Version { - case registry.APIVersion2: - return &v2Pusher{ - v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore), - ref: ref, - endpoint: endpoint, - repoInfo: repoInfo, - config: imagePushConfig, - }, nil - case registry.APIVersion1: - return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL) +// newPusher creates a new pusher for pushing to a v2 registry. +// The parameters are passed through to the underlying pusher implementation for +// use during the actual push operation. +func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher { + return &pusher{ + metadataService: metadata.NewV2MetadataService(config.MetadataStore), + ref: ref, + endpoint: endpoint, + repoInfo: repoInfo, + config: config, } - return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL) } -// Push initiates a push operation on ref. -// ref is the specific variant of the image to be pushed. -// If no tag is provided, all tags will be pushed. -func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error { +// Push initiates a push operation on ref. ref is the specific variant of the +// image to push. If no tag is provided, all tags are pushed. +func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error { // FIXME: Allow to interrupt current push when new push of same image is done. // Resolve the Repository name from fqn to RepositoryInfo - repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref) + repoInfo, err := config.RegistryService.ResolveRepository(ref) if err != nil { return err } - endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) + endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name)) if err != nil { return err } - progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) + progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name()) - associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name) + associations := config.ReferenceStore.ReferencesByName(repoInfo.Name) if len(associations) == 0 { return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name)) } @@ -87,14 +69,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo } } - logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version) + logrus.Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL) - pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig) - if err != nil { - lastErr = err - continue - } - if err := pusher.Push(ctx); err != nil { + if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil { // Was this push cancelled? If so, don't try to fall // back. select { @@ -115,7 +92,7 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo return err } - imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push") + config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push") return nil } diff --git a/distribution/push_v2.go b/distribution/push_v2.go index 4aa4f60175..372705dc9c 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -34,13 +34,13 @@ const ( middleLayerMaximumSize = 10 * (1 << 20) // 10MB ) -type v2Pusher struct { - v2MetadataService metadata.V2MetadataService - ref reference.Named - endpoint registry.APIEndpoint - repoInfo *registry.RepositoryInfo - config *ImagePushConfig - repo distribution.Repository +type pusher struct { + metadataService metadata.V2MetadataService + ref reference.Named + endpoint registry.APIEndpoint + repoInfo *registry.RepositoryInfo + config *ImagePushConfig + repo distribution.Repository // pushState is state built by the Upload functions. pushState pushState @@ -56,7 +56,8 @@ type pushState struct { hasAuthInfo bool } -func (p *v2Pusher) Push(ctx context.Context) (err error) { +// TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic. +func (p *pusher) push(ctx context.Context) (err error) { p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor) p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull") @@ -66,7 +67,7 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) { return err } - if err = p.pushV2Repository(ctx); err != nil { + if err = p.pushRepository(ctx); err != nil { if continueOnError(err, p.endpoint.Mirror) { return fallbackError{ err: err, @@ -77,14 +78,14 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) { return err } -func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { +func (p *pusher) pushRepository(ctx context.Context) (err error) { if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged { imageID, err := p.config.ReferenceStore.Get(p.ref) if err != nil { return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref)) } - return p.pushV2Tag(ctx, namedTagged, imageID) + return p.pushTag(ctx, namedTagged, imageID) } if !reference.IsNameOnly(p.ref) { @@ -96,7 +97,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) { if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged { pushed++ - if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil { + if err := p.pushTag(ctx, namedTagged, association.ID); err != nil { return err } } @@ -109,7 +110,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { return nil } -func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { +func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref)) imgConfig, err := p.config.ImageStore.Get(ctx, id) @@ -135,14 +136,14 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id var descriptors []xfer.UploadDescriptor - descriptorTemplate := v2PushDescriptor{ - v2MetadataService: p.v2MetadataService, - hmacKey: hmacKey, - repoInfo: p.repoInfo.Name, - ref: p.ref, - endpoint: p.endpoint, - repo: p.repo, - pushState: &p.pushState, + descriptorTemplate := pushDescriptor{ + metadataService: p.metadataService, + hmacKey: hmacKey, + repoInfo: p.repoInfo.Name, + ref: p.ref, + endpoint: p.endpoint, + repo: p.repo, + pushState: &p.pushState, } // Loop bounds condition is to avoid pushing the base layer on Windows. @@ -243,7 +244,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild // descriptors is in reverse order; iterate backwards to get references // appended in the right order. for i := len(descriptors) - 1; i >= 0; i-- { - if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil { + if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil { return nil, err } } @@ -251,33 +252,33 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild return builder.Build(ctx) } -type v2PushDescriptor struct { - layer PushLayer - v2MetadataService metadata.V2MetadataService - hmacKey []byte - repoInfo reference.Named - ref reference.Named - endpoint registry.APIEndpoint - repo distribution.Repository - pushState *pushState - remoteDescriptor distribution.Descriptor +type pushDescriptor struct { + layer PushLayer + metadataService metadata.V2MetadataService + hmacKey []byte + repoInfo reference.Named + ref reference.Named + endpoint registry.APIEndpoint + repo distribution.Repository + pushState *pushState + remoteDescriptor distribution.Descriptor // a set of digests whose presence has been checked in a target repository checkedDigests map[digest.Digest]struct{} } -func (pd *v2PushDescriptor) Key() string { +func (pd *pushDescriptor) Key() string { return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String() } -func (pd *v2PushDescriptor) ID() string { +func (pd *pushDescriptor) ID() string { return stringid.TruncateID(pd.layer.DiffID().String()) } -func (pd *v2PushDescriptor) DiffID() layer.DiffID { +func (pd *pushDescriptor) DiffID() layer.DiffID { return pd.layer.DiffID() } -func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { +func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) { // Skip foreign layers unless this registry allows nondistributable artifacts. if !pd.endpoint.AllowNondistributableArtifacts { if fs, ok := pd.layer.(distribution.Describable); ok { @@ -303,10 +304,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer) // Do we have any metadata associated with this layer's DiffID? - v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) + metaData, err := pd.metadataService.GetMetadata(diffID) if err == nil { // check for blob existence in the target repository - descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata) + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData) if exists || err != nil { return descriptor, err } @@ -319,7 +320,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. var layerUpload distribution.BlobWriter // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload - candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata) + candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData) isUnauthorizedError := false for _, mc := range candidates { mountCandidate := mc @@ -329,8 +330,8 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. if len(mountCandidate.SourceRepository) > 0 { namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository) if err != nil { - logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err) - pd.v2MetadataService.Remove(mountCandidate) + logrus.WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef)) + _ = pd.metadataService.Remove(mountCandidate) continue } @@ -338,13 +339,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. // with only path to set mount from with remoteRef, err := reference.WithName(reference.Path(namedRef)) if err != nil { - logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err) + logrus.WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef)) continue } canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest) if err != nil { - logrus.Errorf("failed to make canonical reference: %v", err) + logrus.WithError(err).Error("failed to make canonical reference") continue } @@ -366,7 +367,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. pd.pushState.Unlock() // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: err.Descriptor.Digest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -400,7 +401,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. cause = fmt.Sprintf("an error: %v", err.Error()) } logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause) - pd.v2MetadataService.Remove(mountCandidate) + _ = pd.metadataService.Remove(mountCandidate) } if lu != nil { @@ -412,7 +413,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. if maxExistenceChecks-len(pd.checkedDigests) > 0 { // do additional layer existence checks with other known digests if any - descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata) + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData) if exists || err != nil { return descriptor, err } @@ -430,15 +431,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload) } -func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { +func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { pd.remoteDescriptor = descriptor } -func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { +func (pd *pushDescriptor) Descriptor() distribution.Descriptor { return pd.remoteDescriptor } -func (pd *v2PushDescriptor) uploadUsingSession( +func (pd *pushDescriptor) uploadUsingSession( ctx context.Context, progressOutput progress.Output, diffID layer.DiffID, @@ -485,7 +486,7 @@ func (pd *v2PushDescriptor) uploadUsingSession( progress.Update(progressOutput, pd.ID(), "Pushed") // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: pushDigest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -509,7 +510,7 @@ func (pd *v2PushDescriptor) uploadUsingSession( // slice. If it finds one that the registry knows about, it returns the known digest and "true". If // "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository // (not just the target one). -func (pd *v2PushDescriptor) layerAlreadyExists( +func (pd *pushDescriptor) layerAlreadyExists( ctx context.Context, progressOutput progress.Output, diffID layer.DiffID, @@ -558,7 +559,7 @@ attempts: case nil: if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) { // cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ Digest: desc.Digest, SourceRepository: pd.repoInfo.Name(), }); err != nil { @@ -571,7 +572,7 @@ attempts: case distribution.ErrBlobUnknown: if meta.SourceRepository == pd.repoInfo.Name() { // remove the mapping to the target repository - pd.v2MetadataService.Remove(*meta) + pd.metadataService.Remove(*meta) } default: logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name()) diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go index ea7ce00ec8..def4233786 100644 --- a/distribution/push_v2_test.go +++ b/distribution/push_v2_test.go @@ -395,16 +395,16 @@ func TestLayerAlreadyExists(t *testing.T) { } ctx := context.Background() ms := &mockV2MetadataService{} - pd := &v2PushDescriptor{ + pd := &pushDescriptor{ hmacKey: []byte(tc.hmacKey), repoInfo: repoInfo, layer: &storeLayer{ Layer: layer.EmptyLayer, }, - repo: repo, - v2MetadataService: ms, - pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, - checkedDigests: make(map[digest.Digest]struct{}), + repo: repo, + metadataService: ms, + pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, + checkedDigests: make(map[digest.Digest]struct{}), } desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata) @@ -522,7 +522,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) { } imagePushConfig.ReferenceStore = &mockReferenceStore{} repoInfo, _ := reference.ParseNormalizedNamed("xujihui1985/test.img") - pusher := &v2Pusher{ + pusher := &pusher{ config: imagePushConfig, repoInfo: ®istry.RepositoryInfo{ Name: repoInfo, @@ -536,7 +536,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) { TrimHostname: true, }, } - pusher.Push(context.Background()) + pusher.push(context.Background()) if pusher.pushState.hasAuthInfo != authInfo.expected { t.Errorf("hasAuthInfo does not match expected: %t != %t", authInfo.expected, pusher.pushState.hasAuthInfo) } @@ -598,14 +598,14 @@ func TestPushRegistryWhenAuthInfoEmpty(t *testing.T) { requests: []string{}, }, } - pd := &v2PushDescriptor{ + pd := &pushDescriptor{ hmacKey: []byte("abcd"), repoInfo: repoInfo, layer: &storeLayer{ Layer: layer.EmptyLayer, }, - repo: repo, - v2MetadataService: ms, + repo: repo, + metadataService: ms, pushState: &pushState{ remoteLayers: make(map[layer.DiffID]distribution.Descriptor), hasAuthInfo: false,