distribution: remove Pusher interface, NewPusher(), and redundant V1 checks

It's only used internally, so we can refer to the implementation itself. Given
that RegistryService.LookupPushEndpoints now only returns V2 endpoints, we
no longer need to check if an endpoint is possibly V1.

Also rename some types that had "v2" in their name, now that we only support v2.

Signed-off-by: Sebastiaan van Stijn <github@gone.nl>
This commit is contained in:
Sebastiaan van Stijn 2022-02-27 21:33:35 +01:00
parent 41999abcbe
commit 1e75ab0ab9
No known key found for this signature in database
GPG Key ID: 76698F39D527CE8C
3 changed files with 84 additions and 106 deletions

View File

@ -14,58 +14,40 @@ import (
"github.com/sirupsen/logrus"
)
// Pusher is an interface that abstracts pushing for different API versions.
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
// Push returns an error if any, as well as a boolean that determines whether to retry Push on the next configured endpoint.
//
// TODO(tiborvass): have Push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
Push(ctx context.Context) error
}
const compressionBufSize = 32768
// NewPusher creates a new Pusher interface that will push to either a v1 or v2
// registry. The endpoint argument contains a Version field that determines
// whether a v1 or v2 pusher will be created. The other parameters are passed
// through to the underlying pusher implementation for use during the actual
// push operation.
func NewPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePushConfig *ImagePushConfig) (Pusher, error) {
switch endpoint.Version {
case registry.APIVersion2:
return &v2Pusher{
v2MetadataService: metadata.NewV2MetadataService(imagePushConfig.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: imagePushConfig,
}, nil
case registry.APIVersion1:
return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL)
// newPusher creates a new pusher for pushing to a v2 registry.
// The parameters are passed through to the underlying pusher implementation for
// use during the actual push operation.
func newPusher(ref reference.Named, endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePushConfig) *pusher {
return &pusher{
metadataService: metadata.NewV2MetadataService(config.MetadataStore),
ref: ref,
endpoint: endpoint,
repoInfo: repoInfo,
config: config,
}
return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL)
}
// Push initiates a push operation on ref.
// ref is the specific variant of the image to be pushed.
// If no tag is provided, all tags will be pushed.
func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushConfig) error {
// Push initiates a push operation on ref. ref is the specific variant of the
// image to push. If no tag is provided, all tags are pushed.
func Push(ctx context.Context, ref reference.Named, config *ImagePushConfig) error {
// FIXME: Allow to interrupt current push when new push of same image is done.
// Resolve the Repository name from fqn to RepositoryInfo
repoInfo, err := imagePushConfig.RegistryService.ResolveRepository(ref)
repoInfo, err := config.RegistryService.ResolveRepository(ref)
if err != nil {
return err
}
endpoints, err := imagePushConfig.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
endpoints, err := config.RegistryService.LookupPushEndpoints(reference.Domain(repoInfo.Name))
if err != nil {
return err
}
progress.Messagef(imagePushConfig.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
progress.Messagef(config.ProgressOutput, "", "The push refers to repository [%s]", repoInfo.Name.Name())
associations := imagePushConfig.ReferenceStore.ReferencesByName(repoInfo.Name)
associations := config.ReferenceStore.ReferencesByName(repoInfo.Name)
if len(associations) == 0 {
return fmt.Errorf("An image does not exist locally with the tag: %s", reference.FamiliarName(repoInfo.Name))
}
@ -87,14 +69,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
}
}
logrus.Debugf("Trying to push %s to %s %s", repoInfo.Name.Name(), endpoint.URL, endpoint.Version)
logrus.Debugf("Trying to push %s to %s", repoInfo.Name.Name(), endpoint.URL)
pusher, err := NewPusher(ref, endpoint, repoInfo, imagePushConfig)
if err != nil {
lastErr = err
continue
}
if err := pusher.Push(ctx); err != nil {
if err := newPusher(ref, endpoint, repoInfo, config).push(ctx); err != nil {
// Was this push cancelled? If so, don't try to fall
// back.
select {
@ -115,7 +92,7 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
return err
}
imagePushConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "push")
return nil
}

View File

@ -34,13 +34,13 @@ const (
middleLayerMaximumSize = 10 * (1 << 20) // 10MB
)
type v2Pusher struct {
v2MetadataService metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository
type pusher struct {
metadataService metadata.V2MetadataService
ref reference.Named
endpoint registry.APIEndpoint
repoInfo *registry.RepositoryInfo
config *ImagePushConfig
repo distribution.Repository
// pushState is state built by the Upload functions.
pushState pushState
@ -56,7 +56,8 @@ type pushState struct {
hasAuthInfo bool
}
func (p *v2Pusher) Push(ctx context.Context) (err error) {
// TODO(tiborvass): have push() take a reference to repository + tag, so that the pusher itself is repository-agnostic.
func (p *pusher) push(ctx context.Context) (err error) {
p.pushState.remoteLayers = make(map[layer.DiffID]distribution.Descriptor)
p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "push", "pull")
@ -66,7 +67,7 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) {
return err
}
if err = p.pushV2Repository(ctx); err != nil {
if err = p.pushRepository(ctx); err != nil {
if continueOnError(err, p.endpoint.Mirror) {
return fallbackError{
err: err,
@ -77,14 +78,14 @@ func (p *v2Pusher) Push(ctx context.Context) (err error) {
return err
}
func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
func (p *pusher) pushRepository(ctx context.Context) (err error) {
if namedTagged, isNamedTagged := p.ref.(reference.NamedTagged); isNamedTagged {
imageID, err := p.config.ReferenceStore.Get(p.ref)
if err != nil {
return fmt.Errorf("tag does not exist: %s", reference.FamiliarString(p.ref))
}
return p.pushV2Tag(ctx, namedTagged, imageID)
return p.pushTag(ctx, namedTagged, imageID)
}
if !reference.IsNameOnly(p.ref) {
@ -96,7 +97,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
for _, association := range p.config.ReferenceStore.ReferencesByName(p.ref) {
if namedTagged, isNamedTagged := association.Ref.(reference.NamedTagged); isNamedTagged {
pushed++
if err := p.pushV2Tag(ctx, namedTagged, association.ID); err != nil {
if err := p.pushTag(ctx, namedTagged, association.ID); err != nil {
return err
}
}
@ -109,7 +110,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
return nil
}
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
func (p *pusher) pushTag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref))
imgConfig, err := p.config.ImageStore.Get(ctx, id)
@ -135,14 +136,14 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
var descriptors []xfer.UploadDescriptor
descriptorTemplate := v2PushDescriptor{
v2MetadataService: p.v2MetadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo.Name,
ref: p.ref,
endpoint: p.endpoint,
repo: p.repo,
pushState: &p.pushState,
descriptorTemplate := pushDescriptor{
metadataService: p.metadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo.Name,
ref: p.ref,
endpoint: p.endpoint,
repo: p.repo,
pushState: &p.pushState,
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
@ -243,7 +244,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
// descriptors is in reverse order; iterate backwards to get references
// appended in the right order.
for i := len(descriptors) - 1; i >= 0; i-- {
if err := builder.AppendReference(descriptors[i].(*v2PushDescriptor)); err != nil {
if err := builder.AppendReference(descriptors[i].(*pushDescriptor)); err != nil {
return nil, err
}
}
@ -251,33 +252,33 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
return builder.Build(ctx)
}
type v2PushDescriptor struct {
layer PushLayer
v2MetadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
endpoint registry.APIEndpoint
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
type pushDescriptor struct {
layer PushLayer
metadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
endpoint registry.APIEndpoint
repo distribution.Repository
pushState *pushState
remoteDescriptor distribution.Descriptor
// a set of digests whose presence has been checked in a target repository
checkedDigests map[digest.Digest]struct{}
}
func (pd *v2PushDescriptor) Key() string {
func (pd *pushDescriptor) Key() string {
return "v2push:" + pd.ref.Name() + " " + pd.layer.DiffID().String()
}
func (pd *v2PushDescriptor) ID() string {
func (pd *pushDescriptor) ID() string {
return stringid.TruncateID(pd.layer.DiffID().String())
}
func (pd *v2PushDescriptor) DiffID() layer.DiffID {
func (pd *pushDescriptor) DiffID() layer.DiffID {
return pd.layer.DiffID()
}
func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
func (pd *pushDescriptor) Upload(ctx context.Context, progressOutput progress.Output) (distribution.Descriptor, error) {
// Skip foreign layers unless this registry allows nondistributable artifacts.
if !pd.endpoint.AllowNondistributableArtifacts {
if fs, ok := pd.layer.(distribution.Describable); ok {
@ -303,10 +304,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer)
// Do we have any metadata associated with this layer's DiffID?
v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID)
metaData, err := pd.metadataService.GetMetadata(diffID)
if err == nil {
// check for blob existence in the target repository
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, v2Metadata)
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, true, 1, metaData)
if exists || err != nil {
return descriptor, err
}
@ -319,7 +320,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
var layerUpload distribution.BlobWriter
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata)
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, metaData)
isUnauthorizedError := false
for _, mc := range candidates {
mountCandidate := mc
@ -329,8 +330,8 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if len(mountCandidate.SourceRepository) > 0 {
namedRef, err := reference.ParseNormalizedNamed(mountCandidate.SourceRepository)
if err != nil {
logrus.Errorf("failed to parse source repository reference %v: %v", reference.FamiliarString(namedRef), err)
pd.v2MetadataService.Remove(mountCandidate)
logrus.WithError(err).Errorf("failed to parse source repository reference %v", reference.FamiliarString(namedRef))
_ = pd.metadataService.Remove(mountCandidate)
continue
}
@ -338,13 +339,13 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// with only path to set mount from with
remoteRef, err := reference.WithName(reference.Path(namedRef))
if err != nil {
logrus.Errorf("failed to make remote reference out of %q: %v", reference.Path(namedRef), err)
logrus.WithError(err).Errorf("failed to make remote reference out of %q", reference.Path(namedRef))
continue
}
canonicalRef, err := reference.WithDigest(reference.TrimNamed(remoteRef), mountCandidate.Digest)
if err != nil {
logrus.Errorf("failed to make canonical reference: %v", err)
logrus.WithError(err).Error("failed to make canonical reference")
continue
}
@ -366,7 +367,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: err.Descriptor.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -400,7 +401,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
cause = fmt.Sprintf("an error: %v", err.Error())
}
logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
pd.v2MetadataService.Remove(mountCandidate)
_ = pd.metadataService.Remove(mountCandidate)
}
if lu != nil {
@ -412,7 +413,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
if maxExistenceChecks-len(pd.checkedDigests) > 0 {
// do additional layer existence checks with other known digests if any
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata)
descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), metaData)
if exists || err != nil {
return descriptor, err
}
@ -430,15 +431,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
return pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
func (pd *pushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
func (pd *pushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}
func (pd *v2PushDescriptor) uploadUsingSession(
func (pd *pushDescriptor) uploadUsingSession(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
@ -485,7 +486,7 @@ func (pd *v2PushDescriptor) uploadUsingSession(
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: pushDigest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -509,7 +510,7 @@ func (pd *v2PushDescriptor) uploadUsingSession(
// slice. If it finds one that the registry knows about, it returns the known digest and "true". If
// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository
// (not just the target one).
func (pd *v2PushDescriptor) layerAlreadyExists(
func (pd *pushDescriptor) layerAlreadyExists(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
@ -558,7 +559,7 @@ attempts:
case nil:
if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.Name() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) {
// cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
if err := pd.metadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: desc.Digest,
SourceRepository: pd.repoInfo.Name(),
}); err != nil {
@ -571,7 +572,7 @@ attempts:
case distribution.ErrBlobUnknown:
if meta.SourceRepository == pd.repoInfo.Name() {
// remove the mapping to the target repository
pd.v2MetadataService.Remove(*meta)
pd.metadataService.Remove(*meta)
}
default:
logrus.WithError(err).Debugf("Failed to check for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.Name())

View File

@ -395,16 +395,16 @@ func TestLayerAlreadyExists(t *testing.T) {
}
ctx := context.Background()
ms := &mockV2MetadataService{}
pd := &v2PushDescriptor{
pd := &pushDescriptor{
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},
checkedDigests: make(map[digest.Digest]struct{}),
repo: repo,
metadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},
checkedDigests: make(map[digest.Digest]struct{}),
}
desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata)
@ -522,7 +522,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) {
}
imagePushConfig.ReferenceStore = &mockReferenceStore{}
repoInfo, _ := reference.ParseNormalizedNamed("xujihui1985/test.img")
pusher := &v2Pusher{
pusher := &pusher{
config: imagePushConfig,
repoInfo: &registry.RepositoryInfo{
Name: repoInfo,
@ -536,7 +536,7 @@ func TestWhenEmptyAuthConfig(t *testing.T) {
TrimHostname: true,
},
}
pusher.Push(context.Background())
pusher.push(context.Background())
if pusher.pushState.hasAuthInfo != authInfo.expected {
t.Errorf("hasAuthInfo does not match expected: %t != %t", authInfo.expected, pusher.pushState.hasAuthInfo)
}
@ -598,14 +598,14 @@ func TestPushRegistryWhenAuthInfoEmpty(t *testing.T) {
requests: []string{},
},
}
pd := &v2PushDescriptor{
pd := &pushDescriptor{
hmacKey: []byte("abcd"),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
repo: repo,
metadataService: ms,
pushState: &pushState{
remoteLayers: make(map[layer.DiffID]distribution.Descriptor),
hasAuthInfo: false,