diff --git a/distribution/pull.go b/distribution/pull.go index 5e8312d839..baba845264 100644 --- a/distribution/pull.go +++ b/distribution/pull.go @@ -15,38 +15,24 @@ import ( "github.com/sirupsen/logrus" ) -// Puller is an interface that abstracts pulling for different API versions. -type Puller interface { - // Pull tries to pull the image referenced by `tag` - // Pull returns an error if any, as well as a boolean that determines whether to retry Pull on the next configured endpoint. - // - Pull(ctx context.Context, ref reference.Named) error -} - -// newPuller returns a Puller interface that will pull from a v2 registry. -func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) { - switch endpoint.Version { - case registry.APIVersion2: - return &v2Puller{ - V2MetadataService: metadata.NewV2MetadataService(imagePullConfig.MetadataStore), - endpoint: endpoint, - config: imagePullConfig, - repoInfo: repoInfo, - manifestStore: &manifestStore{ - local: local, - }, - }, nil - case registry.APIVersion1: - return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL) +// newPuller returns a puller to pull from a v2 registry. +func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, config *ImagePullConfig, local ContentStore) *puller { + return &puller{ + metadataService: metadata.NewV2MetadataService(config.MetadataStore), + endpoint: endpoint, + config: config, + repoInfo: repoInfo, + manifestStore: &manifestStore{ + local: local, + }, } - return nil, fmt.Errorf("unknown version %d for registry %s", endpoint.Version, endpoint.URL) } // Pull initiates a pull operation. image is the repository name to pull, and // tag may be either empty, or indicate a specific tag to pull. -func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error { +func Pull(ctx context.Context, ref reference.Named, config *ImagePullConfig, local ContentStore) error { // Resolve the Repository name from fqn to RepositoryInfo - repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref) + repoInfo, err := config.RegistryService.ResolveRepository(ref) if err != nil { return err } @@ -56,7 +42,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo return err } - endpoints, err := imagePullConfig.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name)) + endpoints, err := config.RegistryService.LookupPullEndpoints(reference.Domain(repoInfo.Name)) if err != nil { return err } @@ -77,15 +63,9 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo } } - logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version) + logrus.Debugf("Trying to pull %s from %s", reference.FamiliarName(repoInfo.Name), endpoint.URL) - puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local) - if err != nil { - lastErr = err - continue - } - - if err := puller.Pull(ctx, ref); err != nil { + if err := newPuller(endpoint, repoInfo, config, local).pull(ctx, ref); err != nil { // Was this pull cancelled? If so, don't try to fall // back. fallback := false @@ -109,7 +89,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo return translatePullError(err, ref) } - imagePullConfig.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull") + config.ImageEventLogger(reference.FamiliarString(ref), reference.FamiliarName(repoInfo.Name), "pull") return nil } diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index ab5ac9b34e..090ba10f25 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -52,16 +52,16 @@ func (e imageConfigPullError) Error() string { return "error pulling image configuration: " + e.Err.Error() } -type v2Puller struct { - V2MetadataService metadata.V2MetadataService - endpoint registry.APIEndpoint - config *ImagePullConfig - repoInfo *registry.RepositoryInfo - repo distribution.Repository - manifestStore *manifestStore +type puller struct { + metadataService metadata.V2MetadataService + endpoint registry.APIEndpoint + config *ImagePullConfig + repoInfo *registry.RepositoryInfo + repo distribution.Repository + manifestStore *manifestStore } -func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) { +func (p *puller) pull(ctx context.Context, ref reference.Named) (err error) { // TODO(tiborvass): was ReceiveTimeout p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") if err != nil { @@ -74,7 +74,7 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) { return err } - if err = p.pullV2Repository(ctx, ref); err != nil { + if err = p.pullRepository(ctx, ref); err != nil { if _, ok := err.(fallbackError); ok { return err } @@ -88,10 +88,10 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named) (err error) { return err } -func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (err error) { +func (p *puller) pullRepository(ctx context.Context, ref reference.Named) (err error) { var layersDownloaded bool if !reference.IsNameOnly(ref) { - layersDownloaded, err = p.pullV2Tag(ctx, ref, p.config.Platform) + layersDownloaded, err = p.pullTag(ctx, ref, p.config.Platform) if err != nil { return err } @@ -106,7 +106,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e if err != nil { return err } - pulledNew, err := p.pullV2Tag(ctx, tagRef, p.config.Platform) + pulledNew, err := p.pullTag(ctx, tagRef, p.config.Platform) if err != nil { // Since this is the pull-all-tags case, don't // allow an error pulling a particular tag to @@ -127,33 +127,33 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e return nil } -type v2LayerDescriptor struct { - digest digest.Digest - diffID layer.DiffID - repoInfo *registry.RepositoryInfo - repo distribution.Repository - V2MetadataService metadata.V2MetadataService - tmpFile *os.File - verifier digest.Verifier - src distribution.Descriptor +type layerDescriptor struct { + digest digest.Digest + diffID layer.DiffID + repoInfo *registry.RepositoryInfo + repo distribution.Repository + metadataService metadata.V2MetadataService + tmpFile *os.File + verifier digest.Verifier + src distribution.Descriptor } -func (ld *v2LayerDescriptor) Key() string { +func (ld *layerDescriptor) Key() string { return "v2:" + ld.digest.String() } -func (ld *v2LayerDescriptor) ID() string { +func (ld *layerDescriptor) ID() string { return stringid.TruncateID(ld.digest.String()) } -func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) { +func (ld *layerDescriptor) DiffID() (layer.DiffID, error) { if ld.diffID != "" { return ld.diffID, nil } - return ld.V2MetadataService.GetDiffID(ld.digest) + return ld.metadataService.GetDiffID(ld.digest) } -func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) { +func (ld *layerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) { logrus.Debugf("pulling blob %q", ld.digest) var ( @@ -291,7 +291,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre }), size, nil } -func (ld *v2LayerDescriptor) Close() { +func (ld *layerDescriptor) Close() { if ld.tmpFile != nil { ld.tmpFile.Close() if err := os.RemoveAll(ld.tmpFile.Name()); err != nil { @@ -300,7 +300,7 @@ func (ld *v2LayerDescriptor) Close() { } } -func (ld *v2LayerDescriptor) truncateDownloadFile() error { +func (ld *layerDescriptor) truncateDownloadFile() error { // Need a new hash context since we will be redoing the download ld.verifier = nil @@ -317,13 +317,12 @@ func (ld *v2LayerDescriptor) truncateDownloadFile() error { return nil } -func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) { +func (ld *layerDescriptor) Registered(diffID layer.DiffID) { // Cache mapping from this layer's DiffID to the blobsum - ld.V2MetadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()}) + _ = ld.metadataService.Add(diffID, metadata.V2Metadata{Digest: ld.digest, SourceRepository: ld.repoInfo.Name.Name()}) } -func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) { - +func (p *puller) pullTag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) { var ( tagOrDigest string // Used for logging/progress only dgst digest.Digest @@ -477,7 +476,7 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform // validateMediaType validates if the given mediaType is accepted by the puller's // configuration. -func (p *v2Puller) validateMediaType(mediaType string) error { +func (p *puller) validateMediaType(mediaType string) error { var allowedMediaTypes []string if len(p.config.Schema2Types) > 0 { allowedMediaTypes = p.config.Schema2Types @@ -497,7 +496,7 @@ func (p *v2Puller) validateMediaType(mediaType string) error { return invalidManifestClassError{mediaType, configClass} } -func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { +func (p *puller) pullSchema1(ctx context.Context, ref reference.Reference, unverifiedManifest *schema1.SignedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { if platform != nil { // Early bath if the requested OS doesn't match that of the configuration. // This avoids doing the download, only to potentially fail later. @@ -550,11 +549,11 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv continue } - layerDescriptor := &v2LayerDescriptor{ - digest: blobSum, - repoInfo: p.repoInfo, - repo: p.repo, - V2MetadataService: p.V2MetadataService, + layerDescriptor := &layerDescriptor{ + digest: blobSum, + repoInfo: p.repoInfo, + repo: p.repo, + metadataService: p.metadataService, } descriptors = append(descriptors, layerDescriptor) @@ -581,7 +580,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv return imageID, manifestDigest, nil } -func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) { +func (p *puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) { if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil { // If the image already exists locally, no need to pull // anything. @@ -596,12 +595,12 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De if err := d.Digest.Validate(); err != nil { return "", errors.Wrapf(err, "could not validate layer digest %q", d.Digest) } - layerDescriptor := &v2LayerDescriptor{ - digest: d.Digest, - repo: p.repo, - repoInfo: p.repoInfo, - V2MetadataService: p.V2MetadataService, - src: d, + layerDescriptor := &layerDescriptor{ + digest: d.Digest, + repo: p.repo, + repoInfo: p.repoInfo, + metadataService: p.metadataService, + src: d, } descriptors = append(descriptors, layerDescriptor) @@ -674,7 +673,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De // Populate diff ids in descriptors to avoid downloading foreign layers // which have been side loaded for i := range descriptors { - descriptors[i].(*v2LayerDescriptor).diffID = configRootFS.DiffIDs[i] + descriptors[i].(*layerDescriptor).diffID = configRootFS.DiffIDs[i] } } @@ -756,7 +755,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De return imageID, nil } -func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { +func (p *puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { manifestDigest, err = schema2ManifestDigest(ref, mfst) if err != nil { return "", "", err @@ -765,7 +764,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s return id, manifestDigest, err } -func (p *v2Puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { +func (p *puller) pullOCI(ctx context.Context, ref reference.Named, mfst *ocischema.DeserializedManifest, platform *specs.Platform) (id digest.Digest, manifestDigest digest.Digest, err error) { manifestDigest, err = schema2ManifestDigest(ref, mfst) if err != nil { return "", "", err @@ -795,7 +794,7 @@ func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan // pullManifestList handles "manifest lists" which point to various // platform-specific manifests. -func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) { +func (p *puller) pullManifestList(ctx context.Context, ref reference.Named, mfstList *manifestlist.DeserializedManifestList, pp *specs.Platform) (id digest.Digest, manifestListDigest digest.Digest, err error) { manifestListDigest, err = schema2ManifestDigest(ref, mfstList) if err != nil { return "", "", err @@ -874,7 +873,7 @@ const ( defaultMaxSchemaPullAttempts = 5 ) -func (p *v2Puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) { +func (p *puller) pullSchema2Config(ctx context.Context, dgst digest.Digest) (configJSON []byte, err error) { blobs := p.repo.Blobs(ctx) err = retry(ctx, defaultMaxSchemaPullAttempts, defaultSchemaPullBackoff, func(ctx context.Context) (err error) { configJSON, err = blobs.Get(ctx, dgst) diff --git a/distribution/pull_v2_test.go b/distribution/pull_v2_test.go index 4918802b26..a080c9eadf 100644 --- a/distribution/pull_v2_test.go +++ b/distribution/pull_v2_test.go @@ -322,7 +322,7 @@ func TestPullSchema2Config(t *testing.T) { } } -func testNewPuller(t *testing.T, rawurl string) *v2Puller { +func testNewPuller(t *testing.T, rawurl string) *puller { t.Helper() uri, err := url.Parse(rawurl) @@ -358,12 +358,7 @@ func testNewPuller(t *testing.T, rawurl string) *v2Puller { }, } - puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil) - if err != nil { - t.Fatal(err) - } - p := puller.(*v2Puller) - + p := newPuller(endpoint, repoInfo, imagePullConfig, nil) p.repo, err = NewV2Repository(context.Background(), p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") if err != nil { t.Fatal(err) diff --git a/distribution/pull_v2_unix.go b/distribution/pull_v2_unix.go index 2c10bb5ba1..ca4814a472 100644 --- a/distribution/pull_v2_unix.go +++ b/distribution/pull_v2_unix.go @@ -14,7 +14,7 @@ import ( "github.com/sirupsen/logrus" ) -func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) { +func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) { blobs := ld.repo.Blobs(ctx) return blobs.Open(ctx, ld.digest) } diff --git a/distribution/pull_v2_windows.go b/distribution/pull_v2_windows.go index a6db4b9b9e..acd67feb59 100644 --- a/distribution/pull_v2_windows.go +++ b/distribution/pull_v2_windows.go @@ -22,16 +22,16 @@ import ( "github.com/sirupsen/logrus" ) -var _ distribution.Describable = &v2LayerDescriptor{} +var _ distribution.Describable = &layerDescriptor{} -func (ld *v2LayerDescriptor) Descriptor() distribution.Descriptor { +func (ld *layerDescriptor) Descriptor() distribution.Descriptor { if ld.src.MediaType == schema2.MediaTypeForeignLayer && len(ld.src.URLs) > 0 { return ld.src } return distribution.Descriptor{} } -func (ld *v2LayerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) { +func (ld *layerDescriptor) open(ctx context.Context) (distribution.ReadSeekCloser, error) { blobs := ld.repo.Blobs(ctx) rsc, err := blobs.Open(ctx, ld.digest) diff --git a/distribution/registry_unit_test.go b/distribution/registry_unit_test.go index 136fd516cf..df9e12a99a 100644 --- a/distribution/registry_unit_test.go +++ b/distribution/registry_unit_test.go @@ -68,11 +68,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) { }, }, } - puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil) - if err != nil { - t.Fatal(err) - } - p := puller.(*v2Puller) + p := newPuller(endpoint, repoInfo, imagePullConfig, nil) ctx := context.Background() p.repo, err = NewV2Repository(ctx, p.repoInfo, p.endpoint, p.config.MetaHeaders, p.config.AuthConfig, "pull") if err != nil { @@ -82,7 +78,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) { logrus.Debug("About to pull") // We expect it to fail, since we haven't mock'd the full registry exchange in our handler above tag, _ := reference.WithTag(n, "tag_goes_here") - _ = p.pullV2Repository(ctx, tag) + _ = p.pullRepository(ctx, tag) } func TestTokenPassThru(t *testing.T) {