1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/plugin/blobstore.go
Tonis Tiigi 38d914cc96 Implement content addressability for plugins
Move plugins to shared distribution stack with images.

Create immutable plugin config that matches schema2 requirements.

Ensure data being pushed is same as pulled/created.

Store distribution artifacts in a blobstore.

Run init layer setup for every plugin start.

Fix breakouts from unsafe file accesses.

Add support for `docker plugin install --alias`

Uses normalized references for default names to avoid collisions when using default hosts/tags.

Some refactoring of the plugin manager to support the change, like removing the singleton manager and adding manager config struct.

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Derek McGowan <derek@mcgstyle.net>
(cherry picked from commit 3d86b0c79b)
2016-12-27 13:31:14 -08:00

181 lines
4.6 KiB
Go

package plugin
import (
"io"
"io/ioutil"
"os"
"path/filepath"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/digest"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/pkg/progress"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
type blobstore interface {
New() (WriteCommitCloser, error)
Get(dgst digest.Digest) (io.ReadCloser, error)
Size(dgst digest.Digest) (int64, error)
}
type basicBlobStore struct {
path string
}
func newBasicBlobStore(p string) (*basicBlobStore, error) {
tmpdir := filepath.Join(p, "tmp")
if err := os.MkdirAll(tmpdir, 0700); err != nil {
return nil, errors.Wrapf(err, "failed to mkdir %v", p)
}
return &basicBlobStore{path: p}, nil
}
func (b *basicBlobStore) New() (WriteCommitCloser, error) {
f, err := ioutil.TempFile(filepath.Join(b.path, "tmp"), ".insertion")
if err != nil {
return nil, errors.Wrap(err, "failed to create temp file")
}
return newInsertion(f), nil
}
func (b *basicBlobStore) Get(dgst digest.Digest) (io.ReadCloser, error) {
return os.Open(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
}
func (b *basicBlobStore) Size(dgst digest.Digest) (int64, error) {
stat, err := os.Stat(filepath.Join(b.path, string(dgst.Algorithm()), dgst.Hex()))
if err != nil {
return 0, err
}
return stat.Size(), nil
}
func (b *basicBlobStore) gc(whitelist map[digest.Digest]struct{}) {
for _, alg := range []string{string(digest.Canonical)} {
items, err := ioutil.ReadDir(filepath.Join(b.path, alg))
if err != nil {
continue
}
for _, fi := range items {
if _, exists := whitelist[digest.Digest(alg+":"+fi.Name())]; !exists {
p := filepath.Join(b.path, alg, fi.Name())
err := os.RemoveAll(p)
logrus.Debugf("cleaned up blob %v: %v", p, err)
}
}
}
}
// WriteCommitCloser defines object that can be committed to blobstore.
type WriteCommitCloser interface {
io.WriteCloser
Commit() (digest.Digest, error)
}
type insertion struct {
io.Writer
f *os.File
digester digest.Digester
closed bool
}
func newInsertion(tempFile *os.File) *insertion {
digester := digest.Canonical.New()
return &insertion{f: tempFile, digester: digester, Writer: io.MultiWriter(tempFile, digester.Hash())}
}
func (i *insertion) Commit() (digest.Digest, error) {
p := i.f.Name()
d := filepath.Join(filepath.Join(p, "../../"))
i.f.Sync()
defer os.RemoveAll(p)
if err := i.f.Close(); err != nil {
return "", err
}
i.closed = true
dgst := i.digester.Digest()
if err := os.MkdirAll(filepath.Join(d, string(dgst.Algorithm())), 0700); err != nil {
return "", errors.Wrapf(err, "failed to mkdir %v", d)
}
if err := os.Rename(p, filepath.Join(d, string(dgst.Algorithm()), dgst.Hex())); err != nil {
return "", errors.Wrapf(err, "failed to rename %v", p)
}
return dgst, nil
}
func (i *insertion) Close() error {
if i.closed {
return nil
}
defer os.RemoveAll(i.f.Name())
return i.f.Close()
}
type downloadManager struct {
blobStore blobstore
tmpDir string
blobs []digest.Digest
configDigest digest.Digest
}
func (dm *downloadManager) Download(ctx context.Context, initialRootFS image.RootFS, layers []xfer.DownloadDescriptor, progressOutput progress.Output) (image.RootFS, func(), error) {
for _, l := range layers {
b, err := dm.blobStore.New()
if err != nil {
return initialRootFS, nil, err
}
defer b.Close()
rc, _, err := l.Download(ctx, progressOutput)
if err != nil {
return initialRootFS, nil, errors.Wrap(err, "failed to download")
}
defer rc.Close()
r := io.TeeReader(rc, b)
inflatedLayerData, err := archive.DecompressStream(r)
if err != nil {
return initialRootFS, nil, err
}
digester := digest.Canonical.New()
if _, err := archive.ApplyLayer(dm.tmpDir, io.TeeReader(inflatedLayerData, digester.Hash())); err != nil {
return initialRootFS, nil, err
}
initialRootFS.Append(layer.DiffID(digester.Digest()))
d, err := b.Commit()
if err != nil {
return initialRootFS, nil, err
}
dm.blobs = append(dm.blobs, d)
}
return initialRootFS, nil, nil
}
func (dm *downloadManager) Put(dt []byte) (digest.Digest, error) {
b, err := dm.blobStore.New()
if err != nil {
return "", err
}
defer b.Close()
n, err := b.Write(dt)
if err != nil {
return "", err
}
if n != len(dt) {
return "", io.ErrShortWrite
}
d, err := b.Commit()
dm.configDigest = d
return d, err
}
func (dm *downloadManager) Get(d digest.Digest) ([]byte, error) {
return nil, digest.ErrDigestNotFound
}
func (dm *downloadManager) RootFSFromConfig(c []byte) (*image.RootFS, error) {
return configToRootFS(c)
}