From d7ba1f85ef8f23096f2fd92d0444820543ab9675 Mon Sep 17 00:00:00 2001 From: Brian Goff Date: Mon, 10 Feb 2020 16:31:04 -0800 Subject: [PATCH] Use containerd dist libs for plugin pull/pull This removes the use of the old distribution code in the plugin packages and replaces it with containerd libraries for plugin pushes and pulls. Additionally it uses a content store from containerd which seems like it's compatible with the old "basicBlobStore" in the plugin package. This is being used locally isntead of through the containerd client for now. Signed-off-by: Brian Goff --- integration/plugin/common/main_test.go | 4 + integration/plugin/common/plugin_test.go | 121 +++++ plugin/backend_linux.go | 580 +++++++++++------------ plugin/blobstore.go | 190 -------- plugin/fetch_linux.go | 288 +++++++++++ plugin/manager.go | 46 +- plugin/manager_linux.go | 17 +- plugin/progress.go | 74 +++ plugin/registry.go | 111 +++++ plugin/v2/plugin.go | 1 + testutil/fixtures/plugin/plugin.go | 17 +- testutil/registry/ops.go | 16 + testutil/registry/registry.go | 5 + 13 files changed, 928 insertions(+), 542 deletions(-) delete mode 100644 plugin/blobstore.go create mode 100644 plugin/fetch_linux.go create mode 100644 plugin/progress.go create mode 100644 plugin/registry.go diff --git a/integration/plugin/common/main_test.go b/integration/plugin/common/main_test.go index 5bcbac2a83..cd42c8f761 100644 --- a/integration/plugin/common/main_test.go +++ b/integration/plugin/common/main_test.go @@ -5,12 +5,16 @@ import ( "os" "testing" + "github.com/docker/docker/pkg/reexec" "github.com/docker/docker/testutil/environment" ) var testEnv *environment.Execution func TestMain(m *testing.M) { + if reexec.Init() { + return + } var err error testEnv, err = environment.New() if err != nil { diff --git a/integration/plugin/common/plugin_test.go b/integration/plugin/common/plugin_test.go index 35dc7fb980..b2c824e8c4 100644 --- a/integration/plugin/common/plugin_test.go +++ b/integration/plugin/common/plugin_test.go @@ -1,12 +1,25 @@ package common // import "github.com/docker/docker/integration/plugin/common" import ( + "context" + "encoding/base64" + "encoding/json" + "io" + "io/ioutil" + "net" "net/http" + "path" + "strings" "testing" + "github.com/docker/docker/api/types" + "github.com/docker/docker/testutil/daemon" + "github.com/docker/docker/testutil/fixtures/plugin" + "github.com/docker/docker/testutil/registry" "github.com/docker/docker/testutil/request" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" + "gotest.tools/v3/skip" ) func TestPluginInvalidJSON(t *testing.T) { @@ -36,3 +49,111 @@ func TestPluginInvalidJSON(t *testing.T) { }) } } + +func TestPluginInstall(t *testing.T) { + skip.If(t, testEnv.IsRemoteDaemon, "cannot run daemon when remote daemon") + skip.If(t, testEnv.OSType == "windows") + skip.If(t, testEnv.IsRootless, "rootless mode has different view of localhost") + + ctx := context.Background() + client := testEnv.APIClient() + + t.Run("no auth", func(t *testing.T) { + defer setupTest(t)() + + reg := registry.NewV2(t) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(registry.DefaultURL, name+":latest") + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil)) + + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo}) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + + t.Run("with htpasswd", func(t *testing.T) { + defer setupTest(t)() + + reg := registry.NewV2(t, registry.Htpasswd) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(registry.DefaultURL, name+":latest") + auth := &types.AuthConfig{ServerAddress: registry.DefaultURL, Username: "testuser", Password: "testpassword"} + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, auth)) + + authEncoded, err := json.Marshal(auth) + assert.NilError(t, err) + + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{ + RegistryAuth: base64.URLEncoding.EncodeToString(authEncoded), + Disabled: true, + RemoteRef: repo, + }) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + t.Run("with insecure", func(t *testing.T) { + skip.If(t, !testEnv.IsLocalDaemon()) + + addrs, err := net.InterfaceAddrs() + assert.NilError(t, err) + + var bindTo string + for _, addr := range addrs { + ip, ok := addr.(*net.IPNet) + if !ok { + continue + } + if ip.IP.IsLoopback() || ip.IP.To4() == nil { + continue + } + bindTo = ip.IP.String() + } + + if bindTo == "" { + t.Skip("No suitable interface to bind registry to") + } + + regURL := bindTo + ":5000" + + d := daemon.New(t) + defer d.Stop(t) + + d.Start(t, "--insecure-registry="+regURL) + defer d.Stop(t) + + reg := registry.NewV2(t, registry.URL(regURL)) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(regURL, name+":latest") + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil, plugin.WithInsecureRegistry(regURL))) + + client := d.NewClientT(t) + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo}) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + // TODO: test insecure registry with https +} diff --git a/plugin/backend_linux.go b/plugin/backend_linux.go index f77d12650d..7bb3c4257d 100644 --- a/plugin/backend_linux.go +++ b/plugin/backend_linux.go @@ -2,6 +2,7 @@ package plugin // import "github.com/docker/docker/plugin" import ( "archive/tar" + "bytes" "compress/gzip" "context" "encoding/json" @@ -11,27 +12,27 @@ import ( "os" "path" "path/filepath" - "runtime" "strings" + "time" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/distribution" - progressutils "github.com/docker/docker/distribution/utils" - "github.com/docker/docker/distribution/xfer" "github.com/docker/docker/dockerversion" "github.com/docker/docker/errdefs" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/chrootarchive" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/system" v2 "github.com/docker/docker/plugin/v2" - refstore "github.com/docker/docker/reference" "github.com/moby/sys/mount" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -98,64 +99,6 @@ func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) { return &p.PluginObj, nil } -func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error { - if outStream != nil { - // Include a buffer so that slow client connections don't affect - // transfer performance. - progressChan := make(chan progress.Progress, 100) - - writesDone := make(chan struct{}) - - defer func() { - close(progressChan) - <-writesDone - }() - - var cancelFunc context.CancelFunc - ctx, cancelFunc = context.WithCancel(ctx) - - go func() { - progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan) - close(writesDone) - }() - - config.ProgressOutput = progress.ChanOutput(progressChan) - } else { - config.ProgressOutput = progress.DiscardOutput() - } - return distribution.Pull(ctx, ref, config) -} - -type tempConfigStore struct { - config []byte - configDigest digest.Digest -} - -func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) { - dgst := digest.FromBytes(c) - - s.config = c - s.configDigest = dgst - - return dgst, nil -} - -func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) { - if d != s.configDigest { - return nil, errNotFound("digest not found") - } - return s.config, nil -} - -func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} - -func (s *tempConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} - func computePrivileges(c types.PluginConfig) types.PluginPrivileges { var privileges types.PluginPrivileges if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" { @@ -217,37 +160,53 @@ func computePrivileges(c types.PluginConfig) types.PluginPrivileges { // Privileges pulls a plugin config and computes the privileges required to install it. func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) { - // create image store instance - cs := &tempConfigStore{} + var ( + config types.PluginConfig + configSeen bool + ) - // DownloadManager not defined because only pulling configuration. - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: func(string, string, string) {}, - ImageStore: cs, - }, - Schema2Types: distribution.PluginTypes, + h := func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case schema2.MediaTypeManifest, specs.MediaTypeImageManifest: + data, err := content.ReadBlob(ctx, pm.blobStore, desc) + if err != nil { + return nil, errors.Wrapf(err, "error reading image manifest from blob store for %s", ref) + } + + var m specs.Manifest + if err := json.Unmarshal(data, &m); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling image manifest for %s", ref) + } + return []specs.Descriptor{m.Config}, nil + case schema2.MediaTypePluginConfig: + configSeen = true + data, err := content.ReadBlob(ctx, pm.blobStore, desc) + if err != nil { + return nil, errors.Wrapf(err, "error reading plugin config from blob store for %s", ref) + } + + if err := json.Unmarshal(data, &config); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling plugin config for %s", ref) + } + } + + return nil, nil } - if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil { - return nil, err + if err := pm.fetch(ctx, ref, authConfig, progress.DiscardOutput(), metaHeader, images.HandlerFunc(h)); err != nil { + return types.PluginPrivileges{}, nil } - if cs.config == nil { - return nil, errors.New("no configuration pulled") - } - var config types.PluginConfig - if err := json.Unmarshal(cs.config, &config); err != nil { - return nil, errdefs.System(err) + if !configSeen { + return types.PluginPrivileges{}, errors.Errorf("did not find plugin config for specified reference %s", ref) } return computePrivileges(config), nil } // Upgrade upgrades a plugin +// +// TODO: replace reference package usage with simpler url.Parse semantics func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) { p, err := pm.config.Store.GetV2Plugin(name) if err != nil { @@ -258,44 +217,35 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string return errors.Wrap(enabledError(p.Name()), "plugin must be disabled before upgrading") } - pm.muGC.RLock() - defer pm.muGC.RUnlock() - // revalidate because Pull is public if _, err := reference.ParseNormalizedNamed(name); err != nil { return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name) } + pm.muGC.RLock() + defer pm.muGC.RUnlock() + tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs") if err != nil { - return errors.Wrap(errdefs.System(err), "error preparing upgrade") - } - defer os.RemoveAll(tmpRootFSDir) - - dm := &downloadManager{ - tmpDir: tmpRootFSDir, - blobStore: pm.blobStore, + return errors.Wrap(err, "error creating tmp dir for plugin rootfs") } - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: dm, - }, - DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead - Schema2Types: distribution.PluginTypes, - } + var md fetchMeta - err = pm.pull(ctx, ref, pluginPullConfig, outStream) - if err != nil { - go pm.GC() + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() + + if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil { + return err + } + pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull") + + if err := validateFetchedMetadata(md); err != nil { return err } - if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil { + if err := pm.upgradePlugin(p, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges); err != nil { return err } p.PluginObj.PluginReference = ref.String() @@ -303,6 +253,8 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string } // Pull pulls a plugin, check if the correct privileges are provided and install the plugin. +// +// TODO: replace reference package usage with simpler url.Parse semantics func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) { pm.muGC.RLock() defer pm.muGC.RUnlock() @@ -320,30 +272,22 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs") if err != nil { - return errors.Wrap(errdefs.System(err), "error preparing pull") + return errors.Wrap(errdefs.System(err), "error preparing upgrade") } defer os.RemoveAll(tmpRootFSDir) - dm := &downloadManager{ - tmpDir: tmpRootFSDir, - blobStore: pm.blobStore, - } + var md fetchMeta - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: dm, - }, - DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead - Schema2Types: distribution.PluginTypes, - } + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() - err = pm.pull(ctx, ref, pluginPullConfig, outStream) - if err != nil { - go pm.GC() + if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil { + return err + } + pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull") + + if err := validateFetchedMetadata(md); err != nil { return err } @@ -354,12 +298,14 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m optsList = append(optsList, opts...) optsList = append(optsList, refOpt) - p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...) + // TODO: tmpRootFSDir is empty but should have layers in it + p, err := pm.createPlugin(name, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges, optsList...) if err != nil { return err } pm.publisher.Publish(EventCreate{Plugin: p.PluginObj}) + return nil } @@ -404,7 +350,7 @@ next: return out, nil } -// Push pushes a plugin to the store. +// Push pushes a plugin to the registry. func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error { p, err := pm.config.Store.GetV2Plugin(name) if err != nil { @@ -416,201 +362,197 @@ func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name()) } - var po progress.Output - if outStream != nil { - // Include a buffer so that slow client connections don't affect - // transfer performance. - progressChan := make(chan progress.Progress, 100) + statusTracker := docker.NewInMemoryTracker() - writesDone := make(chan struct{}) - - defer func() { - close(progressChan) - <-writesDone - }() - - var cancelFunc context.CancelFunc - ctx, cancelFunc = context.WithCancel(ctx) - - go func() { - progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan) - close(writesDone) - }() - - po = progress.ChanOutput(progressChan) - } else { - po = progress.DiscardOutput() - } - - // TODO: replace these with manager - is := &pluginConfigStore{ - pm: pm, - plugin: p, - } - lss := make(map[string]distribution.PushLayerProvider) - lss[runtime.GOOS] = &pluginLayerProvider{ - pm: pm, - plugin: p, - } - rs := &pluginReference{ - name: ref, - pluginID: p.Config, - } - - uploadManager := xfer.NewLayerUploadManager(3) - - imagePushConfig := &distribution.ImagePushConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - ProgressOutput: po, - RegistryService: pm.config.RegistryService, - ReferenceStore: rs, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: is, - RequireSchema2: true, - }, - ConfigMediaType: schema2.MediaTypePluginConfig, - LayerStores: lss, - UploadManager: uploadManager, - } - - return distribution.Push(ctx, ref, imagePushConfig) -} - -type pluginReference struct { - name reference.Named - pluginID digest.Digest -} - -func (r *pluginReference) References(id digest.Digest) []reference.Named { - if r.pluginID != id { - return nil - } - return []reference.Named{r.name} -} - -func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association { - return []refstore.Association{ - { - Ref: r.name, - ID: r.pluginID, - }, - } -} - -func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) { - if r.name.String() != ref.String() { - return digest.Digest(""), refstore.ErrDoesNotExist - } - return r.pluginID, nil -} - -func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error { - // Read only, ignore - return nil -} -func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error { - // Read only, ignore - return nil -} -func (r *pluginReference) Delete(ref reference.Named) (bool, error) { - // Read only, ignore - return false, nil -} - -type pluginConfigStore struct { - pm *Manager - plugin *v2.Plugin -} - -func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) { - return digest.Digest(""), errors.New("cannot store config on push") -} - -func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) { - if s.plugin.Config != d { - return nil, errors.New("plugin not found") - } - rwc, err := s.pm.blobStore.Get(d) + resolver, err := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, false) if err != nil { - return nil, err + return err } - defer rwc.Close() - return ioutil.ReadAll(rwc) -} -func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} + pusher, err := resolver.Pusher(ctx, ref.String()) + if err != nil { -func (s *pluginConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} + return errors.Wrap(err, "error creating plugin pusher") + } -type pluginLayerProvider struct { - pm *Manager - plugin *v2.Plugin -} + pj := newPushJobs(statusTracker) -func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) { - rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs) - var i int - for i = 1; i <= len(rootFS.DiffIDs); i++ { - if layer.CreateChainID(rootFS.DiffIDs[:i]) == id { - break + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() + + progressHandler := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + logrus.WithField("mediaType", desc.MediaType).WithField("digest", desc.Digest.String()).Debug("Preparing to push plugin layer") + id := stringid.TruncateID(desc.Digest.String()) + pj.add(remotes.MakeRefKey(ctx, desc), id) + progress.Update(out, id, "Preparing") + return nil, nil + }) + + desc, err := pm.getManifestDescriptor(ctx, p) + if err != nil { + return errors.Wrap(err, "error reading plugin manifest") + } + + progress.Messagef(out, "", "The push refers to repository [%s]", reference.FamiliarName(ref)) + + // TODO: If a layer already exists on the registry, the progress output just says "Preparing" + go func() { + timer := time.NewTimer(100 * time.Millisecond) + defer timer.Stop() + if !timer.Stop() { + <-timer.C + } + var statuses []contentStatus + for { + timer.Reset(100 * time.Millisecond) + select { + case <-ctx.Done(): + return + case <-timer.C: + statuses = pj.status() + } + + for _, s := range statuses { + out.WriteProgress(progress.Progress{ID: s.Ref, Current: s.Offset, Total: s.Total, Action: s.Status, LastUpdate: s.Offset == s.Total}) + } + } + }() + + // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo. + ctx = docker.WithScope(ctx, scope(ref, true)) + if err := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler { + return images.Handlers(progressHandler, h) + }); err != nil { + // Try fallback to http. + // This is needed because the containerd pusher will only attempt the first registry config we pass, which would + // typically be https. + // If there are no http-only host configs found we'll error out anyway. + resolver, _ := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, true) + if resolver != nil { + pusher, _ := resolver.Pusher(ctx, ref.String()) + if pusher != nil { + logrus.WithField("ref", ref).Debug("Re-attmpting push with http-fallback") + err2 := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler { + return images.Handlers(progressHandler, h) + }) + if err2 == nil { + err = nil + } else { + logrus.WithError(err2).WithField("ref", ref).Debug("Error while attempting push with http-fallback") + } + } + } + if err != nil { + return errors.Wrap(err, "error pushing plugin") } } - if i > len(rootFS.DiffIDs) { - return nil, errors.New("layer not found") + + // For blobs that already exist in the registry we need to make sure to update the progress otherwise it will just say "pending" + // TODO: How to check if the layer already exists? Is it worth it? + for _, j := range pj.jobs { + progress.Update(out, pj.names[j], "Upload complete") } - return &pluginLayer{ - pm: p.pm, - diffIDs: rootFS.DiffIDs[:i], - blobs: p.plugin.Blobsums[:i], - }, nil + + // Signal the client for content trust verification + progress.Aux(out, types.PushResult{Tag: ref.(reference.Tagged).Tag(), Digest: desc.Digest.String(), Size: int(desc.Size)}) + + return nil } -type pluginLayer struct { - pm *Manager - diffIDs []layer.DiffID - blobs []digest.Digest +// manifest wraps an OCI manifest, because... +// Historically the registry does not support plugins unless the media type on the manifest is specifically schema2.MediaTypeManifest +// So the OCI manifest media type is not supported. +// Additionally, there is extra validation for the docker schema2 manifest than there is a mediatype set on the manifest itself +// even though this is set on the descriptor +// The OCI types do not have this field. +type manifest struct { + specs.Manifest + MediaType string `json:"mediaType,omitempty"` } -func (l *pluginLayer) ChainID() layer.ChainID { - return layer.CreateChainID(l.diffIDs) -} +func buildManifest(ctx context.Context, s content.Manager, config digest.Digest, layers []digest.Digest) (manifest, error) { + var m manifest + m.MediaType = images.MediaTypeDockerSchema2Manifest + m.SchemaVersion = 2 -func (l *pluginLayer) DiffID() layer.DiffID { - return l.diffIDs[len(l.diffIDs)-1] -} - -func (l *pluginLayer) Parent() distribution.PushLayer { - if len(l.diffIDs) == 1 { - return nil + configInfo, err := s.Info(ctx, config) + if err != nil { + return m, errors.Wrapf(err, "error reading plugin config content for digest %s", config) } - return &pluginLayer{ - pm: l.pm, - diffIDs: l.diffIDs[:len(l.diffIDs)-1], - blobs: l.blobs[:len(l.diffIDs)-1], + m.Config = specs.Descriptor{ + MediaType: mediaTypePluginConfig, + Size: configInfo.Size, + Digest: configInfo.Digest, } + + for _, l := range layers { + info, err := s.Info(ctx, l) + if err != nil { + return m, errors.Wrapf(err, "error fetching info for content digest %s", l) + } + m.Layers = append(m.Layers, specs.Descriptor{ + MediaType: specs.MediaTypeImageLayerGzip, // TODO: This is assuming everything is a gzip compressed layer, but that may not be true. + Digest: l, + Size: info.Size, + }) + } + return m, nil } -func (l *pluginLayer) Open() (io.ReadCloser, error) { - return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1]) +// getManifestDescriptor gets the OCI descriptor for a manifest +// It will generate a manifest if one does not exist +func (pm *Manager) getManifestDescriptor(ctx context.Context, p *v2.Plugin) (specs.Descriptor, error) { + logger := logrus.WithField("plugin", p.Name()).WithField("digest", p.Manifest) + if p.Manifest != "" { + info, err := pm.blobStore.Info(ctx, p.Manifest) + if err == nil { + desc := specs.Descriptor{ + Size: info.Size, + Digest: info.Digest, + MediaType: images.MediaTypeDockerSchema2Manifest, + } + return desc, nil + } + logger.WithError(err).Debug("Could not find plugin manifest in content store") + } else { + logger.Info("Plugin does not have manifest digest") + } + logger.Info("Building a new plugin manifest") + + manifest, err := buildManifest(ctx, pm.blobStore, p.Config, p.Blobsums) + if err != nil { + return specs.Descriptor{}, err + } + + desc, err := writeManifest(ctx, pm.blobStore, &manifest) + if err != nil { + return desc, err + } + + if err := pm.save(p); err != nil { + logger.WithError(err).Error("Could not save plugin with manifest digest") + } + return desc, nil } -func (l *pluginLayer) Size() (int64, error) { - return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1]) -} +func writeManifest(ctx context.Context, cs content.Store, m *manifest) (specs.Descriptor, error) { + platform := platforms.DefaultSpec() + desc := specs.Descriptor{ + MediaType: images.MediaTypeDockerSchema2Manifest, + Platform: &platform, + } + data, err := json.Marshal(m) + if err != nil { + return desc, errors.Wrap(err, "error encoding manifest") + } + desc.Digest = digest.FromBytes(data) + desc.Size = int64(len(data)) -func (l *pluginLayer) MediaType() string { - return schema2.MediaTypeLayer -} - -func (l *pluginLayer) Release() { - // Nothing needs to be release, no references held + if err := content.WriteBlob(ctx, cs, remotes.MakeRefKey(ctx, desc), bytes.NewReader(data), desc); err != nil { + return desc, errors.Wrap(err, "error writing plugin manifest") + } + return desc, nil } // Remove deletes plugin's root directory. @@ -700,14 +642,14 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, var configJSON []byte rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON) - rootFSBlob, err := pm.blobStore.New() + rootFSBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name)) if err != nil { return err } defer rootFSBlob.Close() + gzw := gzip.NewWriter(rootFSBlob) - layerDigester := digest.Canonical.Digester() - rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash())) + rootFSReader := io.TeeReader(rootFS, gzw) if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil { return err @@ -736,8 +678,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, pm.mu.Lock() defer pm.mu.Unlock() - rootFSBlobsum, err := rootFSBlob.Commit() - if err != nil { + if err := rootFSBlob.Commit(ctx, 0, ""); err != nil { return err } defer func() { @@ -748,12 +689,12 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, config.Rootfs = &types.PluginConfigRootfs{ Type: "layers", - DiffIds: []string{layerDigester.Digest().String()}, + DiffIds: []string{rootFSBlob.Digest().String()}, } config.DockerVersion = dockerversion.Version - configBlob, err := pm.blobStore.New() + configBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name+"-config.json")) if err != nil { return err } @@ -761,12 +702,23 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, if err := json.NewEncoder(configBlob).Encode(config); err != nil { return errors.Wrap(err, "error encoding json config") } - configBlobsum, err := configBlob.Commit() - if err != nil { + if err := configBlob.Commit(ctx, 0, ""); err != nil { return err } - p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil) + configDigest := configBlob.Digest() + layers := []digest.Digest{rootFSBlob.Digest()} + + manifest, err := buildManifest(ctx, pm.blobStore, configDigest, layers) + if err != nil { + return err + } + desc, err := writeManifest(ctx, pm.blobStore, &manifest) + if err != nil { + return + } + + p, err := pm.createPlugin(name, configDigest, desc.Digest, layers, tmpRootFSDir, nil) if err != nil { return err } diff --git a/plugin/blobstore.go b/plugin/blobstore.go deleted file mode 100644 index 0babefbbf5..0000000000 --- a/plugin/blobstore.go +++ /dev/null @@ -1,190 +0,0 @@ -package plugin // import "github.com/docker/docker/plugin" - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "runtime" - - "github.com/docker/docker/distribution/xfer" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" - "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/chrootarchive" - "github.com/docker/docker/pkg/progress" - digest "github.com/opencontainers/go-digest" - specs "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type blobstore interface { - New() (WriteCommitCloser, error) - Get(dgst digest.Digest) (io.ReadCloser, error) - Size(dgst digest.Digest) (int64, error) -} - -type basicBlobStore struct { - path string -} - -func newBasicBlobStore(p string) (*basicBlobStore, error) { - tmpdir := filepath.Join(p, "tmp") - if err := os.MkdirAll(tmpdir, 0700); err != nil { - return nil, errors.Wrapf(err, "failed to mkdir %v", p) - } - return &basicBlobStore{path: p}, nil -} - -func (b *basicBlobStore) New() (WriteCommitCloser, error) { - f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion") - if err != nil { - return nil, errors.Wrap(err, "failed to create temp file") - } - return newInsertion(f), nil -} - -func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) { - return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) -} - -func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) { - stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) - if err != nil { - return 0, err - } - return stat.Size(), nil -} - -func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) { - for _, alg := range []string{string(digest.Canonical)} { - items, err := ioutil.ReadDir(filepath.Join(b.path, alg)) - if err != nil { - continue - } - for _, fi := range items { - if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists { - p := filepath.Join(b.path, alg, fi.Name()) - err := os.RemoveAll(p) - logrus.Debugf("cleaned up blob %v: %v", p, err) - } - } - } - -} - -// WriteCommitCloser defines object that can be committed to blobstore. -type WriteCommitCloser interface { - io.WriteCloser - Commit() (digest.Digest, error) -} - -type insertion struct { - io.Writer - f *os.File - digester digest.Digester - closed bool -} - -func newInsertion(tempFile *os.File) *insertion { - digester := digest.Canonical.Digester() - return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())} -} - -func (i *insertion) Commit() (digest.Digest, error) { - p := i.f.Name() - d := filepath.Join(filepath.Join(p, "../../")) - i.f.Sync() - defer os.RemoveAll(p) - if err := i.f.Close(); err != nil { - return "", err - } - i.closed = true - dgst := i.digester.Digest() - if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil { - return "", errors.Wrapf(err, "failed to mkdir %v", d) - } - if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil { - return "", errors.Wrapf(err, "failed to rename %v", p) - } - return dgst, nil -} - -func (i *insertion) Close() error { - if i.closed { - return nil - } - defer os.RemoveAll(i.f.Name()) - return i.f.Close() -} - -type downloadManager struct { - blobStore blobstore - tmpDir string - blobs []digest.Digest - configDigest digest.Digest -} - -func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) { - for _, l := range layers { - b, err := dm.blobStore.New() - if err != nil { - return initialRootFS, nil, err - } - defer b.Close() - rc, _, err := l.Download(ctx, progressOutput) - if err != nil { - return initialRootFS, nil, errors.Wrap(err, "failed to download") - } - defer rc.Close() - r := io.TeeReader(rc, b) - inflatedLayerData, err := archive.DecompressStream(r) - if err != nil { - return initialRootFS, nil, err - } - defer inflatedLayerData.Close() - digester := digest.Canonical.Digester() - if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil { - return initialRootFS, nil, err - } - initialRootFS.Append(layer.DiffID(digester.Digest())) - d, err := b.Commit() - if err != nil { - return initialRootFS, nil, err - } - dm.blobs = append(dm.blobs, d) - } - return initialRootFS, nil, nil -} - -func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) { - b, err := dm.blobStore.New() - if err != nil { - return "", err - } - defer b.Close() - n, err := b.Write(dt) - if err != nil { - return "", err - } - if n != len(dt) { - return "", io.ErrShortWrite - } - d, err := b.Commit() - dm.configDigest = d - return d, err -} - -func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) { - return nil, fmt.Errorf("digest not found") -} -func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} -func (dm *downloadManager) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} diff --git a/plugin/fetch_linux.go b/plugin/fetch_linux.go new file mode 100644 index 0000000000..19b9bbf18e --- /dev/null +++ b/plugin/fetch_linux.go @@ -0,0 +1,288 @@ +package plugin + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/containerd/containerd/content" + c8derrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types" + progressutils "github.com/docker/docker/distribution/utils" + "github.com/docker/docker/pkg/chrootarchive" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/stringid" + digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json" + +// setupProgressOutput sets up the passed in writer to stream progress. +// +// The passed in cancel function is used by the progress writer to signal callers that there +// is an issue writing to the stream. +// +// The returned function is used to wait for the progress writer to be finished. +// Call it to make sure the progress writer is done before returning from your function as needed. +func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) { + var out progress.Output + f := func() {} + + if outStream != nil { + ch := make(chan progress.Progress, 100) + out = progress.ChanOutput(ch) + + ctx, retCancel := context.WithCancel(context.Background()) + go func() { + progressutils.WriteDistributionProgress(cancel, outStream, ch) + retCancel() + }() + + f = func() { + close(ch) + <-ctx.Done() + } + } else { + out = progress.DiscardOutput() + } + return out, f +} + +// fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers +// There is no need to use remotes.FetchHandler since it already gets set +func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *types.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) { + // We need to make sure we have a domain on the reference + withDomain, err := reference.ParseNormalizedNamed(ref.String()) + if err != nil { + return errors.Wrap(err, "error parsing plugin image reference") + } + + // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo. + ctx = docker.WithScope(ctx, scope(ref, false)) + + // Make sure the fetch handler knows how to set a ref key for the plugin media type. + // Without this the ref key is "unknown" and we see a nasty warning message in the logs + ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin") + + resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false) + if err != nil { + return err + } + resolved, desc, err := resolver.Resolve(ctx, withDomain.String()) + if err != nil { + // This is backwards compatible with older versions of the distribution registry. + // The containerd client will add it's own accept header as a comma separated list of supported manifests. + // This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list, + // so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the + // fallback does not support plugin configs... + logrus.WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format") + headers := http.Header{} + headers.Add("Accept", images.MediaTypeDockerSchema2Manifest) + headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList) + headers.Add("Accept", specs.MediaTypeImageManifest) + headers.Add("Accept", specs.MediaTypeImageIndex) + resolver, _ = pm.newResolver(ctx, nil, auth, headers, false) + if resolver != nil { + resolved, desc, err = resolver.Resolve(ctx, withDomain.String()) + if err != nil { + logrus.WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format") + } + } + if err != nil { + return errors.Wrap(err, "error resolving plugin reference") + } + } + + fetcher, err := resolver.Fetcher(ctx, resolved) + if err != nil { + return errors.Wrap(err, "error creating plugin image fetcher") + } + + fp := withFetchProgress(pm.blobStore, out, ref) + handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...) + if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { + return err + } + return nil +} + +// applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory. +// +// TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however +// if there are multiple layers to fetch we may end up extracting layers in the wrong +// order. +func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case + specs.MediaTypeImageLayer, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayerGzip, + images.MediaTypeDockerSchema2LayerGzip: + default: + return nil, nil + } + + ra, err := cs.ReaderAt(ctx, desc) + if err != nil { + return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest) + } + + id := stringid.TruncateID(desc.Digest.String()) + + rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close) + pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting") + defer pr.Close() + + if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil { + return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest) + } + progress.Update(out, id, "Complete") + return nil, nil + } +} + +func childrenHandler(cs content.Store) images.HandlerFunc { + ch := images.ChildrenHandler(cs) + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case mediaTypePluginConfig: + return nil, nil + default: + return ch(ctx, desc) + } + } +} + +type fetchMeta struct { + blobs []digest.Digest + config digest.Digest + manifest digest.Digest +} + +func storeFetchMetadata(m *fetchMeta) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case + images.MediaTypeDockerSchema2LayerForeignGzip, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayer, + specs.MediaTypeImageLayerGzip: + m.blobs = append(m.blobs, desc.Digest) + case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: + m.manifest = desc.Digest + case mediaTypePluginConfig: + m.config = desc.Digest + } + return nil, nil + } +} + +func validateFetchedMetadata(md fetchMeta) error { + if md.config == "" { + return errors.New("fetched plugin image but plugin config is missing") + } + if md.manifest == "" { + return errors.New("fetched plugin image but manifest is missing") + } + return nil +} + +// withFetchProgress is a fetch handler which registers a descriptor with a progress +func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: + tn := reference.TagNameOnly(ref) + tagged := tn.(reference.Tagged) + progress.Messagef(out, tagged.Tag(), "Pulling from %s", reference.FamiliarName(ref)) + progress.Messagef(out, "", "Digest: %s", desc.Digest.String()) + return nil, nil + case + images.MediaTypeDockerSchema2LayerGzip, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayer, + specs.MediaTypeImageLayerGzip: + default: + return nil, nil + } + + id := stringid.TruncateID(desc.Digest.String()) + + if _, err := cs.Info(ctx, desc.Digest); err == nil { + out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true}) + return nil, nil + } + + progress.Update(out, id, "Waiting") + + key := remotes.MakeRefKey(ctx, desc) + + go func() { + timer := time.NewTimer(100 * time.Millisecond) + if !timer.Stop() { + <-timer.C + } + defer timer.Stop() + + var pulling bool + var ctxErr error + + for { + timer.Reset(100 * time.Millisecond) + + select { + case <-ctx.Done(): + ctxErr = ctx.Err() + // make sure we can still fetch from the content store + // TODO: Might need to add some sort of timeout + ctx = context.Background() + case <-timer.C: + } + + s, err := cs.Status(ctx, key) + if err != nil { + if !c8derrdefs.IsNotFound(err) { + logrus.WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull") + progress.Update(out, id, err.Error()) + return + } + + if _, err := cs.Info(ctx, desc.Digest); err == nil { + progress.Update(out, id, "Download complete") + return + } + + if ctxErr != nil { + progress.Update(out, id, ctxErr.Error()) + return + } + + continue + } + + if !pulling { + progress.Update(out, id, "Pulling fs layer") + pulling = true + } + + if s.Offset == s.Total { + out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true}) + return + } + + out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total}) + } + }() + return nil, nil + } +} diff --git a/plugin/manager.go b/plugin/manager.go index 38b18f0e74..f67b22c952 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -1,6 +1,7 @@ package plugin // import "github.com/docker/docker/plugin" import ( + "context" "encoding/json" "io" "io/ioutil" @@ -12,10 +13,10 @@ import ( "strings" "sync" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pubsub" @@ -72,7 +73,7 @@ type Manager struct { mu sync.RWMutex // protects cMap muGC sync.RWMutex // protects blobstore deletions cMap map[*v2.Plugin]*controller - blobStore *basicBlobStore + blobStore content.Store publisher *pubsub.Publisher executor Executor } @@ -117,9 +118,9 @@ func NewManager(config ManagerConfig) (*Manager, error) { return nil, err } - manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs")) + manager.blobStore, err = local.NewStore(filepath.Join(manager.config.Root, "storage")) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error creating plugin blob store") } manager.cMap = make(map[*v2.Plugin]*controller) @@ -305,7 +306,15 @@ func (pm *Manager) GC() { } } - pm.blobStore.gc(whitelist) + ctx := context.TODO() + pm.blobStore.Walk(ctx, func(info content.Info) error { + _, ok := whitelist[info.Digest] + if ok { + return nil + } + + return pm.blobStore.Delete(ctx, info.Digest) + }) } type logHook struct{ id string } @@ -357,28 +366,3 @@ func isEqualPrivilege(a, b types.PluginPrivilege) bool { return reflect.DeepEqual(a.Value, b.Value) } - -func configToRootFS(c []byte) (*image.RootFS, error) { - var pluginConfig types.PluginConfig - if err := json.Unmarshal(c, &pluginConfig); err != nil { - return nil, err - } - // validation for empty rootfs is in distribution code - if pluginConfig.Rootfs == nil { - return nil, nil - } - - return rootFSFromPlugin(pluginConfig.Rootfs), nil -} - -func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS { - rootFS := image.RootFS{ - Type: pluginfs.Type, - DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)), - } - for i := range pluginfs.DiffIds { - rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i]) - } - - return &rootFS -} diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index a3aca0d783..eb39c39cc0 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -1,12 +1,14 @@ package plugin // import "github.com/docker/docker/plugin" import ( + "context" "encoding/json" "net" "os" "path/filepath" "time" + "github.com/containerd/containerd/content" "github.com/docker/docker/api/types" "github.com/docker/docker/daemon/initlayer" "github.com/docker/docker/errdefs" @@ -17,6 +19,7 @@ import ( v2 "github.com/docker/docker/plugin/v2" "github.com/moby/sys/mount" digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -213,7 +216,7 @@ func (pm *Manager) Shutdown() { } } -func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) { +func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) { config, err := pm.setupNewPlugin(configDigest, blobsums, privileges) if err != nil { return err @@ -261,19 +264,22 @@ func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobs } p.PluginObj.Config = config + p.Manifest = manifestDigest err = pm.save(p) return errors.Wrap(err, "error saving upgraded plugin config") } func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.Digest, privileges *types.PluginPrivileges) (types.PluginConfig, error) { - configRC, err := pm.blobStore.Get(configDigest) + configRA, err := pm.blobStore.ReaderAt(context.TODO(), specs.Descriptor{Digest: configDigest}) if err != nil { return types.PluginConfig{}, err } - defer configRC.Close() + defer configRA.Close() + + configR := content.NewReader(configRA) var config types.PluginConfig - dec := json.NewDecoder(configRC) + dec := json.NewDecoder(configR) if err := dec.Decode(&config); err != nil { return types.PluginConfig{}, errors.Wrapf(err, "failed to parse config") } @@ -292,7 +298,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest. } // createPlugin creates a new plugin. take lock before calling. -func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) { +func (pm *Manager) createPlugin(name string, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) { if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store return nil, errdefs.InvalidParameter(err) } @@ -310,6 +316,7 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum }, Config: configDigest, Blobsums: blobsums, + Manifest: manifestDigest, } p.InitEmptySettings() for _, o := range opts { diff --git a/plugin/progress.go b/plugin/progress.go new file mode 100644 index 0000000000..ffe6c70aae --- /dev/null +++ b/plugin/progress.go @@ -0,0 +1,74 @@ +package plugin + +import ( + "sync" + "time" + + "github.com/containerd/containerd/remotes/docker" +) + +func newPushJobs(tracker docker.StatusTracker) *pushJobs { + return &pushJobs{ + names: make(map[string]string), + t: tracker, + } +} + +type pushJobs struct { + t docker.StatusTracker + + mu sync.Mutex + jobs []string + // maps job ref to a name + names map[string]string +} + +func (p *pushJobs) add(id, name string) { + p.mu.Lock() + defer p.mu.Unlock() + + if _, ok := p.names[id]; ok { + return + } + p.jobs = append(p.jobs, id) + p.names[id] = name +} + +func (p *pushJobs) status() []contentStatus { + statuses := make([]contentStatus, 0, len(p.jobs)) + + p.mu.Lock() + defer p.mu.Unlock() + + for _, j := range p.jobs { + var s contentStatus + s.Ref = p.names[j] + + status, err := p.t.GetStatus(j) + if err != nil { + s.Status = "Waiting" + } else { + s.Total = status.Total + s.Offset = status.Offset + s.StartedAt = status.StartedAt + s.UpdatedAt = status.UpdatedAt + if status.UploadUUID == "" { + s.Status = "Upload complete" + } else { + s.Status = "Uploading" + } + } + statuses = append(statuses, s) + } + + return statuses +} + +type contentStatus struct { + Status string + Total int64 + Offset int64 + StartedAt time.Time + UpdatedAt time.Time + Ref string +} diff --git a/plugin/registry.go b/plugin/registry.go new file mode 100644 index 0000000000..ad2a6b7138 --- /dev/null +++ b/plugin/registry.go @@ -0,0 +1,111 @@ +package plugin + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "github.com/docker/docker/dockerversion" + + "github.com/pkg/errors" + + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types" +) + +// scope builds the correct auth scope for the registry client to authorize against +// By default the client currently only does a "repository:" scope with out a classifier, e.g. "(plugin)" +// Without this, the client will not be able to authorize the request +func scope(ref reference.Named, push bool) string { + scope := "repository(plugin):" + reference.Path(reference.TrimNamed(ref)) + ":pull" + if push { + scope += ",push" + } + return scope +} + +func (pm *Manager) newResolver(ctx context.Context, tracker docker.StatusTracker, auth *types.AuthConfig, headers http.Header, httpFallback bool) (remotes.Resolver, error) { + if headers == nil { + headers = http.Header{} + } + headers.Add("User-Agent", dockerversion.DockerUserAgent(ctx)) + + return docker.NewResolver(docker.ResolverOptions{ + Tracker: tracker, + Headers: headers, + Hosts: pm.registryHostsFn(auth, httpFallback), + }), nil +} + +func registryHTTPClient(config *tls.Config) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSClientConfig: config, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 30 * time.Second, + }, + } +} + +func (pm *Manager) registryHostsFn(auth *types.AuthConfig, httpFallback bool) docker.RegistryHosts { + return func(hostname string) ([]docker.RegistryHost, error) { + eps, err := pm.config.RegistryService.LookupPullEndpoints(hostname) + if err != nil { + return nil, errors.Wrapf(err, "error resolving repository for %s", hostname) + } + + hosts := make([]docker.RegistryHost, 0, len(eps)) + + for _, ep := range eps { + // forced http fallback is used only for push since the containerd pusher only ever uses the first host we + // pass to it. + // So it is the callers responsibility to retry with this flag set. + if httpFallback && ep.URL.Scheme != "http" { + logrus.WithField("registryHost", hostname).WithField("endpoint", ep).Debugf("Skipping non-http endpoint") + continue + } + + caps := docker.HostCapabilityPull | docker.HostCapabilityResolve + if !ep.Mirror { + caps = caps | docker.HostCapabilityPush + } + + host, err := docker.DefaultHost(ep.URL.Host) + if err != nil { + return nil, err + } + + client := registryHTTPClient(ep.TLSConfig) + hosts = append(hosts, docker.RegistryHost{ + Host: host, + Scheme: ep.URL.Scheme, + Client: client, + Path: "/v2", + Capabilities: caps, + Authorizer: docker.NewDockerAuthorizer( + docker.WithAuthClient(client), + docker.WithAuthCreds(func(_ string) (string, string, error) { + if auth.IdentityToken != "" { + return "", auth.IdentityToken, nil + } + return auth.Username, auth.Password, nil + }), + ), + }) + } + logrus.WithField("registryHost", hostname).WithField("hosts", hosts).Debug("Resolved registry hosts") + + return hosts, nil + } +} diff --git a/plugin/v2/plugin.go b/plugin/v2/plugin.go index b46ff60b31..3e6e063f4a 100644 --- a/plugin/v2/plugin.go +++ b/plugin/v2/plugin.go @@ -25,6 +25,7 @@ type Plugin struct { Config digest.Digest Blobsums []digest.Digest + Manifest digest.Digest modifyRuntimeSpec func(*specs.Spec) diff --git a/testutil/fixtures/plugin/plugin.go b/testutil/fixtures/plugin/plugin.go index 8517b37afb..3ec3bc39ea 100644 --- a/testutil/fixtures/plugin/plugin.go +++ b/testutil/fixtures/plugin/plugin.go @@ -26,7 +26,15 @@ type CreateOpt func(*Config) // create the plugin with. type Config struct { *types.PluginConfig - binPath string + binPath string + RegistryConfig registry.ServiceOptions +} + +// WithInsecureRegistry specifies that the given registry can skip host-key checking as well as fall back to plain http +func WithInsecureRegistry(url string) CreateOpt { + return func(cfg *Config) { + cfg.RegistryConfig.InsecureRegistries = append(cfg.RegistryConfig.InsecureRegistries, url) + } } // WithBinary is a CreateOpt to set an custom binary to create the plugin with. @@ -82,6 +90,11 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, return errors.Wrap(err, "error creating plugin root") } + var cfg Config + for _, o := range opts { + o(&cfg) + } + tar, err := makePluginBundle(inPath, opts...) if err != nil { return err @@ -92,7 +105,7 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, return nil, nil } - regService, err := registry.NewService(registry.ServiceOptions{}) + regService, err := registry.NewService(cfg.RegistryConfig) if err != nil { return err } diff --git a/testutil/registry/ops.go b/testutil/registry/ops.go index c004f37424..7357d5f509 100644 --- a/testutil/registry/ops.go +++ b/testutil/registry/ops.go @@ -1,5 +1,7 @@ package registry +import "io" + // Schema1 sets the registry to serve v1 api func Schema1(c *Config) { c.schema1 = true @@ -24,3 +26,17 @@ func URL(registryURL string) func(*Config) { c.registryURL = registryURL } } + +// WithStdout sets the stdout of the registry command to the passed in writer. +func WithStdout(w io.Writer) func(c *Config) { + return func(c *Config) { + c.stdout = w + } +} + +// WithStderr sets the stdout of the registry command to the passed in writer. +func WithStderr(w io.Writer) func(c *Config) { + return func(c *Config) { + c.stderr = w + } +} diff --git a/testutil/registry/registry.go b/testutil/registry/registry.go index c272d88ce6..d8213a3bec 100644 --- a/testutil/registry/registry.go +++ b/testutil/registry/registry.go @@ -2,6 +2,7 @@ package registry // import "github.com/docker/docker/testutil/registry" import ( "fmt" + "io" "io/ioutil" "net/http" "os" @@ -40,6 +41,8 @@ type Config struct { auth string tokenURL string registryURL string + stdout io.Writer + stderr io.Writer } // NewV2 creates a v2 registry server @@ -109,6 +112,8 @@ http: binary = V2binarySchema1 } cmd := exec.Command(binary, confPath) + cmd.Stdout = c.stdout + cmd.Stderr = c.stderr if err := cmd.Start(); err != nil { // FIXME(vdemeester) use a defer/clean func os.RemoveAll(tmp)