From bb37c67a90ff5d4804e4a5cee59f89a93d3257c1 Mon Sep 17 00:00:00 2001 From: Derek McGowan Date: Fri, 16 Dec 2016 11:19:05 -0800 Subject: [PATCH] Abstract distribution interfaces from image specific types Move configurations into a single file. Abstract download manager in pull config. Add supports for schema2 only and schema2 type checking. Add interface for providing push layers. Abstract image store to generically handle configurations. Signed-off-by: Derek McGowan (cherry picked from commit 3c7676a057a4c0103895f793e407dc6736df139a) --- cli/command/image/pull.go | 2 +- daemon/image_pull.go | 21 +-- daemon/image_push.go | 26 ++-- distribution/config.go | 233 +++++++++++++++++++++++++++++ distribution/pull.go | 33 +--- distribution/pull_v1.go | 8 +- distribution/pull_v2.go | 160 +++++++++++--------- distribution/push.go | 39 +---- distribution/push_v1.go | 32 ++-- distribution/push_v2.go | 73 ++++----- distribution/push_v2_test.go | 8 +- distribution/registry.go | 29 ++++ distribution/registry_unit_test.go | 9 +- 13 files changed, 459 insertions(+), 214 deletions(-) create mode 100644 distribution/config.go diff --git a/cli/command/image/pull.go b/cli/command/image/pull.go index 13de492f92..24933fe846 100644 --- a/cli/command/image/pull.go +++ b/cli/command/image/pull.go @@ -74,7 +74,7 @@ func runPull(dockerCli *command.DockerCli, opts pullOptions) error { err = imagePullPrivileged(ctx, dockerCli, authConfig, distributionRef.String(), requestPrivilege, opts.all) } if err != nil { - if strings.Contains(err.Error(), "target is a plugin") { + if strings.Contains(err.Error(), "target is plugin") { return errors.New(err.Error() + " - Use `docker plugin install`") } return err diff --git a/daemon/image_pull.go b/daemon/image_pull.go index 4c866a61ff..7e52cc243a 100644 --- a/daemon/image_pull.go +++ b/daemon/image_pull.go @@ -89,15 +89,18 @@ func (daemon *Daemon) pullImageWithReference(ctx context.Context, ref reference. }() imagePullConfig := &distribution.ImagePullConfig{ - MetaHeaders: metaHeaders, - AuthConfig: authConfig, - ProgressOutput: progress.ChanOutput(progressChan), - RegistryService: daemon.RegistryService, - ImageEventLogger: daemon.LogImageEvent, - MetadataStore: daemon.distributionMetadataStore, - ImageStore: daemon.imageStore, - ReferenceStore: daemon.referenceStore, - DownloadManager: daemon.downloadManager, + Config: distribution.Config{ + MetaHeaders: metaHeaders, + AuthConfig: authConfig, + ProgressOutput: progress.ChanOutput(progressChan), + RegistryService: daemon.RegistryService, + ImageEventLogger: daemon.LogImageEvent, + MetadataStore: daemon.distributionMetadataStore, + ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore), + ReferenceStore: daemon.referenceStore, + }, + DownloadManager: daemon.downloadManager, + Schema2Types: distribution.ImageTypes, } err := distribution.Pull(ctx, ref, imagePullConfig) diff --git a/daemon/image_push.go b/daemon/image_push.go index 42d27e02ad..679dbc0dde 100644 --- a/daemon/image_push.go +++ b/daemon/image_push.go @@ -3,6 +3,7 @@ package daemon import ( "io" + "github.com/docker/distribution/manifest/schema2" "github.com/docker/docker/api/types" "github.com/docker/docker/distribution" "github.com/docker/docker/pkg/progress" @@ -38,17 +39,20 @@ func (daemon *Daemon) PushImage(ctx context.Context, image, tag string, metaHead }() imagePushConfig := &distribution.ImagePushConfig{ - MetaHeaders: metaHeaders, - AuthConfig: authConfig, - ProgressOutput: progress.ChanOutput(progressChan), - RegistryService: daemon.RegistryService, - ImageEventLogger: daemon.LogImageEvent, - MetadataStore: daemon.distributionMetadataStore, - LayerStore: daemon.layerStore, - ImageStore: daemon.imageStore, - ReferenceStore: daemon.referenceStore, - TrustKey: daemon.trustKey, - UploadManager: daemon.uploadManager, + Config: distribution.Config{ + MetaHeaders: metaHeaders, + AuthConfig: authConfig, + ProgressOutput: progress.ChanOutput(progressChan), + RegistryService: daemon.RegistryService, + ImageEventLogger: daemon.LogImageEvent, + MetadataStore: daemon.distributionMetadataStore, + ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore), + ReferenceStore: daemon.referenceStore, + }, + ConfigMediaType: schema2.MediaTypeImageConfig, + LayerStore: distribution.NewLayerProviderFromStore(daemon.layerStore), + TrustKey: daemon.trustKey, + UploadManager: daemon.uploadManager, } err = distribution.Push(ctx, ref, imagePushConfig) diff --git a/distribution/config.go b/distribution/config.go new file mode 100644 index 0000000000..78cf0530ca --- /dev/null +++ b/distribution/config.go @@ -0,0 +1,233 @@ +package distribution + +import ( + "encoding/json" + "fmt" + "io" + "runtime" + + "github.com/docker/distribution" + "github.com/docker/distribution/digest" + "github.com/docker/distribution/manifest/schema2" + "github.com/docker/docker/api/types" + "github.com/docker/docker/distribution/metadata" + "github.com/docker/docker/distribution/xfer" + "github.com/docker/docker/image" + "github.com/docker/docker/layer" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/reference" + "github.com/docker/docker/registry" + "github.com/docker/libtrust" + "golang.org/x/net/context" +) + +// Config stores configuration for communicating +// with a registry. +type Config struct { + // MetaHeaders stores HTTP headers with metadata about the image + MetaHeaders map[string][]string + // AuthConfig holds authentication credentials for authenticating with + // the registry. + AuthConfig *types.AuthConfig + // ProgressOutput is the interface for showing the status of the pull + // operation. + ProgressOutput progress.Output + // RegistryService is the registry service to use for TLS configuration + // and endpoint lookup. + RegistryService registry.Service + // ImageEventLogger notifies events for a given image + ImageEventLogger func(id, name, action string) + // MetadataStore is the storage backend for distribution-specific + // metadata. + MetadataStore metadata.Store + // ImageStore manages images. + ImageStore ImageConfigStore + // ReferenceStore manages tags. This value is optional, when excluded + // content will not be tagged. + ReferenceStore reference.Store + // RequireSchema2 ensures that only schema2 manifests are used. + RequireSchema2 bool +} + +// ImagePullConfig stores pull configuration. +type ImagePullConfig struct { + Config + + // DownloadManager manages concurrent pulls. + DownloadManager RootFSDownloadManager + // Schema2Types is the valid schema2 configuration types allowed + // by the pull operation. + Schema2Types []string +} + +// ImagePushConfig stores push configuration. +type ImagePushConfig struct { + Config + + // ConfigMediaType is the configuration media type for + // schema2 manifests. + ConfigMediaType string + // LayerStore manages layers. + LayerStore PushLayerProvider + // TrustKey is the private key for legacy signatures. This is typically + // an ephemeral key, since these signatures are no longer verified. + TrustKey libtrust.PrivateKey + // UploadManager dispatches uploads. + UploadManager *xfer.LayerUploadManager +} + +// ImageConfigStore handles storing and getting image configurations +// by digest. Allows getting an image configurations rootfs from the +// configuration. +type ImageConfigStore interface { + Put([]byte) (digest.Digest, error) + Get(digest.Digest) ([]byte, error) + RootFSFromConfig([]byte) (*image.RootFS, error) +} + +// PushLayerProvider provides layers to be pushed by ChainID. +type PushLayerProvider interface { + Get(layer.ChainID) (PushLayer, error) +} + +// PushLayer is a pushable layer with metadata about the layer +// and access to the content of the layer. +type PushLayer interface { + ChainID() layer.ChainID + DiffID() layer.DiffID + Parent() PushLayer + Open() (io.ReadCloser, error) + Size() (int64, error) + MediaType() string + Release() +} + +// RootFSDownloadManager handles downloading of the rootfs +type RootFSDownloadManager interface { + // Download downloads the layers into the given initial rootfs and + // returns the final rootfs. + // Given progress output to track download progress + // Returns function to release download resources + Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) +} + +type imageConfigStore struct { + image.Store +} + +// NewImageConfigStoreFromStore returns an ImageConfigStore backed +// by an image.Store for container images. +func NewImageConfigStoreFromStore(is image.Store) ImageConfigStore { + return &imageConfigStore{ + Store: is, + } +} + +func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) { + id, err := s.Store.Create(c) + return digest.Digest(id), err +} + +func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) { + img, err := s.Store.Get(image.IDFromDigest(d)) + if err != nil { + return nil, err + } + return img.RawJSON(), nil +} + +func (s *imageConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) { + var unmarshalledConfig image.Image + if err := json.Unmarshal(c, &unmarshalledConfig); err != nil { + return nil, err + } + + // fail immediately on windows + if runtime.GOOS == "windows" && unmarshalledConfig.OS == "linux" { + return nil, fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS) + } + + return unmarshalledConfig.RootFS, nil +} + +type storeLayerProvider struct { + ls layer.Store +} + +// NewLayerProviderFromStore returns a layer provider backed by +// an instance of LayerStore. Only getting layers as gzipped +// tars is supported. +func NewLayerProviderFromStore(ls layer.Store) PushLayerProvider { + return &storeLayerProvider{ + ls: ls, + } +} + +func (p *storeLayerProvider) Get(lid layer.ChainID) (PushLayer, error) { + if lid == "" { + return &storeLayer{ + Layer: layer.EmptyLayer, + }, nil + } + l, err := p.ls.Get(lid) + if err != nil { + return nil, err + } + + sl := storeLayer{ + Layer: l, + ls: p.ls, + } + if d, ok := l.(distribution.Describable); ok { + return &describableStoreLayer{ + storeLayer: sl, + describable: d, + }, nil + } + + return &sl, nil +} + +type storeLayer struct { + layer.Layer + ls layer.Store +} + +func (l *storeLayer) Parent() PushLayer { + p := l.Layer.Parent() + if p == nil { + return nil + } + return &storeLayer{ + Layer: p, + ls: l.ls, + } +} + +func (l *storeLayer) Open() (io.ReadCloser, error) { + return l.Layer.TarStream() +} + +func (l *storeLayer) Size() (int64, error) { + return l.Layer.DiffSize() +} + +func (l *storeLayer) MediaType() string { + // layer store always returns uncompressed tars + return schema2.MediaTypeUncompressedLayer +} + +func (l *storeLayer) Release() { + if l.ls != nil { + layer.ReleaseAndLog(l.ls, l.Layer) + } +} + +type describableStoreLayer struct { + storeLayer + describable distribution.Describable +} + +func (l *describableStoreLayer) Descriptor() distribution.Descriptor { + return l.describable.Descriptor() +} diff --git a/distribution/pull.go b/distribution/pull.go index b631788b49..a0acfe5b6b 100644 --- a/distribution/pull.go +++ b/distribution/pull.go @@ -6,42 +6,13 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/distribution/digest" "github.com/docker/docker/api" - "github.com/docker/docker/api/types" "github.com/docker/docker/distribution/metadata" - "github.com/docker/docker/distribution/xfer" - "github.com/docker/docker/image" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/reference" "github.com/docker/docker/registry" "golang.org/x/net/context" ) -// ImagePullConfig stores pull configuration. -type ImagePullConfig struct { - // MetaHeaders stores HTTP headers with metadata about the image - MetaHeaders map[string][]string - // AuthConfig holds authentication credentials for authenticating with - // the registry. - AuthConfig *types.AuthConfig - // ProgressOutput is the interface for showing the status of the pull - // operation. - ProgressOutput progress.Output - // RegistryService is the registry service to use for TLS configuration - // and endpoint lookup. - RegistryService registry.Service - // ImageEventLogger notifies events for a given image - ImageEventLogger func(id, name, action string) - // MetadataStore is the storage backend for distribution-specific - // metadata. - MetadataStore metadata.Store - // ImageStore manages images. - ImageStore image.Store - // ReferenceStore manages tags. - ReferenceStore reference.Store - // DownloadManager manages concurrent pulls. - DownloadManager *xfer.LayerDownloadManager -} - // Puller is an interface that abstracts pulling for different API versions. type Puller interface { // Pull tries to pull the image referenced by `tag` @@ -117,6 +88,10 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo confirmedTLSRegistries = make(map[string]struct{}) ) for _, endpoint := range endpoints { + if imagePullConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 { + continue + } + if confirmedV2 && endpoint.Version == registry.APIVersion1 { logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL) continue diff --git a/distribution/pull_v1.go b/distribution/pull_v1.go index f02e8fd75e..f44ed4f371 100644 --- a/distribution/pull_v1.go +++ b/distribution/pull_v1.go @@ -243,13 +243,15 @@ func (p *v1Puller) pullImage(ctx context.Context, v1ID, endpoint string, localNa return err } - imageID, err := p.config.ImageStore.Create(config) + imageID, err := p.config.ImageStore.Put(config) if err != nil { return err } - if err := p.config.ReferenceStore.AddTag(localNameRef, imageID.Digest(), true); err != nil { - return err + if p.config.ReferenceStore != nil { + if err := p.config.ReferenceStore.AddTag(localNameRef, imageID, true); err != nil { + return err + } } return nil diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index 806ca85382..88807edc7d 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -33,9 +33,8 @@ import ( ) var ( - errRootFSMismatch = errors.New("layers from manifest don't match image configuration") - errMediaTypePlugin = errors.New("target is a plugin") - errRootFSInvalid = errors.New("invalid rootfs in image configuration") + errRootFSMismatch = errors.New("layers from manifest don't match image configuration") + errRootFSInvalid = errors.New("invalid rootfs in image configuration") ) // ImageConfigPullError is an error pulling the image config blob @@ -355,8 +354,19 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat } if m, ok := manifest.(*schema2.DeserializedManifest); ok { - if m.Manifest.Config.MediaType == schema2.MediaTypePluginConfig { - return false, errMediaTypePlugin + var allowedMediatype bool + for _, t := range p.config.Schema2Types { + if m.Manifest.Config.MediaType == t { + allowedMediatype = true + break + } + } + if !allowedMediatype { + configClass := mediaTypeClasses[m.Manifest.Config.MediaType] + if configClass == "" { + configClass = "unknown" + } + return false, fmt.Errorf("target is %s", configClass) } } @@ -374,6 +384,9 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat switch v := manifest.(type) { case *schema1.SignedManifest: + if p.config.RequireSchema2 { + return false, fmt.Errorf("invalid manifest: not schema2") + } id, manifestDigest, err = p.pullSchema1(ctx, ref, v) if err != nil { return false, err @@ -394,25 +407,27 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat progress.Message(p.config.ProgressOutput, "", "Digest: "+manifestDigest.String()) - oldTagID, err := p.config.ReferenceStore.Get(ref) - if err == nil { - if oldTagID == id { - return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id) + if p.config.ReferenceStore != nil { + oldTagID, err := p.config.ReferenceStore.Get(ref) + if err == nil { + if oldTagID == id { + return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id) + } + } else if err != reference.ErrDoesNotExist { + return false, err } - } else if err != reference.ErrDoesNotExist { - return false, err - } - if canonical, ok := ref.(reference.Canonical); ok { - if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil { - return false, err - } - } else { - if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil { - return false, err - } - if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil { - return false, err + if canonical, ok := ref.(reference.Canonical); ok { + if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil { + return false, err + } + } else { + if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil { + return false, err + } + if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil { + return false, err + } } } return true, nil @@ -481,14 +496,14 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif return "", "", err } - imageID, err := p.config.ImageStore.Create(config) + imageID, err := p.config.ImageStore.Put(config) if err != nil { return "", "", err } manifestDigest = digest.FromBytes(unverifiedManifest.Canonical) - return imageID.Digest(), manifestDigest, nil + return imageID, manifestDigest, nil } func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest) (id digest.Digest, manifestDigest digest.Digest, err error) { @@ -498,7 +513,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s } target := mfst.Target() - if _, err := p.config.ImageStore.Get(image.IDFromDigest(target.Digest)); err == nil { + if _, err := p.config.ImageStore.Get(target.Digest); err == nil { // If the image already exists locally, no need to pull // anything. return target.Digest, manifestDigest, nil @@ -537,9 +552,9 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s }() var ( - configJSON []byte // raw serialized image config - unmarshalledConfig image.Image // deserialized image config - downloadRootFS image.RootFS // rootFS to use for registering layers. + configJSON []byte // raw serialized image config + downloadedRootFS *image.RootFS // rootFS from registered layers + configRootFS *image.RootFS // rootFS from configuration ) // https://github.com/docker/docker/issues/24766 - Err on the side of caution, @@ -551,84 +566,87 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s // check to block Windows images being pulled on Linux is implemented, it // may be necessary to perform the same type of serialisation. if runtime.GOOS == "windows" { - configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan) + configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan) if err != nil { return "", "", err } - if unmarshalledConfig.RootFS == nil { + if configRootFS == nil { return "", "", errRootFSInvalid } - - if unmarshalledConfig.OS == "linux" { - return "", "", fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS) - } } - downloadRootFS = *image.NewRootFS() - - rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput) - if err != nil { - if configJSON != nil { - // Already received the config - return "", "", err - } - select { - case err = <-errChan: - return "", "", err - default: - cancel() - select { - case <-configChan: - case <-errChan: + if p.config.DownloadManager != nil { + downloadRootFS := *image.NewRootFS() + rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput) + if err != nil { + if configJSON != nil { + // Already received the config + return "", "", err + } + select { + case err = <-errChan: + return "", "", err + default: + cancel() + select { + case <-configChan: + case <-errChan: + } + return "", "", err } - return "", "", err } + if release != nil { + defer release() + } + + downloadedRootFS = &rootFS } - defer release() if configJSON == nil { - configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan) + configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan) if err != nil { return "", "", err } - if unmarshalledConfig.RootFS == nil { + if configRootFS == nil { return "", "", errRootFSInvalid } } - // The DiffIDs returned in rootFS MUST match those in the config. - // Otherwise the image config could be referencing layers that aren't - // included in the manifest. - if len(rootFS.DiffIDs) != len(unmarshalledConfig.RootFS.DiffIDs) { - return "", "", errRootFSMismatch - } - - for i := range rootFS.DiffIDs { - if rootFS.DiffIDs[i] != unmarshalledConfig.RootFS.DiffIDs[i] { + if downloadedRootFS != nil { + // The DiffIDs returned in rootFS MUST match those in the config. + // Otherwise the image config could be referencing layers that aren't + // included in the manifest. + if len(downloadedRootFS.DiffIDs) != len(configRootFS.DiffIDs) { return "", "", errRootFSMismatch } + + for i := range downloadedRootFS.DiffIDs { + if downloadedRootFS.DiffIDs[i] != configRootFS.DiffIDs[i] { + return "", "", errRootFSMismatch + } + } } - imageID, err := p.config.ImageStore.Create(configJSON) + imageID, err := p.config.ImageStore.Put(configJSON) if err != nil { return "", "", err } - return imageID.Digest(), manifestDigest, nil + return imageID, manifestDigest, nil } -func receiveConfig(configChan <-chan []byte, errChan <-chan error) ([]byte, image.Image, error) { +func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, error) { select { case configJSON := <-configChan: - var unmarshalledConfig image.Image - if err := json.Unmarshal(configJSON, &unmarshalledConfig); err != nil { - return nil, image.Image{}, err + rootfs, err := s.RootFSFromConfig(configJSON) + if err != nil { + return nil, nil, err } - return configJSON, unmarshalledConfig, nil + return configJSON, rootfs, nil case err := <-errChan: - return nil, image.Image{}, err + return nil, nil, err // Don't need a case for ctx.Done in the select because cancellation // will trigger an error in p.pullSchema2ImageConfig. } diff --git a/distribution/push.go b/distribution/push.go index e696a4e109..d35bdb103e 100644 --- a/distribution/push.go +++ b/distribution/push.go @@ -7,49 +7,13 @@ import ( "io" "github.com/Sirupsen/logrus" - "github.com/docker/docker/api/types" "github.com/docker/docker/distribution/metadata" - "github.com/docker/docker/distribution/xfer" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" "github.com/docker/docker/pkg/progress" "github.com/docker/docker/reference" "github.com/docker/docker/registry" - "github.com/docker/libtrust" "golang.org/x/net/context" ) -// ImagePushConfig stores push configuration. -type ImagePushConfig struct { - // MetaHeaders store HTTP headers with metadata about the image - MetaHeaders map[string][]string - // AuthConfig holds authentication credentials for authenticating with - // the registry. - AuthConfig *types.AuthConfig - // ProgressOutput is the interface for showing the status of the push - // operation. - ProgressOutput progress.Output - // RegistryService is the registry service to use for TLS configuration - // and endpoint lookup. - RegistryService registry.Service - // ImageEventLogger notifies events for a given image - ImageEventLogger func(id, name, action string) - // MetadataStore is the storage backend for distribution-specific - // metadata. - MetadataStore metadata.Store - // LayerStore manages layers. - LayerStore layer.Store - // ImageStore manages images. - ImageStore image.Store - // ReferenceStore manages tags. - ReferenceStore reference.Store - // TrustKey is the private key for legacy signatures. This is typically - // an ephemeral key, since these signatures are no longer verified. - TrustKey libtrust.PrivateKey - // UploadManager dispatches uploads. - UploadManager *xfer.LayerUploadManager -} - // Pusher is an interface that abstracts pushing for different API versions. type Pusher interface { // Push tries to push the image configured at the creation of Pusher. @@ -127,6 +91,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo ) for _, endpoint := range endpoints { + if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 { + continue + } if confirmedV2 && endpoint.Version == registry.APIVersion1 { logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL) continue diff --git a/distribution/push_v1.go b/distribution/push_v1.go index b18958cdbd..257ac181ec 100644 --- a/distribution/push_v1.go +++ b/distribution/push_v1.go @@ -137,7 +137,7 @@ func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) (*v1Dependen } // Retrieve the all the images to be uploaded in the correct order -func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []layer.Layer, err error) { +func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) { tagsByImage = make(map[image.ID][]string) // Ignore digest references @@ -202,25 +202,31 @@ func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID return } -func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]layer.Layer) (imageListForThisTag []v1Image, err error) { - img, err := p.config.ImageStore.Get(imgID) +func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) { + ics, ok := p.config.ImageStore.(*imageConfigStore) + if !ok { + return nil, fmt.Errorf("only image store images supported for v1 push") + } + img, err := ics.Store.Get(imgID) if err != nil { return nil, err } topLayerID := img.RootFS.ChainID() - var l layer.Layer - if topLayerID == "" { - l = layer.EmptyLayer - } else { - l, err = p.config.LayerStore.Get(topLayerID) - *referencedLayers = append(*referencedLayers, l) - if err != nil { - return nil, fmt.Errorf("failed to get top layer from image: %v", err) - } + pl, err := p.config.LayerStore.Get(topLayerID) + *referencedLayers = append(*referencedLayers, pl) + if err != nil { + return nil, fmt.Errorf("failed to get top layer from image: %v", err) } + // V1 push is deprecated, only support existing layerstore layers + lsl, ok := pl.(*storeLayer) + if !ok { + return nil, fmt.Errorf("only layer store layers supported for v1 push") + } + l := lsl.Layer + dependencyImages, parent, err := generateDependencyImages(l.Parent(), dependenciesSeen) if err != nil { return nil, err @@ -371,7 +377,7 @@ func (p *v1Pusher) pushRepository(ctx context.Context) error { imgList, tags, referencedLayers, err := p.getImageList() defer func() { for _, l := range referencedLayers { - p.config.LayerStore.Release(l) + l.Release() } }() if err != nil { diff --git a/distribution/push_v2.go b/distribution/push_v2.go index d72882b0d9..1f8c822fec 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -20,7 +20,6 @@ import ( "github.com/docker/distribution/registry/client" "github.com/docker/docker/distribution/metadata" "github.com/docker/docker/distribution/xfer" - "github.com/docker/docker/image" "github.com/docker/docker/layer" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/progress" @@ -123,24 +122,22 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { logrus.Debugf("Pushing repository: %s", ref.String()) - img, err := p.config.ImageStore.Get(image.IDFromDigest(id)) + imgConfig, err := p.config.ImageStore.Get(id) if err != nil { return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err) } - var l layer.Layer - - topLayerID := img.RootFS.ChainID() - if topLayerID == "" { - l = layer.EmptyLayer - } else { - l, err = p.config.LayerStore.Get(topLayerID) - if err != nil { - return fmt.Errorf("failed to get top layer from image: %v", err) - } - defer layer.ReleaseAndLog(p.config.LayerStore, l) + rootfs, err := p.config.ImageStore.RootFSFromConfig(imgConfig) + if err != nil { + return fmt.Errorf("unable to get rootfs for image %s: %s", ref.String(), err) } + l, err := p.config.LayerStore.Get(rootfs.ChainID()) + if err != nil { + return fmt.Errorf("failed to get top layer from image: %v", err) + } + defer l.Release() + hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig) if err != nil { return fmt.Errorf("failed to compute hmac key of auth config: %v", err) @@ -158,7 +155,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id } // Loop bounds condition is to avoid pushing the base layer on Windows. - for i := 0; i < len(img.RootFS.DiffIDs); i++ { + for i := 0; i < len(rootfs.DiffIDs); i++ { descriptor := descriptorTemplate descriptor.layer = l descriptor.checkedDigests = make(map[digest.Digest]struct{}) @@ -172,7 +169,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id } // Try schema2 first - builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON()) + builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig) manifest, err := manifestFromBuilder(ctx, builder, descriptors) if err != nil { return err @@ -185,7 +182,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())} if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil { - if runtime.GOOS == "windows" { + if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 { logrus.Warnf("failed to upload schema2 manifest: %v", err) return err } @@ -196,7 +193,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id if err != nil { return err } - builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON()) + builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig) manifest, err = manifestFromBuilder(ctx, builder, descriptors) if err != nil { return err @@ -246,7 +243,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild } type v2PushDescriptor struct { - layer layer.Layer + layer PushLayer v2MetadataService metadata.V2MetadataService hmacKey []byte repoInfo reference.Named @@ -425,26 +422,32 @@ func (pd *v2PushDescriptor) uploadUsingSession( diffID layer.DiffID, layerUpload distribution.BlobWriter, ) (distribution.Descriptor, error) { - arch, err := pd.layer.TarStream() - if err != nil { - return distribution.Descriptor{}, xfer.DoNotRetry{Err: err} + var reader io.ReadCloser + + contentReader, err := pd.layer.Open() + size, _ := pd.layer.Size() + + reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing") + + switch m := pd.layer.MediaType(); m { + case schema2.MediaTypeUncompressedLayer: + compressedReader, compressionDone := compress(reader) + defer func(closer io.Closer) { + closer.Close() + <-compressionDone + }(reader) + reader = compressedReader + case schema2.MediaTypeLayer: + default: + reader.Close() + return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m) } - // don't care if this fails; best effort - size, _ := pd.layer.DiffSize() - - reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing") - compressedReader, compressionDone := compress(reader) - defer func() { - reader.Close() - <-compressionDone - }() - digester := digest.Canonical.New() - tee := io.TeeReader(compressedReader, digester.Hash()) + tee := io.TeeReader(reader, digester.Hash()) nn, err := layerUpload.ReadFrom(tee) - compressedReader.Close() + reader.Close() if err != nil { return distribution.Descriptor{}, retryOnError(err) } @@ -568,8 +571,8 @@ attempts: // repository and whether the check shall be done also with digests mapped to different repositories. The // decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost // of upload does not outweigh a latency. -func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) { - size, err := layer.DiffSize() +func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) { + size, err := layer.Size() switch { // big blob case size > middleLayerMaximumSize: diff --git a/distribution/push_v2_test.go b/distribution/push_v2_test.go index c56f50bdae..6a5216b1d0 100644 --- a/distribution/push_v2_test.go +++ b/distribution/push_v2_test.go @@ -387,9 +387,11 @@ func TestLayerAlreadyExists(t *testing.T) { ctx := context.Background() ms := &mockV2MetadataService{} pd := &v2PushDescriptor{ - hmacKey: []byte(tc.hmacKey), - repoInfo: repoInfo, - layer: layer.EmptyLayer, + hmacKey: []byte(tc.hmacKey), + repoInfo: repoInfo, + layer: &storeLayer{ + Layer: layer.EmptyLayer, + }, repo: repo, v2MetadataService: ms, pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)}, diff --git a/distribution/registry.go b/distribution/registry.go index 4c3513046d..3b2fdf0379 100644 --- a/distribution/registry.go +++ b/distribution/registry.go @@ -7,6 +7,7 @@ import ( "time" "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema2" distreference "github.com/docker/distribution/reference" "github.com/docker/distribution/registry/client" "github.com/docker/distribution/registry/client/auth" @@ -18,6 +19,34 @@ import ( "golang.org/x/net/context" ) +// ImageTypes represents the schema2 config types for images +var ImageTypes = []string{ + schema2.MediaTypeImageConfig, + // Handle unexpected values from https://github.com/docker/distribution/issues/1621 + "application/octet-stream", + // Treat defaulted values as images, newer types cannot be implied + "", +} + +// PluginTypes represents the schema2 config types for plugins +var PluginTypes = []string{ + schema2.MediaTypePluginConfig, +} + +var mediaTypeClasses map[string]string + +func init() { + // initialize media type classes with all know types for + // plugin + mediaTypeClasses = map[string]string{} + for _, t := range ImageTypes { + mediaTypeClasses[t] = "image" + } + for _, t := range PluginTypes { + mediaTypeClasses[t] = "plugin" + } +} + // NewV2Repository returns a repository (v2 only). It creates an HTTP transport // providing timeout settings and authentication support, and also verifies the // remote API version. diff --git a/distribution/registry_unit_test.go b/distribution/registry_unit_test.go index 23edc095e1..406de34915 100644 --- a/distribution/registry_unit_test.go +++ b/distribution/registry_unit_test.go @@ -70,10 +70,13 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) { Official: false, } imagePullConfig := &ImagePullConfig{ - MetaHeaders: http.Header{}, - AuthConfig: &types.AuthConfig{ - RegistryToken: secretRegistryToken, + Config: Config{ + MetaHeaders: http.Header{}, + AuthConfig: &types.AuthConfig{ + RegistryToken: secretRegistryToken, + }, }, + Schema2Types: ImageTypes, } puller, err := newPuller(endpoint, repoInfo, imagePullConfig) if err != nil {