diff --git a/distribution/metadata/v2_metadata_service.go b/distribution/metadata/v2_metadata_service.go index 239cd1f45e..b62cc291f1 100644 --- a/distribution/metadata/v2_metadata_service.go +++ b/distribution/metadata/v2_metadata_service.go @@ -1,52 +1,130 @@ package metadata import ( + "crypto/hmac" + "crypto/sha256" + "encoding/hex" "encoding/json" "github.com/docker/distribution/digest" + "github.com/docker/docker/api/types" "github.com/docker/docker/layer" ) // V2MetadataService maps layer IDs to a set of known metadata for // the layer. -type V2MetadataService struct { +type V2MetadataService interface { + GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) + GetDiffID(dgst digest.Digest) (layer.DiffID, error) + Add(diffID layer.DiffID, metadata V2Metadata) error + TagAndAdd(diffID layer.DiffID, hmacKey []byte, metadata V2Metadata) error + Remove(metadata V2Metadata) error +} + +// v2MetadataService implements V2MetadataService +type v2MetadataService struct { store Store } +var _ V2MetadataService = &v2MetadataService{} + // V2Metadata contains the digest and source repository information for a layer. type V2Metadata struct { Digest digest.Digest SourceRepository string + // HMAC hashes above attributes with recent authconfig digest used as a key in order to determine matching + // metadata entries accompanied by the same credentials without actually exposing them. + HMAC string +} + +// CheckV2MetadataHMAC return true if the given "meta" is tagged with a hmac hashed by the given "key". +func CheckV2MetadataHMAC(meta *V2Metadata, key []byte) bool { + if len(meta.HMAC) == 0 || len(key) == 0 { + return len(meta.HMAC) == 0 && len(key) == 0 + } + mac := hmac.New(sha256.New, key) + mac.Write([]byte(meta.Digest)) + mac.Write([]byte(meta.SourceRepository)) + expectedMac := mac.Sum(nil) + + storedMac, err := hex.DecodeString(meta.HMAC) + if err != nil { + return false + } + + return hmac.Equal(storedMac, expectedMac) +} + +// ComputeV2MetadataHMAC returns a hmac for the given "meta" hash by the given key. +func ComputeV2MetadataHMAC(key []byte, meta *V2Metadata) string { + if len(key) == 0 || meta == nil { + return "" + } + mac := hmac.New(sha256.New, key) + mac.Write([]byte(meta.Digest)) + mac.Write([]byte(meta.SourceRepository)) + return hex.EncodeToString(mac.Sum(nil)) +} + +// ComputeV2MetadataHMACKey returns a key for the given "authConfig" that can be used to hash v2 metadata +// entries. +func ComputeV2MetadataHMACKey(authConfig *types.AuthConfig) ([]byte, error) { + if authConfig == nil { + return nil, nil + } + key := authConfigKeyInput{ + Username: authConfig.Username, + Password: authConfig.Password, + Auth: authConfig.Auth, + IdentityToken: authConfig.IdentityToken, + RegistryToken: authConfig.RegistryToken, + } + buf, err := json.Marshal(&key) + if err != nil { + return nil, err + } + return []byte(digest.FromBytes([]byte(buf))), nil +} + +// authConfigKeyInput is a reduced AuthConfig structure holding just relevant credential data eligible for +// hmac key creation. +type authConfigKeyInput struct { + Username string `json:"username,omitempty"` + Password string `json:"password,omitempty"` + Auth string `json:"auth,omitempty"` + + IdentityToken string `json:"identitytoken,omitempty"` + RegistryToken string `json:"registrytoken,omitempty"` } // maxMetadata is the number of metadata entries to keep per layer DiffID. const maxMetadata = 50 // NewV2MetadataService creates a new diff ID to v2 metadata mapping service. -func NewV2MetadataService(store Store) *V2MetadataService { - return &V2MetadataService{ +func NewV2MetadataService(store Store) V2MetadataService { + return &v2MetadataService{ store: store, } } -func (serv *V2MetadataService) diffIDNamespace() string { +func (serv *v2MetadataService) diffIDNamespace() string { return "v2metadata-by-diffid" } -func (serv *V2MetadataService) digestNamespace() string { +func (serv *v2MetadataService) digestNamespace() string { return "diffid-by-digest" } -func (serv *V2MetadataService) diffIDKey(diffID layer.DiffID) string { +func (serv *v2MetadataService) diffIDKey(diffID layer.DiffID) string { return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex() } -func (serv *V2MetadataService) digestKey(dgst digest.Digest) string { +func (serv *v2MetadataService) digestKey(dgst digest.Digest) string { return string(dgst.Algorithm()) + "/" + dgst.Hex() } // GetMetadata finds the metadata associated with a layer DiffID. -func (serv *V2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) { +func (serv *v2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, error) { jsonBytes, err := serv.store.Get(serv.diffIDNamespace(), serv.diffIDKey(diffID)) if err != nil { return nil, err @@ -61,7 +139,7 @@ func (serv *V2MetadataService) GetMetadata(diffID layer.DiffID) ([]V2Metadata, e } // GetDiffID finds a layer DiffID from a digest. -func (serv *V2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { +func (serv *v2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { diffIDBytes, err := serv.store.Get(serv.digestNamespace(), serv.digestKey(dgst)) if err != nil { return layer.DiffID(""), err @@ -72,7 +150,7 @@ func (serv *V2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, erro // Add associates metadata with a layer DiffID. If too many metadata entries are // present, the oldest one is dropped. -func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) error { +func (serv *v2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) error { oldMetadata, err := serv.GetMetadata(diffID) if err != nil { oldMetadata = nil @@ -105,8 +183,15 @@ func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) err return serv.store.Set(serv.digestNamespace(), serv.digestKey(metadata.Digest), []byte(diffID)) } +// TagAndAdd amends the given "meta" for hmac hashed by the given "hmacKey" and associates it with a layer +// DiffID. If too many metadata entries are present, the oldest one is dropped. +func (serv *v2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta V2Metadata) error { + meta.HMAC = ComputeV2MetadataHMAC(hmacKey, &meta) + return serv.Add(diffID, meta) +} + // Remove unassociates a metadata entry from a layer DiffID. -func (serv *V2MetadataService) Remove(metadata V2Metadata) error { +func (serv *v2MetadataService) Remove(metadata V2Metadata) error { diffID, err := serv.GetDiffID(metadata.Digest) if err != nil { return err diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index c6239e4d7b..5c236c149a 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -50,7 +50,7 @@ func (e ImageConfigPullError) Error() string { } type v2Puller struct { - V2MetadataService *metadata.V2MetadataService + V2MetadataService metadata.V2MetadataService endpoint registry.APIEndpoint config *ImagePullConfig repoInfo *registry.RepositoryInfo @@ -134,7 +134,7 @@ type v2LayerDescriptor struct { digest digest.Digest repoInfo *registry.RepositoryInfo repo distribution.Repository - V2MetadataService *metadata.V2MetadataService + V2MetadataService metadata.V2MetadataService tmpFile *os.File verifier digest.Verifier src distribution.Descriptor diff --git a/distribution/push_v2.go b/distribution/push_v2.go index f1ca554dee..2362f1f752 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -5,8 +5,12 @@ import ( "fmt" "io" "runtime" + "sort" + "strings" "sync" + "golang.org/x/net/context" + "github.com/Sirupsen/logrus" "github.com/docker/distribution" "github.com/docker/distribution/digest" @@ -23,7 +27,11 @@ import ( "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/reference" "github.com/docker/docker/registry" - "golang.org/x/net/context" +) + +const ( + smallLayerMaximumSize = 100 * (1 << 10) // 100KB + middleLayerMaximumSize = 10 * (1 << 20) // 10MB ) // PushResult contains the tag, manifest digest, and manifest size from the @@ -36,7 +44,7 @@ type PushResult struct { } type v2Pusher struct { - v2MetadataService *metadata.V2MetadataService + v2MetadataService metadata.V2MetadataService ref reference.Named endpoint registry.APIEndpoint repoInfo *registry.RepositoryInfo @@ -133,10 +141,16 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id defer layer.ReleaseAndLog(p.config.LayerStore, l) } + hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig) + if err != nil { + return fmt.Errorf("failed to compute hmac key of auth config: %v", err) + } + var descriptors []xfer.UploadDescriptor descriptorTemplate := v2PushDescriptor{ v2MetadataService: p.v2MetadataService, + hmacKey: hmacKey, repoInfo: p.repoInfo, ref: p.ref, repo: p.repo, @@ -147,6 +161,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id for i := 0; i < len(img.RootFS.DiffIDs); i++ { descriptor := descriptorTemplate descriptor.layer = l + descriptor.checkedDigests = make(map[digest.Digest]struct{}) descriptors = append(descriptors, &descriptor) l = l.Parent() @@ -232,12 +247,15 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild type v2PushDescriptor struct { layer layer.Layer - v2MetadataService *metadata.V2MetadataService + v2MetadataService metadata.V2MetadataService + hmacKey []byte repoInfo reference.Named ref reference.Named repo distribution.Repository pushState *pushState remoteDescriptor distribution.Descriptor + // a set of digests whose presence has been checked in a target repository + checkedDigests map[digest.Digest]struct{} } func (pd *v2PushDescriptor) Key() string { @@ -272,71 +290,61 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } pd.pushState.Unlock() + maxMountAttempts, maxExistenceChecks, checkOtherRepositories := getMaxMountAndExistenceCheckAttempts(pd.layer) + // Do we have any metadata associated with this layer's DiffID? v2Metadata, err := pd.v2MetadataService.GetMetadata(diffID) if err == nil { - descriptor, exists, err := layerAlreadyExists(ctx, v2Metadata, pd.repoInfo, pd.repo, pd.pushState) - if err != nil { - progress.Update(progressOutput, pd.ID(), "Image push failed") - return distribution.Descriptor{}, retryOnError(err) - } - if exists { - progress.Update(progressOutput, pd.ID(), "Layer already exists") - pd.pushState.Lock() - pd.pushState.remoteLayers[diffID] = descriptor - pd.pushState.Unlock() - return descriptor, nil + // check for blob existence in the target repository if we have a mapping with it + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, false, 1, v2Metadata) + if exists || err != nil { + return descriptor, err } } - logrus.Debugf("Pushing layer: %s", diffID) - // if digest was empty or not saved, or if blob does not exist on the remote repository, // then push the blob. bs := pd.repo.Blobs(ctx) var layerUpload distribution.BlobWriter - mountAttemptsRemaining := 3 - // Attempt to find another repository in the same registry to mount the layer - // from to avoid an unnecessary upload. - // Note: metadata is stored from oldest to newest, so we iterate through this - // slice in reverse to maximize our chances of the blob still existing in the - // remote repository. - for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- { - mountFrom := v2Metadata[i] + // Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload + candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxMountAttempts, v2Metadata) + for _, mountCandidate := range candidates { + logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository) + createOpts := []distribution.BlobCreateOption{} - sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository) - if err != nil { - continue - } - if pd.repoInfo.Hostname() != sourceRepo.Hostname() { - // don't mount blobs from another registry - continue + if len(mountCandidate.SourceRepository) > 0 { + namedRef, err := reference.WithName(mountCandidate.SourceRepository) + if err != nil { + logrus.Errorf("failed to parse source repository reference %v: %v", namedRef.String(), err) + pd.v2MetadataService.Remove(mountCandidate) + continue + } + + // TODO (brianbland): We need to construct a reference where the Name is + // only the full remote name, so clean this up when distribution has a + // richer reference package + remoteRef, err := distreference.WithName(namedRef.RemoteName()) + if err != nil { + logrus.Errorf("failed to make remote reference out of %q: %v", namedRef.RemoteName(), namedRef.RemoteName()) + continue + } + + canonicalRef, err := distreference.WithDigest(remoteRef, mountCandidate.Digest) + if err != nil { + logrus.Errorf("failed to make canonical reference: %v", err) + continue + } + + createOpts = append(createOpts, client.WithMountFrom(canonicalRef)) } - namedRef, err := reference.WithName(mountFrom.SourceRepository) - if err != nil { - continue - } - - // TODO (brianbland): We need to construct a reference where the Name is - // only the full remote name, so clean this up when distribution has a - // richer reference package - remoteRef, err := distreference.WithName(namedRef.RemoteName()) - if err != nil { - continue - } - - canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest) - if err != nil { - continue - } - - logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName()) - - layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef)) + // send the layer + lu, err := bs.Create(ctx, createOpts...) switch err := err.(type) { + case nil: + // noop case distribution.ErrBlobMounted: progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name()) @@ -348,21 +356,44 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. pd.pushState.Unlock() // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil { + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: err.Descriptor.Digest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } return err.Descriptor, nil - case nil: - // blob upload session created successfully, so begin the upload - mountAttemptsRemaining = 0 default: - // unable to mount layer from this repository, so this source mapping is no longer valid - logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository) - pd.v2MetadataService.Remove(mountFrom) - mountAttemptsRemaining-- + logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err) + } + + if len(mountCandidate.SourceRepository) > 0 && + (metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) || + len(mountCandidate.HMAC) == 0) { + cause := "blob mount failure" + if err != nil { + cause = fmt.Sprintf("an error: %v", err.Error()) + } + logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause) + pd.v2MetadataService.Remove(mountCandidate) + } + + if lu != nil { + // cancel previous upload + cancelLayerUpload(ctx, mountCandidate.Digest, layerUpload) + layerUpload = lu } } + if maxExistenceChecks-len(pd.checkedDigests) > 0 { + // do additional layer existence checks with other known digests if any + descriptor, exists, err := pd.layerAlreadyExists(ctx, progressOutput, diffID, checkOtherRepositories, maxExistenceChecks-len(pd.checkedDigests), v2Metadata) + if exists || err != nil { + return descriptor, err + } + } + + logrus.Debugf("Pushing layer: %s", diffID) if layerUpload == nil { layerUpload, err = bs.Create(ctx) if err != nil { @@ -371,6 +402,29 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. } defer layerUpload.Close() + // upload the blob + desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload) + if err != nil { + return desc, err + } + + return desc, nil +} + +func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { + pd.remoteDescriptor = descriptor +} + +func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { + return pd.remoteDescriptor +} + +func (pd *v2PushDescriptor) uploadUsingSession( + ctx context.Context, + progressOutput progress.Output, + diffID layer.DiffID, + layerUpload distribution.BlobWriter, +) (distribution.Descriptor, error) { arch, err := pd.layer.TarStream() if err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} @@ -404,55 +458,237 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress. progress.Update(progressOutput, pd.ID(), "Pushed") // Cache mapping from this layer's DiffID to the blobsum - if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil { + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: pushDigest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} } - pd.pushState.Lock() - - // If Commit succeeded, that's an indication that the remote registry - // speaks the v2 protocol. - pd.pushState.confirmedV2 = true - - descriptor := distribution.Descriptor{ + desc := distribution.Descriptor{ Digest: pushDigest, MediaType: schema2.MediaTypeLayer, Size: nn, } - pd.pushState.remoteLayers[diffID] = descriptor + pd.pushState.Lock() + // If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol. + pd.pushState.confirmedV2 = true + pd.pushState.remoteLayers[diffID] = desc pd.pushState.Unlock() - return descriptor, nil + return desc, nil } -func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) { - pd.remoteDescriptor = descriptor -} - -func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor { - return pd.remoteDescriptor -} - -// layerAlreadyExists checks if the registry already know about any of the -// metadata passed in the "metadata" slice. If it finds one that the registry -// knows about, it returns the known digest and "true". -func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) { - for _, meta := range metadata { - // Only check blobsums that are known to this repository or have an unknown source - if meta.SourceRepository != "" && meta.SourceRepository != repoInfo.FullName() { +// layerAlreadyExists checks if the registry already knows about any of the metadata passed in the "metadata" +// slice. If it finds one that the registry knows about, it returns the known digest and "true". If +// "checkOtherRepositories" is true, stat will be performed also with digests mapped to any other repository +// (not just the target one). +func (pd *v2PushDescriptor) layerAlreadyExists( + ctx context.Context, + progressOutput progress.Output, + diffID layer.DiffID, + checkOtherRepositories bool, + maxExistenceCheckAttempts int, + v2Metadata []metadata.V2Metadata, +) (desc distribution.Descriptor, exists bool, err error) { + // filter the metadata + candidates := []metadata.V2Metadata{} + for _, meta := range v2Metadata { + if len(meta.SourceRepository) > 0 && !checkOtherRepositories && meta.SourceRepository != pd.repoInfo.FullName() { continue } - descriptor, err := repo.Blobs(ctx).Stat(ctx, meta.Digest) + candidates = append(candidates, meta) + } + // sort the candidates by similarity + sortV2MetadataByLikenessAndAge(pd.repoInfo, pd.hmacKey, candidates) + + digestToMetadata := make(map[digest.Digest]*metadata.V2Metadata) + // an array of unique blob digests ordered from the best mount candidates to worst + layerDigests := []digest.Digest{} + for i := 0; i < len(candidates); i++ { + if len(layerDigests) >= maxExistenceCheckAttempts { + break + } + meta := &candidates[i] + if _, exists := digestToMetadata[meta.Digest]; exists { + // keep reference just to the first mapping (the best mount candidate) + continue + } + if _, exists := pd.checkedDigests[meta.Digest]; exists { + // existence of this digest has already been tested + continue + } + digestToMetadata[meta.Digest] = meta + layerDigests = append(layerDigests, meta.Digest) + } + + for _, dgst := range layerDigests { + meta := digestToMetadata[dgst] + logrus.Debugf("Checking for presence of layer %s (%s) in %s", diffID, dgst, pd.repoInfo.FullName()) + desc, err = pd.repo.Blobs(ctx).Stat(ctx, dgst) + pd.checkedDigests[meta.Digest] = struct{}{} switch err { case nil: - descriptor.MediaType = schema2.MediaTypeLayer - return descriptor, true, nil + if m, ok := digestToMetadata[desc.Digest]; !ok || m.SourceRepository != pd.repoInfo.FullName() || !metadata.CheckV2MetadataHMAC(m, pd.hmacKey) { + // cache mapping from this layer's DiffID to the blobsum + if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{ + Digest: desc.Digest, + SourceRepository: pd.repoInfo.FullName(), + }); err != nil { + return distribution.Descriptor{}, false, xfer.DoNotRetry{Err: err} + } + } + desc.MediaType = schema2.MediaTypeLayer + exists = true + break case distribution.ErrBlobUnknown: - // nop + if meta.SourceRepository == pd.repoInfo.FullName() { + // remove the mapping to the target repository + pd.v2MetadataService.Remove(*meta) + } default: - return distribution.Descriptor{}, false, err + progress.Update(progressOutput, pd.ID(), "Image push failed") + return desc, false, retryOnError(err) + } + } + + if exists { + progress.Update(progressOutput, pd.ID(), "Layer already exists") + pd.pushState.Lock() + pd.pushState.remoteLayers[diffID] = desc + pd.pushState.Unlock() + } + + return desc, exists, nil +} + +// getMaxMountAndExistenceCheckAttempts returns a maximum number of cross repository mount attempts from +// source repositories of target registry, maximum number of layer existence checks performed on the target +// repository and whether the check shall be done also with digests mapped to different repositories. The +// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost +// of upload does not outweigh a latency. +func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) { + size, err := layer.DiffSize() + switch { + // big blob + case size > middleLayerMaximumSize: + // 1st attempt to mount the blob few times + // 2nd few existence checks with digests associated to any repository + // then fallback to upload + return 4, 3, true + + // middle sized blobs; if we could not get the size, assume we deal with middle sized blob + case size > smallLayerMaximumSize, err != nil: + // 1st attempt to mount blobs of average size few times + // 2nd try at most 1 existence check if there's an existing mapping to the target repository + // then fallback to upload + return 3, 1, false + + // small blobs, do a minimum number of checks + default: + return 1, 1, false + } +} + +// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The +// array is sorted from youngest to oldest. If requireReigstryMatch is true, the resulting array will contain +// only metadata entries having registry part of SourceRepository matching the part of repoInfo. +func getRepositoryMountCandidates( + repoInfo reference.Named, + hmacKey []byte, + max int, + v2Metadata []metadata.V2Metadata, +) []metadata.V2Metadata { + candidates := []metadata.V2Metadata{} + for _, meta := range v2Metadata { + sourceRepo, err := reference.ParseNamed(meta.SourceRepository) + if err != nil || repoInfo.Hostname() != sourceRepo.Hostname() { + continue + } + // target repository is not a viable candidate + if meta.SourceRepository == repoInfo.FullName() { + continue + } + candidates = append(candidates, meta) + } + + sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates) + if max >= 0 && len(candidates) > max { + // select the youngest metadata + candidates = candidates[:max] + } + + return candidates +} + +// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The +// candidate "a" is preferred over "b": +// +// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the +// "b" was not +// 2. if a number of its repository path components exactly matching path components of target repository is higher +type byLikeness struct { + arr []metadata.V2Metadata + hmacKey []byte + pathComponents []string +} + +func (bla byLikeness) Less(i, j int) bool { + aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey) + bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey) + if aMacMatch != bMacMatch { + return aMacMatch + } + aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents) + bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents) + return aMatch > bMatch +} +func (bla byLikeness) Swap(i, j int) { + bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i] +} +func (bla byLikeness) Len() int { return len(bla.arr) } + +func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) { + // reverse the metadata array to shift the newest entries to the beginning + for i := 0; i < len(marr)/2; i++ { + marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i] + } + // keep equal entries ordered from the youngest to the oldest + sort.Stable(byLikeness{ + arr: marr, + hmacKey: hmacKey, + pathComponents: getPathComponents(repoInfo.FullName()), + }) +} + +// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents". +func numOfMatchingPathComponents(pth string, matchComponents []string) int { + pthComponents := getPathComponents(pth) + i := 0 + for ; i < len(pthComponents) && i < len(matchComponents); i++ { + if matchComponents[i] != pthComponents[i] { + return i + } + } + return i +} + +func getPathComponents(path string) []string { + // make sure to add docker.io/ prefix to the path + named, err := reference.ParseNamed(path) + if err == nil { + path = named.FullName() + } + return strings.Split(path, "/") +} + +func cancelLayerUpload(ctx context.Context, dgst digest.Digest, layerUpload distribution.BlobWriter) { + if layerUpload != nil { + logrus.Debugf("cancelling upload of blob %s", dgst) + err := layerUpload.Cancel(ctx) + if err != nil { + logrus.Warnf("failed to cancel upload: %v", err) } } - return distribution.Descriptor{}, false, nil } diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go new file mode 100644 index 0000000000..b76e1a3733 --- /dev/null +++ b/distribution/push_v2_test.go @@ -0,0 +1,574 @@ +package distribution + +import ( + "net/http" + "reflect" + "testing" + + "github.com/docker/distribution" + "github.com/docker/distribution/context" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema2" + distreference "github.com/docker/distribution/reference" + "github.com/docker/docker/distribution/metadata" + "github.com/docker/docker/layer" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/reference" +) + +func TestGetRepositoryMountCandidates(t *testing.T) { + for _, tc := range []struct { + name string + hmacKey string + targetRepo string + maxCandidates int + metadata []metadata.V2Metadata + candidates []metadata.V2Metadata + }{ + { + name: "empty metadata", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{}, + candidates: []metadata.V2Metadata{}, + }, + { + name: "one item not matching", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{taggedMetadata("key", "dgst", "127.0.0.1/repo")}, + candidates: []metadata.V2Metadata{}, + }, + { + name: "one item matching", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")}, + candidates: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")}, + }, + { + name: "allow missing SourceRepository", + targetRepo: "busybox", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("3")}, + {Digest: digest.Digest("2")}, + }, + candidates: []metadata.V2Metadata{}, + }, + { + name: "handle docker.io", + targetRepo: "user/app", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"}, + {Digest: digest.Digest("3"), SourceRepository: "user/bar"}, + {Digest: digest.Digest("2"), SourceRepository: "app"}, + }, + candidates: []metadata.V2Metadata{ + {Digest: digest.Digest("3"), SourceRepository: "user/bar"}, + {Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"}, + {Digest: digest.Digest("2"), SourceRepository: "app"}, + }, + }, + { + name: "sort more items", + hmacKey: "abcd", + targetRepo: "127.0.0.1/foo/bar", + maxCandidates: -1, + metadata: []metadata.V2Metadata{ + taggedMetadata("hash", "1", "hello-world"), + taggedMetadata("efgh", "2", "127.0.0.1/hello-world"), + taggedMetadata("abcd", "3", "busybox"), + taggedMetadata("hash", "4", "busybox"), + taggedMetadata("hash", "5", "127.0.0.1/foo"), + taggedMetadata("hash", "6", "127.0.0.1/bar"), + taggedMetadata("efgh", "7", "127.0.0.1/foo/bar"), + taggedMetadata("abcd", "8", "127.0.0.1/xyz"), + taggedMetadata("hash", "9", "127.0.0.1/foo/app"), + }, + candidates: []metadata.V2Metadata{ + // first by matching hash + taggedMetadata("abcd", "8", "127.0.0.1/xyz"), + // then by longest matching prefix + taggedMetadata("hash", "9", "127.0.0.1/foo/app"), + taggedMetadata("hash", "5", "127.0.0.1/foo"), + // sort the rest of the matching items in reversed order + taggedMetadata("hash", "6", "127.0.0.1/bar"), + taggedMetadata("efgh", "2", "127.0.0.1/hello-world"), + }, + }, + { + name: "limit max candidates", + hmacKey: "abcd", + targetRepo: "user/app", + maxCandidates: 3, + metadata: []metadata.V2Metadata{ + taggedMetadata("abcd", "1", "user/app1"), + taggedMetadata("abcd", "2", "user/app/base"), + taggedMetadata("hash", "3", "user/app"), + taggedMetadata("abcd", "4", "127.0.0.1/user/app"), + taggedMetadata("hash", "5", "user/foo"), + taggedMetadata("hash", "6", "app/bar"), + }, + candidates: []metadata.V2Metadata{ + // first by matching hash + taggedMetadata("abcd", "2", "user/app/base"), + taggedMetadata("abcd", "1", "user/app1"), + // then by longest matching prefix + taggedMetadata("hash", "3", "user/app"), + }, + }, + } { + repoInfo, err := reference.ParseNamed(tc.targetRepo) + if err != nil { + t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err) + } + candidates := getRepositoryMountCandidates(repoInfo, []byte(tc.hmacKey), tc.maxCandidates, tc.metadata) + if len(candidates) != len(tc.candidates) { + t.Errorf("[%s] got unexpected number of candidates: %d != %d", tc.name, len(candidates), len(tc.candidates)) + } + for i := 0; i < len(candidates) && i < len(tc.candidates); i++ { + if !reflect.DeepEqual(candidates[i], tc.candidates[i]) { + t.Errorf("[%s] candidate %d does not match expected: %#+v != %#+v", tc.name, i, candidates[i], tc.candidates[i]) + } + } + for i := len(candidates); i < len(tc.candidates); i++ { + t.Errorf("[%s] missing expected candidate at position %d (%#+v)", tc.name, i, tc.candidates[i]) + } + for i := len(tc.candidates); i < len(candidates); i++ { + t.Errorf("[%s] got unexpected candidate at position %d (%#+v)", tc.name, i, candidates[i]) + } + } +} + +func TestLayerAlreadyExists(t *testing.T) { + for _, tc := range []struct { + name string + metadata []metadata.V2Metadata + targetRepo string + hmacKey string + maxExistenceChecks int + checkOtherRepositories bool + remoteBlobs map[digest.Digest]distribution.Descriptor + remoteErrors map[digest.Digest]error + expectedDescriptor distribution.Descriptor + expectedExists bool + expectedError error + expectedRequests []string + expectedAdditions []metadata.V2Metadata + expectedRemovals []metadata.V2Metadata + }{ + { + name: "empty metadata", + targetRepo: "busybox", + maxExistenceChecks: 3, + checkOtherRepositories: true, + }, + { + name: "single not existent metadata", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + maxExistenceChecks: 3, + expectedRequests: []string{"pear"}, + expectedRemovals: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + }, + { + name: "access denied", + targetRepo: "busybox", + maxExistenceChecks: 1, + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + remoteErrors: map[digest.Digest]error{digest.Digest("apple"): distribution.ErrAccessDenied}, + expectedError: distribution.ErrAccessDenied, + expectedRequests: []string{"apple"}, + }, + { + name: "not matching reposies", + targetRepo: "busybox", + maxExistenceChecks: 3, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"}, + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"}, + {Digest: digest.Digest("plum"), SourceRepository: "busybox"}, + {Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"}, + }, + }, + { + name: "check other repositories", + targetRepo: "busybox", + maxExistenceChecks: 10, + checkOtherRepositories: true, + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/hello-world"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/library/busybox/subapp"}, + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/busybox"}, + {Digest: digest.Digest("plum"), SourceRepository: "busybox"}, + {Digest: digest.Digest("banana"), SourceRepository: "127.0.0.1/busybox"}, + }, + expectedRequests: []string{"plum", "pear", "apple", "orange", "banana"}, + }, + { + name: "find existing blob", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + }, + { + name: "find existing blob with different hmac", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{SourceRepository: "docker.io/library/busybox", Digest: digest.Digest("apple"), HMAC: "dummyhmac"}}, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + }, + { + name: "overwrite media types", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{{Digest: digest.Digest("apple"), SourceRepository: "docker.io/library/busybox"}}, + hmacKey: "key", + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {Digest: digest.Digest("apple"), MediaType: "custom-media-type"}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("apple"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple"}, + expectedAdditions: []metadata.V2Metadata{taggedMetadata("key", "apple", "docker.io/library/busybox")}, + }, + { + name: "find existing blob among many", + targetRepo: "127.0.0.1/myapp", + hmacKey: "key", + metadata: []metadata.V2Metadata{ + taggedMetadata("someotherkey", "pear", "127.0.0.1/myapp"), + taggedMetadata("key", "apple", "127.0.0.1/myapp"), + taggedMetadata("", "plum", "127.0.0.1/myapp"), + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple", "plum", "pear"}, + expectedAdditions: []metadata.V2Metadata{taggedMetadata("key", "pear", "127.0.0.1/myapp")}, + expectedRemovals: []metadata.V2Metadata{ + taggedMetadata("key", "apple", "127.0.0.1/myapp"), + {Digest: digest.Digest("plum"), SourceRepository: "127.0.0.1/myapp"}, + }, + }, + { + name: "reach maximum existence checks", + targetRepo: "user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedExists: false, + expectedRequests: []string{"banana", "plum", "apple"}, + expectedRemovals: []metadata.V2Metadata{ + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + }, + }, + { + name: "zero allowed existence checks", + targetRepo: "user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("pear"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("apple"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/user/app"}, + {Digest: digest.Digest("banana"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 0, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + }, + { + name: "stat single digest just once", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + taggedMetadata("key1", "pear", "docker.io/library/busybox"), + taggedMetadata("key2", "apple", "docker.io/library/busybox"), + taggedMetadata("key3", "apple", "docker.io/library/busybox"), + }, + maxExistenceChecks: 3, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("pear"): {Digest: digest.Digest("pear")}}, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("pear"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"apple", "pear"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("pear"), SourceRepository: "docker.io/library/busybox"}}, + expectedRemovals: []metadata.V2Metadata{taggedMetadata("key3", "apple", "docker.io/library/busybox")}, + }, + { + name: "stop on first error", + targetRepo: "user/app", + hmacKey: "key", + metadata: []metadata.V2Metadata{ + taggedMetadata("key", "banana", "docker.io/user/app"), + taggedMetadata("key", "orange", "docker.io/user/app"), + taggedMetadata("key", "plum", "docker.io/user/app"), + }, + maxExistenceChecks: 3, + remoteErrors: map[digest.Digest]error{"orange": distribution.ErrAccessDenied}, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("apple"): {}}, + expectedError: distribution.ErrAccessDenied, + expectedRequests: []string{"plum", "orange"}, + expectedRemovals: []metadata.V2Metadata{taggedMetadata("key", "plum", "docker.io/user/app")}, + }, + { + name: "remove outdated metadata", + targetRepo: "docker.io/user/app", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("plum"), SourceRepository: "docker.io/library/busybox"}, + {Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"}, + }, + maxExistenceChecks: 3, + remoteErrors: map[digest.Digest]error{"orange": distribution.ErrBlobUnknown}, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("plum"): {}}, + expectedExists: false, + expectedRequests: []string{"orange"}, + expectedRemovals: []metadata.V2Metadata{{Digest: digest.Digest("orange"), SourceRepository: "docker.io/user/app"}}, + }, + { + name: "missing SourceRepository", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("3")}, + {Digest: digest.Digest("2")}, + }, + maxExistenceChecks: 3, + expectedExists: false, + expectedRequests: []string{"2", "3", "1"}, + }, + + { + name: "with and without SourceRepository", + targetRepo: "busybox", + metadata: []metadata.V2Metadata{ + {Digest: digest.Digest("1")}, + {Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"}, + {Digest: digest.Digest("3")}, + }, + remoteBlobs: map[digest.Digest]distribution.Descriptor{digest.Digest("1"): {Digest: digest.Digest("1")}}, + maxExistenceChecks: 3, + expectedDescriptor: distribution.Descriptor{Digest: digest.Digest("1"), MediaType: schema2.MediaTypeLayer}, + expectedExists: true, + expectedRequests: []string{"2", "3", "1"}, + expectedAdditions: []metadata.V2Metadata{{Digest: digest.Digest("1"), SourceRepository: "docker.io/library/busybox"}}, + expectedRemovals: []metadata.V2Metadata{ + {Digest: digest.Digest("2"), SourceRepository: "docker.io/library/busybox"}, + }, + }, + } { + repoInfo, err := reference.ParseNamed(tc.targetRepo) + if err != nil { + t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err) + } + repo := &mockRepo{ + t: t, + errors: tc.remoteErrors, + blobs: tc.remoteBlobs, + requests: []string{}, + } + ctx := context.Background() + ms := &mockV2MetadataService{} + pd := &v2PushDescriptor{ + hmacKey: []byte(tc.hmacKey), + repoInfo: repoInfo, + layer: layer.EmptyLayer, + repo: repo, + v2MetadataService: ms, + pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, + checkedDigests: make(map[digest.Digest]struct{}), + } + + desc, exists, err := pd.layerAlreadyExists(ctx, &progressSink{t}, layer.EmptyLayer.DiffID(), tc.checkOtherRepositories, tc.maxExistenceChecks, tc.metadata) + + if !reflect.DeepEqual(desc, tc.expectedDescriptor) { + t.Errorf("[%s] got unexpected descriptor: %#+v != %#+v", tc.name, desc, tc.expectedDescriptor) + } + if exists != tc.expectedExists { + t.Errorf("[%s] got unexpected exists: %t != %t", tc.name, exists, tc.expectedExists) + } + if !reflect.DeepEqual(err, tc.expectedError) { + t.Errorf("[%s] got unexpected error: %#+v != %#+v", tc.name, err, tc.expectedError) + } + + if len(repo.requests) != len(tc.expectedRequests) { + t.Errorf("[%s] got unexpected number of requests: %d != %d", tc.name, len(repo.requests), len(tc.expectedRequests)) + } + for i := 0; i < len(repo.requests) && i < len(tc.expectedRequests); i++ { + if repo.requests[i] != tc.expectedRequests[i] { + t.Errorf("[%s] request %d does not match expected: %q != %q", tc.name, i, repo.requests[i], tc.expectedRequests[i]) + } + } + for i := len(repo.requests); i < len(tc.expectedRequests); i++ { + t.Errorf("[%s] missing expected request at position %d (%q)", tc.name, i, tc.expectedRequests[i]) + } + for i := len(tc.expectedRequests); i < len(repo.requests); i++ { + t.Errorf("[%s] got unexpected request at position %d (%q)", tc.name, i, repo.requests[i]) + } + + if len(ms.added) != len(tc.expectedAdditions) { + t.Errorf("[%s] got unexpected number of additions: %d != %d", tc.name, len(ms.added), len(tc.expectedAdditions)) + } + for i := 0; i < len(ms.added) && i < len(tc.expectedAdditions); i++ { + if ms.added[i] != tc.expectedAdditions[i] { + t.Errorf("[%s] added metadata at %d does not match expected: %q != %q", tc.name, i, ms.added[i], tc.expectedAdditions[i]) + } + } + for i := len(ms.added); i < len(tc.expectedAdditions); i++ { + t.Errorf("[%s] missing expected addition at position %d (%q)", tc.name, i, tc.expectedAdditions[i]) + } + for i := len(tc.expectedAdditions); i < len(ms.added); i++ { + t.Errorf("[%s] unexpected metadata addition at position %d (%q)", tc.name, i, ms.added[i]) + } + + if len(ms.removed) != len(tc.expectedRemovals) { + t.Errorf("[%s] got unexpected number of removals: %d != %d", tc.name, len(ms.removed), len(tc.expectedRemovals)) + } + for i := 0; i < len(ms.removed) && i < len(tc.expectedRemovals); i++ { + if ms.removed[i] != tc.expectedRemovals[i] { + t.Errorf("[%s] removed metadata at %d does not match expected: %q != %q", tc.name, i, ms.removed[i], tc.expectedRemovals[i]) + } + } + for i := len(ms.removed); i < len(tc.expectedRemovals); i++ { + t.Errorf("[%s] missing expected removal at position %d (%q)", tc.name, i, tc.expectedRemovals[i]) + } + for i := len(tc.expectedRemovals); i < len(ms.removed); i++ { + t.Errorf("[%s] removed unexpected metadata at position %d (%q)", tc.name, i, ms.removed[i]) + } + } +} + +func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata { + meta := metadata.V2Metadata{ + Digest: digest.Digest(dgst), + SourceRepository: sourceRepo, + } + + meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta) + return meta +} + +type mockRepo struct { + t *testing.T + errors map[digest.Digest]error + blobs map[digest.Digest]distribution.Descriptor + requests []string +} + +var _ distribution.Repository = &mockRepo{} + +func (m *mockRepo) Named() distreference.Named { + m.t.Fatalf("Named() not implemented") + return nil +} +func (m *mockRepo) Manifests(ctc context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error) { + m.t.Fatalf("Manifests() not implemented") + return nil, nil +} +func (m *mockRepo) Tags(ctc context.Context) distribution.TagService { + m.t.Fatalf("Tags() not implemented") + return nil +} +func (m *mockRepo) Blobs(ctx context.Context) distribution.BlobStore { + return &mockBlobStore{ + repo: m, + } +} + +type mockBlobStore struct { + repo *mockRepo +} + +var _ distribution.BlobStore = &mockBlobStore{} + +func (m *mockBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error) { + m.repo.requests = append(m.repo.requests, dgst.String()) + if err, exists := m.repo.errors[dgst]; exists { + return distribution.Descriptor{}, err + } + if desc, exists := m.repo.blobs[dgst]; exists { + return desc, nil + } + return distribution.Descriptor{}, distribution.ErrBlobUnknown +} +func (m *mockBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + m.repo.t.Fatal("Get() not implemented") + return nil, nil +} + +func (m *mockBlobStore) Open(ctx context.Context, dgst digest.Digest) (distribution.ReadSeekCloser, error) { + m.repo.t.Fatal("Open() not implemented") + return nil, nil +} + +func (m *mockBlobStore) Put(ctx context.Context, mediaType string, p []byte) (distribution.Descriptor, error) { + m.repo.t.Fatal("Put() not implemented") + return distribution.Descriptor{}, nil +} + +func (m *mockBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error) { + m.repo.t.Fatal("Create() not implemented") + return nil, nil +} +func (m *mockBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error) { + m.repo.t.Fatal("Resume() not implemented") + return nil, nil +} +func (m *mockBlobStore) Delete(ctx context.Context, dgst digest.Digest) error { + m.repo.t.Fatal("Delete() not implemented") + return nil +} +func (m *mockBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error { + m.repo.t.Fatalf("ServeBlob() not implemented") + return nil +} + +type mockV2MetadataService struct { + added []metadata.V2Metadata + removed []metadata.V2Metadata +} + +var _ metadata.V2MetadataService = &mockV2MetadataService{} + +func (*mockV2MetadataService) GetMetadata(diffID layer.DiffID) ([]metadata.V2Metadata, error) { + return nil, nil +} +func (*mockV2MetadataService) GetDiffID(dgst digest.Digest) (layer.DiffID, error) { + return "", nil +} +func (m *mockV2MetadataService) Add(diffID layer.DiffID, metadata metadata.V2Metadata) error { + m.added = append(m.added, metadata) + return nil +} +func (m *mockV2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta metadata.V2Metadata) error { + meta.HMAC = metadata.ComputeV2MetadataHMAC(hmacKey, &meta) + m.Add(diffID, meta) + return nil +} +func (m *mockV2MetadataService) Remove(metadata metadata.V2Metadata) error { + m.removed = append(m.removed, metadata) + return nil +} + +type progressSink struct { + t *testing.T +} + +func (s *progressSink) WriteProgress(p progress.Progress) error { + s.t.Logf("progress update: %#+v", p) + return nil +}