Compare V2Metadata with associated auth config

to avoid unnecessary blob re-uploads.

Cross repository mount from particular repo will most probably fail if
the user pushing to the registry is not the same as the one who pulled
or pushed to the source repo.

This PR attempts first to cross-repo mount from the source repositories
associated with the pusher's auth config. Then it falls back to other
repositories sorted from the most similar to the target repo to the
least.

It also prevents metadata deletion in cases where cross-repo mount fails
and the auth config hashes differ.

Signed-off-by: Michal Minář <miminar@redhat.com>
This commit is contained in:
Michal Minář 2016-09-18 10:55:28 +02:00
parent 027e7be348
commit 0928f3f2e3
No known key found for this signature in database
GPG Key ID: ED8FF0CA0500B770
3 changed files with 415 additions and 67 deletions

View File

@ -1,9 +1,13 @@
package metadata
import (
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"github.com/docker/distribution/digest"
"github.com/docker/docker/api/types"
"github.com/docker/docker/layer"
)
@ -17,6 +21,69 @@ type V2MetadataService struct {
type V2Metadata struct {
Digest digest.Digest
SourceRepository string
// HMAC hashes above attributes with recent authconfig digest used as a key in order to determine matching
// metadata entries accompanied by the same credentials without actually exposing them.
HMAC string
}
// CheckV2MetadataHMAC return true if the given "meta" is tagged with a hmac hashed by the given "key".
func CheckV2MetadataHMAC(meta *V2Metadata, key []byte) bool {
if len(meta.HMAC) == 0 || len(key) == 0 {
return len(meta.HMAC) == 0 && len(key) == 0
}
mac := hmac.New(sha256.New, key)
mac.Write([]byte(meta.Digest))
mac.Write([]byte(meta.SourceRepository))
expectedMac := mac.Sum(nil)
storedMac, err := hex.DecodeString(meta.HMAC)
if err != nil {
return false
}
return hmac.Equal(storedMac, expectedMac)
}
// ComputeV2MetadataHMAC returns a hmac for the given "meta" hash by the given key.
func ComputeV2MetadataHMAC(key []byte, meta *V2Metadata) string {
if len(key) == 0 || meta == nil {
return ""
}
mac := hmac.New(sha256.New, key)
mac.Write([]byte(meta.Digest))
mac.Write([]byte(meta.SourceRepository))
return hex.EncodeToString(mac.Sum(nil))
}
// ComputeV2MetadataHMACKey returns a key for the given "authConfig" that can be used to hash v2 metadata
// entries.
func ComputeV2MetadataHMACKey(authConfig *types.AuthConfig) ([]byte, error) {
if authConfig == nil {
return nil, nil
}
key := authConfigKeyInput{
Username: authConfig.Username,
Password: authConfig.Password,
Auth: authConfig.Auth,
IdentityToken: authConfig.IdentityToken,
RegistryToken: authConfig.RegistryToken,
}
buf, err := json.Marshal(&key)
if err != nil {
return nil, err
}
return []byte(digest.FromBytes([]byte(buf))), nil
}
// authConfigKeyInput is a reduced AuthConfig structure holding just relevant credential data eligible for
// hmac key creation.
type authConfigKeyInput struct {
Username string `json:"username,omitempty"`
Password string `json:"password,omitempty"`
Auth string `json:"auth,omitempty"`
IdentityToken string `json:"identitytoken,omitempty"`
RegistryToken string `json:"registrytoken,omitempty"`
}
// maxMetadata is the number of metadata entries to keep per layer DiffID.
@ -105,6 +172,13 @@ func (serv *V2MetadataService) Add(diffID layer.DiffID, metadata V2Metadata) err
return serv.store.Set(serv.digestNamespace(), serv.digestKey(metadata.Digest), []byte(diffID))
}
// TagAndAdd amends the given "meta" for hmac hashed by the given "hmacKey" and associates it with a layer
// DiffID. If too many metadata entries are present, the oldest one is dropped.
func (serv *V2MetadataService) TagAndAdd(diffID layer.DiffID, hmacKey []byte, meta V2Metadata) error {
meta.HMAC = ComputeV2MetadataHMAC(hmacKey, &meta)
return serv.Add(diffID, meta)
}
// Remove unassociates a metadata entry from a layer DiffID.
func (serv *V2MetadataService) Remove(metadata V2Metadata) error {
diffID, err := serv.GetDiffID(metadata.Digest)

View File

@ -5,8 +5,12 @@ import (
"fmt"
"io"
"runtime"
"sort"
"strings"
"sync"
"golang.org/x/net/context"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
@ -23,9 +27,10 @@ import (
"github.com/docker/docker/pkg/stringid"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
const maxRepositoryMountAttempts = 3
// PushResult contains the tag, manifest digest, and manifest size from the
// push. It's used to signal this information to the trust code in the client
// so it can sign the manifest if necessary.
@ -133,10 +138,16 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima
defer layer.ReleaseAndLog(p.config.LayerStore, l)
}
hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
if err != nil {
return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
}
var descriptors []xfer.UploadDescriptor
descriptorTemplate := v2PushDescriptor{
v2MetadataService: p.v2MetadataService,
hmacKey: hmacKey,
repoInfo: p.repoInfo,
ref: p.ref,
repo: p.repo,
@ -233,6 +244,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
type v2PushDescriptor struct {
layer layer.Layer
v2MetadataService *metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
ref reference.Named
repo distribution.Repository
@ -296,47 +308,44 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
bs := pd.repo.Blobs(ctx)
var layerUpload distribution.BlobWriter
mountAttemptsRemaining := 3
// Attempt to find another repository in the same registry to mount the layer
// from to avoid an unnecessary upload.
// Note: metadata is stored from oldest to newest, so we iterate through this
// slice in reverse to maximize our chances of the blob still existing in the
// remote repository.
for i := len(v2Metadata) - 1; i >= 0 && mountAttemptsRemaining > 0; i-- {
mountFrom := v2Metadata[i]
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
candidates := getRepositoryMountCandidates(pd.repoInfo, pd.hmacKey, maxRepositoryMountAttempts, v2Metadata)
for _, mountCandidate := range candidates {
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountCandidate.Digest, mountCandidate.SourceRepository)
createOpts := []distribution.BlobCreateOption{}
sourceRepo, err := reference.ParseNamed(mountFrom.SourceRepository)
if err != nil {
continue
}
if pd.repoInfo.Hostname() != sourceRepo.Hostname() {
// don't mount blobs from another registry
continue
if len(mountCandidate.SourceRepository) > 0 {
namedRef, err := reference.WithName(mountCandidate.SourceRepository)
if err != nil {
logrus.Errorf("failed to parse source repository reference %v: %v", namedRef.String(), err)
pd.v2MetadataService.Remove(mountCandidate)
continue
}
// TODO (brianbland): We need to construct a reference where the Name is
// only the full remote name, so clean this up when distribution has a
// richer reference package
remoteRef, err := distreference.WithName(namedRef.RemoteName())
if err != nil {
logrus.Errorf("failed to make remote reference out of %q: %v", namedRef.RemoteName(), namedRef.RemoteName())
continue
}
canonicalRef, err := distreference.WithDigest(remoteRef, mountCandidate.Digest)
if err != nil {
logrus.Errorf("failed to make canonical reference: %v", err)
continue
}
createOpts = append(createOpts, client.WithMountFrom(canonicalRef))
}
namedRef, err := reference.WithName(mountFrom.SourceRepository)
if err != nil {
continue
}
// TODO (brianbland): We need to construct a reference where the Name is
// only the full remote name, so clean this up when distribution has a
// richer reference package
remoteRef, err := distreference.WithName(namedRef.RemoteName())
if err != nil {
continue
}
canonicalRef, err := distreference.WithDigest(remoteRef, mountFrom.Digest)
if err != nil {
continue
}
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, mountFrom.Digest, sourceRepo.FullName())
layerUpload, err = bs.Create(ctx, client.WithMountFrom(canonicalRef))
// send the layer
lu, err := bs.Create(ctx, createOpts...)
switch err := err.(type) {
case nil:
// noop
case distribution.ErrBlobMounted:
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", err.From.Name())
@ -348,18 +357,31 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: mountFrom.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: err.Descriptor.Digest,
SourceRepository: pd.repoInfo.FullName(),
}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
return err.Descriptor, nil
case nil:
// blob upload session created successfully, so begin the upload
mountAttemptsRemaining = 0
default:
// unable to mount layer from this repository, so this source mapping is no longer valid
logrus.Debugf("unassociating layer %s (%s) with %s", diffID, mountFrom.Digest, mountFrom.SourceRepository)
pd.v2MetadataService.Remove(mountFrom)
mountAttemptsRemaining--
logrus.Infof("failed to mount layer %s (%s) from %s: %v", diffID, mountCandidate.Digest, mountCandidate.SourceRepository, err)
}
if len(mountCandidate.SourceRepository) > 0 &&
(metadata.CheckV2MetadataHMAC(&mountCandidate, pd.hmacKey) ||
len(mountCandidate.HMAC) == 0) {
cause := "blob mount failure"
if err != nil {
cause = fmt.Sprintf("an error: %v", err.Error())
}
logrus.Debugf("removing association between layer %s and %s due to %s", mountCandidate.Digest, mountCandidate.SourceRepository, cause)
pd.v2MetadataService.Remove(mountCandidate)
}
layerUpload = lu
if layerUpload != nil {
break
}
}
@ -371,6 +393,35 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
}
defer layerUpload.Close()
// upload the blob
desc, err := pd.uploadUsingSession(ctx, progressOutput, diffID, layerUpload)
if err != nil {
return desc, err
}
pd.pushState.Lock()
// If Commit succeeded, that's an indication that the remote registry speaks the v2 protocol.
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = desc
pd.pushState.Unlock()
return desc, nil
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}
func (pd *v2PushDescriptor) uploadUsingSession(
ctx context.Context,
progressOutput progress.Output,
diffID layer.DiffID,
layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
arch, err := pd.layer.TarStream()
if err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
@ -404,34 +455,18 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.v2MetadataService.Add(diffID, metadata.V2Metadata{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
if err := pd.v2MetadataService.TagAndAdd(diffID, pd.hmacKey, metadata.V2Metadata{
Digest: pushDigest,
SourceRepository: pd.repoInfo.FullName(),
}); err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
}
pd.pushState.Lock()
// If Commit succeeded, that's an indication that the remote registry
// speaks the v2 protocol.
pd.pushState.confirmedV2 = true
descriptor := distribution.Descriptor{
return distribution.Descriptor{
Digest: pushDigest,
MediaType: schema2.MediaTypeLayer,
Size: nn,
}
pd.pushState.remoteLayers[diffID] = descriptor
pd.pushState.Unlock()
return descriptor, nil
}
func (pd *v2PushDescriptor) SetRemoteDescriptor(descriptor distribution.Descriptor) {
pd.remoteDescriptor = descriptor
}
func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
return pd.remoteDescriptor
}, nil
}
// layerAlreadyExists checks if the registry already know about any of the
@ -456,3 +491,95 @@ func layerAlreadyExists(ctx context.Context, metadata []metadata.V2Metadata, rep
}
return distribution.Descriptor{}, false, nil
}
// getRepositoryMountCandidates returns an array of v2 metadata items belonging to the given registry. The
// array is sorted from youngest to oldest. If requireReigstryMatch is true, the resulting array will contain
// only metadata entries having registry part of SourceRepository matching the part of repoInfo.
func getRepositoryMountCandidates(
repoInfo reference.Named,
hmacKey []byte,
max int,
v2Metadata []metadata.V2Metadata,
) []metadata.V2Metadata {
candidates := []metadata.V2Metadata{}
for _, meta := range v2Metadata {
sourceRepo, err := reference.ParseNamed(meta.SourceRepository)
if err != nil || repoInfo.Hostname() != sourceRepo.Hostname() {
continue
}
// target repository is not a viable candidate
if meta.SourceRepository == repoInfo.FullName() {
continue
}
candidates = append(candidates, meta)
}
sortV2MetadataByLikenessAndAge(repoInfo, hmacKey, candidates)
if max >= 0 && len(candidates) > max {
// select the youngest metadata
candidates = candidates[:max]
}
return candidates
}
// byLikeness is a sorting container for v2 metadata candidates for cross repository mount. The
// candidate "a" is preferred over "b":
//
// 1. if it was hashed using the same AuthConfig as the one used to authenticate to target repository and the
// "b" was not
// 2. if a number of its repository path components exactly matching path components of target repository is higher
type byLikeness struct {
arr []metadata.V2Metadata
hmacKey []byte
pathComponents []string
}
func (bla byLikeness) Less(i, j int) bool {
aMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[i], bla.hmacKey)
bMacMatch := metadata.CheckV2MetadataHMAC(&bla.arr[j], bla.hmacKey)
if aMacMatch != bMacMatch {
return aMacMatch
}
aMatch := numOfMatchingPathComponents(bla.arr[i].SourceRepository, bla.pathComponents)
bMatch := numOfMatchingPathComponents(bla.arr[j].SourceRepository, bla.pathComponents)
return aMatch > bMatch
}
func (bla byLikeness) Swap(i, j int) {
bla.arr[i], bla.arr[j] = bla.arr[j], bla.arr[i]
}
func (bla byLikeness) Len() int { return len(bla.arr) }
func sortV2MetadataByLikenessAndAge(repoInfo reference.Named, hmacKey []byte, marr []metadata.V2Metadata) {
// reverse the metadata array to shift the newest entries to the beginning
for i := 0; i < len(marr)/2; i++ {
marr[i], marr[len(marr)-i-1] = marr[len(marr)-i-1], marr[i]
}
// keep equal entries ordered from the youngest to the oldest
sort.Stable(byLikeness{
arr: marr,
hmacKey: hmacKey,
pathComponents: getPathComponents(repoInfo.FullName()),
})
}
// numOfMatchingPathComponents returns a number of path components in "pth" that exactly match "matchComponents".
func numOfMatchingPathComponents(pth string, matchComponents []string) int {
pthComponents := getPathComponents(pth)
i := 0
for ; i < len(pthComponents) && i < len(matchComponents); i++ {
if matchComponents[i] != pthComponents[i] {
return i
}
}
return i
}
func getPathComponents(path string) []string {
// make sure to add docker.io/ prefix to the path
named, err := reference.ParseNamed(path)
if err == nil {
path = named.FullName()
}
return strings.Split(path, "/")
}

View File

@ -0,0 +1,147 @@
package distribution
import (
"reflect"
"testing"
"github.com/docker/distribution/digest"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/reference"
)
func TestGetRepositoryMountCandidates(t *testing.T) {
for _, tc := range []struct {
name string
hmacKey string
targetRepo string
maxCandidates int
metadata []metadata.V2Metadata
candidates []metadata.V2Metadata
}{
{
name: "empty metadata",
targetRepo: "busybox",
maxCandidates: -1,
metadata: []metadata.V2Metadata{},
candidates: []metadata.V2Metadata{},
},
{
name: "one item not matching",
targetRepo: "busybox",
maxCandidates: -1,
metadata: []metadata.V2Metadata{taggedMetadata("key", "dgst", "127.0.0.1/repo")},
candidates: []metadata.V2Metadata{},
},
{
name: "one item matching",
targetRepo: "busybox",
maxCandidates: -1,
metadata: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")},
candidates: []metadata.V2Metadata{taggedMetadata("hash", "1", "hello-world")},
},
{
name: "allow missing SourceRepository",
targetRepo: "busybox",
maxCandidates: -1,
metadata: []metadata.V2Metadata{
{Digest: digest.Digest("1")},
{Digest: digest.Digest("3")},
{Digest: digest.Digest("2")},
},
candidates: []metadata.V2Metadata{},
},
{
name: "handle docker.io",
targetRepo: "user/app",
maxCandidates: -1,
metadata: []metadata.V2Metadata{
{Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"},
{Digest: digest.Digest("3"), SourceRepository: "user/bar"},
{Digest: digest.Digest("2"), SourceRepository: "app"},
},
candidates: []metadata.V2Metadata{
{Digest: digest.Digest("3"), SourceRepository: "user/bar"},
{Digest: digest.Digest("1"), SourceRepository: "docker.io/user/foo"},
{Digest: digest.Digest("2"), SourceRepository: "app"},
},
},
{
name: "sort more items",
hmacKey: "abcd",
targetRepo: "127.0.0.1/foo/bar",
maxCandidates: -1,
metadata: []metadata.V2Metadata{
taggedMetadata("hash", "1", "hello-world"),
taggedMetadata("efgh", "2", "127.0.0.1/hello-world"),
taggedMetadata("abcd", "3", "busybox"),
taggedMetadata("hash", "4", "busybox"),
taggedMetadata("hash", "5", "127.0.0.1/foo"),
taggedMetadata("hash", "6", "127.0.0.1/bar"),
taggedMetadata("efgh", "7", "127.0.0.1/foo/bar"),
taggedMetadata("abcd", "8", "127.0.0.1/xyz"),
taggedMetadata("hash", "9", "127.0.0.1/foo/app"),
},
candidates: []metadata.V2Metadata{
// first by matching hash
taggedMetadata("abcd", "8", "127.0.0.1/xyz"),
// then by longest matching prefix
taggedMetadata("hash", "9", "127.0.0.1/foo/app"),
taggedMetadata("hash", "5", "127.0.0.1/foo"),
// sort the rest of the matching items in reversed order
taggedMetadata("hash", "6", "127.0.0.1/bar"),
taggedMetadata("efgh", "2", "127.0.0.1/hello-world"),
},
},
{
name: "limit max candidates",
hmacKey: "abcd",
targetRepo: "user/app",
maxCandidates: 3,
metadata: []metadata.V2Metadata{
taggedMetadata("abcd", "1", "user/app1"),
taggedMetadata("abcd", "2", "user/app/base"),
taggedMetadata("hash", "3", "user/app"),
taggedMetadata("abcd", "4", "127.0.0.1/user/app"),
taggedMetadata("hash", "5", "user/foo"),
taggedMetadata("hash", "6", "app/bar"),
},
candidates: []metadata.V2Metadata{
// first by matching hash
taggedMetadata("abcd", "2", "user/app/base"),
taggedMetadata("abcd", "1", "user/app1"),
// then by longest matching prefix
taggedMetadata("hash", "3", "user/app"),
},
},
} {
repoInfo, err := reference.ParseNamed(tc.targetRepo)
if err != nil {
t.Fatalf("[%s] failed to parse reference name: %v", tc.name, err)
}
candidates := getRepositoryMountCandidates(repoInfo, []byte(tc.hmacKey), tc.maxCandidates, tc.metadata)
if len(candidates) != len(tc.candidates) {
t.Errorf("[%s] got unexpected number of candidates: %d != %d", tc.name, len(candidates), len(tc.candidates))
}
for i := 0; i < len(candidates) && i < len(tc.candidates); i++ {
if !reflect.DeepEqual(candidates[i], tc.candidates[i]) {
t.Errorf("[%s] candidate %d does not match expected: %#+v != %#+v", tc.name, i, candidates[i], tc.candidates[i])
}
}
for i := len(candidates); i < len(tc.candidates); i++ {
t.Errorf("[%s] missing expected candidate at position %d (%#+v)", tc.name, i, tc.candidates[i])
}
for i := len(tc.candidates); i < len(candidates); i++ {
t.Errorf("[%s] got unexpected candidate at position %d (%#+v)", tc.name, i, candidates[i])
}
}
}
func taggedMetadata(key string, dgst string, sourceRepo string) metadata.V2Metadata {
meta := metadata.V2Metadata{
Digest: digest.Digest(dgst),
SourceRepository: sourceRepo,
}
meta.HMAC = metadata.ComputeV2MetadataHMAC([]byte(key), &meta)
return meta
}