diff --git a/integration/plugin/common/main_test.go b/integration/plugin/common/main_test.go index 5bcbac2a83..cd42c8f761 100644 --- a/integration/plugin/common/main_test.go +++ b/integration/plugin/common/main_test.go @@ -5,12 +5,16 @@ import ( "os" "testing" + "github.com/docker/docker/pkg/reexec" "github.com/docker/docker/testutil/environment" ) var testEnv *environment.Execution func TestMain(m *testing.M) { + if reexec.Init() { + return + } var err error testEnv, err = environment.New() if err != nil { diff --git a/integration/plugin/common/plugin_test.go b/integration/plugin/common/plugin_test.go index 35dc7fb980..b2c824e8c4 100644 --- a/integration/plugin/common/plugin_test.go +++ b/integration/plugin/common/plugin_test.go @@ -1,12 +1,25 @@ package common // import "github.com/docker/docker/integration/plugin/common" import ( + "context" + "encoding/base64" + "encoding/json" + "io" + "io/ioutil" + "net" "net/http" + "path" + "strings" "testing" + "github.com/docker/docker/api/types" + "github.com/docker/docker/testutil/daemon" + "github.com/docker/docker/testutil/fixtures/plugin" + "github.com/docker/docker/testutil/registry" "github.com/docker/docker/testutil/request" "gotest.tools/v3/assert" is "gotest.tools/v3/assert/cmp" + "gotest.tools/v3/skip" ) func TestPluginInvalidJSON(t *testing.T) { @@ -36,3 +49,111 @@ func TestPluginInvalidJSON(t *testing.T) { }) } } + +func TestPluginInstall(t *testing.T) { + skip.If(t, testEnv.IsRemoteDaemon, "cannot run daemon when remote daemon") + skip.If(t, testEnv.OSType == "windows") + skip.If(t, testEnv.IsRootless, "rootless mode has different view of localhost") + + ctx := context.Background() + client := testEnv.APIClient() + + t.Run("no auth", func(t *testing.T) { + defer setupTest(t)() + + reg := registry.NewV2(t) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(registry.DefaultURL, name+":latest") + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil)) + + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo}) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + + t.Run("with htpasswd", func(t *testing.T) { + defer setupTest(t)() + + reg := registry.NewV2(t, registry.Htpasswd) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(registry.DefaultURL, name+":latest") + auth := &types.AuthConfig{ServerAddress: registry.DefaultURL, Username: "testuser", Password: "testpassword"} + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, auth)) + + authEncoded, err := json.Marshal(auth) + assert.NilError(t, err) + + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{ + RegistryAuth: base64.URLEncoding.EncodeToString(authEncoded), + Disabled: true, + RemoteRef: repo, + }) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + t.Run("with insecure", func(t *testing.T) { + skip.If(t, !testEnv.IsLocalDaemon()) + + addrs, err := net.InterfaceAddrs() + assert.NilError(t, err) + + var bindTo string + for _, addr := range addrs { + ip, ok := addr.(*net.IPNet) + if !ok { + continue + } + if ip.IP.IsLoopback() || ip.IP.To4() == nil { + continue + } + bindTo = ip.IP.String() + } + + if bindTo == "" { + t.Skip("No suitable interface to bind registry to") + } + + regURL := bindTo + ":5000" + + d := daemon.New(t) + defer d.Stop(t) + + d.Start(t, "--insecure-registry="+regURL) + defer d.Stop(t) + + reg := registry.NewV2(t, registry.URL(regURL)) + defer reg.Close() + + name := "test-" + strings.ToLower(t.Name()) + repo := path.Join(regURL, name+":latest") + assert.NilError(t, plugin.CreateInRegistry(ctx, repo, nil, plugin.WithInsecureRegistry(regURL))) + + client := d.NewClientT(t) + rdr, err := client.PluginInstall(ctx, repo, types.PluginInstallOptions{Disabled: true, RemoteRef: repo}) + assert.NilError(t, err) + defer rdr.Close() + + _, err = io.Copy(ioutil.Discard, rdr) + assert.NilError(t, err) + + _, _, err = client.PluginInspectWithRaw(ctx, repo) + assert.NilError(t, err) + }) + // TODO: test insecure registry with https +} diff --git a/plugin/backend_linux.go b/plugin/backend_linux.go index f77d12650d..7bb3c4257d 100644 --- a/plugin/backend_linux.go +++ b/plugin/backend_linux.go @@ -2,6 +2,7 @@ package plugin // import "github.com/docker/docker/plugin" import ( "archive/tar" + "bytes" "compress/gzip" "context" "encoding/json" @@ -11,27 +12,27 @@ import ( "os" "path" "path/filepath" - "runtime" "strings" + "time" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/platforms" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" "github.com/docker/distribution/manifest/schema2" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" - "github.com/docker/docker/distribution" - progressutils "github.com/docker/docker/distribution/utils" - "github.com/docker/docker/distribution/xfer" "github.com/docker/docker/dockerversion" "github.com/docker/docker/errdefs" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/chrootarchive" "github.com/docker/docker/pkg/pools" "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/system" v2 "github.com/docker/docker/plugin/v2" - refstore "github.com/docker/docker/reference" "github.com/moby/sys/mount" digest "github.com/opencontainers/go-digest" specs "github.com/opencontainers/image-spec/specs-go/v1" @@ -98,64 +99,6 @@ func (pm *Manager) Inspect(refOrID string) (tp *types.Plugin, err error) { return &p.PluginObj, nil } -func (pm *Manager) pull(ctx context.Context, ref reference.Named, config *distribution.ImagePullConfig, outStream io.Writer) error { - if outStream != nil { - // Include a buffer so that slow client connections don't affect - // transfer performance. - progressChan := make(chan progress.Progress, 100) - - writesDone := make(chan struct{}) - - defer func() { - close(progressChan) - <-writesDone - }() - - var cancelFunc context.CancelFunc - ctx, cancelFunc = context.WithCancel(ctx) - - go func() { - progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan) - close(writesDone) - }() - - config.ProgressOutput = progress.ChanOutput(progressChan) - } else { - config.ProgressOutput = progress.DiscardOutput() - } - return distribution.Pull(ctx, ref, config) -} - -type tempConfigStore struct { - config []byte - configDigest digest.Digest -} - -func (s *tempConfigStore) Put(c []byte) (digest.Digest, error) { - dgst := digest.FromBytes(c) - - s.config = c - s.configDigest = dgst - - return dgst, nil -} - -func (s *tempConfigStore) Get(d digest.Digest) ([]byte, error) { - if d != s.configDigest { - return nil, errNotFound("digest not found") - } - return s.config, nil -} - -func (s *tempConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} - -func (s *tempConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} - func computePrivileges(c types.PluginConfig) types.PluginPrivileges { var privileges types.PluginPrivileges if c.Network.Type != "null" && c.Network.Type != "bridge" && c.Network.Type != "" { @@ -217,37 +160,53 @@ func computePrivileges(c types.PluginConfig) types.PluginPrivileges { // Privileges pulls a plugin config and computes the privileges required to install it. func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHeader http.Header, authConfig *types.AuthConfig) (types.PluginPrivileges, error) { - // create image store instance - cs := &tempConfigStore{} + var ( + config types.PluginConfig + configSeen bool + ) - // DownloadManager not defined because only pulling configuration. - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: func(string, string, string) {}, - ImageStore: cs, - }, - Schema2Types: distribution.PluginTypes, + h := func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case schema2.MediaTypeManifest, specs.MediaTypeImageManifest: + data, err := content.ReadBlob(ctx, pm.blobStore, desc) + if err != nil { + return nil, errors.Wrapf(err, "error reading image manifest from blob store for %s", ref) + } + + var m specs.Manifest + if err := json.Unmarshal(data, &m); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling image manifest for %s", ref) + } + return []specs.Descriptor{m.Config}, nil + case schema2.MediaTypePluginConfig: + configSeen = true + data, err := content.ReadBlob(ctx, pm.blobStore, desc) + if err != nil { + return nil, errors.Wrapf(err, "error reading plugin config from blob store for %s", ref) + } + + if err := json.Unmarshal(data, &config); err != nil { + return nil, errors.Wrapf(err, "error unmarshaling plugin config for %s", ref) + } + } + + return nil, nil } - if err := pm.pull(ctx, ref, pluginPullConfig, nil); err != nil { - return nil, err + if err := pm.fetch(ctx, ref, authConfig, progress.DiscardOutput(), metaHeader, images.HandlerFunc(h)); err != nil { + return types.PluginPrivileges{}, nil } - if cs.config == nil { - return nil, errors.New("no configuration pulled") - } - var config types.PluginConfig - if err := json.Unmarshal(cs.config, &config); err != nil { - return nil, errdefs.System(err) + if !configSeen { + return types.PluginPrivileges{}, errors.Errorf("did not find plugin config for specified reference %s", ref) } return computePrivileges(config), nil } // Upgrade upgrades a plugin +// +// TODO: replace reference package usage with simpler url.Parse semantics func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) { p, err := pm.config.Store.GetV2Plugin(name) if err != nil { @@ -258,44 +217,35 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string return errors.Wrap(enabledError(p.Name()), "plugin must be disabled before upgrading") } - pm.muGC.RLock() - defer pm.muGC.RUnlock() - // revalidate because Pull is public if _, err := reference.ParseNormalizedNamed(name); err != nil { return errors.Wrapf(errdefs.InvalidParameter(err), "failed to parse %q", name) } + pm.muGC.RLock() + defer pm.muGC.RUnlock() + tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs") if err != nil { - return errors.Wrap(errdefs.System(err), "error preparing upgrade") - } - defer os.RemoveAll(tmpRootFSDir) - - dm := &downloadManager{ - tmpDir: tmpRootFSDir, - blobStore: pm.blobStore, + return errors.Wrap(err, "error creating tmp dir for plugin rootfs") } - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: dm, - }, - DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead - Schema2Types: distribution.PluginTypes, - } + var md fetchMeta - err = pm.pull(ctx, ref, pluginPullConfig, outStream) - if err != nil { - go pm.GC() + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() + + if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil { + return err + } + pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull") + + if err := validateFetchedMetadata(md); err != nil { return err } - if err := pm.upgradePlugin(p, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges); err != nil { + if err := pm.upgradePlugin(p, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges); err != nil { return err } p.PluginObj.PluginReference = ref.String() @@ -303,6 +253,8 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string } // Pull pulls a plugin, check if the correct privileges are provided and install the plugin. +// +// TODO: replace reference package usage with simpler url.Parse semantics func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) { pm.muGC.RLock() defer pm.muGC.RUnlock() @@ -320,30 +272,22 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m tmpRootFSDir, err := ioutil.TempDir(pm.tmpDir(), ".rootfs") if err != nil { - return errors.Wrap(errdefs.System(err), "error preparing pull") + return errors.Wrap(errdefs.System(err), "error preparing upgrade") } defer os.RemoveAll(tmpRootFSDir) - dm := &downloadManager{ - tmpDir: tmpRootFSDir, - blobStore: pm.blobStore, - } + var md fetchMeta - pluginPullConfig := &distribution.ImagePullConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - RegistryService: pm.config.RegistryService, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: dm, - }, - DownloadManager: dm, // todo: reevaluate if possible to substitute distribution/xfer dependencies instead - Schema2Types: distribution.PluginTypes, - } + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() - err = pm.pull(ctx, ref, pluginPullConfig, outStream) - if err != nil { - go pm.GC() + if err := pm.fetch(ctx, ref, authConfig, out, metaHeader, storeFetchMetadata(&md), childrenHandler(pm.blobStore), applyLayer(pm.blobStore, tmpRootFSDir, out)); err != nil { + return err + } + pm.config.LogPluginEvent(reference.FamiliarString(ref), name, "pull") + + if err := validateFetchedMetadata(md); err != nil { return err } @@ -354,12 +298,14 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m optsList = append(optsList, opts...) optsList = append(optsList, refOpt) - p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...) + // TODO: tmpRootFSDir is empty but should have layers in it + p, err := pm.createPlugin(name, md.config, md.manifest, md.blobs, tmpRootFSDir, &privileges, optsList...) if err != nil { return err } pm.publisher.Publish(EventCreate{Plugin: p.PluginObj}) + return nil } @@ -404,7 +350,7 @@ next: return out, nil } -// Push pushes a plugin to the store. +// Push pushes a plugin to the registry. func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header, authConfig *types.AuthConfig, outStream io.Writer) error { p, err := pm.config.Store.GetV2Plugin(name) if err != nil { @@ -416,201 +362,197 @@ func (pm *Manager) Push(ctx context.Context, name string, metaHeader http.Header return errors.Wrapf(err, "plugin has invalid name %v for push", p.Name()) } - var po progress.Output - if outStream != nil { - // Include a buffer so that slow client connections don't affect - // transfer performance. - progressChan := make(chan progress.Progress, 100) + statusTracker := docker.NewInMemoryTracker() - writesDone := make(chan struct{}) - - defer func() { - close(progressChan) - <-writesDone - }() - - var cancelFunc context.CancelFunc - ctx, cancelFunc = context.WithCancel(ctx) - - go func() { - progressutils.WriteDistributionProgress(cancelFunc, outStream, progressChan) - close(writesDone) - }() - - po = progress.ChanOutput(progressChan) - } else { - po = progress.DiscardOutput() - } - - // TODO: replace these with manager - is := &pluginConfigStore{ - pm: pm, - plugin: p, - } - lss := make(map[string]distribution.PushLayerProvider) - lss[runtime.GOOS] = &pluginLayerProvider{ - pm: pm, - plugin: p, - } - rs := &pluginReference{ - name: ref, - pluginID: p.Config, - } - - uploadManager := xfer.NewLayerUploadManager(3) - - imagePushConfig := &distribution.ImagePushConfig{ - Config: distribution.Config{ - MetaHeaders: metaHeader, - AuthConfig: authConfig, - ProgressOutput: po, - RegistryService: pm.config.RegistryService, - ReferenceStore: rs, - ImageEventLogger: pm.config.LogPluginEvent, - ImageStore: is, - RequireSchema2: true, - }, - ConfigMediaType: schema2.MediaTypePluginConfig, - LayerStores: lss, - UploadManager: uploadManager, - } - - return distribution.Push(ctx, ref, imagePushConfig) -} - -type pluginReference struct { - name reference.Named - pluginID digest.Digest -} - -func (r *pluginReference) References(id digest.Digest) []reference.Named { - if r.pluginID != id { - return nil - } - return []reference.Named{r.name} -} - -func (r *pluginReference) ReferencesByName(ref reference.Named) []refstore.Association { - return []refstore.Association{ - { - Ref: r.name, - ID: r.pluginID, - }, - } -} - -func (r *pluginReference) Get(ref reference.Named) (digest.Digest, error) { - if r.name.String() != ref.String() { - return digest.Digest(""), refstore.ErrDoesNotExist - } - return r.pluginID, nil -} - -func (r *pluginReference) AddTag(ref reference.Named, id digest.Digest, force bool) error { - // Read only, ignore - return nil -} -func (r *pluginReference) AddDigest(ref reference.Canonical, id digest.Digest, force bool) error { - // Read only, ignore - return nil -} -func (r *pluginReference) Delete(ref reference.Named) (bool, error) { - // Read only, ignore - return false, nil -} - -type pluginConfigStore struct { - pm *Manager - plugin *v2.Plugin -} - -func (s *pluginConfigStore) Put([]byte) (digest.Digest, error) { - return digest.Digest(""), errors.New("cannot store config on push") -} - -func (s *pluginConfigStore) Get(d digest.Digest) ([]byte, error) { - if s.plugin.Config != d { - return nil, errors.New("plugin not found") - } - rwc, err := s.pm.blobStore.Get(d) + resolver, err := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, false) if err != nil { - return nil, err + return err } - defer rwc.Close() - return ioutil.ReadAll(rwc) -} -func (s *pluginConfigStore) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} + pusher, err := resolver.Pusher(ctx, ref.String()) + if err != nil { -func (s *pluginConfigStore) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} + return errors.Wrap(err, "error creating plugin pusher") + } -type pluginLayerProvider struct { - pm *Manager - plugin *v2.Plugin -} + pj := newPushJobs(statusTracker) -func (p *pluginLayerProvider) Get(id layer.ChainID) (distribution.PushLayer, error) { - rootFS := rootFSFromPlugin(p.plugin.PluginObj.Config.Rootfs) - var i int - for i = 1; i <= len(rootFS.DiffIDs); i++ { - if layer.CreateChainID(rootFS.DiffIDs[:i]) == id { - break + ctx, cancel := context.WithCancel(ctx) + out, waitProgress := setupProgressOutput(outStream, cancel) + defer waitProgress() + + progressHandler := images.HandlerFunc(func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + logrus.WithField("mediaType", desc.MediaType).WithField("digest", desc.Digest.String()).Debug("Preparing to push plugin layer") + id := stringid.TruncateID(desc.Digest.String()) + pj.add(remotes.MakeRefKey(ctx, desc), id) + progress.Update(out, id, "Preparing") + return nil, nil + }) + + desc, err := pm.getManifestDescriptor(ctx, p) + if err != nil { + return errors.Wrap(err, "error reading plugin manifest") + } + + progress.Messagef(out, "", "The push refers to repository [%s]", reference.FamiliarName(ref)) + + // TODO: If a layer already exists on the registry, the progress output just says "Preparing" + go func() { + timer := time.NewTimer(100 * time.Millisecond) + defer timer.Stop() + if !timer.Stop() { + <-timer.C + } + var statuses []contentStatus + for { + timer.Reset(100 * time.Millisecond) + select { + case <-ctx.Done(): + return + case <-timer.C: + statuses = pj.status() + } + + for _, s := range statuses { + out.WriteProgress(progress.Progress{ID: s.Ref, Current: s.Offset, Total: s.Total, Action: s.Status, LastUpdate: s.Offset == s.Total}) + } + } + }() + + // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo. + ctx = docker.WithScope(ctx, scope(ref, true)) + if err := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler { + return images.Handlers(progressHandler, h) + }); err != nil { + // Try fallback to http. + // This is needed because the containerd pusher will only attempt the first registry config we pass, which would + // typically be https. + // If there are no http-only host configs found we'll error out anyway. + resolver, _ := pm.newResolver(ctx, statusTracker, authConfig, metaHeader, true) + if resolver != nil { + pusher, _ := resolver.Pusher(ctx, ref.String()) + if pusher != nil { + logrus.WithField("ref", ref).Debug("Re-attmpting push with http-fallback") + err2 := remotes.PushContent(ctx, pusher, desc, pm.blobStore, nil, func(h images.Handler) images.Handler { + return images.Handlers(progressHandler, h) + }) + if err2 == nil { + err = nil + } else { + logrus.WithError(err2).WithField("ref", ref).Debug("Error while attempting push with http-fallback") + } + } + } + if err != nil { + return errors.Wrap(err, "error pushing plugin") } } - if i > len(rootFS.DiffIDs) { - return nil, errors.New("layer not found") + + // For blobs that already exist in the registry we need to make sure to update the progress otherwise it will just say "pending" + // TODO: How to check if the layer already exists? Is it worth it? + for _, j := range pj.jobs { + progress.Update(out, pj.names[j], "Upload complete") } - return &pluginLayer{ - pm: p.pm, - diffIDs: rootFS.DiffIDs[:i], - blobs: p.plugin.Blobsums[:i], - }, nil + + // Signal the client for content trust verification + progress.Aux(out, types.PushResult{Tag: ref.(reference.Tagged).Tag(), Digest: desc.Digest.String(), Size: int(desc.Size)}) + + return nil } -type pluginLayer struct { - pm *Manager - diffIDs []layer.DiffID - blobs []digest.Digest +// manifest wraps an OCI manifest, because... +// Historically the registry does not support plugins unless the media type on the manifest is specifically schema2.MediaTypeManifest +// So the OCI manifest media type is not supported. +// Additionally, there is extra validation for the docker schema2 manifest than there is a mediatype set on the manifest itself +// even though this is set on the descriptor +// The OCI types do not have this field. +type manifest struct { + specs.Manifest + MediaType string `json:"mediaType,omitempty"` } -func (l *pluginLayer) ChainID() layer.ChainID { - return layer.CreateChainID(l.diffIDs) -} +func buildManifest(ctx context.Context, s content.Manager, config digest.Digest, layers []digest.Digest) (manifest, error) { + var m manifest + m.MediaType = images.MediaTypeDockerSchema2Manifest + m.SchemaVersion = 2 -func (l *pluginLayer) DiffID() layer.DiffID { - return l.diffIDs[len(l.diffIDs)-1] -} - -func (l *pluginLayer) Parent() distribution.PushLayer { - if len(l.diffIDs) == 1 { - return nil + configInfo, err := s.Info(ctx, config) + if err != nil { + return m, errors.Wrapf(err, "error reading plugin config content for digest %s", config) } - return &pluginLayer{ - pm: l.pm, - diffIDs: l.diffIDs[:len(l.diffIDs)-1], - blobs: l.blobs[:len(l.diffIDs)-1], + m.Config = specs.Descriptor{ + MediaType: mediaTypePluginConfig, + Size: configInfo.Size, + Digest: configInfo.Digest, } + + for _, l := range layers { + info, err := s.Info(ctx, l) + if err != nil { + return m, errors.Wrapf(err, "error fetching info for content digest %s", l) + } + m.Layers = append(m.Layers, specs.Descriptor{ + MediaType: specs.MediaTypeImageLayerGzip, // TODO: This is assuming everything is a gzip compressed layer, but that may not be true. + Digest: l, + Size: info.Size, + }) + } + return m, nil } -func (l *pluginLayer) Open() (io.ReadCloser, error) { - return l.pm.blobStore.Get(l.blobs[len(l.diffIDs)-1]) +// getManifestDescriptor gets the OCI descriptor for a manifest +// It will generate a manifest if one does not exist +func (pm *Manager) getManifestDescriptor(ctx context.Context, p *v2.Plugin) (specs.Descriptor, error) { + logger := logrus.WithField("plugin", p.Name()).WithField("digest", p.Manifest) + if p.Manifest != "" { + info, err := pm.blobStore.Info(ctx, p.Manifest) + if err == nil { + desc := specs.Descriptor{ + Size: info.Size, + Digest: info.Digest, + MediaType: images.MediaTypeDockerSchema2Manifest, + } + return desc, nil + } + logger.WithError(err).Debug("Could not find plugin manifest in content store") + } else { + logger.Info("Plugin does not have manifest digest") + } + logger.Info("Building a new plugin manifest") + + manifest, err := buildManifest(ctx, pm.blobStore, p.Config, p.Blobsums) + if err != nil { + return specs.Descriptor{}, err + } + + desc, err := writeManifest(ctx, pm.blobStore, &manifest) + if err != nil { + return desc, err + } + + if err := pm.save(p); err != nil { + logger.WithError(err).Error("Could not save plugin with manifest digest") + } + return desc, nil } -func (l *pluginLayer) Size() (int64, error) { - return l.pm.blobStore.Size(l.blobs[len(l.diffIDs)-1]) -} +func writeManifest(ctx context.Context, cs content.Store, m *manifest) (specs.Descriptor, error) { + platform := platforms.DefaultSpec() + desc := specs.Descriptor{ + MediaType: images.MediaTypeDockerSchema2Manifest, + Platform: &platform, + } + data, err := json.Marshal(m) + if err != nil { + return desc, errors.Wrap(err, "error encoding manifest") + } + desc.Digest = digest.FromBytes(data) + desc.Size = int64(len(data)) -func (l *pluginLayer) MediaType() string { - return schema2.MediaTypeLayer -} - -func (l *pluginLayer) Release() { - // Nothing needs to be release, no references held + if err := content.WriteBlob(ctx, cs, remotes.MakeRefKey(ctx, desc), bytes.NewReader(data), desc); err != nil { + return desc, errors.Wrap(err, "error writing plugin manifest") + } + return desc, nil } // Remove deletes plugin's root directory. @@ -700,14 +642,14 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, var configJSON []byte rootFS := splitConfigRootFSFromTar(tarCtx, &configJSON) - rootFSBlob, err := pm.blobStore.New() + rootFSBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name)) if err != nil { return err } defer rootFSBlob.Close() + gzw := gzip.NewWriter(rootFSBlob) - layerDigester := digest.Canonical.Digester() - rootFSReader := io.TeeReader(rootFS, io.MultiWriter(gzw, layerDigester.Hash())) + rootFSReader := io.TeeReader(rootFS, gzw) if err := chrootarchive.Untar(rootFSReader, tmpRootFSDir, nil); err != nil { return err @@ -736,8 +678,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, pm.mu.Lock() defer pm.mu.Unlock() - rootFSBlobsum, err := rootFSBlob.Commit() - if err != nil { + if err := rootFSBlob.Commit(ctx, 0, ""); err != nil { return err } defer func() { @@ -748,12 +689,12 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, config.Rootfs = &types.PluginConfigRootfs{ Type: "layers", - DiffIds: []string{layerDigester.Digest().String()}, + DiffIds: []string{rootFSBlob.Digest().String()}, } config.DockerVersion = dockerversion.Version - configBlob, err := pm.blobStore.New() + configBlob, err := pm.blobStore.Writer(ctx, content.WithRef(name+"-config.json")) if err != nil { return err } @@ -761,12 +702,23 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, if err := json.NewEncoder(configBlob).Encode(config); err != nil { return errors.Wrap(err, "error encoding json config") } - configBlobsum, err := configBlob.Commit() - if err != nil { + if err := configBlob.Commit(ctx, 0, ""); err != nil { return err } - p, err := pm.createPlugin(name, configBlobsum, []digest.Digest{rootFSBlobsum}, tmpRootFSDir, nil) + configDigest := configBlob.Digest() + layers := []digest.Digest{rootFSBlob.Digest()} + + manifest, err := buildManifest(ctx, pm.blobStore, configDigest, layers) + if err != nil { + return err + } + desc, err := writeManifest(ctx, pm.blobStore, &manifest) + if err != nil { + return + } + + p, err := pm.createPlugin(name, configDigest, desc.Digest, layers, tmpRootFSDir, nil) if err != nil { return err } diff --git a/plugin/blobstore.go b/plugin/blobstore.go deleted file mode 100644 index 0babefbbf5..0000000000 --- a/plugin/blobstore.go +++ /dev/null @@ -1,190 +0,0 @@ -package plugin // import "github.com/docker/docker/plugin" - -import ( - "context" - "fmt" - "io" - "io/ioutil" - "os" - "path/filepath" - "runtime" - - "github.com/docker/docker/distribution/xfer" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" - "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/chrootarchive" - "github.com/docker/docker/pkg/progress" - digest "github.com/opencontainers/go-digest" - specs "github.com/opencontainers/image-spec/specs-go/v1" - "github.com/pkg/errors" - "github.com/sirupsen/logrus" -) - -type blobstore interface { - New() (WriteCommitCloser, error) - Get(dgst digest.Digest) (io.ReadCloser, error) - Size(dgst digest.Digest) (int64, error) -} - -type basicBlobStore struct { - path string -} - -func newBasicBlobStore(p string) (*basicBlobStore, error) { - tmpdir := filepath.Join(p, "tmp") - if err := os.MkdirAll(tmpdir, 0700); err != nil { - return nil, errors.Wrapf(err, "failed to mkdir %v", p) - } - return &basicBlobStore{path: p}, nil -} - -func (b *basicBlobStore) New() (WriteCommitCloser, error) { - f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion") - if err != nil { - return nil, errors.Wrap(err, "failed to create temp file") - } - return newInsertion(f), nil -} - -func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) { - return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) -} - -func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) { - stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex())) - if err != nil { - return 0, err - } - return stat.Size(), nil -} - -func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) { - for _, alg := range []string{string(digest.Canonical)} { - items, err := ioutil.ReadDir(filepath.Join(b.path, alg)) - if err != nil { - continue - } - for _, fi := range items { - if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists { - p := filepath.Join(b.path, alg, fi.Name()) - err := os.RemoveAll(p) - logrus.Debugf("cleaned up blob %v: %v", p, err) - } - } - } - -} - -// WriteCommitCloser defines object that can be committed to blobstore. -type WriteCommitCloser interface { - io.WriteCloser - Commit() (digest.Digest, error) -} - -type insertion struct { - io.Writer - f *os.File - digester digest.Digester - closed bool -} - -func newInsertion(tempFile *os.File) *insertion { - digester := digest.Canonical.Digester() - return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())} -} - -func (i *insertion) Commit() (digest.Digest, error) { - p := i.f.Name() - d := filepath.Join(filepath.Join(p, "../../")) - i.f.Sync() - defer os.RemoveAll(p) - if err := i.f.Close(); err != nil { - return "", err - } - i.closed = true - dgst := i.digester.Digest() - if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil { - return "", errors.Wrapf(err, "failed to mkdir %v", d) - } - if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil { - return "", errors.Wrapf(err, "failed to rename %v", p) - } - return dgst, nil -} - -func (i *insertion) Close() error { - if i.closed { - return nil - } - defer os.RemoveAll(i.f.Name()) - return i.f.Close() -} - -type downloadManager struct { - blobStore blobstore - tmpDir string - blobs []digest.Digest - configDigest digest.Digest -} - -func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, os string, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) { - for _, l := range layers { - b, err := dm.blobStore.New() - if err != nil { - return initialRootFS, nil, err - } - defer b.Close() - rc, _, err := l.Download(ctx, progressOutput) - if err != nil { - return initialRootFS, nil, errors.Wrap(err, "failed to download") - } - defer rc.Close() - r := io.TeeReader(rc, b) - inflatedLayerData, err := archive.DecompressStream(r) - if err != nil { - return initialRootFS, nil, err - } - defer inflatedLayerData.Close() - digester := digest.Canonical.Digester() - if _, err := chrootarchive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil { - return initialRootFS, nil, err - } - initialRootFS.Append(layer.DiffID(digester.Digest())) - d, err := b.Commit() - if err != nil { - return initialRootFS, nil, err - } - dm.blobs = append(dm.blobs, d) - } - return initialRootFS, nil, nil -} - -func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) { - b, err := dm.blobStore.New() - if err != nil { - return "", err - } - defer b.Close() - n, err := b.Write(dt) - if err != nil { - return "", err - } - if n != len(dt) { - return "", io.ErrShortWrite - } - d, err := b.Commit() - dm.configDigest = d - return d, err -} - -func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) { - return nil, fmt.Errorf("digest not found") -} -func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) { - return configToRootFS(c) -} -func (dm *downloadManager) PlatformFromConfig(c []byte) (*specs.Platform, error) { - // TODO: LCOW/Plugins. This will need revisiting. For now use the runtime OS - return &specs.Platform{OS: runtime.GOOS}, nil -} diff --git a/plugin/fetch_linux.go b/plugin/fetch_linux.go new file mode 100644 index 0000000000..19b9bbf18e --- /dev/null +++ b/plugin/fetch_linux.go @@ -0,0 +1,288 @@ +package plugin + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/containerd/containerd/content" + c8derrdefs "github.com/containerd/containerd/errdefs" + "github.com/containerd/containerd/images" + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types" + progressutils "github.com/docker/docker/distribution/utils" + "github.com/docker/docker/pkg/chrootarchive" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/progress" + "github.com/docker/docker/pkg/stringid" + digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" + "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const mediaTypePluginConfig = "application/vnd.docker.plugin.v1+json" + +// setupProgressOutput sets up the passed in writer to stream progress. +// +// The passed in cancel function is used by the progress writer to signal callers that there +// is an issue writing to the stream. +// +// The returned function is used to wait for the progress writer to be finished. +// Call it to make sure the progress writer is done before returning from your function as needed. +func setupProgressOutput(outStream io.Writer, cancel func()) (progress.Output, func()) { + var out progress.Output + f := func() {} + + if outStream != nil { + ch := make(chan progress.Progress, 100) + out = progress.ChanOutput(ch) + + ctx, retCancel := context.WithCancel(context.Background()) + go func() { + progressutils.WriteDistributionProgress(cancel, outStream, ch) + retCancel() + }() + + f = func() { + close(ch) + <-ctx.Done() + } + } else { + out = progress.DiscardOutput() + } + return out, f +} + +// fetch the content related to the passed in reference into the blob store and appends the provided images.Handlers +// There is no need to use remotes.FetchHandler since it already gets set +func (pm *Manager) fetch(ctx context.Context, ref reference.Named, auth *types.AuthConfig, out progress.Output, metaHeader http.Header, handlers ...images.Handler) (err error) { + // We need to make sure we have a domain on the reference + withDomain, err := reference.ParseNormalizedNamed(ref.String()) + if err != nil { + return errors.Wrap(err, "error parsing plugin image reference") + } + + // Make sure we can authenticate the request since the auth scope for plugin repos is different than a normal repo. + ctx = docker.WithScope(ctx, scope(ref, false)) + + // Make sure the fetch handler knows how to set a ref key for the plugin media type. + // Without this the ref key is "unknown" and we see a nasty warning message in the logs + ctx = remotes.WithMediaTypeKeyPrefix(ctx, mediaTypePluginConfig, "docker-plugin") + + resolver, err := pm.newResolver(ctx, nil, auth, metaHeader, false) + if err != nil { + return err + } + resolved, desc, err := resolver.Resolve(ctx, withDomain.String()) + if err != nil { + // This is backwards compatible with older versions of the distribution registry. + // The containerd client will add it's own accept header as a comma separated list of supported manifests. + // This is perfectly fine, unless you are talking to an older registry which does not split the comma separated list, + // so it is never able to match a media type and it falls back to schema1 (yuck) and fails because our manifest the + // fallback does not support plugin configs... + logrus.WithError(err).WithField("ref", withDomain).Debug("Error while resolving reference, falling back to backwards compatible accept header format") + headers := http.Header{} + headers.Add("Accept", images.MediaTypeDockerSchema2Manifest) + headers.Add("Accept", images.MediaTypeDockerSchema2ManifestList) + headers.Add("Accept", specs.MediaTypeImageManifest) + headers.Add("Accept", specs.MediaTypeImageIndex) + resolver, _ = pm.newResolver(ctx, nil, auth, headers, false) + if resolver != nil { + resolved, desc, err = resolver.Resolve(ctx, withDomain.String()) + if err != nil { + logrus.WithError(err).WithField("ref", withDomain).Debug("Failed to resolve reference after falling back to backwards compatible accept header format") + } + } + if err != nil { + return errors.Wrap(err, "error resolving plugin reference") + } + } + + fetcher, err := resolver.Fetcher(ctx, resolved) + if err != nil { + return errors.Wrap(err, "error creating plugin image fetcher") + } + + fp := withFetchProgress(pm.blobStore, out, ref) + handlers = append([]images.Handler{fp, remotes.FetchHandler(pm.blobStore, fetcher)}, handlers...) + if err := images.Dispatch(ctx, images.Handlers(handlers...), nil, desc); err != nil { + return err + } + return nil +} + +// applyLayer makes an images.HandlerFunc which applies a fetched image rootfs layer to a directory. +// +// TODO(@cpuguy83) This gets run sequentially after layer pull (makes sense), however +// if there are multiple layers to fetch we may end up extracting layers in the wrong +// order. +func applyLayer(cs content.Store, dir string, out progress.Output) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case + specs.MediaTypeImageLayer, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayerGzip, + images.MediaTypeDockerSchema2LayerGzip: + default: + return nil, nil + } + + ra, err := cs.ReaderAt(ctx, desc) + if err != nil { + return nil, errors.Wrapf(err, "error getting content from content store for digest %s", desc.Digest) + } + + id := stringid.TruncateID(desc.Digest.String()) + + rc := ioutils.NewReadCloserWrapper(content.NewReader(ra), ra.Close) + pr := progress.NewProgressReader(rc, out, desc.Size, id, "Extracting") + defer pr.Close() + + if _, err := chrootarchive.ApplyLayer(dir, pr); err != nil { + return nil, errors.Wrapf(err, "error applying layer for digest %s", desc.Digest) + } + progress.Update(out, id, "Complete") + return nil, nil + } +} + +func childrenHandler(cs content.Store) images.HandlerFunc { + ch := images.ChildrenHandler(cs) + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case mediaTypePluginConfig: + return nil, nil + default: + return ch(ctx, desc) + } + } +} + +type fetchMeta struct { + blobs []digest.Digest + config digest.Digest + manifest digest.Digest +} + +func storeFetchMetadata(m *fetchMeta) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case + images.MediaTypeDockerSchema2LayerForeignGzip, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayer, + specs.MediaTypeImageLayerGzip: + m.blobs = append(m.blobs, desc.Digest) + case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: + m.manifest = desc.Digest + case mediaTypePluginConfig: + m.config = desc.Digest + } + return nil, nil + } +} + +func validateFetchedMetadata(md fetchMeta) error { + if md.config == "" { + return errors.New("fetched plugin image but plugin config is missing") + } + if md.manifest == "" { + return errors.New("fetched plugin image but manifest is missing") + } + return nil +} + +// withFetchProgress is a fetch handler which registers a descriptor with a progress +func withFetchProgress(cs content.Store, out progress.Output, ref reference.Named) images.HandlerFunc { + return func(ctx context.Context, desc specs.Descriptor) ([]specs.Descriptor, error) { + switch desc.MediaType { + case specs.MediaTypeImageManifest, images.MediaTypeDockerSchema2Manifest: + tn := reference.TagNameOnly(ref) + tagged := tn.(reference.Tagged) + progress.Messagef(out, tagged.Tag(), "Pulling from %s", reference.FamiliarName(ref)) + progress.Messagef(out, "", "Digest: %s", desc.Digest.String()) + return nil, nil + case + images.MediaTypeDockerSchema2LayerGzip, + images.MediaTypeDockerSchema2Layer, + specs.MediaTypeImageLayer, + specs.MediaTypeImageLayerGzip: + default: + return nil, nil + } + + id := stringid.TruncateID(desc.Digest.String()) + + if _, err := cs.Info(ctx, desc.Digest); err == nil { + out.WriteProgress(progress.Progress{ID: id, Action: "Already exists", LastUpdate: true}) + return nil, nil + } + + progress.Update(out, id, "Waiting") + + key := remotes.MakeRefKey(ctx, desc) + + go func() { + timer := time.NewTimer(100 * time.Millisecond) + if !timer.Stop() { + <-timer.C + } + defer timer.Stop() + + var pulling bool + var ctxErr error + + for { + timer.Reset(100 * time.Millisecond) + + select { + case <-ctx.Done(): + ctxErr = ctx.Err() + // make sure we can still fetch from the content store + // TODO: Might need to add some sort of timeout + ctx = context.Background() + case <-timer.C: + } + + s, err := cs.Status(ctx, key) + if err != nil { + if !c8derrdefs.IsNotFound(err) { + logrus.WithError(err).WithField("layerDigest", desc.Digest.String()).Error("Error looking up status of plugin layer pull") + progress.Update(out, id, err.Error()) + return + } + + if _, err := cs.Info(ctx, desc.Digest); err == nil { + progress.Update(out, id, "Download complete") + return + } + + if ctxErr != nil { + progress.Update(out, id, ctxErr.Error()) + return + } + + continue + } + + if !pulling { + progress.Update(out, id, "Pulling fs layer") + pulling = true + } + + if s.Offset == s.Total { + out.WriteProgress(progress.Progress{ID: id, Action: "Download complete", Current: s.Offset, LastUpdate: true}) + return + } + + out.WriteProgress(progress.Progress{ID: id, Action: "Downloading", Current: s.Offset, Total: s.Total}) + } + }() + return nil, nil + } +} diff --git a/plugin/manager.go b/plugin/manager.go index 38b18f0e74..f67b22c952 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -1,6 +1,7 @@ package plugin // import "github.com/docker/docker/plugin" import ( + "context" "encoding/json" "io" "io/ioutil" @@ -12,10 +13,10 @@ import ( "strings" "sync" + "github.com/containerd/containerd/content" + "github.com/containerd/containerd/content/local" "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" - "github.com/docker/docker/image" - "github.com/docker/docker/layer" "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/pubsub" @@ -72,7 +73,7 @@ type Manager struct { mu sync.RWMutex // protects cMap muGC sync.RWMutex // protects blobstore deletions cMap map[*v2.Plugin]*controller - blobStore *basicBlobStore + blobStore content.Store publisher *pubsub.Publisher executor Executor } @@ -117,9 +118,9 @@ func NewManager(config ManagerConfig) (*Manager, error) { return nil, err } - manager.blobStore, err = newBasicBlobStore(filepath.Join(manager.config.Root, "storage/blobs")) + manager.blobStore, err = local.NewStore(filepath.Join(manager.config.Root, "storage")) if err != nil { - return nil, err + return nil, errors.Wrap(err, "error creating plugin blob store") } manager.cMap = make(map[*v2.Plugin]*controller) @@ -305,7 +306,15 @@ func (pm *Manager) GC() { } } - pm.blobStore.gc(whitelist) + ctx := context.TODO() + pm.blobStore.Walk(ctx, func(info content.Info) error { + _, ok := whitelist[info.Digest] + if ok { + return nil + } + + return pm.blobStore.Delete(ctx, info.Digest) + }) } type logHook struct{ id string } @@ -357,28 +366,3 @@ func isEqualPrivilege(a, b types.PluginPrivilege) bool { return reflect.DeepEqual(a.Value, b.Value) } - -func configToRootFS(c []byte) (*image.RootFS, error) { - var pluginConfig types.PluginConfig - if err := json.Unmarshal(c, &pluginConfig); err != nil { - return nil, err - } - // validation for empty rootfs is in distribution code - if pluginConfig.Rootfs == nil { - return nil, nil - } - - return rootFSFromPlugin(pluginConfig.Rootfs), nil -} - -func rootFSFromPlugin(pluginfs *types.PluginConfigRootfs) *image.RootFS { - rootFS := image.RootFS{ - Type: pluginfs.Type, - DiffIDs: make([]layer.DiffID, len(pluginfs.DiffIds)), - } - for i := range pluginfs.DiffIds { - rootFS.DiffIDs[i] = layer.DiffID(pluginfs.DiffIds[i]) - } - - return &rootFS -} diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index a3aca0d783..eb39c39cc0 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -1,12 +1,14 @@ package plugin // import "github.com/docker/docker/plugin" import ( + "context" "encoding/json" "net" "os" "path/filepath" "time" + "github.com/containerd/containerd/content" "github.com/docker/docker/api/types" "github.com/docker/docker/daemon/initlayer" "github.com/docker/docker/errdefs" @@ -17,6 +19,7 @@ import ( v2 "github.com/docker/docker/plugin/v2" "github.com/moby/sys/mount" digest "github.com/opencontainers/go-digest" + specs "github.com/opencontainers/image-spec/specs-go/v1" "github.com/pkg/errors" "github.com/sirupsen/logrus" "golang.org/x/sys/unix" @@ -213,7 +216,7 @@ func (pm *Manager) Shutdown() { } } -func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) { +func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, tmpRootFSDir string, privileges *types.PluginPrivileges) (err error) { config, err := pm.setupNewPlugin(configDigest, blobsums, privileges) if err != nil { return err @@ -261,19 +264,22 @@ func (pm *Manager) upgradePlugin(p *v2.Plugin, configDigest digest.Digest, blobs } p.PluginObj.Config = config + p.Manifest = manifestDigest err = pm.save(p) return errors.Wrap(err, "error saving upgraded plugin config") } func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.Digest, privileges *types.PluginPrivileges) (types.PluginConfig, error) { - configRC, err := pm.blobStore.Get(configDigest) + configRA, err := pm.blobStore.ReaderAt(context.TODO(), specs.Descriptor{Digest: configDigest}) if err != nil { return types.PluginConfig{}, err } - defer configRC.Close() + defer configRA.Close() + + configR := content.NewReader(configRA) var config types.PluginConfig - dec := json.NewDecoder(configRC) + dec := json.NewDecoder(configR) if err := dec.Decode(&config); err != nil { return types.PluginConfig{}, errors.Wrapf(err, "failed to parse config") } @@ -292,7 +298,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest. } // createPlugin creates a new plugin. take lock before calling. -func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) { +func (pm *Manager) createPlugin(name string, configDigest, manifestDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) { if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store return nil, errdefs.InvalidParameter(err) } @@ -310,6 +316,7 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum }, Config: configDigest, Blobsums: blobsums, + Manifest: manifestDigest, } p.InitEmptySettings() for _, o := range opts { diff --git a/plugin/progress.go b/plugin/progress.go new file mode 100644 index 0000000000..ffe6c70aae --- /dev/null +++ b/plugin/progress.go @@ -0,0 +1,74 @@ +package plugin + +import ( + "sync" + "time" + + "github.com/containerd/containerd/remotes/docker" +) + +func newPushJobs(tracker docker.StatusTracker) *pushJobs { + return &pushJobs{ + names: make(map[string]string), + t: tracker, + } +} + +type pushJobs struct { + t docker.StatusTracker + + mu sync.Mutex + jobs []string + // maps job ref to a name + names map[string]string +} + +func (p *pushJobs) add(id, name string) { + p.mu.Lock() + defer p.mu.Unlock() + + if _, ok := p.names[id]; ok { + return + } + p.jobs = append(p.jobs, id) + p.names[id] = name +} + +func (p *pushJobs) status() []contentStatus { + statuses := make([]contentStatus, 0, len(p.jobs)) + + p.mu.Lock() + defer p.mu.Unlock() + + for _, j := range p.jobs { + var s contentStatus + s.Ref = p.names[j] + + status, err := p.t.GetStatus(j) + if err != nil { + s.Status = "Waiting" + } else { + s.Total = status.Total + s.Offset = status.Offset + s.StartedAt = status.StartedAt + s.UpdatedAt = status.UpdatedAt + if status.UploadUUID == "" { + s.Status = "Upload complete" + } else { + s.Status = "Uploading" + } + } + statuses = append(statuses, s) + } + + return statuses +} + +type contentStatus struct { + Status string + Total int64 + Offset int64 + StartedAt time.Time + UpdatedAt time.Time + Ref string +} diff --git a/plugin/registry.go b/plugin/registry.go new file mode 100644 index 0000000000..ad2a6b7138 --- /dev/null +++ b/plugin/registry.go @@ -0,0 +1,111 @@ +package plugin + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "time" + + "github.com/sirupsen/logrus" + + "github.com/docker/docker/dockerversion" + + "github.com/pkg/errors" + + "github.com/containerd/containerd/remotes" + "github.com/containerd/containerd/remotes/docker" + "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types" +) + +// scope builds the correct auth scope for the registry client to authorize against +// By default the client currently only does a "repository:" scope with out a classifier, e.g. "(plugin)" +// Without this, the client will not be able to authorize the request +func scope(ref reference.Named, push bool) string { + scope := "repository(plugin):" + reference.Path(reference.TrimNamed(ref)) + ":pull" + if push { + scope += ",push" + } + return scope +} + +func (pm *Manager) newResolver(ctx context.Context, tracker docker.StatusTracker, auth *types.AuthConfig, headers http.Header, httpFallback bool) (remotes.Resolver, error) { + if headers == nil { + headers = http.Header{} + } + headers.Add("User-Agent", dockerversion.DockerUserAgent(ctx)) + + return docker.NewResolver(docker.ResolverOptions{ + Tracker: tracker, + Headers: headers, + Hosts: pm.registryHostsFn(auth, httpFallback), + }), nil +} + +func registryHTTPClient(config *tls.Config) *http.Client { + return &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + DialContext: (&net.Dialer{ + Timeout: 30 * time.Second, + KeepAlive: 30 * time.Second, + }).DialContext, + TLSClientConfig: config, + TLSHandshakeTimeout: 10 * time.Second, + IdleConnTimeout: 30 * time.Second, + }, + } +} + +func (pm *Manager) registryHostsFn(auth *types.AuthConfig, httpFallback bool) docker.RegistryHosts { + return func(hostname string) ([]docker.RegistryHost, error) { + eps, err := pm.config.RegistryService.LookupPullEndpoints(hostname) + if err != nil { + return nil, errors.Wrapf(err, "error resolving repository for %s", hostname) + } + + hosts := make([]docker.RegistryHost, 0, len(eps)) + + for _, ep := range eps { + // forced http fallback is used only for push since the containerd pusher only ever uses the first host we + // pass to it. + // So it is the callers responsibility to retry with this flag set. + if httpFallback && ep.URL.Scheme != "http" { + logrus.WithField("registryHost", hostname).WithField("endpoint", ep).Debugf("Skipping non-http endpoint") + continue + } + + caps := docker.HostCapabilityPull | docker.HostCapabilityResolve + if !ep.Mirror { + caps = caps | docker.HostCapabilityPush + } + + host, err := docker.DefaultHost(ep.URL.Host) + if err != nil { + return nil, err + } + + client := registryHTTPClient(ep.TLSConfig) + hosts = append(hosts, docker.RegistryHost{ + Host: host, + Scheme: ep.URL.Scheme, + Client: client, + Path: "/v2", + Capabilities: caps, + Authorizer: docker.NewDockerAuthorizer( + docker.WithAuthClient(client), + docker.WithAuthCreds(func(_ string) (string, string, error) { + if auth.IdentityToken != "" { + return "", auth.IdentityToken, nil + } + return auth.Username, auth.Password, nil + }), + ), + }) + } + logrus.WithField("registryHost", hostname).WithField("hosts", hosts).Debug("Resolved registry hosts") + + return hosts, nil + } +} diff --git a/plugin/v2/plugin.go b/plugin/v2/plugin.go index b46ff60b31..3e6e063f4a 100644 --- a/plugin/v2/plugin.go +++ b/plugin/v2/plugin.go @@ -25,6 +25,7 @@ type Plugin struct { Config digest.Digest Blobsums []digest.Digest + Manifest digest.Digest modifyRuntimeSpec func(*specs.Spec) diff --git a/testutil/fixtures/plugin/plugin.go b/testutil/fixtures/plugin/plugin.go index 8517b37afb..3ec3bc39ea 100644 --- a/testutil/fixtures/plugin/plugin.go +++ b/testutil/fixtures/plugin/plugin.go @@ -26,7 +26,15 @@ type CreateOpt func(*Config) // create the plugin with. type Config struct { *types.PluginConfig - binPath string + binPath string + RegistryConfig registry.ServiceOptions +} + +// WithInsecureRegistry specifies that the given registry can skip host-key checking as well as fall back to plain http +func WithInsecureRegistry(url string) CreateOpt { + return func(cfg *Config) { + cfg.RegistryConfig.InsecureRegistries = append(cfg.RegistryConfig.InsecureRegistries, url) + } } // WithBinary is a CreateOpt to set an custom binary to create the plugin with. @@ -82,6 +90,11 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, return errors.Wrap(err, "error creating plugin root") } + var cfg Config + for _, o := range opts { + o(&cfg) + } + tar, err := makePluginBundle(inPath, opts...) if err != nil { return err @@ -92,7 +105,7 @@ func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, return nil, nil } - regService, err := registry.NewService(registry.ServiceOptions{}) + regService, err := registry.NewService(cfg.RegistryConfig) if err != nil { return err } diff --git a/testutil/registry/ops.go b/testutil/registry/ops.go index c004f37424..7357d5f509 100644 --- a/testutil/registry/ops.go +++ b/testutil/registry/ops.go @@ -1,5 +1,7 @@ package registry +import "io" + // Schema1 sets the registry to serve v1 api func Schema1(c *Config) { c.schema1 = true @@ -24,3 +26,17 @@ func URL(registryURL string) func(*Config) { c.registryURL = registryURL } } + +// WithStdout sets the stdout of the registry command to the passed in writer. +func WithStdout(w io.Writer) func(c *Config) { + return func(c *Config) { + c.stdout = w + } +} + +// WithStderr sets the stdout of the registry command to the passed in writer. +func WithStderr(w io.Writer) func(c *Config) { + return func(c *Config) { + c.stderr = w + } +} diff --git a/testutil/registry/registry.go b/testutil/registry/registry.go index c272d88ce6..d8213a3bec 100644 --- a/testutil/registry/registry.go +++ b/testutil/registry/registry.go @@ -2,6 +2,7 @@ package registry // import "github.com/docker/docker/testutil/registry" import ( "fmt" + "io" "io/ioutil" "net/http" "os" @@ -40,6 +41,8 @@ type Config struct { auth string tokenURL string registryURL string + stdout io.Writer + stderr io.Writer } // NewV2 creates a v2 registry server @@ -109,6 +112,8 @@ http: binary = V2binarySchema1 } cmd := exec.Command(binary, confPath) + cmd.Stdout = c.stdout + cmd.Stderr = c.stderr if err := cmd.Start(); err != nil { // FIXME(vdemeester) use a defer/clean func os.RemoveAll(tmp)