From 0928f3f2e3eda75a295b651d27f9dd992fd951a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Sun, 18 Sep 2016 10:55:28 +0200 Subject: [PATCH 1/4] Compare V2Metadata with associated auth config MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit to avoid unnecessary blob re-uploads. Cross repository mount from particular repo will most probably fail if the user pushing to the registry is not the same as the one who pulled or pushed to the source repo. This PR attempts first to cross-repo mount from the source repositories associated with the pusher's auth config. Then it falls back to other repositories sorted from the most similar to the target repo to the least. It also prevents metadata deletion in cases where cross-repo mount fails and the auth config hashes differ. Signed-off-by: Michal Minář --- distribution/metadata/v2_metadata_service.go | 74 ++++++ distribution/push_v2.go | 261 ++++++++++++++----- distribution/push_v2_test.go | 147 +++++++++++ 3 files changed, 415 insertions(+), 67 deletions(-) create mode 100644 distribution/push_v2_test.go diff --git a/distribution/metadata/v2_metadata_service.go b/distribution/metadata/v2_metadata_service.go index 239cd1f45e..2c063f5ff8 100644 --- a/distribution/metadata/v2_metadata_service.go +++ b/distribution/metadata/v2_metadata_service.go @@ -1,9 +1,13 @@ package metadata import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" "encoding/json" "github.com/docker/distribution/digest" + "github.com/docker/docker/api/types" "github.com/docker/docker/layer" ) @@ -17,6 +21,69 @@ type V2MetadataService struct { type V2Metadata struct { Digest digest.Digest SourceRepository string + // HMAC hashes above attributes with recent authconfig digest used as a key in order to determine matching + // metadata entries accompanied by the same credentials without actually exposing them. + HMAC string +} + +// CheckV2MetadataHMAC return true if the given "meta" is tagged with a hmac hashed by the given "key". +func CheckV2MetadataHMAC(meta *V2Metadata, key []byte) bool { + if len(meta.HMAC) == 0 || len(key) == 0 { + return len(meta.HMAC) == 0 && len(key) == 0 + } + mac := hmac.New(sha256.New, key) + mac.Write([]byte(meta.Digest)) + mac.Write([]byte(meta.SourceRepository)) + expectedMac := mac.Sum(nil) + + storedMac, err := hex.DecodeString(meta.HMAC) + if err != nil { + return false + } + + return hmac.Equal(storedMac, expectedMac) +} + +// ComputeV2MetadataHMAC returns a hmac for the given "meta" hash by the given key. +func ComputeV2MetadataHMAC(key []byte, meta *V2Metadata) string { + if len(key) == 0 || meta == nil { + return "" + } + mac := hmac.New(sha256.New, key) + mac.Write([]byte(meta.Digest)) + mac.Write([]byte(meta.SourceRepository)) + return hex.EncodeToString(mac.Sum(nil)) +} + +// ComputeV2MetadataHMACKey returns a key for the given "authConfig" that can be used to hash v2 metadata +// entries. +func ComputeV2MetadataHMACKey(authConfig *types.AuthConfig) ([]byte, error) { + if authConfig == nil { + return nil, nil + } + key := authConfigKeyInput{ + Username: authConfig.Username, + Password: authConfig.Password, + Auth: authConfig.Auth, + IdentityToken: authConfig.IdentityToken, + RegistryToken: authConfig.RegistryToken, + } + buf, err := json.Marshal(&key) + if err != nil { + return nil, err + } + return []byte(digest.FromBytes([]byte(buf))), nil +} + +// authConfigKeyInput is a reduced AuthConfig structure holding just relevant credential data eligible for +// hmac key creation. +type authConfigKeyInput struct { + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + Auth string `json:"auth,omitempty"` + + IdentityToken string `json:"identitytoken,omitempty"` + RegistryToken string `json:"registrytoken,omitempty"` } // maxMetadata is the number of metadata entries to keep per layer DiffID. @@ -105,6 +172,13 @@ func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) err return serv.store.Set(serv.digestNamespace(), serv.digestKey(metadata.Digest), []byte(diffID)) } +// TagAndAdd amends the given "meta" for hmac hashed by the given "hmacKey" and associates it with a layer +// DiffID. If too many metadata entries are present, the oldest one is dropped. +func (serv *V2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta V2Metadata) error { + meta.HMAC = ComputeV2MetadataHMAC(hmacKey, &meta) + return serv.Add(diffID, meta) +} + // Remove unassociates a metadata entry from a layer DiffID. func (serv *V2MetadataService) Remove(metadata V2Metadata) error { diffID, err := serv.GetDiffID(metadata.Digest) diff --git a/distribution/push_v2.go b/distribution/push_v2.go index cd919e2f52..d2012704db 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -5,8 +5,12 @@ import ( "fmt" "io" "runtime" + "sort" + "strings" "sync" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/docker/distribution" "github.com/docker/distribution/digest" @@ -23,9 +27,10 @@ import ( "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/reference" "github.com/docker/docker/registry" - "golang.org/x/net/context" ) +const maxRepositoryMountAttempts = 3 + // PushResult contains the tag, manifest digest, and manifest size from the // push. It's used to signal this information to the trust code in the client // so it can sign the manifest if necessary. @@ -133,10 +138,16 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima defer layer.ReleaseAndLog(p.config.LayerStore, l) } + hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig) + if err != nil { + return fmt.Errorf("failed to compute hmac key of auth config: %v", err) + } + var descriptors []xfer.UploadDescriptor descriptorTemplate := v2PushDescriptor{ v2MetadataService: p.v2MetadataService, + hmacKey: hmacKey, repoInfo: p.repoInfo, ref: p.ref, repo: p.repo, @@ -233,6 +244,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild type v2PushDescriptor struct { layer layer.Layer v2MetadataService *metadata.V2MetadataService + hmacKey []byte repoInfo reference.Named ref reference.Named repo distribution.Repository @@ -296,47 +308,44 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. bs := pd.repo.Blobs(ctx) var layerUpload distribution.BlobWriter - mountAttemptsRemaining := 3 - // Attempt to find another repository in the same registry to mount the layer - // from to avoid an unnecessary upload. - // Note: metadata is stored from oldest to newest, so we iterate through this - // slice in reverse to maximize our chances of the blob still existing in the - // remote repository. - for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- { - mountFrom := v2Metadata[i] + // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload + candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxRepositoryMountAttempts, v2Metadata) + for _, mountCandidate := range candidates { + logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository) + createOpts := []distribution.BlobCreateOption{} - sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository) - if err != nil { - continue - } - if pd.repoInfo.Hostname() != sourceRepo.Hostname() { - // don't mount blobs from another registry - continue + if len(mountCandidate.SourceRepository) > 0 { + namedRef, err := reference.WithName(mountCandidate.SourceRepository) + if err != nil { + logrus.Errorf("failed to parse source repository reference %v: %v", namedRef.String(), err) + pd.v2MetadataService.Remove(mountCandidate) + continue + } + + // TODO (brianbland): We need to construct a reference where the Name is + // only the full remote name, so clean this up when distribution has a + // richer reference package + remoteRef, err := distreference.WithName(namedRef.RemoteName()) + if err != nil { + logrus.Errorf("failed to make remote reference out of %q: %v", namedRef.RemoteName(), namedRef.RemoteName()) + continue + } + + canonicalRef, err := distreference.WithDigest(remoteRef, mountCandidate.Digest) + if err != nil { + logrus.Errorf("failed to make canonical reference: %v", err) + continue + } + + createOpts = append(createOpts, client.WithMountFrom(canonicalRef)) } - namedRef, err := reference.WithName(mountFrom.SourceRepository) - if err != nil { - continue - } - - // TODO (brianbland): We need to construct a reference where the Name is - // only the full remote name, so clean this up when distribution has a - // richer reference package - remoteRef, err := distreference.WithName(namedRef.RemoteName()) - if err != nil { - continue - } - - canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest) - if err != nil { - continue - } - - logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName()) - - layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef)) + // send the layer + lu, err := bs.Create(ctx, createOpts...) switch err := err.(type) { + case nil: + // noop case distribution.ErrBlobMounted: progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name()) @@ -348,18 +357,31 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. pd.pushState.Unlock() // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: err.Descriptor.Digest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } return err.Descriptor, nil - case nil: - // blob upload session created successfully, so begin the upload - mountAttemptsRemaining = 0 default: - // unable to mount layer from this repository, so this source mapping is no longer valid - logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository) - pd.v2MetadataService.Remove(mountFrom) - mountAttemptsRemaining-- + logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err) + } + + if len(mountCandidate.SourceRepository) > 0 && + (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) || + len(mountCandidate.HMAC) == 0) { + cause := "blob mount failure" + if err != nil { + cause = fmt.Sprintf("an error: %v", err.Error()) + } + logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause) + pd.v2MetadataService.Remove(mountCandidate) + } + + layerUpload = lu + if layerUpload != nil { + break } } @@ -371,6 +393,35 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } defer layerUpload.Close() + // upload the blob + desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload) + if err != nil { + return desc, err + } + + pd.pushState.Lock() + // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol. + pd.pushState.confirmedV2 = true + pd.pushState.remoteLayers[diffID] = desc + pd.pushState.Unlock() + + return desc, nil +} + +func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { + pd.remoteDescriptor = descriptor +} + +func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { + return pd.remoteDescriptor +} + +func (pd *v2PushDescriptor) uploadUsingSession( + ctx context.Context, + progressOutput progress.Output, + diffID layer.DiffID, + layerUpload distribution.BlobWriter, +) (distribution.Descriptor, error) { arch, err := pd.layer.TarStream() if err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} @@ -404,34 +455,18 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. progress.Update(progressOutput, pd.ID(), "Pushed") // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: pushDigest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } - pd.pushState.Lock() - - // If Commit succeeded, that's an indication that the remote registry - // speaks the v2 protocol. - pd.pushState.confirmedV2 = true - - descriptor := distribution.Descriptor{ + return distribution.Descriptor{ Digest: pushDigest, MediaType: schema2.MediaTypeLayer, Size: nn, - } - pd.pushState.remoteLayers[diffID] = descriptor - - pd.pushState.Unlock() - - return descriptor, nil -} - -func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { - pd.remoteDescriptor = descriptor -} - -func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { - return pd.remoteDescriptor + }, nil } // layerAlreadyExists checks if the registry already know about any of the @@ -456,3 +491,95 @@ func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, rep } return distribution.Descriptor{}, false, nil } + +// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The +// array is sorted from youngest to oldest. If requireReigstryMatch is true, the resulting array will contain +// only metadata entries having registry part of SourceRepository matching the part of repoInfo. +func getRepositoryMountCandidates( + repoInfo reference.Named, + hmacKey []byte, + max int, + v2Metadata []metadata.V2Metadata, +) []metadata.V2Metadata { + candidates := []metadata.V2Metadata{} + for _, meta := range v2Metadata { + sourceRepo, err := reference.ParseNamed(meta.SourceRepository) + if err != nil || repoInfo.Hostname() != sourceRepo.Hostname() { + continue + } + // target repository is not a viable candidate + if meta.SourceRepository == repoInfo.FullName() { + continue + } + candidates = append(candidates, meta) + } + + sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates) + if max >= 0 && len(candidates) > max { + // select the youngest metadata + candidates = candidates[:max] + } + + return candidates +} + +// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The +// candidate "a" is preferred over "b": +// +// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the +// "b" was not +// 2. if a number of its repository path components exactly matching path components of target repository is higher +type byLikeness struct { + arr []metadata.V2Metadata + hmacKey []byte + pathComponents []string +} + +func (bla byLikeness) Less(i, j int) bool { + aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey) + bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey) + if aMacMatch != bMacMatch { + return aMacMatch + } + aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents) + bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents) + return aMatch > bMatch +} +func (bla byLikeness) Swap(i, j int) { + bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i] +} +func (bla byLikeness) Len() int { return len(bla.arr) } + +func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) { + // reverse the metadata array to shift the newest entries to the beginning + for i := 0; i < len(marr)/2; i++ { + marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i] + } + // keep equal entries ordered from the youngest to the oldest + sort.Stable(byLikeness{ + arr: marr, + hmacKey: hmacKey, + pathComponents: getPathComponents(repoInfo.FullName()), + }) +} + +// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents". +func numOfMatchingPathComponents(pth string, matchComponents []string) int { + pthComponents := getPathComponents(pth) + i := 0 + for ; i < len(pthComponents) && i < len(matchComponents); i++ { + if matchComponents[i] != pthComponents[i] { + return i + } + } + return i +} + +func getPathComponents(path string) []string { + // make sure to add docker.io/ prefix to the path + named, err := reference.ParseNamed(path) + if err == nil { + path = named.FullName() + } + return strings.Split(path, "/") +} diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go new file mode 100644 index 0000000000..e4654e32c5 --- /dev/null +++ b/distribution/push_v2_test.go @@ -0,0 +1,147 @@ +package distribution + +import ( + "reflect" + "testing" + + "github.com/docker/distribution/digest" + "github.com/docker/docker/distribution/metadata" + "github.com/docker/docker/reference" +) + +func TestGetRepositoryMountCandidates(t *testing.T) { + for _, tc := range []struct { + name string + hmacKey string + targetRepo string + maxCandidates int + metadata []metadata.V2Metadata + candidates []metadata.V2Metadata + }{ + { + name: "empty metadata", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{}, + candidates: []metadata.V2Metadata{}, + }, + { + name: "one item not matching", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{taggedMetadata("key", "dgst", "127.0.0.1/repo")}, + candidates: []metadata.V2Metadata{}, + }, + { + name: "one item matching", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")}, + candidates: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")}, + }, + { + name: "allow missing SourceRepository", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("3")}, + {Digest: digest.Digest("2")}, + }, + candidates: []metadata.V2Metadata{}, + }, + { + name: "handle docker.io", + targetRepo: "user/app", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"}, + {Digest: digest.Digest("3"), SourceRepository: "user/bar"}, + {Digest: digest.Digest("2"), SourceRepository: "app"}, + }, + candidates: []metadata.V2Metadata{ + {Digest: digest.Digest("3"), SourceRepository: "user/bar"}, + {Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"}, + {Digest: digest.Digest("2"), SourceRepository: "app"}, + }, + }, + { + name: "sort more items", + hmacKey: "abcd", + targetRepo: "127.0.0.1/foo/bar", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + taggedMetadata("hash", "1", "hello-world"), + taggedMetadata("efgh", "2", "127.0.0.1/hello-world"), + taggedMetadata("abcd", "3", "busybox"), + taggedMetadata("hash", "4", "busybox"), + taggedMetadata("hash", "5", "127.0.0.1/foo"), + taggedMetadata("hash", "6", "127.0.0.1/bar"), + taggedMetadata("efgh", "7", "127.0.0.1/foo/bar"), + taggedMetadata("abcd", "8", "127.0.0.1/xyz"), + taggedMetadata("hash", "9", "127.0.0.1/foo/app"), + }, + candidates: []metadata.V2Metadata{ + // first by matching hash + taggedMetadata("abcd", "8", "127.0.0.1/xyz"), + // then by longest matching prefix + taggedMetadata("hash", "9", "127.0.0.1/foo/app"), + taggedMetadata("hash", "5", "127.0.0.1/foo"), + // sort the rest of the matching items in reversed order + taggedMetadata("hash", "6", "127.0.0.1/bar"), + taggedMetadata("efgh", "2", "127.0.0.1/hello-world"), + }, + }, + { + name: "limit max candidates", + hmacKey: "abcd", + targetRepo: "user/app", + maxCandidates: 3, + metadata: []metadata.V2Metadata{ + taggedMetadata("abcd", "1", "user/app1"), + taggedMetadata("abcd", "2", "user/app/base"), + taggedMetadata("hash", "3", "user/app"), + taggedMetadata("abcd", "4", "127.0.0.1/user/app"), + taggedMetadata("hash", "5", "user/foo"), + taggedMetadata("hash", "6", "app/bar"), + }, + candidates: []metadata.V2Metadata{ + // first by matching hash + taggedMetadata("abcd", "2", "user/app/base"), + taggedMetadata("abcd", "1", "user/app1"), + // then by longest matching prefix + taggedMetadata("hash", "3", "user/app"), + }, + }, + } { + repoInfo, err := reference.ParseNamed(tc.targetRepo) + if err != nil { + t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err) + } + candidates := getRepositoryMountCandidates(repoInfo, []byte(tc.hmacKey), tc.maxCandidates, tc.metadata) + if len(candidates) != len(tc.candidates) { + t.Errorf("[%s] got unexpected number of candidates: %d != %d", tc.name, len(candidates), len(tc.candidates)) + } + for i := 0; i < len(candidates) && i < len(tc.candidates); i++ { + if !reflect.DeepEqual(candidates[i], tc.candidates[i]) { + t.Errorf("[%s] candidate %d does not match expected: %#+v != %#+v", tc.name, i, candidates[i], tc.candidates[i]) + } + } + for i := len(candidates); i < len(tc.candidates); i++ { + t.Errorf("[%s] missing expected candidate at position %d (%#+v)", tc.name, i, tc.candidates[i]) + } + for i := len(tc.candidates); i < len(candidates); i++ { + t.Errorf("[%s] got unexpected candidate at position %d (%#+v)", tc.name, i, candidates[i]) + } + } +} + +func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata { + meta := metadata.V2Metadata{ + Digest: digest.Digest(dgst), + SourceRepository: sourceRepo, + } + + meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta) + return meta +} From c6dd51c32cfccc06e77b4f7cb2358f788753df72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Fri, 16 Sep 2016 14:05:51 +0200 Subject: [PATCH 2/4] Try to cross-repo mount until success MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Don't fallback back immediately to blob upload if the cross-repo mount fails and layer upload is initiated by registry. Instead cancel the upload and re-try cross-repo mount from different source repository before doing full re-upload. Signed-off-by: Michal Minář --- distribution/push_v2.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/distribution/push_v2.go b/distribution/push_v2.go index d2012704db..41d4e69e91 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -29,7 +29,7 @@ import ( "github.com/docker/docker/registry" ) -const maxRepositoryMountAttempts = 3 +const maxRepositoryMountAttempts = 4 // PushResult contains the tag, manifest digest, and manifest size from the // push. It's used to signal this information to the trust code in the client @@ -379,9 +379,10 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. pd.v2MetadataService.Remove(mountCandidate) } - layerUpload = lu - if layerUpload != nil { - break + if lu != nil { + // cancel previous upload + cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload) + layerUpload = lu } } @@ -583,3 +584,13 @@ func getPathComponents(path string) []string { } return strings.Split(path, "/") } + +func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) { + if layerUpload != nil { + logrus.Debugf("cancelling upload of blob %s", dgst) + err := layerUpload.Cancel(ctx) + if err != nil { + logrus.Warnf("failed to cancel upload: %v", err) + } + } +} From d3bd14a4fb7513688644479da09e701bdcd00dc7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Wed, 21 Sep 2016 17:01:09 +0200 Subject: [PATCH 3/4] Allow to mock V2MetadataService in unit tests MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Michal Minář --- distribution/metadata/v2_metadata_service.go | 35 +++++++++++++------- distribution/pull_v2.go | 4 +-- distribution/push_v2.go | 4 +-- 3 files changed, 27 insertions(+), 16 deletions(-) diff --git a/distribution/metadata/v2_metadata_service.go b/distribution/metadata/v2_metadata_service.go index 2c063f5ff8..b62cc291f1 100644 --- a/distribution/metadata/v2_metadata_service.go +++ b/distribution/metadata/v2_metadata_service.go @@ -13,10 +13,21 @@ import ( // V2MetadataService maps layer IDs to a set of known metadata for // the layer. -type V2MetadataService struct { +type V2MetadataService interface { + GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) + GetDiffID(dgst digest.Digest) (layer.DiffID, error) + Add(diffID layer.DiffID, metadata V2Metadata) error + TagAndAdd(diffID layer.DiffID, hmacKey []byte, metadata V2Metadata) error + Remove(metadata V2Metadata) error +} + +// v2MetadataService implements V2MetadataService +type v2MetadataService struct { store Store } +var _ V2MetadataService = &v2MetadataService{} + // V2Metadata contains the digest and source repository information for a layer. type V2Metadata struct { Digest digest.Digest @@ -90,30 +101,30 @@ type authConfigKeyInput struct { const maxMetadata = 50 // NewV2MetadataService creates a new diff ID to v2 metadata mapping service. -func NewV2MetadataService(store Store) *V2MetadataService { - return &V2MetadataService{ +func NewV2MetadataService(store Store) V2MetadataService { + return &v2MetadataService{ store: store, } } -func (serv *V2MetadataService) diffIDNamespace() string { +func (serv *v2MetadataService) diffIDNamespace() string { return "v2metadata-by-diffid" } -func (serv *V2MetadataService) digestNamespace() string { +func (serv *v2MetadataService) digestNamespace() string { return "diffid-by-digest" } -func (serv *V2MetadataService) diffIDKey(diffID layer.DiffID) string { +func (serv *v2MetadataService) diffIDKey(diffID layer.DiffID) string { return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex() } -func (serv *V2MetadataService) digestKey(dgst digest.Digest) string { +func (serv *v2MetadataService) digestKey(dgst digest.Digest) string { return string(dgst.Algorithm()) + "/" + dgst.Hex() } // GetMetadata finds the metadata associated with a layer DiffID. -func (serv *V2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) { +func (serv *v2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) { jsonBytes, err := serv.store.Get(serv.diffIDNamespace(), serv.diffIDKey(diffID)) if err != nil { return nil, err @@ -128,7 +139,7 @@ func (serv *V2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, e } // GetDiffID finds a layer DiffID from a digest. -func (serv *V2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { +func (serv *v2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { diffIDBytes, err := serv.store.Get(serv.digestNamespace(), serv.digestKey(dgst)) if err != nil { return layer.DiffID(""), err @@ -139,7 +150,7 @@ func (serv *V2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, erro // Add associates metadata with a layer DiffID. If too many metadata entries are // present, the oldest one is dropped. -func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) error { +func (serv *v2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) error { oldMetadata, err := serv.GetMetadata(diffID) if err != nil { oldMetadata = nil @@ -174,13 +185,13 @@ func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) err // TagAndAdd amends the given "meta" for hmac hashed by the given "hmacKey" and associates it with a layer // DiffID. If too many metadata entries are present, the oldest one is dropped. -func (serv *V2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta V2Metadata) error { +func (serv *v2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta V2Metadata) error { meta.HMAC = ComputeV2MetadataHMAC(hmacKey, &meta) return serv.Add(diffID, meta) } // Remove unassociates a metadata entry from a layer DiffID. -func (serv *V2MetadataService) Remove(metadata V2Metadata) error { +func (serv *v2MetadataService) Remove(metadata V2Metadata) error { diffID, err := serv.GetDiffID(metadata.Digest) if err != nil { return err diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index 3cd8d33aae..22568f5569 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -50,7 +50,7 @@ func (e ImageConfigPullError) Error() string { } type v2Puller struct { - V2MetadataService *metadata.V2MetadataService + V2MetadataService metadata.V2MetadataService endpoint registry.APIEndpoint config *ImagePullConfig repoInfo *registry.RepositoryInfo @@ -134,7 +134,7 @@ type v2LayerDescriptor struct { digest digest.Digest repoInfo *registry.RepositoryInfo repo distribution.Repository - V2MetadataService *metadata.V2MetadataService + V2MetadataService metadata.V2MetadataService tmpFile *os.File verifier digest.Verifier src distribution.Descriptor diff --git a/distribution/push_v2.go b/distribution/push_v2.go index 41d4e69e91..d42c9ca4c2 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -41,7 +41,7 @@ type PushResult struct { } type v2Pusher struct { - v2MetadataService *metadata.V2MetadataService + v2MetadataService metadata.V2MetadataService ref reference.Named endpoint registry.APIEndpoint repoInfo *registry.RepositoryInfo @@ -243,7 +243,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild type v2PushDescriptor struct { layer layer.Layer - v2MetadataService *metadata.V2MetadataService + v2MetadataService metadata.V2MetadataService hmacKey []byte repoInfo reference.Named ref reference.Named From 81f7b1f1e50252e9fdd52111f0899d59f19ecb87 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Min=C3=A1=C5=99?= Date: Fri, 16 Sep 2016 13:58:36 +0200 Subject: [PATCH 4/4] Different number of retries for layers of different sizes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Classify blobs into three categories based on size. Use a very limited number of mount attempts and no existence check for small blobs. Use more attempts for bigger blobs. Also remember blob associations during layer existence check. Blob digests are now checked in the target repository from newest to latest. If the blob exists and the metadata entry does not, it will be created. If the blob is not found, the metadata entry will be removed. Signed-off-by: Michal Minář --- distribution/push_v2.go | 170 +++++++++++--- distribution/push_v2_test.go | 427 +++++++++++++++++++++++++++++++++++ 2 files changed, 561 insertions(+), 36 deletions(-) diff --git a/distribution/push_v2.go b/distribution/push_v2.go index d42c9ca4c2..fafb380d02 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -29,7 +29,10 @@ import ( "github.com/docker/docker/registry" ) -const maxRepositoryMountAttempts = 4 +const ( + smallLayerMaximumSize = 100 * (1 << 10) // 100KB + middleLayerMaximumSize = 10 * (1 << 20) // 10MB +) // PushResult contains the tag, manifest digest, and manifest size from the // push. It's used to signal this information to the trust code in the client @@ -158,6 +161,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima for i := 0; i < len(img.RootFS.DiffIDs); i++ { descriptor := descriptorTemplate descriptor.layer = l + descriptor.checkedDigests = make(map[digest.Digest]struct{}) descriptors = append(descriptors, &descriptor) l = l.Parent() @@ -250,6 +254,8 @@ type v2PushDescriptor struct { repo distribution.Repository pushState *pushState remoteDescriptor distribution.Descriptor + // a set of digests whose presence has been checked in a target repository + checkedDigests map[digest.Digest]struct{} } func (pd *v2PushDescriptor) Key() string { @@ -284,25 +290,18 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } pd.pushState.Unlock() + maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer) + // Do we have any metadata associated with this layer's DiffID? v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) if err == nil { - descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState) - if err != nil { - progress.Update(progressOutput, pd.ID(), "Image push failed") - return distribution.Descriptor{}, retryOnError(err) - } - if exists { - progress.Update(progressOutput, pd.ID(), "Layer already exists") - pd.pushState.Lock() - pd.pushState.remoteLayers[diffID] = descriptor - pd.pushState.Unlock() - return descriptor, nil + // check for blob existence in the target repository if we have a mapping with it + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, false, 1, v2Metadata) + if exists || err != nil { + return descriptor, err } } - logrus.Debugf("Pushing layer: %s", diffID) - // if digest was empty or not saved, or if blob does not exist on the remote repository, // then push the blob. bs := pd.repo.Blobs(ctx) @@ -310,7 +309,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. var layerUpload distribution.BlobWriter // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload - candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxRepositoryMountAttempts, v2Metadata) + candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata) for _, mountCandidate := range candidates { logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository) createOpts := []distribution.BlobCreateOption{} @@ -386,6 +385,15 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } } + if maxExistenceChecks-len(pd.checkedDigests) > 0 { + // do additional layer existence checks with other known digests if any + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata) + if exists || err != nil { + return descriptor, err + } + } + + logrus.Debugf("Pushing layer: %s", diffID) if layerUpload == nil { layerUpload, err = bs.Create(ctx) if err != nil { @@ -400,12 +408,6 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. return desc, err } - pd.pushState.Lock() - // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol. - pd.pushState.confirmedV2 = true - pd.pushState.remoteLayers[diffID] = desc - pd.pushState.Unlock() - return desc, nil } @@ -463,34 +465,130 @@ func (pd *v2PushDescriptor) uploadUsingSession( return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } - return distribution.Descriptor{ + desc := distribution.Descriptor{ Digest: pushDigest, MediaType: schema2.MediaTypeLayer, Size: nn, - }, nil + } + + pd.pushState.Lock() + // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol. + pd.pushState.confirmedV2 = true + pd.pushState.remoteLayers[diffID] = desc + pd.pushState.Unlock() + + return desc, nil } -// layerAlreadyExists checks if the registry already know about any of the -// metadata passed in the "metadata" slice. If it finds one that the registry -// knows about, it returns the known digest and "true". -func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) { - for _, meta := range metadata { - // Only check blobsums that are known to this repository or have an unknown source - if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() { +// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata" +// slice. If it finds one that the registry knows about, it returns the known digest and "true". If +// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository +// (not just the target one). +func (pd *v2PushDescriptor) layerAlreadyExists( + ctx context.Context, + progressOutput progress.Output, + diffID layer.DiffID, + checkOtherRepositories bool, + maxExistenceCheckAttempts int, + v2Metadata []metadata.V2Metadata, +) (desc distribution.Descriptor, exists bool, err error) { + // filter the metadata + candidates := []metadata.V2Metadata{} + for _, meta := range v2Metadata { + if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.FullName() { continue } - descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest) + candidates = append(candidates, meta) + } + // sort the candidates by similarity + sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates) + + digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata) + // an array of unique blob digests ordered from the best mount candidates to worst + layerDigests := []digest.Digest{} + for i := 0; i < len(candidates); i++ { + if len(layerDigests) >= maxExistenceCheckAttempts { + break + } + meta := &candidates[i] + if _, exists := digestToMetadata[meta.Digest]; exists { + // keep reference just to the first mapping (the best mount candidate) + continue + } + if _, exists := pd.checkedDigests[meta.Digest]; exists { + // existence of this digest has already been tested + continue + } + digestToMetadata[meta.Digest] = meta + layerDigests = append(layerDigests, meta.Digest) + } + + for _, dgst := range layerDigests { + meta := digestToMetadata[dgst] + logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName()) + desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst) + pd.checkedDigests[meta.Digest] = struct{}{} switch err { case nil: - descriptor.MediaType = schema2.MediaTypeLayer - return descriptor, true, nil + if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.FullName() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) { + // cache mapping from this layer's DiffID to the blobsum + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: desc.Digest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { + return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err} + } + } + desc.MediaType = schema2.MediaTypeLayer + exists = true + break case distribution.ErrBlobUnknown: - // nop + if meta.SourceRepository == pd.repoInfo.FullName() { + // remove the mapping to the target repository + pd.v2MetadataService.Remove(*meta) + } default: - return distribution.Descriptor{}, false, err + progress.Update(progressOutput, pd.ID(), "Image push failed") + return desc, false, retryOnError(err) } } - return distribution.Descriptor{}, false, nil + + if exists { + progress.Update(progressOutput, pd.ID(), "Layer already exists") + pd.pushState.Lock() + pd.pushState.remoteLayers[diffID] = desc + pd.pushState.Unlock() + } + + return desc, exists, nil +} + +// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from +// source repositories of target registry, maximum number of layer existence checks performed on the target +// repository and whether the check shall be done also with digests mapped to different repositories. The +// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost +// of upload does not outweigh a latency. +func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) { + size, err := layer.DiffSize() + switch { + // big blob + case size > middleLayerMaximumSize: + // 1st attempt to mount the blob few times + // 2nd few existence checks with digests associated to any repository + // then fallback to upload + return 4, 3, true + + // middle sized blobs; if we could not get the size, assume we deal with middle sized blob + case size > smallLayerMaximumSize, err != nil: + // 1st attempt to mount blobs of average size few times + // 2nd try at most 1 existence check if there's an existing mapping to the target repository + // then fallback to upload + return 3, 1, false + + // small blobs, do a minimum number of checks + default: + return 1, 1, false + } } // getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go index e4654e32c5..b76e1a3733 100644 --- a/distribution/push_v2_test.go +++ b/distribution/push_v2_test.go @@ -1,11 +1,18 @@ package distribution import ( + "net/http" "reflect" "testing" + "github.com/docker/distribution" + "github.com/docker/distribution/context" "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema2" + distreference "github.com/docker/distribution/reference" "github.com/docker/docker/distribution/metadata" + "github.com/docker/docker/layer" + "github.com/docker/docker/pkg/progress" "github.com/docker/docker/reference" ) @@ -136,6 +143,315 @@ func TestGetRepositoryMountCandidates(t *testing.T) { } } +func TestLayerAlreadyExists(t *testing.T) { + for _, tc := range []struct { + name string + metadata []metadata.V2Metadata + targetRepo string + hmacKey string + maxExistenceChecks int + checkOtherRepositories bool + remoteBlobs map[digest.Digest]distribution.Descriptor + remoteErrors map[digest.Digest]error + expectedDescriptor distribution.Descriptor + expectedExists bool + expectedError error + expectedRequests []string + expectedAdditions []metadata.V2Metadata + expectedRemovals []metadata.V2Metadata + }{ + { + name: "empty metadata", + targetRepo: "busybox", + maxExistenceChecks: 3, + checkOtherRepositories: true, + }, + { + name: "single not existent metadata", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + maxExistenceChecks: 3, + expectedRequests: []string{"pear"}, + expectedRemovals: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + }, + { + name: "access denied", + targetRepo: "busybox", + maxExistenceChecks: 1, + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + remoteErrors: map[digest.Digest]error{digest.Digest("apple"): distribution.ErrAccessDenied}, + expectedError: distribution.ErrAccessDenied, + expectedRequests: []string{"apple"}, + }, + { + name: "not matching reposies", + targetRepo: "busybox", + maxExistenceChecks: 3, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"}, + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"}, + {Digest: digest.Digest("plum"), SourceRepository: "busybox"}, + {Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"}, + }, + }, + { + name: "check other repositories", + targetRepo: "busybox", + maxExistenceChecks: 10, + checkOtherRepositories: true, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"}, + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"}, + {Digest: digest.Digest("plum"), SourceRepository: "busybox"}, + {Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"}, + }, + expectedRequests: []string{"plum", "pear", "apple", "orange", "banana"}, + }, + { + name: "find existing blob", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + }, + { + name: "find existing blob with different hmac", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{SourceRepository: "docker.io/library/busybox", Digest: digest.Digest("apple"), HMAC: "dummyhmac"}}, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + }, + { + name: "overwrite media types", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + hmacKey: "key", + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple"), MediaType: "custom-media-type"}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + expectedAdditions: []metadata.V2Metadata{taggedMetadata("key", "apple", "docker.io/library/busybox")}, + }, + { + name: "find existing blob among many", + targetRepo: "127.0.0.1/myapp", + hmacKey: "key", + metadata: []metadata.V2Metadata{ + taggedMetadata("someotherkey", "pear", "127.0.0.1/myapp"), + taggedMetadata("key", "apple", "127.0.0.1/myapp"), + taggedMetadata("", "plum", "127.0.0.1/myapp"), + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple", "plum", "pear"}, + expectedAdditions: []metadata.V2Metadata{taggedMetadata("key", "pear", "127.0.0.1/myapp")}, + expectedRemovals: []metadata.V2Metadata{ + taggedMetadata("key", "apple", "127.0.0.1/myapp"), + {Digest: digest.Digest("plum"), SourceRepository: "127.0.0.1/myapp"}, + }, + }, + { + name: "reach maximum existence checks", + targetRepo: "user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedExists: false, + expectedRequests: []string{"banana", "plum", "apple"}, + expectedRemovals: []metadata.V2Metadata{ + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + }, + }, + { + name: "zero allowed existence checks", + targetRepo: "user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 0, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + }, + { + name: "stat single digest just once", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + taggedMetadata("key1", "pear", "docker.io/library/busybox"), + taggedMetadata("key2", "apple", "docker.io/library/busybox"), + taggedMetadata("key3", "apple", "docker.io/library/busybox"), + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple", "pear"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + expectedRemovals: []metadata.V2Metadata{taggedMetadata("key3", "apple", "docker.io/library/busybox")}, + }, + { + name: "stop on first error", + targetRepo: "user/app", + hmacKey: "key", + metadata: []metadata.V2Metadata{ + taggedMetadata("key", "banana", "docker.io/user/app"), + taggedMetadata("key", "orange", "docker.io/user/app"), + taggedMetadata("key", "plum", "docker.io/user/app"), + }, + maxExistenceChecks: 3, + remoteErrors: map[digest.Digest]error{"orange": distribution.ErrAccessDenied}, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {}}, + expectedError: distribution.ErrAccessDenied, + expectedRequests: []string{"plum", "orange"}, + expectedRemovals: []metadata.V2Metadata{taggedMetadata("key", "plum", "docker.io/user/app")}, + }, + { + name: "remove outdated metadata", + targetRepo: "docker.io/user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/library/busybox"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 3, + remoteErrors: map[digest.Digest]error{"orange": distribution.ErrBlobUnknown}, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("plum"): {}}, + expectedExists: false, + expectedRequests: []string{"orange"}, + expectedRemovals: []metadata.V2Metadata{{Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"}}, + }, + { + name: "missing SourceRepository", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("3")}, + {Digest: digest.Digest("2")}, + }, + maxExistenceChecks: 3, + expectedExists: false, + expectedRequests: []string{"2", "3", "1"}, + }, + + { + name: "with and without SourceRepository", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"}, + {Digest: digest.Digest("3")}, + }, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("1"): {Digest: digest.Digest("1")}}, + maxExistenceChecks: 3, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("1"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"2", "3", "1"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("1"), SourceRepository: "docker.io/library/busybox"}}, + expectedRemovals: []metadata.V2Metadata{ + {Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"}, + }, + }, + } { + repoInfo, err := reference.ParseNamed(tc.targetRepo) + if err != nil { + t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err) + } + repo := &mockRepo{ + t: t, + errors: tc.remoteErrors, + blobs: tc.remoteBlobs, + requests: []string{}, + } + ctx := context.Background() + ms := &mockV2MetadataService{} + pd := &v2PushDescriptor{ + hmacKey: []byte(tc.hmacKey), + repoInfo: repoInfo, + layer: layer.EmptyLayer, + repo: repo, + v2MetadataService: ms, + pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, + checkedDigests: make(map[digest.Digest]struct{}), + } + + desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata) + + if !reflect.DeepEqual(desc, tc.expectedDescriptor) { + t.Errorf("[%s] got unexpected descriptor: %#+v != %#+v", tc.name, desc, tc.expectedDescriptor) + } + if exists != tc.expectedExists { + t.Errorf("[%s] got unexpected exists: %t != %t", tc.name, exists, tc.expectedExists) + } + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("[%s] got unexpected error: %#+v != %#+v", tc.name, err, tc.expectedError) + } + + if len(repo.requests) != len(tc.expectedRequests) { + t.Errorf("[%s] got unexpected number of requests: %d != %d", tc.name, len(repo.requests), len(tc.expectedRequests)) + } + for i := 0; i < len(repo.requests) && i < len(tc.expectedRequests); i++ { + if repo.requests[i] != tc.expectedRequests[i] { + t.Errorf("[%s] request %d does not match expected: %q != %q", tc.name, i, repo.requests[i], tc.expectedRequests[i]) + } + } + for i := len(repo.requests); i < len(tc.expectedRequests); i++ { + t.Errorf("[%s] missing expected request at position %d (%q)", tc.name, i, tc.expectedRequests[i]) + } + for i := len(tc.expectedRequests); i < len(repo.requests); i++ { + t.Errorf("[%s] got unexpected request at position %d (%q)", tc.name, i, repo.requests[i]) + } + + if len(ms.added) != len(tc.expectedAdditions) { + t.Errorf("[%s] got unexpected number of additions: %d != %d", tc.name, len(ms.added), len(tc.expectedAdditions)) + } + for i := 0; i < len(ms.added) && i < len(tc.expectedAdditions); i++ { + if ms.added[i] != tc.expectedAdditions[i] { + t.Errorf("[%s] added metadata at %d does not match expected: %q != %q", tc.name, i, ms.added[i], tc.expectedAdditions[i]) + } + } + for i := len(ms.added); i < len(tc.expectedAdditions); i++ { + t.Errorf("[%s] missing expected addition at position %d (%q)", tc.name, i, tc.expectedAdditions[i]) + } + for i := len(tc.expectedAdditions); i < len(ms.added); i++ { + t.Errorf("[%s] unexpected metadata addition at position %d (%q)", tc.name, i, ms.added[i]) + } + + if len(ms.removed) != len(tc.expectedRemovals) { + t.Errorf("[%s] got unexpected number of removals: %d != %d", tc.name, len(ms.removed), len(tc.expectedRemovals)) + } + for i := 0; i < len(ms.removed) && i < len(tc.expectedRemovals); i++ { + if ms.removed[i] != tc.expectedRemovals[i] { + t.Errorf("[%s] removed metadata at %d does not match expected: %q != %q", tc.name, i, ms.removed[i], tc.expectedRemovals[i]) + } + } + for i := len(ms.removed); i < len(tc.expectedRemovals); i++ { + t.Errorf("[%s] missing expected removal at position %d (%q)", tc.name, i, tc.expectedRemovals[i]) + } + for i := len(tc.expectedRemovals); i < len(ms.removed); i++ { + t.Errorf("[%s] removed unexpected metadata at position %d (%q)", tc.name, i, ms.removed[i]) + } + } +} + func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata { meta := metadata.V2Metadata{ Digest: digest.Digest(dgst), @@ -145,3 +461,114 @@ func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metad meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta) return meta } + +type mockRepo struct { + t *testing.T + errors map[digest.Digest]error + blobs map[digest.Digest]distribution.Descriptor + requests []string +} + +var _ distribution.Repository = &mockRepo{} + +func (m *mockRepo) Named() distreference.Named { + m.t.Fatalf("Named() not implemented") + return nil +} +func (m *mockRepo) Manifests(ctc context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + m.t.Fatalf("Manifests() not implemented") + return nil, nil +} +func (m *mockRepo) Tags(ctc context.Context) distribution.TagService { + m.t.Fatalf("Tags() not implemented") + return nil +} +func (m *mockRepo) Blobs(ctx context.Context) distribution.BlobStore { + return &mockBlobStore{ + repo: m, + } +} + +type mockBlobStore struct { + repo *mockRepo +} + +var _ distribution.BlobStore = &mockBlobStore{} + +func (m *mockBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + m.repo.requests = append(m.repo.requests, dgst.String()) + if err, exists := m.repo.errors[dgst]; exists { + return distribution.Descriptor{}, err + } + if desc, exists := m.repo.blobs[dgst]; exists { + return desc, nil + } + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} +func (m *mockBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + m.repo.t.Fatal("Get() not implemented") + return nil, nil +} + +func (m *mockBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + m.repo.t.Fatal("Open() not implemented") + return nil, nil +} + +func (m *mockBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + m.repo.t.Fatal("Put() not implemented") + return distribution.Descriptor{}, nil +} + +func (m *mockBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + m.repo.t.Fatal("Create() not implemented") + return nil, nil +} +func (m *mockBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + m.repo.t.Fatal("Resume() not implemented") + return nil, nil +} +func (m *mockBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + m.repo.t.Fatal("Delete() not implemented") + return nil +} +func (m *mockBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + m.repo.t.Fatalf("ServeBlob() not implemented") + return nil +} + +type mockV2MetadataService struct { + added []metadata.V2Metadata + removed []metadata.V2Metadata +} + +var _ metadata.V2MetadataService = &mockV2MetadataService{} + +func (*mockV2MetadataService) GetMetadata(diffID layer.DiffID) ([]metadata.V2Metadata, error) { + return nil, nil +} +func (*mockV2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { + return "", nil +} +func (m *mockV2MetadataService) Add(diffID layer.DiffID, metadata metadata.V2Metadata) error { + m.added = append(m.added, metadata) + return nil +} +func (m *mockV2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta metadata.V2Metadata) error { + meta.HMAC = metadata.ComputeV2MetadataHMAC(hmacKey, &meta) + m.Add(diffID, meta) + return nil +} +func (m *mockV2MetadataService) Remove(metadata metadata.V2Metadata) error { + m.removed = append(m.removed, metadata) + return nil +} + +type progressSink struct { + t *testing.T +} + +func (s *progressSink) WriteProgress(p progress.Progress) error { + s.t.Logf("progress update: %#+v", p) + return nil +}