diff --git a/daemon/content.go b/daemon/content.go new file mode 100644 index 0000000000..01d1e2343d --- /dev/null +++ b/daemon/content.go @@ -0,0 +1,30 @@ +package daemon + +import ( + "os" + "path/filepath" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/metadata" + "github.com/pkg/errors" + "go.etcd.io/bbolt" +) + +func (d *Daemon) configureLocalContentStore() (content.Store, leases.Manager, error) { + if err := os.MkdirAll(filepath.Join(d.root, "content"), 0700); err != nil { + return nil, nil, errors.Wrap(err, "error creating dir for content store") + } + db, err := bbolt.Open(filepath.Join(d.root, "content", "metadata.db"), 0600, nil) + if err != nil { + return nil, nil, errors.Wrap(err, "error opening bolt db for content metadata store") + } + cs, err := local.NewStore(filepath.Join(d.root, "content", "data")) + if err != nil { + return nil, nil, errors.Wrap(err, "error setting up content store") + } + md := metadata.NewDB(db, cs, nil) + d.mdDB = db + return md.ContentStore(), metadata.NewLeaseManager(md), nil +} diff --git a/daemon/daemon.go b/daemon/daemon.go index f81f45a1ab..9e3a0fadd8 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -20,6 +20,7 @@ import ( "time" "github.com/docker/docker/pkg/fileutils" + "go.etcd.io/bbolt" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -129,6 +130,11 @@ type Daemon struct { attachmentStore network.AttachmentStore attachableNetworkLock *locker.Locker + + // This is used for Windows which doesn't currently support running on containerd + // It stores metadata for the content store (used for manifest caching) + // This needs to be closed on daemon exit + mdDB *bbolt.DB } // StoreHosts stores the addresses the daemon is listening on @@ -1066,10 +1072,7 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S d.linkIndex = newLinkIndex() - // TODO: imageStore, distributionMetadataStore, and ReferenceStore are only - // used above to run migration. They could be initialized in ImageService - // if migration is called from daemon/images. layerStore might move as well. - d.imageService = images.NewImageService(images.ImageServiceConfig{ + imgSvcConfig := images.ImageServiceConfig{ ContainerStore: d.containers, DistributionMetadataStore: distributionMetadataStore, EventsService: d.EventsService, @@ -1081,7 +1084,28 @@ func NewDaemon(ctx context.Context, config *config.Config, pluginStore *plugin.S ReferenceStore: rs, RegistryService: registryService, TrustKey: trustKey, - }) + ContentNamespace: config.ContainerdNamespace, + } + + // containerd is not currently supported with Windows. + // So sometimes d.containerdCli will be nil + // In that case we'll create a local content store... but otherwise we'll use containerd + if d.containerdCli != nil { + imgSvcConfig.Leases = d.containerdCli.LeasesService() + imgSvcConfig.ContentStore = d.containerdCli.ContentStore() + } else { + cs, lm, err := d.configureLocalContentStore() + if err != nil { + return nil, err + } + imgSvcConfig.ContentStore = cs + imgSvcConfig.Leases = lm + } + + // TODO: imageStore, distributionMetadataStore, and ReferenceStore are only + // used above to run migration. They could be initialized in ImageService + // if migration is called from daemon/images. layerStore might move as well. + d.imageService = images.NewImageService(imgSvcConfig) go d.execCommandGC() @@ -1246,6 +1270,10 @@ func (daemon *Daemon) Shutdown() error { daemon.containerdCli.Close() } + if daemon.mdDB != nil { + daemon.mdDB.Close() + } + return daemon.cleanupMounts() } diff --git a/daemon/images/image_pull.go b/daemon/images/image_pull.go index 4eedfdddc1..ed9099b2ef 100644 --- a/daemon/images/image_pull.go +++ b/daemon/images/image_pull.go @@ -6,6 +6,8 @@ import ( "strings" "time" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/namespaces" dist "github.com/docker/distribution" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" @@ -16,6 +18,7 @@ import ( "github.com/docker/docker/registry" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" ) // PullImage initiates a pull operation. image is the repository name to pull, and @@ -65,6 +68,25 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference close(writesDone) }() + ctx = namespaces.WithNamespace(ctx, i.contentNamespace) + // Take out a temporary lease for everything that gets persisted to the content store. + // Before the lease is cancelled, any content we want to keep should have it's own lease applied. + ctx, done, err := tempLease(ctx, i.leases) + if err != nil { + return err + } + defer done(ctx) + + cs := &contentStoreForPull{ + ContentStore: i.content, + leases: i.leases, + } + imageStore := &imageStoreForPull{ + ImageConfigStore: distribution.NewImageConfigStoreFromStore(i.imageStore), + ingested: cs, + leases: i.leases, + } + imagePullConfig := &distribution.ImagePullConfig{ Config: distribution.Config{ MetaHeaders: metaHeaders, @@ -73,7 +95,7 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference RegistryService: i.registryService, ImageEventLogger: i.LogImageEvent, MetadataStore: i.distributionMetadataStore, - ImageStore: distribution.NewImageConfigStoreFromStore(i.imageStore), + ImageStore: imageStore, ReferenceStore: i.referenceStore, }, DownloadManager: i.downloadManager, @@ -81,7 +103,7 @@ func (i *ImageService) pullImageWithReference(ctx context.Context, ref reference Platform: platform, } - err := distribution.Pull(ctx, ref, imagePullConfig) + err = distribution.Pull(ctx, ref, imagePullConfig, cs) close(progressChan) <-writesDone return err @@ -124,3 +146,29 @@ func (i *ImageService) GetRepository(ctx context.Context, ref reference.Named, a } return repository, confirmedV2, lastError } + +func tempLease(ctx context.Context, mgr leases.Manager) (context.Context, func(context.Context) error, error) { + nop := func(context.Context) error { return nil } + _, ok := leases.FromContext(ctx) + if ok { + return ctx, nop, nil + } + + // Use an expiration that ensures the lease is cleaned up at some point if there is a crash, SIGKILL, etc. + opts := []leases.Opt{ + leases.WithRandomID(), + leases.WithExpiration(24 * time.Hour), + leases.WithLabels(map[string]string{ + "moby.lease/temporary": time.Now().UTC().Format(time.RFC3339Nano), + }), + } + l, err := mgr.Create(ctx, opts...) + if err != nil { + return ctx, nop, errors.Wrap(err, "error creating temporary lease") + } + + ctx = leases.WithLease(ctx, l.ID) + return ctx, func(ctx context.Context) error { + return mgr.Delete(ctx, l) + }, nil +} diff --git a/daemon/images/service.go b/daemon/images/service.go index 388aa2dd23..e0297c35e7 100644 --- a/daemon/images/service.go +++ b/daemon/images/service.go @@ -5,6 +5,8 @@ import ( "os" "runtime" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/leases" "github.com/docker/docker/container" daemonevents "github.com/docker/docker/daemon/events" "github.com/docker/docker/distribution" @@ -42,6 +44,9 @@ type ImageServiceConfig struct { ReferenceStore dockerreference.Store RegistryService registry.Service TrustKey libtrust.PrivateKey + ContentStore content.Store + Leases leases.Manager + ContentNamespace string } // NewImageService returns a new ImageService from a configuration @@ -54,12 +59,15 @@ func NewImageService(config ImageServiceConfig) *ImageService { distributionMetadataStore: config.DistributionMetadataStore, downloadManager: xfer.NewLayerDownloadManager(config.LayerStores, config.MaxConcurrentDownloads, xfer.WithMaxDownloadAttempts(config.MaxDownloadAttempts)), eventsService: config.EventsService, - imageStore: config.ImageStore, + imageStore: &imageStoreWithLease{Store: config.ImageStore, leases: config.Leases, ns: config.ContentNamespace}, layerStores: config.LayerStores, referenceStore: config.ReferenceStore, registryService: config.RegistryService, trustKey: config.TrustKey, uploadManager: xfer.NewLayerUploadManager(config.MaxConcurrentUploads), + leases: config.Leases, + content: config.ContentStore, + contentNamespace: config.ContentNamespace, } } @@ -76,6 +84,9 @@ type ImageService struct { registryService registry.Service trustKey libtrust.PrivateKey uploadManager *xfer.LayerUploadManager + leases leases.Manager + content content.Store + contentNamespace string } // DistributionServices provides daemon image storage services diff --git a/daemon/images/store.go b/daemon/images/store.go new file mode 100644 index 0000000000..2c8d481c50 --- /dev/null +++ b/daemon/images/store.go @@ -0,0 +1,155 @@ +package images + +import ( + "context" + "sync" + + "github.com/containerd/containerd/content" + c8derrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/namespaces" + "github.com/docker/docker/distribution" + "github.com/docker/docker/image" + "github.com/docker/docker/layer" + digest "github.com/opencontainers/go-digest" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +func imageKey(dgst digest.Digest) string { + return "moby-image-" + dgst.String() +} + +// imageStoreWithLease wraps the configured image store with one that deletes the lease +// reigstered for a given image ID, if one exists +// +// This is used by the main image service to wrap delete calls to the real image store. +type imageStoreWithLease struct { + image.Store + leases leases.Manager + + // Normally we'd pass namespace down through a context.Context, however... + // The interface for image store doesn't allow this, so we store it here. + ns string +} + +func (s *imageStoreWithLease) Delete(id image.ID) ([]layer.Metadata, error) { + ctx := namespaces.WithNamespace(context.TODO(), s.ns) + if err := s.leases.Delete(ctx, leases.Lease{ID: imageKey(digest.Digest(id))}); err != nil && !c8derrdefs.IsNotFound(err) { + return nil, errors.Wrap(err, "error deleting lease") + } + return s.Store.Delete(id) +} + +// iamgeStoreForPull is created for each pull It wraps an underlying image store +// to handle registering leases for content fetched in a single image pull. +type imageStoreForPull struct { + distribution.ImageConfigStore + leases leases.Manager + ingested *contentStoreForPull +} + +func (s *imageStoreForPull) Put(ctx context.Context, config []byte) (digest.Digest, error) { + id, err := s.ImageConfigStore.Put(ctx, config) + if err != nil { + return "", err + } + return id, s.updateLease(ctx, id) +} + +func (s *imageStoreForPull) Get(ctx context.Context, dgst digest.Digest) ([]byte, error) { + id, err := s.ImageConfigStore.Get(ctx, dgst) + if err != nil { + return nil, err + } + return id, s.updateLease(ctx, dgst) +} + +func (s *imageStoreForPull) updateLease(ctx context.Context, dgst digest.Digest) error { + leaseID := imageKey(dgst) + lease, err := s.leases.Create(ctx, leases.WithID(leaseID)) + if err != nil { + if !c8derrdefs.IsAlreadyExists(err) { + return errors.Wrap(err, "error creating lease") + } + lease = leases.Lease{ID: leaseID} + } + + digested := s.ingested.getDigested() + resource := leases.Resource{ + Type: "content", + } + for _, dgst := range digested { + log.G(ctx).WithFields(logrus.Fields{ + "digest": dgst, + "lease": lease.ID, + }).Debug("Adding content digest to lease") + + resource.ID = dgst.String() + if err := s.leases.AddResource(ctx, lease, resource); err != nil { + return errors.Wrapf(err, "error adding content digest to lease: %s", dgst) + } + } + return nil +} + +// contentStoreForPull is used to wrap the configured content store to +// add lease management for a single `pull` +// It stores all committed digests so that `imageStoreForPull` can add +// the digsted resources to the lease for an image. +type contentStoreForPull struct { + distribution.ContentStore + leases leases.Manager + + mu sync.Mutex + digested []digest.Digest +} + +func (c *contentStoreForPull) addDigested(dgst digest.Digest) { + c.mu.Lock() + c.digested = append(c.digested, dgst) + c.mu.Unlock() +} + +func (c *contentStoreForPull) getDigested() []digest.Digest { + c.mu.Lock() + digested := make([]digest.Digest, len(c.digested)) + copy(digested, c.digested) + c.mu.Unlock() + return digested +} + +func (c *contentStoreForPull) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + w, err := c.ContentStore.Writer(ctx, opts...) + if err != nil { + if c8derrdefs.IsAlreadyExists(err) { + var cfg content.WriterOpts + for _, o := range opts { + if err := o(&cfg); err != nil { + return nil, err + } + + } + c.addDigested(cfg.Desc.Digest) + } + return nil, err + } + return &contentWriter{ + cs: c, + Writer: w, + }, nil +} + +type contentWriter struct { + cs *contentStoreForPull + content.Writer +} + +func (w *contentWriter) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error { + err := w.Writer.Commit(ctx, size, expected, opts...) + if err == nil || c8derrdefs.IsAlreadyExists(err) { + w.cs.addDigested(expected) + } + return err +} diff --git a/daemon/images/store_test.go b/daemon/images/store_test.go new file mode 100644 index 0000000000..212e362305 --- /dev/null +++ b/daemon/images/store_test.go @@ -0,0 +1,124 @@ +package images + +import ( + "context" + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" + c8derrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/leases" + "github.com/containerd/containerd/metadata" + "github.com/containerd/containerd/namespaces" + "github.com/docker/docker/image" + digest "github.com/opencontainers/go-digest" + v1 "github.com/opencontainers/image-spec/specs-go/v1" + "go.etcd.io/bbolt" + "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" +) + +func setupTestStores(t *testing.T) (context.Context, content.Store, *imageStoreWithLease, func(t *testing.T)) { + dir, err := ioutil.TempDir("", t.Name()) + assert.NilError(t, err) + + backend, err := image.NewFSStoreBackend(filepath.Join(dir, "images")) + assert.NilError(t, err) + is, err := image.NewImageStore(backend, nil) + assert.NilError(t, err) + + db, err := bbolt.Open(filepath.Join(dir, "metadata.db"), 0600, nil) + assert.NilError(t, err) + + cs, err := local.NewStore(filepath.Join(dir, "content")) + assert.NilError(t, err) + mdb := metadata.NewDB(db, cs, nil) + + cleanup := func(t *testing.T) { + assert.Check(t, db.Close()) + assert.Check(t, os.RemoveAll(dir)) + } + ctx := namespaces.WithNamespace(context.Background(), t.Name()) + images := &imageStoreWithLease{Store: is, ns: t.Name(), leases: metadata.NewLeaseManager(mdb)} + + return ctx, cs, images, cleanup +} + +func TestImageDelete(t *testing.T) { + ctx, _, images, cleanup := setupTestStores(t) + defer cleanup(t) + + t.Run("no lease", func(t *testing.T) { + id, err := images.Create([]byte(`{"rootFS": {}}`)) + assert.NilError(t, err) + defer images.Delete(id) + + ls, err := images.leases.List(ctx) + assert.NilError(t, err) + assert.Equal(t, len(ls), 0, ls) + + _, err = images.Delete(id) + assert.NilError(t, err, "should not error when there is no lease") + }) + + t.Run("lease exists", func(t *testing.T) { + id, err := images.Create([]byte(`{"rootFS": {}}`)) + assert.NilError(t, err) + defer images.Delete(id) + + leaseID := imageKey(digest.Digest(id)) + _, err = images.leases.Create(ctx, leases.WithID(leaseID)) + assert.NilError(t, err) + defer images.leases.Delete(ctx, leases.Lease{ID: leaseID}) + + ls, err := images.leases.List(ctx) + assert.NilError(t, err) + assert.Check(t, cmp.Equal(len(ls), 1), ls) + + _, err = images.Delete(id) + assert.NilError(t, err) + + ls, err = images.leases.List(ctx) + assert.NilError(t, err) + assert.Check(t, cmp.Equal(len(ls), 0), ls) + }) +} + +func TestContentStoreForPull(t *testing.T) { + ctx, cs, is, cleanup := setupTestStores(t) + defer cleanup(t) + + csP := &contentStoreForPull{ + ContentStore: cs, + leases: is.leases, + } + + data := []byte(`{}`) + desc := v1.Descriptor{ + Digest: digest.Canonical.FromBytes(data), + Size: int64(len(data)), + } + + w, err := csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc)) + assert.NilError(t, err) + + _, err = w.Write(data) + assert.NilError(t, err) + defer w.Close() + + err = w.Commit(ctx, desc.Size, desc.Digest) + assert.NilError(t, err) + + assert.Equal(t, len(csP.digested), 1) + assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest)) + + // Test already exists + csP.digested = nil + _, err = csP.Writer(ctx, content.WithRef(t.Name()), content.WithDescriptor(desc)) + assert.Check(t, c8derrdefs.IsAlreadyExists(err)) + assert.Equal(t, len(csP.digested), 1) + assert.Check(t, cmp.Equal(csP.digested[0], desc.Digest)) +} diff --git a/distribution/config.go b/distribution/config.go index f96df7b94d..2e3dd7bb60 100644 --- a/distribution/config.go +++ b/distribution/config.go @@ -84,8 +84,8 @@ type ImagePushConfig struct { // by digest. Allows getting an image configurations rootfs from the // configuration. type ImageConfigStore interface { - Put([]byte) (digest.Digest, error) - Get(digest.Digest) ([]byte, error) + Put(context.Context, []byte) (digest.Digest, error) + Get(context.Context, digest.Digest) ([]byte, error) RootFSFromConfig([]byte) (*image.RootFS, error) PlatformFromConfig([]byte) (*specs.Platform, error) } @@ -128,12 +128,12 @@ func NewImageConfigStoreFromStore(is image.Store) ImageConfigStore { } } -func (s *imageConfigStore) Put(c []byte) (digest.Digest, error) { +func (s *imageConfigStore) Put(_ context.Context, c []byte) (digest.Digest, error) { id, err := s.Store.Create(c) return digest.Digest(id), err } -func (s *imageConfigStore) Get(d digest.Digest) ([]byte, error) { +func (s *imageConfigStore) Get(_ context.Context, d digest.Digest) ([]byte, error) { img, err := s.Store.Get(image.IDFromDigest(d)) if err != nil { return nil, err diff --git a/distribution/manifest.go b/distribution/manifest.go new file mode 100644 index 0000000000..a97373bd61 --- /dev/null +++ b/distribution/manifest.go @@ -0,0 +1,195 @@ +package distribution + +import ( + "context" + "encoding/json" + "io" + "io/ioutil" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/log" + "github.com/containerd/containerd/remotes" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/schema1" + digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" +) + +// This is used by manifestStore to pare down the requirements to implement a +// full distribution.ManifestService, since `Get` is all we use here. +type manifestGetter interface { + Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) +} + +type manifestStore struct { + local ContentStore + remote manifestGetter +} + +// ContentStore is the interface used to persist registry blobs +// +// Currently this is only used to persist manifests and manifest lists. +// It is exported because `distribution.Pull` takes one as an argument. +type ContentStore interface { + content.Ingester + content.Provider + Info(ctx context.Context, dgst digest.Digest) (content.Info, error) + Abort(ctx context.Context, ref string) error +} + +func (m *manifestStore) getLocal(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) { + ra, err := m.local.ReaderAt(ctx, desc) + if err != nil { + return nil, errors.Wrap(err, "error getting content store reader") + } + defer ra.Close() + + r := io.NewSectionReader(ra, 0, ra.Size()) + data, err := ioutil.ReadAll(r) + if err != nil { + return nil, errors.Wrap(err, "error reading manifest from content store") + } + + manifest, _, err := distribution.UnmarshalManifest(desc.MediaType, data) + if err != nil { + return nil, errors.Wrap(err, "error unmarshaling manifest from content store") + } + return manifest, nil +} + +func (m *manifestStore) getMediaType(ctx context.Context, desc specs.Descriptor) (string, error) { + ra, err := m.local.ReaderAt(ctx, desc) + if err != nil { + return "", errors.Wrap(err, "error getting reader to detect media type") + } + defer ra.Close() + + mt, err := detectManifestMediaType(ra) + if err != nil { + return "", errors.Wrap(err, "error detecting media type") + } + return mt, nil +} + +func (m *manifestStore) Get(ctx context.Context, desc specs.Descriptor) (distribution.Manifest, error) { + l := log.G(ctx) + + if desc.MediaType == "" { + // When pulling by digest we will not have the media type on the + // descriptor since we have not made a request to the registry yet + // + // We already have the digest, so we only lookup locally... by digest. + // + // Let's try to detect the media type so we can have a good ref key + // here. We may not even have the content locally, and this is fine, but + // if we do we should determine that. + mt, err := m.getMediaType(ctx, desc) + if err != nil && !errdefs.IsNotFound(err) { + l.WithError(err).Warn("Error looking up media type of content") + } + desc.MediaType = mt + } + + key := remotes.MakeRefKey(ctx, desc) + + // Here we open a writer to the requested content. This both gives us a + // reference to write to if indeed we need to persist it and increments the + // ref count on the content. + w, err := m.local.Writer(ctx, content.WithDescriptor(desc), content.WithRef(key)) + if err != nil { + if errdefs.IsAlreadyExists(err) { + var manifest distribution.Manifest + if manifest, err = m.getLocal(ctx, desc); err == nil { + return manifest, nil + } + } + // always fallback to the remote if there is an error with the local store + } + if w != nil { + defer w.Close() + } + + l.WithError(err).Debug("Fetching manifest from remote") + + manifest, err := m.remote.Get(ctx, desc.Digest) + if err != nil { + if err := m.local.Abort(ctx, key); err != nil { + l.WithError(err).Warn("Error while attempting to abort content ingest") + } + return nil, err + } + + if w != nil { + // if `w` is nil here, something happened with the content store, so don't bother trying to persist. + if err := m.Put(ctx, manifest, desc, w); err != nil { + if err := m.local.Abort(ctx, key); err != nil { + l.WithError(err).Warn("error aborting content ingest") + } + l.WithError(err).Warn("Error persisting manifest") + } + } + return manifest, nil +} + +func (m *manifestStore) Put(ctx context.Context, manifest distribution.Manifest, desc specs.Descriptor, w content.Writer) error { + mt, payload, err := manifest.Payload() + if err != nil { + return err + } + desc.Size = int64(len(payload)) + desc.MediaType = mt + + if _, err = w.Write(payload); err != nil { + return errors.Wrap(err, "error writing manifest to content store") + } + + if err := w.Commit(ctx, desc.Size, desc.Digest); err != nil { + return errors.Wrap(err, "error committing manifest to content store") + } + return nil +} + +func detectManifestMediaType(ra content.ReaderAt) (string, error) { + dt := make([]byte, ra.Size()) + if _, err := ra.ReadAt(dt, 0); err != nil { + return "", err + } + + return detectManifestBlobMediaType(dt) +} + +// This is used when the manifest store does not know the media type of a sha it +// was told to get. This would currently only happen when pulling by digest. +// The media type is needed so the blob can be unmarshalled properly. +func detectManifestBlobMediaType(dt []byte) (string, error) { + var mfst struct { + MediaType string `json:"mediaType"` + Config json.RawMessage `json:"config"` // schema2 Manifest + FSLayers json.RawMessage `json:"fsLayers"` // schema1 Manifest + } + + if err := json.Unmarshal(dt, &mfst); err != nil { + return "", err + } + + // We may have a media type specified in the json, in which case that should be used. + // Docker types should generally have a media type set. + // OCI (golang) types do not have a `mediaType` defined, and it is optional in the spec. + // + // `distrubtion.UnmarshalManifest`, which is used to unmarshal this for real, checks these media type values. + // If the specified media type does not match it will error, and in some cases (docker media types) it is required. + // So pretty much if we don't have a media type we can fall back to OCI. + // This does have a special fallback for schema1 manifests just because it is easy to detect. + switch { + case mfst.MediaType != "": + return mfst.MediaType, nil + case mfst.FSLayers != nil: + return schema1.MediaTypeManifest, nil + case mfst.Config != nil: + return specs.MediaTypeImageManifest, nil + default: + return specs.MediaTypeImageIndex, nil + } +} diff --git a/distribution/manifest_test.go b/distribution/manifest_test.go new file mode 100644 index 0000000000..0976a712ec --- /dev/null +++ b/distribution/manifest_test.go @@ -0,0 +1,351 @@ +package distribution + +import ( + "context" + "encoding/json" + "io/ioutil" + "os" + "strings" + "sync" + "testing" + + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" + "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/remotes" + "github.com/docker/distribution" + "github.com/docker/distribution/manifest/ocischema" + "github.com/docker/distribution/manifest/schema1" + "github.com/google/go-cmp/cmp/cmpopts" + digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "gotest.tools/v3/assert" + "gotest.tools/v3/assert/cmp" +) + +type mockManifestGetter struct { + manifests map[digest.Digest]distribution.Manifest + gets int +} + +func (m *mockManifestGetter) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error) { + m.gets++ + manifest, ok := m.manifests[dgst] + if !ok { + return nil, distribution.ErrManifestUnknown{Tag: dgst.String()} + } + return manifest, nil +} + +type memoryLabelStore struct { + mu sync.Mutex + labels map[digest.Digest]map[string]string +} + +// Get returns all the labels for the given digest +func (s *memoryLabelStore) Get(dgst digest.Digest) (map[string]string, error) { + s.mu.Lock() + labels := s.labels[dgst] + s.mu.Unlock() + return labels, nil +} + +// Set sets all the labels for a given digest +func (s *memoryLabelStore) Set(dgst digest.Digest, labels map[string]string) error { + s.mu.Lock() + if s.labels == nil { + s.labels = make(map[digest.Digest]map[string]string) + } + s.labels[dgst] = labels + s.mu.Unlock() + return nil +} + +// Update replaces the given labels for a digest, +// a key with an empty value removes a label. +func (s *memoryLabelStore) Update(dgst digest.Digest, update map[string]string) (map[string]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + + labels, ok := s.labels[dgst] + if !ok { + labels = map[string]string{} + } + for k, v := range update { + labels[k] = v + } + + s.labels[dgst] = labels + + return labels, nil +} + +type testingContentStoreWrapper struct { + ContentStore + errorOnWriter error + errorOnCommit error +} + +func (s *testingContentStoreWrapper) Writer(ctx context.Context, opts ...content.WriterOpt) (content.Writer, error) { + if s.errorOnWriter != nil { + return nil, s.errorOnWriter + } + + w, err := s.ContentStore.Writer(ctx, opts...) + if err != nil { + return nil, err + } + + if s.errorOnCommit != nil { + w = &testingContentWriterWrapper{w, s.errorOnCommit} + } + return w, nil +} + +type testingContentWriterWrapper struct { + content.Writer + err error +} + +func (w *testingContentWriterWrapper) Commit(ctx context.Context, size int64, dgst digest.Digest, opts ...content.Opt) error { + if w.err != nil { + // The contract for `Commit` is to always close. + // Since this is returning early before hitting the real `Commit`, we should close it here. + w.Close() + return w.err + } + return w.Writer.Commit(ctx, size, dgst, opts...) +} + +func TestManifestStore(t *testing.T) { + ociManifest := &specs.Manifest{} + serialized, err := json.Marshal(ociManifest) + assert.NilError(t, err) + dgst := digest.Canonical.FromBytes(serialized) + + setupTest := func(t *testing.T) (specs.Descriptor, *mockManifestGetter, *manifestStore, content.Store, func(*testing.T)) { + root, err := ioutil.TempDir("", strings.Replace(t.Name(), "/", "_", -1)) + assert.NilError(t, err) + defer func() { + if t.Failed() { + os.RemoveAll(root) + } + }() + + cs, err := local.NewLabeledStore(root, &memoryLabelStore{}) + assert.NilError(t, err) + + mg := &mockManifestGetter{manifests: make(map[digest.Digest]distribution.Manifest)} + store := &manifestStore{local: cs, remote: mg} + desc := specs.Descriptor{Digest: dgst, MediaType: specs.MediaTypeImageManifest, Size: int64(len(serialized))} + + return desc, mg, store, cs, func(t *testing.T) { + assert.Check(t, os.RemoveAll(root)) + } + } + + ctx := context.Background() + + m, _, err := distribution.UnmarshalManifest(specs.MediaTypeImageManifest, serialized) + assert.NilError(t, err) + + writeManifest := func(t *testing.T, cs ContentStore, desc specs.Descriptor, opts ...content.Opt) { + ingestKey := remotes.MakeRefKey(ctx, desc) + w, err := cs.Writer(ctx, content.WithDescriptor(desc), content.WithRef(ingestKey)) + assert.NilError(t, err) + defer func() { + if err := w.Close(); err != nil { + t.Log(err) + } + if t.Failed() { + if err := cs.Abort(ctx, ingestKey); err != nil { + t.Log(err) + } + } + }() + + _, err = w.Write(serialized) + assert.NilError(t, err) + + err = w.Commit(ctx, desc.Size, desc.Digest, opts...) + assert.NilError(t, err) + + } + + // All tests should end up with no active ingest + checkIngest := func(t *testing.T, cs content.Store, desc specs.Descriptor) { + ingestKey := remotes.MakeRefKey(ctx, desc) + _, err := cs.Status(ctx, ingestKey) + assert.Check(t, errdefs.IsNotFound(err), err) + } + + t.Run("no remote or local", func(t *testing.T) { + desc, _, store, cs, teardown := setupTest(t) + defer teardown(t) + + _, err = store.Get(ctx, desc) + checkIngest(t, cs, desc) + // This error is what our digest getter returns when it doesn't know about the manifest + assert.Error(t, err, distribution.ErrManifestUnknown{Tag: dgst.String()}.Error()) + }) + + t.Run("no local cache", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + + mg.manifests[desc.Digest] = m + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 1)) + + i, err := cs.Info(ctx, desc.Digest) + assert.NilError(t, err) + assert.Check(t, cmp.Equal(i.Digest, desc.Digest)) + + // Now check again, this should not hit the remote + m2, err = store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 1)) + }) + + t.Run("with local cache", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + + // first add the manifest to the coontent store + writeManifest(t, cs, desc) + + // now do the get + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 0)) + + i, err := cs.Info(ctx, desc.Digest) + assert.NilError(t, err) + assert.Check(t, cmp.Equal(i.Digest, desc.Digest)) + }) + + // This is for the case of pull by digest where we don't know the media type of the manifest until it's actually pulled. + t.Run("unknown media type", func(t *testing.T) { + t.Run("no cache", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + + mg.manifests[desc.Digest] = m + desc.MediaType = "" + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 1)) + }) + + t.Run("with cache", func(t *testing.T) { + t.Run("cached manifest has media type", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + + writeManifest(t, cs, desc) + desc.MediaType = "" + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 0)) + }) + + t.Run("cached manifest has no media type", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + + desc.MediaType = "" + writeManifest(t, cs, desc) + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 0)) + }) + }) + }) + + // Test that if there is an error with the content store, for whatever + // reason, that doesn't stop us from getting the manifest. + // + // Also makes sure the ingests are aborted. + t.Run("error persisting manifest", func(t *testing.T) { + t.Run("error on writer", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + mg.manifests[desc.Digest] = m + + csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnWriter: errors.New("random error")} + store.local = csW + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 1)) + + _, err = cs.Info(ctx, desc.Digest) + // Nothing here since we couldn't persist + assert.Check(t, errdefs.IsNotFound(err), err) + }) + + t.Run("error on commit", func(t *testing.T) { + desc, mg, store, cs, teardown := setupTest(t) + defer teardown(t) + mg.manifests[desc.Digest] = m + + csW := &testingContentStoreWrapper{ContentStore: store.local, errorOnCommit: errors.New("random error")} + store.local = csW + + m2, err := store.Get(ctx, desc) + checkIngest(t, cs, desc) + assert.NilError(t, err) + assert.Check(t, cmp.DeepEqual(m, m2, cmpopts.IgnoreUnexported(ocischema.DeserializedManifest{}))) + assert.Check(t, cmp.Equal(mg.gets, 1)) + + _, err = cs.Info(ctx, desc.Digest) + // Nothing here since we couldn't persist + assert.Check(t, errdefs.IsNotFound(err), err) + }) + }) +} + +func TestDetectManifestBlobMediaType(t *testing.T) { + type testCase struct { + json []byte + expected string + } + cases := map[string]testCase{ + "mediaType is set": {[]byte(`{"mediaType": "bananas"}`), "bananas"}, + "oci manifest": {[]byte(`{"config": {}}`), specs.MediaTypeImageManifest}, + "schema1": {[]byte(`{"fsLayers": []}`), schema1.MediaTypeManifest}, + "oci index fallback": {[]byte(`{}`), specs.MediaTypeImageIndex}, + // Make sure we prefer mediaType + "mediaType and config set": {[]byte(`{"mediaType": "bananas", "config": {}}`), "bananas"}, + "mediaType and fsLayers set": {[]byte(`{"mediaType": "bananas", "fsLayers": []}`), "bananas"}, + } + + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + mt, err := detectManifestBlobMediaType(tc.json) + assert.NilError(t, err) + assert.Equal(t, mt, tc.expected) + }) + } + +} diff --git a/distribution/pull.go b/distribution/pull.go index ecf2f98168..c8ddd4c5cf 100644 --- a/distribution/pull.go +++ b/distribution/pull.go @@ -29,7 +29,7 @@ type Puller interface { // whether a v1 or v2 puller will be created. The other parameters are passed // through to the underlying puller implementation for use during the actual // pull operation. -func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig) (Puller, error) { +func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, imagePullConfig *ImagePullConfig, local ContentStore) (Puller, error) { switch endpoint.Version { case registry.APIVersion2: return &v2Puller{ @@ -37,6 +37,9 @@ func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, endpoint: endpoint, config: imagePullConfig, repoInfo: repoInfo, + manifestStore: &manifestStore{ + local: local, + }, }, nil case registry.APIVersion1: return nil, fmt.Errorf("protocol version %d no longer supported. Please contact admins of registry %s", endpoint.Version, endpoint.URL) @@ -46,7 +49,7 @@ func newPuller(endpoint registry.APIEndpoint, repoInfo *registry.RepositoryInfo, // Pull initiates a pull operation. image is the repository name to pull, and // tag may be either empty, or indicate a specific tag to pull. -func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig) error { +func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullConfig, local ContentStore) error { // Resolve the Repository name from fqn to RepositoryInfo repoInfo, err := imagePullConfig.RegistryService.ResolveRepository(ref) if err != nil { @@ -104,7 +107,7 @@ func Pull(ctx context.Context, ref reference.Named, imagePullConfig *ImagePullCo logrus.Debugf("Trying to pull %s from %s %s", reference.FamiliarName(repoInfo.Name), endpoint.URL, endpoint.Version) - puller, err := newPuller(endpoint, repoInfo, imagePullConfig) + puller, err := newPuller(endpoint, repoInfo, imagePullConfig, local) if err != nil { lastErr = err continue diff --git a/distribution/pull_v2.go b/distribution/pull_v2.go index 1643a36e04..b5db4ee006 100644 --- a/distribution/pull_v2.go +++ b/distribution/pull_v2.go @@ -11,6 +11,7 @@ import ( "runtime" "strings" + "github.com/containerd/containerd/log" "github.com/containerd/containerd/platforms" "github.com/docker/distribution" "github.com/docker/distribution/manifest/manifestlist" @@ -62,7 +63,8 @@ type v2Puller struct { repo distribution.Repository // confirmedV2 is set to true if we confirm we're talking to a v2 // registry. This is used to limit fallbacks to the v1 protocol. - confirmedV2 bool + confirmedV2 bool + manifestStore *manifestStore } func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *specs.Platform) (err error) { @@ -73,6 +75,11 @@ func (p *v2Puller) Pull(ctx context.Context, ref reference.Named, platform *spec return err } + p.manifestStore.remote, err = p.repo.Manifests(ctx) + if err != nil { + return err + } + if err = p.pullV2Repository(ctx, ref, platform); err != nil { if _, ok := err.(fallbackError); ok { return err @@ -330,37 +337,45 @@ func (ld *v2LayerDescriptor) Registered(diffID layer.DiffID) { } func (p *v2Puller) pullV2Tag(ctx context.Context, ref reference.Named, platform *specs.Platform) (tagUpdated bool, err error) { - manSvc, err := p.repo.Manifests(ctx) - if err != nil { - return false, err - } var ( - manifest distribution.Manifest tagOrDigest string // Used for logging/progress only + dgst digest.Digest + mt string + size int64 ) if digested, isDigested := ref.(reference.Canonical); isDigested { - manifest, err = manSvc.Get(ctx, digested.Digest()) - if err != nil { - return false, err - } - tagOrDigest = digested.Digest().String() + dgst = digested.Digest() + tagOrDigest = digested.String() } else if tagged, isTagged := ref.(reference.NamedTagged); isTagged { tagService := p.repo.Tags(ctx) desc, err := tagService.Get(ctx, tagged.Tag()) if err != nil { return false, allowV1Fallback(err) } - - manifest, err = manSvc.Get(ctx, desc.Digest) - if err != nil { - return false, err - } + dgst = desc.Digest tagOrDigest = tagged.Tag() + mt = desc.MediaType + size = desc.Size } else { return false, fmt.Errorf("internal error: reference has neither a tag nor a digest: %s", reference.FamiliarString(ref)) } + ctx = log.WithLogger(ctx, logrus.WithFields( + logrus.Fields{ + "digest": dgst, + "remote": ref, + })) + + manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{ + MediaType: mt, + Digest: dgst, + Size: size, + }) + if err != nil { + return false, err + } + if manifest == nil { return false, fmt.Errorf("image manifest does not exist for tag or digest %q", tagOrDigest) } @@ -559,7 +574,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv return "", "", err } - imageID, err := p.config.ImageStore.Put(config) + imageID, err := p.config.ImageStore.Put(ctx, config) if err != nil { return "", "", err } @@ -570,7 +585,7 @@ func (p *v2Puller) pullSchema1(ctx context.Context, ref reference.Reference, unv } func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.Descriptor, layers []distribution.Descriptor, platform *specs.Platform) (id digest.Digest, err error) { - if _, err := p.config.ImageStore.Get(target.Digest); err == nil { + if _, err := p.config.ImageStore.Get(ctx, target.Digest); err == nil { // If the image already exists locally, no need to pull // anything. return target.Digest, nil @@ -727,7 +742,7 @@ func (p *v2Puller) pullSchema2Layers(ctx context.Context, target distribution.De } } - imageID, err := p.config.ImageStore.Put(configJSON) + imageID, err := p.config.ImageStore.Put(ctx, configJSON) if err != nil { return "", err } @@ -797,23 +812,22 @@ func (p *v2Puller) pullManifestList(ctx context.Context, ref reference.Named, mf if len(manifestMatches) > 1 { logrus.Debugf("found multiple matches in manifest list, choosing best match %s", manifestMatches[0].Digest.String()) } - manifestDigest := manifestMatches[0].Digest + match := manifestMatches[0] - if err := checkImageCompatibility(manifestMatches[0].Platform.OS, manifestMatches[0].Platform.OSVersion); err != nil { + if err := checkImageCompatibility(match.Platform.OS, match.Platform.OSVersion); err != nil { return "", "", err } - manSvc, err := p.repo.Manifests(ctx) + manifest, err := p.manifestStore.Get(ctx, specs.Descriptor{ + Digest: match.Digest, + Size: match.Size, + MediaType: match.MediaType, + }) if err != nil { return "", "", err } - manifest, err := manSvc.Get(ctx, manifestDigest) - if err != nil { - return "", "", err - } - - manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), manifestDigest) + manifestRef, err := reference.WithDigest(reference.TrimNamed(ref), match.Digest) if err != nil { return "", "", err } diff --git a/distribution/push_v2.go b/distribution/push_v2.go index b64230fa9c..764149c632 100644 --- a/distribution/push_v2.go +++ b/distribution/push_v2.go @@ -116,7 +116,7 @@ func (p *v2Pusher) pushV2Repository(ctx context.Context) (err error) { func (p *v2Pusher) pushV2Tag(ctx context.Context, ref reference.NamedTagged, id digest.Digest) error { logrus.Debugf("Pushing repository: %s", reference.FamiliarString(ref)) - imgConfig, err := p.config.ImageStore.Get(id) + imgConfig, err := p.config.ImageStore.Get(ctx, id) if err != nil { return fmt.Errorf("could not find image from tag %s: %v", reference.FamiliarString(ref), err) } diff --git a/distribution/registry_unit_test.go b/distribution/registry_unit_test.go index 3651c4688b..60fbeb9502 100644 --- a/distribution/registry_unit_test.go +++ b/distribution/registry_unit_test.go @@ -69,7 +69,7 @@ func testTokenPassThru(t *testing.T, ts *httptest.Server) { }, Schema2Types: ImageTypes, } - puller, err := newPuller(endpoint, repoInfo, imagePullConfig) + puller, err := newPuller(endpoint, repoInfo, imagePullConfig, nil) if err != nil { t.Fatal(err) }