Abstract distribution interfaces from image specific types

Move configurations into a single file.
Abstract download manager in pull config.
Add supports for schema2 only and schema2 type checking.
Add interface for providing push layers.
Abstract image store to generically handle configurations.

Signed-off-by: Derek McGowan <derek@mcgstyle.net>
This commit is contained in:
Derek McGowan 2016-12-16 11:19:05 -08:00
parent 61ac7c4bf8
commit 3c7676a057
No known key found for this signature in database
GPG Key ID: F58C5D0A4405ACDB
13 changed files with 459 additions and 214 deletions

View File

@ -74,7 +74,7 @@ func runPull(dockerCli *command.DockerCli, opts pullOptions) error {
err = imagePullPrivileged(ctx, dockerCli, authConfig, distributionRef.String(), requestPrivilege, opts.all)
}
if err != nil {
if strings.Contains(err.Error(), "target is a plugin") {
if strings.Contains(err.Error(), "target is plugin") {
return errors.New(err.Error() + " - Use `docker plugin install`")
}
return err

View File

@ -89,15 +89,18 @@ func (daemon *Daemon) pullImageWithReference(ctx context.Context, ref reference.
}()
imagePullConfig := &distribution.ImagePullConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: daemon.imageStore,
ReferenceStore: daemon.referenceStore,
DownloadManager: daemon.downloadManager,
Config: distribution.Config{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore),
ReferenceStore: daemon.referenceStore,
},
DownloadManager: daemon.downloadManager,
Schema2Types: distribution.ImageTypes,
}
err := distribution.Pull(ctx, ref, imagePullConfig)

View File

@ -3,6 +3,7 @@ package daemon
import (
"io"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution"
"github.com/docker/docker/pkg/progress"
@ -38,17 +39,20 @@ func (daemon *Daemon) PushImage(ctx context.Context, image, tag string, metaHead
}()
imagePushConfig := &distribution.ImagePushConfig{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
LayerStore: daemon.layerStore,
ImageStore: daemon.imageStore,
ReferenceStore: daemon.referenceStore,
TrustKey: daemon.trustKey,
UploadManager: daemon.uploadManager,
Config: distribution.Config{
MetaHeaders: metaHeaders,
AuthConfig: authConfig,
ProgressOutput: progress.ChanOutput(progressChan),
RegistryService: daemon.RegistryService,
ImageEventLogger: daemon.LogImageEvent,
MetadataStore: daemon.distributionMetadataStore,
ImageStore: distribution.NewImageConfigStoreFromStore(daemon.imageStore),
ReferenceStore: daemon.referenceStore,
},
ConfigMediaType: schema2.MediaTypeImageConfig,
LayerStore: distribution.NewLayerProviderFromStore(daemon.layerStore),
TrustKey: daemon.trustKey,
UploadManager: daemon.uploadManager,
}
err = distribution.Push(ctx, ref, imagePushConfig)

233
distribution/config.go Normal file
View File

@ -0,0 +1,233 @@
package distribution
import (
"encoding/json"
"fmt"
"io"
"runtime"
"github.com/docker/distribution"
"github.com/docker/distribution/digest"
"github.com/docker/distribution/manifest/schema2"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
// Config stores configuration for communicating
// with a registry.
type Config struct {
// MetaHeaders stores HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the pull
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// ImageStore manages images.
ImageStore ImageConfigStore
// ReferenceStore manages tags. This value is optional, when excluded
// content will not be tagged.
ReferenceStore reference.Store
// RequireSchema2 ensures that only schema2 manifests are used.
RequireSchema2 bool
}
// ImagePullConfig stores pull configuration.
type ImagePullConfig struct {
Config
// DownloadManager manages concurrent pulls.
DownloadManager RootFSDownloadManager
// Schema2Types is the valid schema2 configuration types allowed
// by the pull operation.
Schema2Types []string
}
// ImagePushConfig stores push configuration.
type ImagePushConfig struct {
Config
// ConfigMediaType is the configuration media type for
// schema2 manifests.
ConfigMediaType string
// LayerStore manages layers.
LayerStore PushLayerProvider
// TrustKey is the private key for legacy signatures. This is typically
// an ephemeral key, since these signatures are no longer verified.
TrustKey libtrust.PrivateKey
// UploadManager dispatches uploads.
UploadManager *xfer.LayerUploadManager
}
// ImageConfigStore handles storing and getting image configurations
// by digest. Allows getting an image configurations rootfs from the
// configuration.
type ImageConfigStore interface {
Put([]byte) (digest.Digest, error)
Get(digest.Digest) ([]byte, error)
RootFSFromConfig([]byte) (*image.RootFS, error)
}
// PushLayerProvider provides layers to be pushed by ChainID.
type PushLayerProvider interface {
Get(layer.ChainID) (PushLayer, error)
}
// PushLayer is a pushable layer with metadata about the layer
// and access to the content of the layer.
type PushLayer interface {
ChainID() layer.ChainID
DiffID() layer.DiffID
Parent() PushLayer
Open() (io.ReadCloser, error)
Size() (int64, error)
MediaType() string
Release()
}
// RootFSDownloadManager handles downloading of the rootfs
type RootFSDownloadManager interface {
// Download downloads the layers into the given initial rootfs and
// returns the final rootfs.
// Given progress output to track download progress
// Returns function to release download resources
Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error)
}
type imageConfigStore struct {
image.Store
}
// NewImageConfigStoreFromStore returns an ImageConfigStore backed
// by an image.Store for container images.
func NewImageConfigStoreFromStore(is image.Store) ImageConfigStore {
return &imageConfigStore{
Store: is,
}
}
func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) {
id, err := s.Store.Create(c)
return digest.Digest(id), err
}
func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) {
img, err := s.Store.Get(image.IDFromDigest(d))
if err != nil {
return nil, err
}
return img.RawJSON(), nil
}
func (s *imageConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) {
var unmarshalledConfig image.Image
if err := json.Unmarshal(c, &unmarshalledConfig); err != nil {
return nil, err
}
// fail immediately on windows
if runtime.GOOS == "windows" && unmarshalledConfig.OS == "linux" {
return nil, fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS)
}
return unmarshalledConfig.RootFS, nil
}
type storeLayerProvider struct {
ls layer.Store
}
// NewLayerProviderFromStore returns a layer provider backed by
// an instance of LayerStore. Only getting layers as gzipped
// tars is supported.
func NewLayerProviderFromStore(ls layer.Store) PushLayerProvider {
return &storeLayerProvider{
ls: ls,
}
}
func (p *storeLayerProvider) Get(lid layer.ChainID) (PushLayer, error) {
if lid == "" {
return &storeLayer{
Layer: layer.EmptyLayer,
}, nil
}
l, err := p.ls.Get(lid)
if err != nil {
return nil, err
}
sl := storeLayer{
Layer: l,
ls: p.ls,
}
if d, ok := l.(distribution.Describable); ok {
return &describableStoreLayer{
storeLayer: sl,
describable: d,
}, nil
}
return &sl, nil
}
type storeLayer struct {
layer.Layer
ls layer.Store
}
func (l *storeLayer) Parent() PushLayer {
p := l.Layer.Parent()
if p == nil {
return nil
}
return &storeLayer{
Layer: p,
ls: l.ls,
}
}
func (l *storeLayer) Open() (io.ReadCloser, error) {
return l.Layer.TarStream()
}
func (l *storeLayer) Size() (int64, error) {
return l.Layer.DiffSize()
}
func (l *storeLayer) MediaType() string {
// layer store always returns uncompressed tars
return schema2.MediaTypeUncompressedLayer
}
func (l *storeLayer) Release() {
if l.ls != nil {
layer.ReleaseAndLog(l.ls, l.Layer)
}
}
type describableStoreLayer struct {
storeLayer
describable distribution.Describable
}
func (l *describableStoreLayer) Descriptor() distribution.Descriptor {
return l.describable.Descriptor()
}

View File

@ -6,42 +6,13 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/docker/api"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"golang.org/x/net/context"
)
// ImagePullConfig stores pull configuration.
type ImagePullConfig struct {
// MetaHeaders stores HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the pull
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// ImageStore manages images.
ImageStore image.Store
// ReferenceStore manages tags.
ReferenceStore reference.Store
// DownloadManager manages concurrent pulls.
DownloadManager *xfer.LayerDownloadManager
}
// Puller is an interface that abstracts pulling for different API versions.
type Puller interface {
// Pull tries to pull the image referenced by `tag`
@ -117,6 +88,10 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo
confirmedTLSRegistries = make(map[string]struct{})
)
for _, endpoint := range endpoints {
if imagePullConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
continue
}
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue

View File

@ -243,13 +243,15 @@ func (p *v1Puller) pullImage(ctx context.Context, v1ID, endpoint string, localNa
return err
}
imageID, err := p.config.ImageStore.Create(config)
imageID, err := p.config.ImageStore.Put(config)
if err != nil {
return err
}
if err := p.config.ReferenceStore.AddTag(localNameRef, imageID.Digest(), true); err != nil {
return err
if p.config.ReferenceStore != nil {
if err := p.config.ReferenceStore.AddTag(localNameRef, imageID, true); err != nil {
return err
}
}
return nil

View File

@ -33,9 +33,8 @@ import (
)
var (
errRootFSMismatch = errors.New("layers from manifest don't match image configuration")
errMediaTypePlugin = errors.New("target is a plugin")
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
errRootFSMismatch = errors.New("layers from manifest don't match image configuration")
errRootFSInvalid = errors.New("invalid rootfs in image configuration")
)
// ImageConfigPullError is an error pulling the image config blob
@ -355,8 +354,19 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
}
if m, ok := manifest.(*schema2.DeserializedManifest); ok {
if m.Manifest.Config.MediaType == schema2.MediaTypePluginConfig {
return false, errMediaTypePlugin
var allowedMediatype bool
for _, t := range p.config.Schema2Types {
if m.Manifest.Config.MediaType == t {
allowedMediatype = true
break
}
}
if !allowedMediatype {
configClass := mediaTypeClasses[m.Manifest.Config.MediaType]
if configClass == "" {
configClass = "unknown"
}
return false, fmt.Errorf("target is %s", configClass)
}
}
@ -374,6 +384,9 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
switch v := manifest.(type) {
case *schema1.SignedManifest:
if p.config.RequireSchema2 {
return false, fmt.Errorf("invalid manifest: not schema2")
}
id, manifestDigest, err = p.pullSchema1(ctx, ref, v)
if err != nil {
return false, err
@ -394,25 +407,27 @@ func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named) (tagUpdat
progress.Message(p.config.ProgressOutput, "", "Digest: "+manifestDigest.String())
oldTagID, err := p.config.ReferenceStore.Get(ref)
if err == nil {
if oldTagID == id {
return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id)
if p.config.ReferenceStore != nil {
oldTagID, err := p.config.ReferenceStore.Get(ref)
if err == nil {
if oldTagID == id {
return false, addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id)
}
} else if err != reference.ErrDoesNotExist {
return false, err
}
} else if err != reference.ErrDoesNotExist {
return false, err
}
if canonical, ok := ref.(reference.Canonical); ok {
if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil {
return false, err
}
} else {
if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
return false, err
}
if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil {
return false, err
if canonical, ok := ref.(reference.Canonical); ok {
if err = p.config.ReferenceStore.AddDigest(canonical, id, true); err != nil {
return false, err
}
} else {
if err = addDigestReference(p.config.ReferenceStore, ref, manifestDigest, id); err != nil {
return false, err
}
if err = p.config.ReferenceStore.AddTag(ref, id, true); err != nil {
return false, err
}
}
}
return true, nil
@ -481,14 +496,14 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Named, unverif
return "", "", err
}
imageID, err := p.config.ImageStore.Create(config)
imageID, err := p.config.ImageStore.Put(config)
if err != nil {
return "", "", err
}
manifestDigest = digest.FromBytes(unverifiedManifest.Canonical)
return imageID.Digest(), manifestDigest, nil
return imageID, manifestDigest, nil
}
func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *schema2.DeserializedManifest) (id digest.Digest, manifestDigest digest.Digest, err error) {
@ -498,7 +513,7 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
}
target := mfst.Target()
if _, err := p.config.ImageStore.Get(image.IDFromDigest(target.Digest)); err == nil {
if _, err := p.config.ImageStore.Get(target.Digest); err == nil {
// If the image already exists locally, no need to pull
// anything.
return target.Digest, manifestDigest, nil
@ -537,9 +552,9 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
}()
var (
configJSON []byte // raw serialized image config
unmarshalledConfig image.Image // deserialized image config
downloadRootFS image.RootFS // rootFS to use for registering layers.
configJSON []byte // raw serialized image config
downloadedRootFS *image.RootFS // rootFS from registered layers
configRootFS *image.RootFS // rootFS from configuration
)
// https://github.com/docker/docker/issues/24766 - Err on the side of caution,
@ -551,84 +566,87 @@ func (p *v2Puller) pullSchema2(ctx context.Context, ref reference.Named, mfst *s
// check to block Windows images being pulled on Linux is implemented, it
// may be necessary to perform the same type of serialisation.
if runtime.GOOS == "windows" {
configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan)
configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan)
if err != nil {
return "", "", err
}
if unmarshalledConfig.RootFS == nil {
if configRootFS == nil {
return "", "", errRootFSInvalid
}
if unmarshalledConfig.OS == "linux" {
return "", "", fmt.Errorf("image operating system %q cannot be used on this platform", unmarshalledConfig.OS)
}
}
downloadRootFS = *image.NewRootFS()
rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput)
if err != nil {
if configJSON != nil {
// Already received the config
return "", "", err
}
select {
case err = <-errChan:
return "", "", err
default:
cancel()
select {
case <-configChan:
case <-errChan:
if p.config.DownloadManager != nil {
downloadRootFS := *image.NewRootFS()
rootFS, release, err := p.config.DownloadManager.Download(ctx, downloadRootFS, descriptors, p.config.ProgressOutput)
if err != nil {
if configJSON != nil {
// Already received the config
return "", "", err
}
select {
case err = <-errChan:
return "", "", err
default:
cancel()
select {
case <-configChan:
case <-errChan:
}
return "", "", err
}
return "", "", err
}
if release != nil {
defer release()
}
downloadedRootFS = &rootFS
}
defer release()
if configJSON == nil {
configJSON, unmarshalledConfig, err = receiveConfig(configChan, errChan)
configJSON, configRootFS, err = receiveConfig(p.config.ImageStore, configChan, errChan)
if err != nil {
return "", "", err
}
if unmarshalledConfig.RootFS == nil {
if configRootFS == nil {
return "", "", errRootFSInvalid
}
}
// The DiffIDs returned in rootFS MUST match those in the config.
// Otherwise the image config could be referencing layers that aren't
// included in the manifest.
if len(rootFS.DiffIDs) != len(unmarshalledConfig.RootFS.DiffIDs) {
return "", "", errRootFSMismatch
}
for i := range rootFS.DiffIDs {
if rootFS.DiffIDs[i] != unmarshalledConfig.RootFS.DiffIDs[i] {
if downloadedRootFS != nil {
// The DiffIDs returned in rootFS MUST match those in the config.
// Otherwise the image config could be referencing layers that aren't
// included in the manifest.
if len(downloadedRootFS.DiffIDs) != len(configRootFS.DiffIDs) {
return "", "", errRootFSMismatch
}
for i := range downloadedRootFS.DiffIDs {
if downloadedRootFS.DiffIDs[i] != configRootFS.DiffIDs[i] {
return "", "", errRootFSMismatch
}
}
}
imageID, err := p.config.ImageStore.Create(configJSON)
imageID, err := p.config.ImageStore.Put(configJSON)
if err != nil {
return "", "", err
}
return imageID.Digest(), manifestDigest, nil
return imageID, manifestDigest, nil
}
func receiveConfig(configChan <-chan []byte, errChan <-chan error) ([]byte, image.Image, error) {
func receiveConfig(s ImageConfigStore, configChan <-chan []byte, errChan <-chan error) ([]byte, *image.RootFS, error) {
select {
case configJSON := <-configChan:
var unmarshalledConfig image.Image
if err := json.Unmarshal(configJSON, &unmarshalledConfig); err != nil {
return nil, image.Image{}, err
rootfs, err := s.RootFSFromConfig(configJSON)
if err != nil {
return nil, nil, err
}
return configJSON, unmarshalledConfig, nil
return configJSON, rootfs, nil
case err := <-errChan:
return nil, image.Image{}, err
return nil, nil, err
// Don't need a case for ctx.Done in the select because cancellation
// will trigger an error in p.pullSchema2ImageConfig.
}

View File

@ -7,49 +7,13 @@ import (
"io"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/progress"
"github.com/docker/docker/reference"
"github.com/docker/docker/registry"
"github.com/docker/libtrust"
"golang.org/x/net/context"
)
// ImagePushConfig stores push configuration.
type ImagePushConfig struct {
// MetaHeaders store HTTP headers with metadata about the image
MetaHeaders map[string][]string
// AuthConfig holds authentication credentials for authenticating with
// the registry.
AuthConfig *types.AuthConfig
// ProgressOutput is the interface for showing the status of the push
// operation.
ProgressOutput progress.Output
// RegistryService is the registry service to use for TLS configuration
// and endpoint lookup.
RegistryService registry.Service
// ImageEventLogger notifies events for a given image
ImageEventLogger func(id, name, action string)
// MetadataStore is the storage backend for distribution-specific
// metadata.
MetadataStore metadata.Store
// LayerStore manages layers.
LayerStore layer.Store
// ImageStore manages images.
ImageStore image.Store
// ReferenceStore manages tags.
ReferenceStore reference.Store
// TrustKey is the private key for legacy signatures. This is typically
// an ephemeral key, since these signatures are no longer verified.
TrustKey libtrust.PrivateKey
// UploadManager dispatches uploads.
UploadManager *xfer.LayerUploadManager
}
// Pusher is an interface that abstracts pushing for different API versions.
type Pusher interface {
// Push tries to push the image configured at the creation of Pusher.
@ -127,6 +91,9 @@ func Push(ctx context.Context, ref reference.Named, imagePushConfig *ImagePushCo
)
for _, endpoint := range endpoints {
if imagePushConfig.RequireSchema2 && endpoint.Version == registry.APIVersion1 {
continue
}
if confirmedV2 && endpoint.Version == registry.APIVersion1 {
logrus.Debugf("Skipping v1 endpoint %s because v2 registry was detected", endpoint.URL)
continue

View File

@ -137,7 +137,7 @@ func newV1DependencyImage(l layer.Layer, parent *v1DependencyImage) *v1Dependenc
}
// Retrieve the all the images to be uploaded in the correct order
func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []layer.Layer, err error) {
func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID][]string, referencedLayers []PushLayer, err error) {
tagsByImage = make(map[image.ID][]string)
// Ignore digest references
@ -202,25 +202,31 @@ func (p *v1Pusher) getImageList() (imageList []v1Image, tagsByImage map[image.ID
return
}
func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]layer.Layer) (imageListForThisTag []v1Image, err error) {
img, err := p.config.ImageStore.Get(imgID)
func (p *v1Pusher) imageListForTag(imgID image.ID, dependenciesSeen map[layer.ChainID]*v1DependencyImage, referencedLayers *[]PushLayer) (imageListForThisTag []v1Image, err error) {
ics, ok := p.config.ImageStore.(*imageConfigStore)
if !ok {
return nil, fmt.Errorf("only image store images supported for v1 push")
}
img, err := ics.Store.Get(imgID)
if err != nil {
return nil, err
}
topLayerID := img.RootFS.ChainID()
var l layer.Layer
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
*referencedLayers = append(*referencedLayers, l)
if err != nil {
return nil, fmt.Errorf("failed to get top layer from image: %v", err)
}
pl, err := p.config.LayerStore.Get(topLayerID)
*referencedLayers = append(*referencedLayers, pl)
if err != nil {
return nil, fmt.Errorf("failed to get top layer from image: %v", err)
}
// V1 push is deprecated, only support existing layerstore layers
lsl, ok := pl.(*storeLayer)
if !ok {
return nil, fmt.Errorf("only layer store layers supported for v1 push")
}
l := lsl.Layer
dependencyImages, parent := generateDependencyImages(l.Parent(), dependenciesSeen)
topImage, err := newV1TopImage(imgID, img, l, parent)
@ -365,7 +371,7 @@ func (p *v1Pusher) pushRepository(ctx context.Context) error {
imgList, tags, referencedLayers, err := p.getImageList()
defer func() {
for _, l := range referencedLayers {
p.config.LayerStore.Release(l)
l.Release()
}
}()
if err != nil {

View File

@ -20,7 +20,6 @@ import (
"github.com/docker/distribution/registry/client"
"github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/progress"
@ -123,24 +122,22 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) {
func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error {
logrus.Debugf("Pushing repository: %s", ref.String())
img, err := p.config.ImageStore.Get(image.IDFromDigest(id))
imgConfig, err := p.config.ImageStore.Get(id)
if err != nil {
return fmt.Errorf("could not find image from tag %s: %v", ref.String(), err)
}
var l layer.Layer
topLayerID := img.RootFS.ChainID()
if topLayerID == "" {
l = layer.EmptyLayer
} else {
l, err = p.config.LayerStore.Get(topLayerID)
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer layer.ReleaseAndLog(p.config.LayerStore, l)
rootfs, err := p.config.ImageStore.RootFSFromConfig(imgConfig)
if err != nil {
return fmt.Errorf("unable to get rootfs for image %s: %s", ref.String(), err)
}
l, err := p.config.LayerStore.Get(rootfs.ChainID())
if err != nil {
return fmt.Errorf("failed to get top layer from image: %v", err)
}
defer l.Release()
hmacKey, err := metadata.ComputeV2MetadataHMACKey(p.config.AuthConfig)
if err != nil {
return fmt.Errorf("failed to compute hmac key of auth config: %v", err)
@ -158,7 +155,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
}
// Loop bounds condition is to avoid pushing the base layer on Windows.
for i := 0; i < len(img.RootFS.DiffIDs); i++ {
for i := 0; i < len(rootfs.DiffIDs); i++ {
descriptor := descriptorTemplate
descriptor.layer = l
descriptor.checkedDigests = make(map[digest.Digest]struct{})
@ -172,7 +169,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
}
// Try schema2 first
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), img.RawJSON())
builder := schema2.NewManifestBuilder(p.repo.Blobs(ctx), p.config.ConfigMediaType, imgConfig)
manifest, err := manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
@ -185,7 +182,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
putOptions := []distribution.ManifestServiceOption{distribution.WithTag(ref.Tag())}
if _, err = manSvc.Put(ctx, manifest, putOptions...); err != nil {
if runtime.GOOS == "windows" {
if runtime.GOOS == "windows" || p.config.TrustKey == nil || p.config.RequireSchema2 {
logrus.Warnf("failed to upload schema2 manifest: %v", err)
return err
}
@ -196,7 +193,7 @@ func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id
if err != nil {
return err
}
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, img.RawJSON())
builder = schema1.NewConfigManifestBuilder(p.repo.Blobs(ctx), p.config.TrustKey, manifestRef, imgConfig)
manifest, err = manifestFromBuilder(ctx, builder, descriptors)
if err != nil {
return err
@ -246,7 +243,7 @@ func manifestFromBuilder(ctx context.Context, builder distribution.ManifestBuild
}
type v2PushDescriptor struct {
layer layer.Layer
layer PushLayer
v2MetadataService metadata.V2MetadataService
hmacKey []byte
repoInfo reference.Named
@ -425,26 +422,32 @@ func (pd *v2PushDescriptor) uploadUsingSession(
diffID layer.DiffID,
layerUpload distribution.BlobWriter,
) (distribution.Descriptor, error) {
arch, err := pd.layer.TarStream()
if err != nil {
return distribution.Descriptor{}, xfer.DoNotRetry{Err: err}
var reader io.ReadCloser
contentReader, err := pd.layer.Open()
size, _ := pd.layer.Size()
reader = progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, contentReader), progressOutput, size, pd.ID(), "Pushing")
switch m := pd.layer.MediaType(); m {
case schema2.MediaTypeUncompressedLayer:
compressedReader, compressionDone := compress(reader)
defer func(closer io.Closer) {
closer.Close()
<-compressionDone
}(reader)
reader = compressedReader
case schema2.MediaTypeLayer:
default:
reader.Close()
return distribution.Descriptor{}, fmt.Errorf("unsupported layer media type %s", m)
}
// don't care if this fails; best effort
size, _ := pd.layer.DiffSize()
reader := progress.NewProgressReader(ioutils.NewCancelReadCloser(ctx, arch), progressOutput, size, pd.ID(), "Pushing")
compressedReader, compressionDone := compress(reader)
defer func() {
reader.Close()
<-compressionDone
}()
digester := digest.Canonical.New()
tee := io.TeeReader(compressedReader, digester.Hash())
tee := io.TeeReader(reader, digester.Hash())
nn, err := layerUpload.ReadFrom(tee)
compressedReader.Close()
reader.Close()
if err != nil {
return distribution.Descriptor{}, retryOnError(err)
}
@ -568,8 +571,8 @@ attempts:
// repository and whether the check shall be done also with digests mapped to different repositories. The
// decision is based on layer size. The smaller the layer, the fewer attempts shall be made because the cost
// of upload does not outweigh a latency.
func getMaxMountAndExistenceCheckAttempts(layer layer.Layer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.DiffSize()
func getMaxMountAndExistenceCheckAttempts(layer PushLayer) (maxMountAttempts, maxExistenceCheckAttempts int, checkOtherRepositories bool) {
size, err := layer.Size()
switch {
// big blob
case size > middleLayerMaximumSize:

View File

@ -387,9 +387,11 @@ func TestLayerAlreadyExists(t *testing.T) {
ctx := context.Background()
ms := &mockV2MetadataService{}
pd := &v2PushDescriptor{
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: layer.EmptyLayer,
hmacKey: []byte(tc.hmacKey),
repoInfo: repoInfo,
layer: &storeLayer{
Layer: layer.EmptyLayer,
},
repo: repo,
v2MetadataService: ms,
pushState: &pushState{remoteLayers: make(map[layer.DiffID]distribution.Descriptor)},

View File

@ -7,6 +7,7 @@ import (
"time"
"github.com/docker/distribution"
"github.com/docker/distribution/manifest/schema2"
distreference "github.com/docker/distribution/reference"
"github.com/docker/distribution/registry/client"
"github.com/docker/distribution/registry/client/auth"
@ -18,6 +19,34 @@ import (
"golang.org/x/net/context"
)
// ImageTypes represents the schema2 config types for images
var ImageTypes = []string{
schema2.MediaTypeImageConfig,
// Handle unexpected values from https://github.com/docker/distribution/issues/1621
"application/octet-stream",
// Treat defaulted values as images, newer types cannot be implied
"",
}
// PluginTypes represents the schema2 config types for plugins
var PluginTypes = []string{
schema2.MediaTypePluginConfig,
}
var mediaTypeClasses map[string]string
func init() {
// initialize media type classes with all know types for
// plugin
mediaTypeClasses = map[string]string{}
for _, t := range ImageTypes {
mediaTypeClasses[t] = "image"
}
for _, t := range PluginTypes {
mediaTypeClasses[t] = "plugin"
}
}
// NewV2Repository returns a repository (v2 only). It creates an HTTP transport
// providing timeout settings and authentication support, and also verifies the
// remote API version.

View File

@ -70,10 +70,13 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) {
Official: false,
}
imagePullConfig := &ImagePullConfig{
MetaHeaders: http.Header{},
AuthConfig: &types.AuthConfig{
RegistryToken: secretRegistryToken,
Config: Config{
MetaHeaders: http.Header{},
AuthConfig: &types.AuthConfig{
RegistryToken: secretRegistryToken,
},
},
Schema2Types: ImageTypes,
}
puller, err := newPuller(endpoint, repoInfo, imagePullConfig)
if err != nil {