Adds cross-repository blob pushing behavior

Tracks source repository information for each blob in the blobsum
service, which is then used to attempt to mount blobs from another
repository when pushing instead of having to re-push blobs to the same
registry.

Signed-off-by: Brian Bland <brian.bland@docker.com>
This commit is contained in:
Brian Bland 2016-01-05 14:17:42 -08:00
parent 9c9a1d1b4b
commit 7289c7218e
14 changed files with 335 additions and 42 deletions

View File

@ -152,7 +152,7 @@ RUN set -x \
# both. This allows integration-cli tests to cover push/pull with both schema1
# and schema2 manifests.
ENV REGISTRY_COMMIT_SCHEMA1 ec87e9b6971d831f0eff752ddb54fb64693e51cd
ENV REGISTRY_COMMIT a7ae88da459b98b481a245e5b1750134724ac67d
ENV REGISTRY_COMMIT 93d9070c8bb28414de9ec96fd38c89614acd8435
RUN set -x \
&& export GOPATH="$(mktemp -d)" \
&& git clone https://github.com/docker/distribution.git "$GOPATH/src/github.com/docker/distribution" \

View File

@ -13,8 +13,14 @@ type BlobSumService struct {
store Store
}
// BlobSum contains the digest and source repository information for a layer.
type BlobSum struct {
Digest digest.Digest
SourceRepository string
}
// maxBlobSums is the number of blobsums to keep per layer DiffID.
const maxBlobSums = 5
const maxBlobSums = 50
// NewBlobSumService creates a new blobsum mapping service.
func NewBlobSumService(store Store) *BlobSumService {
@ -35,18 +41,18 @@ func (blobserv *BlobSumService) diffIDKey(diffID layer.DiffID) string {
return string(digest.Digest(diffID).Algorithm()) + "/" + digest.Digest(diffID).Hex()
}
func (blobserv *BlobSumService) blobSumKey(blobsum digest.Digest) string {
return string(blobsum.Algorithm()) + "/" + blobsum.Hex()
func (blobserv *BlobSumService) blobSumKey(blobsum BlobSum) string {
return string(blobsum.Digest.Algorithm()) + "/" + blobsum.Digest.Hex()
}
// GetBlobSums finds the blobsums associated with a layer DiffID.
func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Digest, error) {
func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]BlobSum, error) {
jsonBytes, err := blobserv.store.Get(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID))
if err != nil {
return nil, err
}
var blobsums []digest.Digest
var blobsums []BlobSum
if err := json.Unmarshal(jsonBytes, &blobsums); err != nil {
return nil, err
}
@ -55,7 +61,7 @@ func (blobserv *BlobSumService) GetBlobSums(diffID layer.DiffID) ([]digest.Diges
}
// GetDiffID finds a layer DiffID from a blobsum hash.
func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID, error) {
func (blobserv *BlobSumService) GetDiffID(blobsum BlobSum) (layer.DiffID, error) {
diffIDBytes, err := blobserv.store.Get(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum))
if err != nil {
return layer.DiffID(""), err
@ -66,12 +72,12 @@ func (blobserv *BlobSumService) GetDiffID(blobsum digest.Digest) (layer.DiffID,
// Add associates a blobsum with a layer DiffID. If too many blobsums are
// present, the oldest one is dropped.
func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest) error {
func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum BlobSum) error {
oldBlobSums, err := blobserv.GetBlobSums(diffID)
if err != nil {
oldBlobSums = nil
}
newBlobSums := make([]digest.Digest, 0, len(oldBlobSums)+1)
newBlobSums := make([]BlobSum, 0, len(oldBlobSums)+1)
// Copy all other blobsums to new slice
for _, oldSum := range oldBlobSums {
@ -98,3 +104,34 @@ func (blobserv *BlobSumService) Add(diffID layer.DiffID, blobsum digest.Digest)
return blobserv.store.Set(blobserv.blobSumNamespace(), blobserv.blobSumKey(blobsum), []byte(diffID))
}
// Remove unassociates a blobsum from a layer DiffID.
func (blobserv *BlobSumService) Remove(blobsum BlobSum) error {
diffID, err := blobserv.GetDiffID(blobsum)
if err != nil {
return err
}
oldBlobSums, err := blobserv.GetBlobSums(diffID)
if err != nil {
oldBlobSums = nil
}
newBlobSums := make([]BlobSum, 0, len(oldBlobSums))
// Copy all other blobsums to new slice
for _, oldSum := range oldBlobSums {
if oldSum != blobsum {
newBlobSums = append(newBlobSums, oldSum)
}
}
if len(newBlobSums) == 0 {
return blobserv.store.Delete(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID))
}
jsonBytes, err := json.Marshal(newBlobSums)
if err != nil {
return err
}
return blobserv.store.Set(blobserv.diffIDNamespace(), blobserv.diffIDKey(diffID), jsonBytes)
}

View File

@ -1,7 +1,9 @@
package metadata
import (
"encoding/hex"
"io/ioutil"
"math/rand"
"os"
"reflect"
"testing"
@ -23,33 +25,32 @@ func TestBlobSumService(t *testing.T) {
}
blobSumService := NewBlobSumService(metadataStore)
tooManyBlobSums := make([]BlobSum, 100)
for i := range tooManyBlobSums {
randDigest := randomDigest()
tooManyBlobSums[i] = BlobSum{Digest: randDigest}
}
testVectors := []struct {
diffID layer.DiffID
blobsums []digest.Digest
blobsums []BlobSum
}{
{
diffID: layer.DiffID("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4"),
blobsums: []digest.Digest{
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
blobsums: []BlobSum{
{Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")},
},
},
{
diffID: layer.DiffID("sha256:86e0e091d0da6bde2456dbb48306f3956bbeb2eae1b5b9a43045843f69fe4aaa"),
blobsums: []digest.Digest{
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
blobsums: []BlobSum{
{Digest: digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937")},
{Digest: digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e")},
},
},
{
diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
blobsums: []digest.Digest{
digest.Digest("sha256:f0cd5ca10b07f35512fc2f1cbf9a6cefbdb5cba70ac6b0c9e5988f4497f71937"),
digest.Digest("sha256:9e3447ca24cb96d86ebd5960cb34d1299b07e0a0e03801d90b9969a2c187dd6e"),
digest.Digest("sha256:cbbf2f9a99b47fc460d422812b6a5adff7dfee951d8fa2e4a98caa0382cfbdbf"),
digest.Digest("sha256:8902a7ca89aabbb868835260912159026637634090dd8899eee969523252236e"),
digest.Digest("sha256:c84364306344ccc48532c52ff5209236273525231dddaaab53262322352883aa"),
digest.Digest("sha256:aa7583bbc87532a8352bbb72520a821b3623523523a8352523a52352aaa888fe"),
},
diffID: layer.DiffID("sha256:03f4658f8b782e12230c1783426bd3bacce651ce582a4ffb6fbbfa2079428ecb"),
blobsums: tooManyBlobSums,
},
}
@ -70,8 +71,8 @@ func TestBlobSumService(t *testing.T) {
t.Fatalf("error calling Get: %v", err)
}
expectedBlobsums := len(vec.blobsums)
if expectedBlobsums > 5 {
expectedBlobsums = 5
if expectedBlobsums > 50 {
expectedBlobsums = 50
}
if !reflect.DeepEqual(blobsums, vec.blobsums[len(vec.blobsums)-expectedBlobsums:len(vec.blobsums)]) {
t.Fatal("Get returned incorrect layer ID")
@ -85,7 +86,7 @@ func TestBlobSumService(t *testing.T) {
}
// Test GetDiffID on a nonexistent entry
_, err = blobSumService.GetDiffID(digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917"))
_, err = blobSumService.GetDiffID(BlobSum{Digest: digest.Digest("sha256:82379823067823853223359023576437723560923756b03560378f4497753917")})
if err == nil {
t.Fatal("expected error looking up nonexistent entry")
}
@ -103,3 +104,12 @@ func TestBlobSumService(t *testing.T) {
t.Fatal("GetDiffID returned incorrect diffID")
}
}
func randomDigest() digest.Digest {
b := [32]byte{}
for i := 0; i < len(b); i++ {
b[i] = byte(rand.Intn(256))
}
d := hex.EncodeToString(b[:])
return digest.Digest("sha256:" + d)
}

View File

@ -15,6 +15,8 @@ type Store interface {
Get(namespace string, key string) ([]byte, error)
// Set writes data indexed by namespace and key.
Set(namespace, key string, value []byte) error
// Delete removes data indexed by namespace and key.
Delete(namespace, key string) error
}
// FSMetadataStore uses the filesystem to associate metadata with layer and
@ -63,3 +65,13 @@ func (store *FSMetadataStore) Set(namespace, key string, value []byte) error {
}
return os.Rename(tempFilePath, path)
}
// Delete removes data indexed by namespace and key. The data file named after
// the key, stored in the namespace's directory is deleted.
func (store *FSMetadataStore) Delete(namespace, key string) error {
store.Lock()
defer store.Unlock()
path := store.path(namespace, key)
return os.Remove(path)
}

View File

@ -111,6 +111,7 @@ func (p *v2Puller) pullV2Repository(ctx context.Context, ref reference.Named) (e
type v2LayerDescriptor struct {
digest digest.Digest
repoInfo *registry.RepositoryInfo
repo distribution.Repository
blobSumService *metadata.BlobSumService
}
@ -124,7 +125,7 @@ func (ld *v2LayerDescriptor) ID() string {
}
func (ld *v2LayerDescriptor) DiffID() (layer.DiffID, error) {
return ld.blobSumService.GetDiffID(ld.digest)
return ld.blobSumService.GetDiffID(metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})
}
func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progress.Output) (io.ReadCloser, int64, error) {
@ -196,7 +197,7 @@ func (ld *v2LayerDescriptor) Download(ctx context.Context, progressOutput progre
func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) {
// Cache mapping from this layer's DiffID to the blobsum
ld.blobSumService.Add(diffID, ld.digest)
ld.blobSumService.Add(diffID, metadata.BlobSum{Digest: ld.digest, SourceRepository: ld.repoInfo.FullName()})
}
func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdated bool, err error) {
@ -334,6 +335,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif
layerDescriptor := &v2LayerDescriptor{
digest: blobSum,
repoInfo: p.repoInfo,
repo: p.repo,
blobSumService: p.blobSumService,
}
@ -400,6 +402,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
layerDescriptor := &v2LayerDescriptor{
digest: d.Digest,
repo: p.repo,
repoInfo: p.repoInfo,
blobSumService: p.blobSumService,
}

View File

@ -131,6 +131,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, ima
descriptorTemplate := v2PushDescriptor{
blobSumService: p.blobSumService,
repoInfo: p.repoInfo,
repo: p.repo,
pushState: &p.pushState,
}
@ -211,6 +212,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
type v2PushDescriptor struct {
layer layer.Layer
blobSumService *metadata.BlobSumService
repoInfo reference.Named
repo distribution.Repository
pushState *pushState
}
@ -243,7 +245,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// Do we have any blobsums associated with this layer's DiffID?
possibleBlobsums, err := pd.blobSumService.GetBlobSums(diffID)
if err == nil {
descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repo, pd.pushState)
descriptor, exists, err := blobSumAlreadyExists(ctx, possibleBlobsums, pd.repoInfo, pd.repo, pd.pushState)
if err != nil {
progress.Update(progressOutput, pd.ID(), "Image push failed")
return retryOnError(err)
@ -263,6 +265,37 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
// then push the blob.
bs := pd.repo.Blobs(ctx)
// Attempt to find another repository in the same registry to mount the layer from to avoid an unnecessary upload
for _, blobsum := range possibleBlobsums {
sourceRepo, err := reference.ParseNamed(blobsum.SourceRepository)
if err != nil {
continue
}
if pd.repoInfo.Hostname() == sourceRepo.Hostname() {
logrus.Debugf("attempting to mount layer %s (%s) from %s", diffID, blobsum.Digest, sourceRepo.FullName())
desc, err := bs.Mount(ctx, sourceRepo.RemoteName(), blobsum.Digest)
if err == nil {
progress.Updatef(progressOutput, pd.ID(), "Mounted from %s", sourceRepo.RemoteName())
pd.pushState.Lock()
pd.pushState.confirmedV2 = true
pd.pushState.remoteLayers[diffID] = desc
pd.pushState.Unlock()
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: blobsum.Digest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err}
}
return nil
}
// Unable to mount layer from this repository, so this source mapping is no longer valid
logrus.Debugf("unassociating layer %s (%s) with %s", diffID, blobsum.Digest, sourceRepo.FullName())
pd.blobSumService.Remove(blobsum)
}
}
// Send the layer
layerUpload, err := bs.Create(ctx)
if err != nil {
@ -300,7 +333,7 @@ func (pd *v2PushDescriptor) Upload(ctx context.Context, progressOutput progress.
progress.Update(progressOutput, pd.ID(), "Pushed")
// Cache mapping from this layer's DiffID to the blobsum
if err := pd.blobSumService.Add(diffID, pushDigest); err != nil {
if err := pd.blobSumService.Add(diffID, metadata.BlobSum{Digest: pushDigest, SourceRepository: pd.repoInfo.FullName()}); err != nil {
return xfer.DoNotRetry{Err: err}
}
@ -332,9 +365,13 @@ func (pd *v2PushDescriptor) Descriptor() distribution.Descriptor {
// blobSumAlreadyExists checks if the registry already know about any of the
// blobsums passed in the "blobsums" slice. If it finds one that the registry
// knows about, it returns the known digest and "true".
func blobSumAlreadyExists(ctx context.Context, blobsums []digest.Digest, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
for _, dgst := range blobsums {
descriptor, err := repo.Blobs(ctx).Stat(ctx, dgst)
func blobSumAlreadyExists(ctx context.Context, blobsums []metadata.BlobSum, repoInfo reference.Named, repo distribution.Repository, pushState *pushState) (distribution.Descriptor, bool, error) {
for _, blobSum := range blobsums {
// Only check blobsums that are known to this repository or have an unknown source
if blobSum.SourceRepository != "" && blobSum.SourceRepository != repoInfo.FullName() {
continue
}
descriptor, err := repo.Blobs(ctx).Stat(ctx, blobSum.Digest)
switch err {
case nil:
descriptor.MediaType = schema2.MediaTypeLayer

View File

@ -44,7 +44,7 @@ clone git github.com/boltdb/bolt v1.1.0
clone git github.com/miekg/dns d27455715200c7d3e321a1e5cadb27c9ee0b0f02
# get graph and distribution packages
clone git github.com/docker/distribution a7ae88da459b98b481a245e5b1750134724ac67d
clone git github.com/docker/distribution 93d9070c8bb28414de9ec96fd38c89614acd8435
clone git github.com/vbatts/tar-split v0.9.11
# get desired notary commit, might also need to be updated in Dockerfile

View File

@ -147,6 +147,54 @@ func (s *DockerSchema1RegistrySuite) TestPushEmptyLayer(c *check.C) {
testPushEmptyLayer(c)
}
func (s *DockerRegistrySuite) TestCrossRepositoryLayerPush(c *check.C) {
sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
// tag the image to upload it to the private registry
dockerCmd(c, "tag", "busybox", sourceRepoName)
// push the image to the registry
out1, _, err := dockerCmdWithError("push", sourceRepoName)
c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out1))
// ensure that none of the layers were mounted from another repository during push
c.Assert(strings.Contains(out1, "Mounted from"), check.Equals, false)
destRepoName := fmt.Sprintf("%v/dockercli/crossrepopush", privateRegistryURL)
// retag the image to upload the same layers to another repo in the same registry
dockerCmd(c, "tag", "busybox", destRepoName)
// push the image to the registry
out2, _, err := dockerCmdWithError("push", destRepoName)
c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out2))
// ensure that layers were mounted from the first repo during push
c.Assert(strings.Contains(out2, "Mounted from dockercli/busybox"), check.Equals, true)
// ensure that we can pull the cross-repo-pushed repository
dockerCmd(c, "rmi", destRepoName)
dockerCmd(c, "pull", destRepoName)
}
func (s *DockerSchema1RegistrySuite) TestCrossRepositoryLayerPushNotSupported(c *check.C) {
sourceRepoName := fmt.Sprintf("%v/dockercli/busybox", privateRegistryURL)
// tag the image to upload it to the private registry
dockerCmd(c, "tag", "busybox", sourceRepoName)
// push the image to the registry
out1, _, err := dockerCmdWithError("push", sourceRepoName)
c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out1))
// ensure that none of the layers were mounted from another repository during push
c.Assert(strings.Contains(out1, "Mounted from"), check.Equals, false)
destRepoName := fmt.Sprintf("%v/dockercli/crossrepopush", privateRegistryURL)
// retag the image to upload the same layers to another repo in the same registry
dockerCmd(c, "tag", "busybox", destRepoName)
// push the image to the registry
out2, _, err := dockerCmdWithError("push", destRepoName)
c.Assert(err, check.IsNil, check.Commentf("pushing the image to the private registry has failed: %s", out2))
// schema1 registry should not support cross-repo layer mounts, so ensure that this does not happen
c.Assert(strings.Contains(out2, "Mounted from dockercli/busybox"), check.Equals, false)
// ensure that we can pull the second pushed repository
dockerCmd(c, "rmi", destRepoName)
dockerCmd(c, "pull", destRepoName)
}
func (s *DockerTrustSuite) TestTrustedPush(c *check.C) {
repoName := fmt.Sprintf("%v/dockercli/trusted:latest", privateRegistryURL)
// tag the image and upload it to the private registry

View File

@ -477,7 +477,7 @@ func migrateImage(id, root string, ls graphIDRegistrar, is image.Store, ms metad
dgst, err := digest.ParseDigest(string(checksum))
if err == nil {
blobSumService := metadata.NewBlobSumService(ms)
blobSumService.Add(layer.DiffID(), dgst)
blobSumService.Add(layer.DiffID(), metadata.BlobSum{Digest: dgst})
}
}
_, err = ls.Release(layer)

View File

@ -216,9 +216,9 @@ func TestMigrateImages(t *testing.T) {
t.Fatal(err)
}
expectedBlobsums := []digest.Digest{
"sha256:55dc925c23d1ed82551fd018c27ac3ee731377b6bad3963a2a4e76e753d70e57",
"sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4",
expectedBlobsums := []metadata.BlobSum{
{Digest: digest.Digest("sha256:55dc925c23d1ed82551fd018c27ac3ee731377b6bad3963a2a4e76e753d70e57")},
{Digest: digest.Digest("sha256:a3ed95caeb02ffe68cdd9fd84406680ae93d633cb16422d00e8a7c22955b46d4")},
}
if !reflect.DeepEqual(expectedBlobsums, blobsums) {

View File

@ -155,6 +155,10 @@ type BlobIngester interface {
// Resume attempts to resume a write to a blob, identified by an id.
Resume(ctx context.Context, id string) (BlobWriter, error)
// Mount adds a blob to this service from another source repository,
// identified by a digest.
Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (Descriptor, error)
}
// BlobWriter provides a handle for inserting data into a blob store.

View File

@ -1041,6 +1041,70 @@ var routeDescriptors = []RouteDescriptor{
deniedResponseDescriptor,
},
},
{
Name: "Mount Blob",
Description: "Mount a blob identified by the `mount` parameter from another repository.",
Headers: []ParameterDescriptor{
hostHeader,
authHeader,
contentLengthZeroHeader,
},
PathParameters: []ParameterDescriptor{
nameParameterDescriptor,
},
QueryParameters: []ParameterDescriptor{
{
Name: "mount",
Type: "query",
Format: "<digest>",
Regexp: digest.DigestRegexp,
Description: `Digest of blob to mount from the source repository.`,
},
{
Name: "from",
Type: "query",
Format: "<repository name>",
Regexp: reference.NameRegexp,
Description: `Name of the source repository.`,
},
},
Successes: []ResponseDescriptor{
{
Description: "The blob has been mounted in the repository and is available at the provided location.",
StatusCode: http.StatusCreated,
Headers: []ParameterDescriptor{
{
Name: "Location",
Type: "url",
Format: "<blob location>",
},
contentLengthZeroHeader,
dockerUploadUUIDHeader,
},
},
},
Failures: []ResponseDescriptor{
{
Name: "Invalid Name or Digest",
StatusCode: http.StatusBadRequest,
ErrorCodes: []errcode.ErrorCode{
ErrorCodeDigestInvalid,
ErrorCodeNameInvalid,
},
},
{
Name: "Not allowed",
Description: "Blob mount is not allowed because the registry is configured as a pull-through cache or for some other reason",
StatusCode: http.StatusMethodNotAllowed,
ErrorCodes: []errcode.ErrorCode{
errcode.ErrorCodeUnsupported,
},
},
unauthorizedResponseDescriptor,
repositoryNotFoundResponseDescriptor,
deniedResponseDescriptor,
},
},
},
},
},

View File

@ -108,6 +108,8 @@ type tokenHandler struct {
tokenLock sync.Mutex
tokenCache string
tokenExpiration time.Time
additionalScopes map[string]struct{}
}
// tokenScope represents the scope at which a token will be requested.
@ -145,6 +147,7 @@ func newTokenHandler(transport http.RoundTripper, creds CredentialStore, c clock
Scope: scope,
Actions: actions,
},
additionalScopes: map[string]struct{}{},
}
}
@ -160,7 +163,15 @@ func (th *tokenHandler) Scheme() string {
}
func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]string) error {
if err := th.refreshToken(params); err != nil {
var additionalScopes []string
if fromParam := req.URL.Query().Get("from"); fromParam != "" {
additionalScopes = append(additionalScopes, tokenScope{
Resource: "repository",
Scope: fromParam,
Actions: []string{"pull"},
}.String())
}
if err := th.refreshToken(params, additionalScopes...); err != nil {
return err
}
@ -169,11 +180,18 @@ func (th *tokenHandler) AuthorizeRequest(req *http.Request, params map[string]st
return nil
}
func (th *tokenHandler) refreshToken(params map[string]string) error {
func (th *tokenHandler) refreshToken(params map[string]string, additionalScopes ...string) error {
th.tokenLock.Lock()
defer th.tokenLock.Unlock()
var addedScopes bool
for _, scope := range additionalScopes {
if _, ok := th.additionalScopes[scope]; !ok {
th.additionalScopes[scope] = struct{}{}
addedScopes = true
}
}
now := th.clock.Now()
if now.After(th.tokenExpiration) {
if now.After(th.tokenExpiration) || addedScopes {
tr, err := th.fetchToken(params)
if err != nil {
return err
@ -223,6 +241,10 @@ func (th *tokenHandler) fetchToken(params map[string]string) (token *tokenRespon
reqParams.Add("scope", scopeField)
}
for scope := range th.additionalScopes {
reqParams.Add("scope", scope)
}
if th.creds != nil {
username, password := th.creds.Basic(realmURL)
if username != "" && password != "" {

View File

@ -10,6 +10,7 @@ import (
"net/http"
"net/url"
"strconv"
"sync"
"time"
"github.com/docker/distribution"
@ -499,6 +500,9 @@ type blobs struct {
statter distribution.BlobDescriptorService
distribution.BlobDeleter
cacheLock sync.Mutex
cachedBlobUpload distribution.BlobWriter
}
func sanitizeLocation(location, base string) (string, error) {
@ -573,7 +577,20 @@ func (bs *blobs) Put(ctx context.Context, mediaType string, p []byte) (distribut
}
func (bs *blobs) Create(ctx context.Context) (distribution.BlobWriter, error) {
bs.cacheLock.Lock()
if bs.cachedBlobUpload != nil {
upload := bs.cachedBlobUpload
bs.cachedBlobUpload = nil
bs.cacheLock.Unlock()
return upload, nil
}
bs.cacheLock.Unlock()
u, err := bs.ub.BuildBlobUploadURL(bs.name)
if err != nil {
return nil, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
@ -604,6 +621,45 @@ func (bs *blobs) Resume(ctx context.Context, id string) (distribution.BlobWriter
panic("not implemented")
}
func (bs *blobs) Mount(ctx context.Context, sourceRepo string, dgst digest.Digest) (distribution.Descriptor, error) {
u, err := bs.ub.BuildBlobUploadURL(bs.name, url.Values{"from": {sourceRepo}, "mount": {dgst.String()}})
if err != nil {
return distribution.Descriptor{}, err
}
resp, err := bs.client.Post(u, "", nil)
if err != nil {
return distribution.Descriptor{}, err
}
defer resp.Body.Close()
switch resp.StatusCode {
case http.StatusCreated:
return bs.Stat(ctx, dgst)
case http.StatusAccepted:
// Triggered a blob upload (legacy behavior), so cache the creation info
uuid := resp.Header.Get("Docker-Upload-UUID")
location, err := sanitizeLocation(resp.Header.Get("Location"), u)
if err != nil {
return distribution.Descriptor{}, err
}
bs.cacheLock.Lock()
bs.cachedBlobUpload = &httpBlobUpload{
statter: bs.statter,
client: bs.client,
uuid: uuid,
startedAt: time.Now(),
location: location,
}
bs.cacheLock.Unlock()
return distribution.Descriptor{}, HandleErrorResponse(resp)
default:
return distribution.Descriptor{}, HandleErrorResponse(resp)
}
}
func (bs *blobs) Delete(ctx context.Context, dgst digest.Digest) error {
return bs.statter.Clear(ctx, dgst)
}