From 5c3d2d552b0430672d5f481ab2d37036f6e92166 Mon Sep 17 00:00:00 2001 From: Tonis Tiigi Date: Mon, 15 May 2017 14:54:27 -0700 Subject: [PATCH] Implement incremental file sync using client session Also exposes shared cache and garbage collection/prune for the source data. Signed-off-by: Tonis Tiigi --- api/server/backend/build/backend.go | 28 +- api/server/router/build/backend.go | 4 + api/server/router/build/build.go | 1 + api/server/router/build/build_routes.go | 8 + api/server/router/system/system.go | 5 +- api/server/router/system/system_routes.go | 6 + api/swagger.yaml | 21 + api/types/types.go | 15 +- builder/dockerfile/builder.go | 50 +- builder/dockerfile/clientsession.go | 78 +++ builder/dockerfile/evaluator_test.go | 2 +- builder/fscache/fscache.go | 602 ++++++++++++++++++++++ builder/fscache/fscache_test.go | 131 +++++ builder/fscache/naivedriver.go | 28 + builder/remotecontext/archive.go | 128 +++++ builder/remotecontext/detect.go | 11 +- builder/remotecontext/filehash.go | 13 +- builder/remotecontext/generate.go | 3 + builder/remotecontext/git.go | 2 +- builder/remotecontext/lazycontext.go | 2 +- builder/remotecontext/remote.go | 4 +- builder/remotecontext/remote_test.go | 23 +- builder/remotecontext/tarsum.go | 226 ++++---- builder/remotecontext/tarsum.pb.go | 525 +++++++++++++++++++ builder/remotecontext/tarsum.proto | 7 + builder/remotecontext/tarsum_test.go | 38 +- client/build_prune.go | 30 ++ client/interface.go | 1 + client/session/filesync/diffcopy.go | 30 ++ client/session/filesync/filesync.go | 173 +++++++ client/session/filesync/filesync.pb.go | 575 +++++++++++++++++++++ client/session/filesync/filesync.proto | 15 + client/session/filesync/generate.go | 3 + client/session/filesync/tarstream.go | 83 +++ cmd/dockerd/daemon.go | 29 +- hack/validate/gofmt | 3 +- hack/validate/lint | 2 +- integration-cli/docker_api_build_test.go | 105 ++++ integration-cli/docker_cli_daemon_test.go | 2 +- integration-cli/request/request.go | 3 +- pkg/archive/archive.go | 32 +- pkg/archive/archive_unix.go | 24 +- 42 files changed, 2879 insertions(+), 192 deletions(-) create mode 100644 builder/dockerfile/clientsession.go create mode 100644 builder/fscache/fscache.go create mode 100644 builder/fscache/fscache_test.go create mode 100644 builder/fscache/naivedriver.go create mode 100644 builder/remotecontext/archive.go create mode 100644 builder/remotecontext/generate.go create mode 100644 builder/remotecontext/tarsum.pb.go create mode 100644 builder/remotecontext/tarsum.proto create mode 100644 client/build_prune.go create mode 100644 client/session/filesync/diffcopy.go create mode 100644 client/session/filesync/filesync.go create mode 100644 client/session/filesync/filesync.pb.go create mode 100644 client/session/filesync/filesync.proto create mode 100644 client/session/filesync/generate.go create mode 100644 client/session/filesync/tarstream.go diff --git a/api/server/backend/build/backend.go b/api/server/backend/build/backend.go index 5dd9fb5981..fe98c087b8 100644 --- a/api/server/backend/build/backend.go +++ b/api/server/backend/build/backend.go @@ -4,11 +4,11 @@ import ( "fmt" "github.com/docker/distribution/reference" + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/builder" - "github.com/docker/docker/builder/dockerfile" + "github.com/docker/docker/builder/fscache" "github.com/docker/docker/image" - "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/stringid" "github.com/pkg/errors" "golang.org/x/net/context" @@ -20,16 +20,21 @@ type ImageComponent interface { TagImageWithReference(image.ID, string, reference.Named) error } +// Builder defines interface for running a build +type Builder interface { + Build(context.Context, backend.BuildConfig) (*builder.Result, error) +} + // Backend provides build functionality to the API router type Backend struct { - manager *dockerfile.BuildManager + builder Builder + fsCache *fscache.FSCache imageComponent ImageComponent } // NewBackend creates a new build backend from components -func NewBackend(components ImageComponent, builderBackend builder.Backend, sg dockerfile.SessionGetter, idMappings *idtools.IDMappings) (*Backend, error) { - manager := dockerfile.NewBuildManager(builderBackend, sg, idMappings) - return &Backend{imageComponent: components, manager: manager}, nil +func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSCache) (*Backend, error) { + return &Backend{imageComponent: components, builder: builder, fsCache: fsCache}, nil } // Build builds an image from a Source @@ -40,7 +45,7 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string return "", err } - build, err := b.manager.Build(ctx, config) + build, err := b.builder.Build(ctx, config) if err != nil { return "", err } @@ -58,6 +63,15 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string return imageID, err } +// PruneCache removes all cached build sources +func (b *Backend) PruneCache(ctx context.Context) (*types.BuildCachePruneReport, error) { + size, err := b.fsCache.Prune() + if err != nil { + return nil, errors.Wrap(err, "failed to prune build cache") + } + return &types.BuildCachePruneReport{SpaceReclaimed: size}, nil +} + func squashBuild(build *builder.Result, imageComponent ImageComponent) (string, error) { var fromID string if build.FromImage != nil { diff --git a/api/server/router/build/backend.go b/api/server/router/build/backend.go index 835b11abba..defffd3ef0 100644 --- a/api/server/router/build/backend.go +++ b/api/server/router/build/backend.go @@ -1,6 +1,7 @@ package build import ( + "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" "golang.org/x/net/context" ) @@ -10,6 +11,9 @@ type Backend interface { // Build a Docker image returning the id of the image // TODO: make this return a reference instead of string Build(context.Context, backend.BuildConfig) (string, error) + + // Prune build cache + PruneCache(context.Context) (*types.BuildCachePruneReport, error) } type experimentalProvider interface { diff --git a/api/server/router/build/build.go b/api/server/router/build/build.go index dcf0c53953..78f5ae2f2f 100644 --- a/api/server/router/build/build.go +++ b/api/server/router/build/build.go @@ -24,5 +24,6 @@ func (r *buildRouter) Routes() []router.Route { func (r *buildRouter) initRoutes() { r.routes = []router.Route{ router.NewPostRoute("/build", r.postBuild, router.WithCancel), + router.NewPostRoute("/build/prune", r.postPrune, router.WithCancel), } } diff --git a/api/server/router/build/build_routes.go b/api/server/router/build/build_routes.go index 1ebc30f2b4..baa1da303f 100644 --- a/api/server/router/build/build_routes.go +++ b/api/server/router/build/build_routes.go @@ -132,6 +132,14 @@ func newImageBuildOptions(ctx context.Context, r *http.Request) (*types.ImageBui return options, nil } +func (br *buildRouter) postPrune(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + report, err := br.backend.PruneCache(ctx) + if err != nil { + return err + } + return httputils.WriteJSON(w, http.StatusOK, report) +} + func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { var ( notVerboseBuffer = bytes.NewBuffer(nil) diff --git a/api/server/router/system/system.go b/api/server/router/system/system.go index f8a851cfe3..a64631e8aa 100644 --- a/api/server/router/system/system.go +++ b/api/server/router/system/system.go @@ -2,6 +2,7 @@ package system import ( "github.com/docker/docker/api/server/router" + "github.com/docker/docker/builder/fscache" "github.com/docker/docker/daemon/cluster" ) @@ -11,13 +12,15 @@ type systemRouter struct { backend Backend cluster *cluster.Cluster routes []router.Route + builder *fscache.FSCache } // NewRouter initializes a new system router -func NewRouter(b Backend, c *cluster.Cluster) router.Router { +func NewRouter(b Backend, c *cluster.Cluster, fscache *fscache.FSCache) router.Router { r := &systemRouter{ backend: b, cluster: c, + builder: fscache, } r.routes = []router.Route{ diff --git a/api/server/router/system/system_routes.go b/api/server/router/system/system_routes.go index 7d3ba9a6ec..30fb000e1d 100644 --- a/api/server/router/system/system_routes.go +++ b/api/server/router/system/system_routes.go @@ -17,6 +17,7 @@ import ( timetypes "github.com/docker/docker/api/types/time" "github.com/docker/docker/api/types/versions" "github.com/docker/docker/pkg/ioutils" + pkgerrors "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -75,6 +76,11 @@ func (s *systemRouter) getDiskUsage(ctx context.Context, w http.ResponseWriter, if err != nil { return err } + builderSize, err := s.builder.DiskUsage() + if err != nil { + return pkgerrors.Wrap(err, "error getting build cache usage") + } + du.BuilderSize = builderSize return httputils.WriteJSON(w, http.StatusOK, du) } diff --git a/api/swagger.yaml b/api/swagger.yaml index a2b60217e8..f8493d5e9f 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -4745,6 +4745,27 @@ paths: schema: $ref: "#/definitions/ErrorResponse" tags: ["Image"] + /build/prune: + post: + summary: "Delete builder cache" + produces: + - "application/json" + operationId: "BuildPrune" + responses: + 200: + description: "No error" + schema: + type: "object" + properties: + SpaceReclaimed: + description: "Disk space reclaimed in bytes" + type: "integer" + format: "int64" + 500: + description: "Server error" + schema: + $ref: "#/definitions/ErrorResponse" + tags: ["Image"] /images/create: post: summary: "Create an image" diff --git a/api/types/types.go b/api/types/types.go index 8f309b2100..2473497c89 100644 --- a/api/types/types.go +++ b/api/types/types.go @@ -489,10 +489,11 @@ type Runtime struct { // DiskUsage contains response of Engine API: // GET "/system/df" type DiskUsage struct { - LayersSize int64 - Images []*ImageSummary - Containers []*Container - Volumes []*Volume + LayersSize int64 + Images []*ImageSummary + Containers []*Container + Volumes []*Volume + BuilderSize int64 } // ContainersPruneReport contains the response for Engine API: @@ -516,6 +517,12 @@ type ImagesPruneReport struct { SpaceReclaimed uint64 } +// BuildCachePruneReport contains the response for Engine API: +// POST "/build/prune" +type BuildCachePruneReport struct { + SpaceReclaimed uint64 +} + // NetworksPruneReport contains the response for Engine API: // POST "/networks/prune" type NetworksPruneReport struct { diff --git a/builder/dockerfile/builder.go b/builder/dockerfile/builder.go index 5d0b855a8a..80cfaf0bc2 100644 --- a/builder/dockerfile/builder.go +++ b/builder/dockerfile/builder.go @@ -7,6 +7,7 @@ import ( "io/ioutil" "runtime" "strings" + "time" "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types" @@ -15,6 +16,7 @@ import ( "github.com/docker/docker/builder" "github.com/docker/docker/builder/dockerfile/command" "github.com/docker/docker/builder/dockerfile/parser" + "github.com/docker/docker/builder/fscache" "github.com/docker/docker/builder/remotecontext" "github.com/docker/docker/client/session" "github.com/docker/docker/pkg/archive" @@ -52,16 +54,22 @@ type BuildManager struct { backend builder.Backend pathCache pathCache // TODO: make this persistent sg SessionGetter + fsCache *fscache.FSCache } // NewBuildManager creates a BuildManager -func NewBuildManager(b builder.Backend, sg SessionGetter, idMappings *idtools.IDMappings) *BuildManager { - return &BuildManager{ +func NewBuildManager(b builder.Backend, sg SessionGetter, fsCache *fscache.FSCache, idMappings *idtools.IDMappings) (*BuildManager, error) { + bm := &BuildManager{ backend: b, pathCache: &syncmap.Map{}, sg: sg, archiver: chrootarchive.NewArchiver(idMappings), + fsCache: fsCache, } + if err := fsCache.RegisterTransport(remotecontext.ClientSessionRemote, NewClientSessionTransport()); err != nil { + return nil, err + } + return bm, nil } // Build starts a new build from a BuildConfig @@ -75,13 +83,13 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) ( if err != nil { return nil, err } - if source != nil { - defer func() { + defer func() { + if source != nil { if err := source.Close(); err != nil { logrus.Debugf("[BUILDER] failed to remove temporary context: %v", err) } - }() - } + } + }() // TODO @jhowardmsft LCOW support - this will require rework to allow both linux and Windows simultaneously. // This is an interim solution to hardcode to linux if LCOW is turned on. @@ -95,8 +103,10 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) ( ctx, cancel := context.WithCancel(ctx) defer cancel() - if err := bm.initializeClientSession(ctx, cancel, config.Options); err != nil { + if src, err := bm.initializeClientSession(ctx, cancel, config.Options); err != nil { return nil, err + } else if src != nil { + source = src } builderOptions := builderOptions{ @@ -111,20 +121,38 @@ func (bm *BuildManager) Build(ctx context.Context, config backend.BuildConfig) ( return newBuilder(ctx, builderOptions).build(source, dockerfile) } -func (bm *BuildManager) initializeClientSession(ctx context.Context, cancel func(), options *types.ImageBuildOptions) error { +func (bm *BuildManager) initializeClientSession(ctx context.Context, cancel func(), options *types.ImageBuildOptions) (builder.Source, error) { if options.SessionID == "" || bm.sg == nil { - return nil + return nil, nil } logrus.Debug("client is session enabled") + + ctx, cancelCtx := context.WithTimeout(ctx, sessionConnectTimeout) + defer cancelCtx() + c, err := bm.sg.Get(ctx, options.SessionID) if err != nil { - return err + return nil, err } go func() { <-c.Context().Done() cancel() }() - return nil + if options.RemoteContext == remotecontext.ClientSessionRemote { + st := time.Now() + csi, err := NewClientSessionSourceIdentifier(ctx, bm.sg, + options.SessionID, []string{"/"}) + if err != nil { + return nil, err + } + src, err := bm.fsCache.SyncFrom(ctx, csi) + if err != nil { + return nil, err + } + logrus.Debugf("sync-time: %v", time.Since(st)) + return src, nil + } + return nil, nil } // builderOptions are the dependencies required by the builder diff --git a/builder/dockerfile/clientsession.go b/builder/dockerfile/clientsession.go new file mode 100644 index 0000000000..647e4537c3 --- /dev/null +++ b/builder/dockerfile/clientsession.go @@ -0,0 +1,78 @@ +package dockerfile + +import ( + "time" + + "github.com/docker/docker/builder/fscache" + "github.com/docker/docker/builder/remotecontext" + "github.com/docker/docker/client/session" + "github.com/docker/docker/client/session/filesync" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +const sessionConnectTimeout = 5 * time.Second + +// ClientSessionTransport is a transport for copying files from docker client +// to the daemon. +type ClientSessionTransport struct{} + +// NewClientSessionTransport returns new ClientSessionTransport instance +func NewClientSessionTransport() *ClientSessionTransport { + return &ClientSessionTransport{} +} + +// Copy data from a remote to a destination directory. +func (cst *ClientSessionTransport) Copy(ctx context.Context, id fscache.RemoteIdentifier, dest string, cu filesync.CacheUpdater) error { + csi, ok := id.(*ClientSessionSourceIdentifier) + if !ok { + return errors.New("invalid identifier for client session") + } + + return filesync.FSSync(ctx, csi.caller, filesync.FSSendRequestOpt{ + SrcPaths: csi.srcPaths, + DestDir: dest, + CacheUpdater: cu, + }) +} + +// ClientSessionSourceIdentifier is an identifier that can be used for requesting +// files from remote client +type ClientSessionSourceIdentifier struct { + srcPaths []string + caller session.Caller + sharedKey string + uuid string +} + +// NewClientSessionSourceIdentifier returns new ClientSessionSourceIdentifier instance +func NewClientSessionSourceIdentifier(ctx context.Context, sg SessionGetter, uuid string, sources []string) (*ClientSessionSourceIdentifier, error) { + csi := &ClientSessionSourceIdentifier{ + uuid: uuid, + srcPaths: sources, + } + caller, err := sg.Get(ctx, uuid) + if err != nil { + return nil, errors.Wrapf(err, "failed to get session for %s", uuid) + } + + csi.caller = caller + return csi, nil +} + +// Transport returns transport identifier for remote identifier +func (csi *ClientSessionSourceIdentifier) Transport() string { + return remotecontext.ClientSessionRemote +} + +// SharedKey returns shared key for remote identifier. Shared key is used +// for finding the base for a repeated transfer. +func (csi *ClientSessionSourceIdentifier) SharedKey() string { + return csi.caller.SharedKey() +} + +// Key returns unique key for remote identifier. Requests with same key return +// same data. +func (csi *ClientSessionSourceIdentifier) Key() string { + return csi.uuid +} diff --git a/builder/dockerfile/evaluator_test.go b/builder/dockerfile/evaluator_test.go index 544a93fc30..72d7ce10e3 100644 --- a/builder/dockerfile/evaluator_test.go +++ b/builder/dockerfile/evaluator_test.go @@ -158,7 +158,7 @@ func executeTestCase(t *testing.T, testCase dispatchTestCase) { } }() - context, err := remotecontext.MakeTarSumContext(tarStream) + context, err := remotecontext.FromArchive(tarStream) if err != nil { t.Fatalf("Error when creating tar context: %s", err) diff --git a/builder/fscache/fscache.go b/builder/fscache/fscache.go new file mode 100644 index 0000000000..802db96de0 --- /dev/null +++ b/builder/fscache/fscache.go @@ -0,0 +1,602 @@ +package fscache + +import ( + "encoding/json" + "os" + "path/filepath" + "sort" + "sync" + "time" + + "github.com/Sirupsen/logrus" + "github.com/boltdb/bolt" + "github.com/docker/docker/builder" + "github.com/docker/docker/builder/remotecontext" + "github.com/docker/docker/client/session/filesync" + "github.com/docker/docker/pkg/directory" + "github.com/docker/docker/pkg/stringid" + "github.com/pkg/errors" + "github.com/tonistiigi/fsutil" + "golang.org/x/net/context" + "golang.org/x/sync/singleflight" +) + +const dbFile = "fscache.db" +const cacheKey = "cache" +const metaKey = "meta" + +// Backend is a backing implementation for FSCache +type Backend interface { + Get(id string) (string, error) + Remove(id string) error +} + +// FSCache allows syncing remote resources to cached snapshots +type FSCache struct { + opt Opt + transports map[string]Transport + mu sync.Mutex + g singleflight.Group + store *fsCacheStore +} + +// Opt defines options for initializing FSCache +type Opt struct { + Backend Backend + Root string // for storing local metadata + GCPolicy GCPolicy +} + +// GCPolicy defines policy for garbage collection +type GCPolicy struct { + MaxSize uint64 + MaxKeepDuration time.Duration +} + +// NewFSCache returns new FSCache object +func NewFSCache(opt Opt) (*FSCache, error) { + store, err := newFSCacheStore(opt) + if err != nil { + return nil, err + } + return &FSCache{ + store: store, + opt: opt, + transports: make(map[string]Transport), + }, nil +} + +// Transport defines a method for syncing remote data to FSCache +type Transport interface { + Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error +} + +// RemoteIdentifier identifies a transfer request +type RemoteIdentifier interface { + Key() string + SharedKey() string + Transport() string +} + +// RegisterTransport registers a new transport method +func (fsc *FSCache) RegisterTransport(id string, transport Transport) error { + fsc.mu.Lock() + defer fsc.mu.Unlock() + if _, ok := fsc.transports[id]; ok { + return errors.Errorf("transport %v already exists", id) + } + fsc.transports[id] = transport + return nil +} + +// SyncFrom returns a source based on a remote identifier +func (fsc *FSCache) SyncFrom(ctx context.Context, id RemoteIdentifier) (builder.Source, error) { // cacheOpt + trasportID := id.Transport() + fsc.mu.Lock() + transport, ok := fsc.transports[id.Transport()] + if !ok { + fsc.mu.Unlock() + return nil, errors.Errorf("invalid transport %s", trasportID) + } + + logrus.Debugf("SyncFrom %s %s", id.Key(), id.SharedKey()) + fsc.mu.Unlock() + sourceRef, err, _ := fsc.g.Do(id.Key(), func() (interface{}, error) { + var sourceRef *cachedSourceRef + sourceRef, err := fsc.store.Get(id.Key()) + if err == nil { + return sourceRef, nil + } + + // check for unused shared cache + sharedKey := id.SharedKey() + if sharedKey != "" { + r, err := fsc.store.Rebase(sharedKey, id.Key()) + if err == nil { + sourceRef = r + } + } + + if sourceRef == nil { + var err error + sourceRef, err = fsc.store.New(id.Key(), sharedKey) + if err != nil { + return nil, errors.Wrap(err, "failed to create remote context") + } + } + + if err := syncFrom(ctx, sourceRef, transport, id); err != nil { + sourceRef.Release() + return nil, err + } + if err := sourceRef.resetSize(-1); err != nil { + return nil, err + } + return sourceRef, nil + }) + if err != nil { + return nil, err + } + ref := sourceRef.(*cachedSourceRef) + if ref.src == nil { // failsafe + return nil, errors.Errorf("invalid empty pull") + } + wc := &wrappedContext{Source: ref.src, closer: func() error { + ref.Release() + return nil + }} + return wc, nil +} + +// DiskUsage reports how much data is allocated by the cache +func (fsc *FSCache) DiskUsage() (int64, error) { + return fsc.store.DiskUsage() +} + +// Prune allows manually cleaning up the cache +func (fsc *FSCache) Prune() (uint64, error) { + return fsc.store.Prune() +} + +// Close stops the gc and closes the persistent db +func (fsc *FSCache) Close() error { + return fsc.store.Close() +} + +func syncFrom(ctx context.Context, cs *cachedSourceRef, transport Transport, id RemoteIdentifier) (retErr error) { + src := cs.src + if src == nil { + src = remotecontext.NewCachableSource(cs.Dir()) + } + + if !cs.cached { + if err := cs.storage.db.View(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(id.Key())) + dt := b.Get([]byte(cacheKey)) + if dt != nil { + if err := src.UnmarshalBinary(dt); err != nil { + return err + } + } else { + return errors.Wrap(src.Scan(), "failed to scan cache records") + } + return nil + }); err != nil { + return err + } + } + + dc := &detectChanges{f: src.HandleChange} + + // todo: probably send a bucket to `Copy` and let it return source + // but need to make sure that tx is safe + if err := transport.Copy(ctx, id, cs.Dir(), dc); err != nil { + return errors.Wrapf(err, "failed to copy to %s", cs.Dir()) + } + + if !dc.supported { + if err := src.Scan(); err != nil { + return errors.Wrap(err, "failed to scan cache records after transfer") + } + } + cs.cached = true + cs.src = src + return cs.storage.db.Update(func(tx *bolt.Tx) error { + dt, err := src.MarshalBinary() + if err != nil { + return err + } + b := tx.Bucket([]byte(id.Key())) + return b.Put([]byte(cacheKey), dt) + }) +} + +type fsCacheStore struct { + root string + mu sync.Mutex + sources map[string]*cachedSource + db *bolt.DB + fs Backend + gcTimer *time.Timer + gcPolicy GCPolicy +} + +// CachePolicy defines policy for keeping a resource in cache +type CachePolicy struct { + Priority int + LastUsed time.Time +} + +func defaultCachePolicy() CachePolicy { + return CachePolicy{Priority: 10, LastUsed: time.Now()} +} + +func newFSCacheStore(opt Opt) (*fsCacheStore, error) { + if err := os.MkdirAll(opt.Root, 0700); err != nil { + return nil, err + } + p := filepath.Join(opt.Root, dbFile) + db, err := bolt.Open(p, 0600, nil) + if err != nil { + return nil, errors.Wrap(err, "failed to open database file %s") + } + s := &fsCacheStore{db: db, sources: make(map[string]*cachedSource), fs: opt.Backend, gcPolicy: opt.GCPolicy} + db.View(func(tx *bolt.Tx) error { + return tx.ForEach(func(name []byte, b *bolt.Bucket) error { + dt := b.Get([]byte(metaKey)) + if dt == nil { + return nil + } + var sm sourceMeta + if err := json.Unmarshal(dt, &sm); err != nil { + return err + } + dir, err := s.fs.Get(sm.BackendID) + if err != nil { + return err // TODO: handle gracefully + } + source := &cachedSource{ + refs: make(map[*cachedSourceRef]struct{}), + id: string(name), + dir: dir, + sourceMeta: sm, + storage: s, + } + s.sources[string(name)] = source + return nil + }) + }) + + s.gcTimer = s.startPeriodicGC(5 * time.Minute) + return s, nil +} + +func (s *fsCacheStore) startPeriodicGC(interval time.Duration) *time.Timer { + var t *time.Timer + t = time.AfterFunc(interval, func() { + if err := s.GC(); err != nil { + logrus.Errorf("build gc error: %v", err) + } + t.Reset(interval) + }) + return t +} + +func (s *fsCacheStore) Close() error { + s.gcTimer.Stop() + return s.db.Close() +} + +func (s *fsCacheStore) New(id, sharedKey string) (*cachedSourceRef, error) { + s.mu.Lock() + defer s.mu.Unlock() + var ret *cachedSource + if err := s.db.Update(func(tx *bolt.Tx) error { + b, err := tx.CreateBucket([]byte(id)) + if err != nil { + return err + } + backendID := stringid.GenerateRandomID() + dir, err := s.fs.Get(backendID) + if err != nil { + return err + } + source := &cachedSource{ + refs: make(map[*cachedSourceRef]struct{}), + id: id, + dir: dir, + sourceMeta: sourceMeta{ + BackendID: backendID, + SharedKey: sharedKey, + CachePolicy: defaultCachePolicy(), + }, + storage: s, + } + dt, err := json.Marshal(source.sourceMeta) + if err != nil { + return err + } + if err := b.Put([]byte(metaKey), dt); err != nil { + return err + } + s.sources[id] = source + ret = source + return nil + }); err != nil { + return nil, err + } + return ret.getRef(), nil +} + +func (s *fsCacheStore) Rebase(sharedKey, newid string) (*cachedSourceRef, error) { + s.mu.Lock() + defer s.mu.Unlock() + var ret *cachedSource + for id, snap := range s.sources { + if snap.SharedKey == sharedKey && len(snap.refs) == 0 { + if err := s.db.Update(func(tx *bolt.Tx) error { + if err := tx.DeleteBucket([]byte(id)); err != nil { + return err + } + b, err := tx.CreateBucket([]byte(newid)) + if err != nil { + return err + } + snap.id = newid + snap.CachePolicy = defaultCachePolicy() + dt, err := json.Marshal(snap.sourceMeta) + if err != nil { + return err + } + if err := b.Put([]byte(metaKey), dt); err != nil { + return err + } + delete(s.sources, id) + s.sources[newid] = snap + return nil + }); err != nil { + return nil, err + } + ret = snap + break + } + } + if ret == nil { + return nil, errors.Errorf("no candidate for rebase") + } + return ret.getRef(), nil +} + +func (s *fsCacheStore) Get(id string) (*cachedSourceRef, error) { + s.mu.Lock() + defer s.mu.Unlock() + src, ok := s.sources[id] + if !ok { + return nil, errors.Errorf("not found") + } + return src.getRef(), nil +} + +// DiskUsage reports how much data is allocated by the cache +func (s *fsCacheStore) DiskUsage() (int64, error) { + s.mu.Lock() + defer s.mu.Unlock() + var size int64 + + for _, snap := range s.sources { + if len(snap.refs) == 0 { + ss, err := snap.getSize() + if err != nil { + return 0, err + } + size += ss + } + } + return size, nil +} + +// Prune allows manually cleaning up the cache +func (s *fsCacheStore) Prune() (uint64, error) { + s.mu.Lock() + defer s.mu.Unlock() + var size uint64 + + for id, snap := range s.sources { + if len(snap.refs) == 0 { + ss, err := snap.getSize() + if err != nil { + return size, err + } + if err := s.delete(id); err != nil { + return size, errors.Wrapf(err, "failed to delete %s", id) + } + size += uint64(ss) + } + } + return size, nil +} + +// GC runs a garbage collector on FSCache +func (s *fsCacheStore) GC() error { + s.mu.Lock() + defer s.mu.Unlock() + var size uint64 + + cutoff := time.Now().Add(-s.gcPolicy.MaxKeepDuration) + var blacklist []*cachedSource + + for id, snap := range s.sources { + if len(snap.refs) == 0 { + if cutoff.After(snap.CachePolicy.LastUsed) { + if err := s.delete(id); err != nil { + return errors.Wrapf(err, "failed to delete %s", id) + } + } else { + ss, err := snap.getSize() + if err != nil { + return err + } + size += uint64(ss) + blacklist = append(blacklist, snap) + } + } + } + + sort.Sort(sortableCacheSources(blacklist)) + for _, snap := range blacklist { + if size <= s.gcPolicy.MaxSize { + break + } + ss, err := snap.getSize() + if err != nil { + return err + } + if err := s.delete(snap.id); err != nil { + return errors.Wrapf(err, "failed to delete %s", snap.id) + } + size -= uint64(ss) + } + return nil +} + +// keep mu while calling this +func (s *fsCacheStore) delete(id string) error { + src, ok := s.sources[id] + if !ok { + return nil + } + if len(src.refs) > 0 { + return errors.Errorf("can't delete %s because it has active references", id) + } + delete(s.sources, id) + if err := s.db.Update(func(tx *bolt.Tx) error { + return tx.DeleteBucket([]byte(id)) + }); err != nil { + return err + } + if err := s.fs.Remove(src.BackendID); err != nil { + return err + } + return nil +} + +type sourceMeta struct { + SharedKey string + BackendID string + CachePolicy CachePolicy + Size int64 +} + +type cachedSource struct { + sourceMeta + refs map[*cachedSourceRef]struct{} + id string + dir string + src *remotecontext.CachableSource + storage *fsCacheStore + cached bool // keep track if cache is up to date +} + +type cachedSourceRef struct { + *cachedSource +} + +func (cs *cachedSource) Dir() string { + return cs.dir +} + +// hold storage lock before calling +func (cs *cachedSource) getRef() *cachedSourceRef { + ref := &cachedSourceRef{cachedSource: cs} + cs.refs[ref] = struct{}{} + return ref +} + +// hold storage lock before calling +func (cs *cachedSource) getSize() (int64, error) { + if cs.sourceMeta.Size < 0 { + ss, err := directory.Size(cs.dir) + if err != nil { + return 0, err + } + if err := cs.resetSize(ss); err != nil { + return 0, err + } + return ss, nil + } + return cs.sourceMeta.Size, nil +} + +func (cs *cachedSource) resetSize(val int64) error { + cs.sourceMeta.Size = val + return cs.saveMeta() +} +func (cs *cachedSource) saveMeta() error { + return cs.storage.db.Update(func(tx *bolt.Tx) error { + b := tx.Bucket([]byte(cs.id)) + dt, err := json.Marshal(cs.sourceMeta) + if err != nil { + return err + } + return b.Put([]byte(metaKey), dt) + }) +} + +func (csr *cachedSourceRef) Release() error { + csr.cachedSource.storage.mu.Lock() + defer csr.cachedSource.storage.mu.Unlock() + delete(csr.cachedSource.refs, csr) + if len(csr.cachedSource.refs) == 0 { + go csr.cachedSource.storage.GC() + } + return nil +} + +type detectChanges struct { + f fsutil.ChangeFunc + supported bool +} + +func (dc *detectChanges) HandleChange(kind fsutil.ChangeKind, path string, fi os.FileInfo, err error) error { + if dc == nil { + return nil + } + return dc.f(kind, path, fi, err) +} + +func (dc *detectChanges) MarkSupported(v bool) { + if dc == nil { + return + } + dc.supported = v +} + +type wrappedContext struct { + builder.Source + closer func() error +} + +func (wc *wrappedContext) Close() error { + if err := wc.Source.Close(); err != nil { + return err + } + return wc.closer() +} + +type sortableCacheSources []*cachedSource + +// Len is the number of elements in the collection. +func (s sortableCacheSources) Len() int { + return len(s) +} + +// Less reports whether the element with +// index i should sort before the element with index j. +func (s sortableCacheSources) Less(i, j int) bool { + return s[i].CachePolicy.LastUsed.Before(s[j].CachePolicy.LastUsed) +} + +// Swap swaps the elements with indexes i and j. +func (s sortableCacheSources) Swap(i, j int) { + s[i], s[j] = s[j], s[i] +} diff --git a/builder/fscache/fscache_test.go b/builder/fscache/fscache_test.go new file mode 100644 index 0000000000..c7c0531f27 --- /dev/null +++ b/builder/fscache/fscache_test.go @@ -0,0 +1,131 @@ +package fscache + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + "github.com/docker/docker/client/session/filesync" + "github.com/stretchr/testify/assert" + "golang.org/x/net/context" +) + +func TestFSCache(t *testing.T) { + tmpDir, err := ioutil.TempDir("", "fscache") + assert.Nil(t, err) + defer os.RemoveAll(tmpDir) + + backend := NewNaiveCacheBackend(filepath.Join(tmpDir, "backend")) + + opt := Opt{ + Root: tmpDir, + Backend: backend, + GCPolicy: GCPolicy{MaxSize: 15, MaxKeepDuration: time.Hour}, + } + + fscache, err := NewFSCache(opt) + assert.Nil(t, err) + + defer fscache.Close() + + err = fscache.RegisterTransport("test", &testTransport{}) + assert.Nil(t, err) + + src1, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo", "data", "bar"}) + assert.Nil(t, err) + + dt, err := ioutil.ReadFile(filepath.Join(src1.Root(), "foo")) + assert.Nil(t, err) + assert.Equal(t, string(dt), "data") + + // same id doesn't recalculate anything + src2, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo", "data2", "bar"}) + assert.Nil(t, err) + assert.Equal(t, src1.Root(), src2.Root()) + + dt, err = ioutil.ReadFile(filepath.Join(src1.Root(), "foo")) + assert.Nil(t, err) + assert.Equal(t, string(dt), "data") + assert.Nil(t, src2.Close()) + + src3, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo2", "data2", "bar"}) + assert.Nil(t, err) + assert.NotEqual(t, src1.Root(), src3.Root()) + + dt, err = ioutil.ReadFile(filepath.Join(src3.Root(), "foo2")) + assert.Nil(t, err) + assert.Equal(t, string(dt), "data2") + + s, err := fscache.DiskUsage() + assert.Nil(t, err) + assert.Equal(t, s, int64(0)) + + assert.Nil(t, src3.Close()) + + s, err = fscache.DiskUsage() + assert.Nil(t, err) + assert.Equal(t, s, int64(5)) + + // new upload with the same shared key shoutl overwrite + src4, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo3", "data3", "bar"}) + assert.Nil(t, err) + assert.NotEqual(t, src1.Root(), src3.Root()) + + dt, err = ioutil.ReadFile(filepath.Join(src3.Root(), "foo3")) + assert.Nil(t, err) + assert.Equal(t, string(dt), "data3") + assert.Equal(t, src4.Root(), src3.Root()) + assert.Nil(t, src4.Close()) + + s, err = fscache.DiskUsage() + assert.Nil(t, err) + assert.Equal(t, s, int64(10)) + + // this one goes over the GC limit + src5, err := fscache.SyncFrom(context.TODO(), &testIdentifier{"foo4", "datadata", "baz"}) + assert.Nil(t, err) + assert.Nil(t, src5.Close()) + + // GC happens async + time.Sleep(100 * time.Millisecond) + + // only last insertion after GC + s, err = fscache.DiskUsage() + assert.Nil(t, err) + assert.Equal(t, s, int64(8)) + + // prune deletes everything + released, err := fscache.Prune() + assert.Nil(t, err) + assert.Equal(t, released, uint64(8)) + + s, err = fscache.DiskUsage() + assert.Nil(t, err) + assert.Equal(t, s, int64(0)) +} + +type testTransport struct { +} + +func (t *testTransport) Copy(ctx context.Context, id RemoteIdentifier, dest string, cs filesync.CacheUpdater) error { + testid := id.(*testIdentifier) + return ioutil.WriteFile(filepath.Join(dest, testid.filename), []byte(testid.data), 0600) +} + +type testIdentifier struct { + filename string + data string + sharedKey string +} + +func (t *testIdentifier) Key() string { + return t.filename +} +func (t *testIdentifier) SharedKey() string { + return t.sharedKey +} +func (t *testIdentifier) Transport() string { + return "test" +} diff --git a/builder/fscache/naivedriver.go b/builder/fscache/naivedriver.go new file mode 100644 index 0000000000..f40ee570f4 --- /dev/null +++ b/builder/fscache/naivedriver.go @@ -0,0 +1,28 @@ +package fscache + +import ( + "os" + "path/filepath" + + "github.com/pkg/errors" +) + +// NewNaiveCacheBackend is a basic backend implementation for fscache +func NewNaiveCacheBackend(root string) Backend { + return &naiveCacheBackend{root: root} +} + +type naiveCacheBackend struct { + root string +} + +func (tcb *naiveCacheBackend) Get(id string) (string, error) { + d := filepath.Join(tcb.root, id) + if err := os.MkdirAll(d, 0700); err != nil { + return "", errors.Wrapf(err, "failed to create tmp dir for %s", d) + } + return d, nil +} +func (tcb *naiveCacheBackend) Remove(id string) error { + return errors.WithStack(os.RemoveAll(filepath.Join(tcb.root, id))) +} diff --git a/builder/remotecontext/archive.go b/builder/remotecontext/archive.go new file mode 100644 index 0000000000..f48cafecd4 --- /dev/null +++ b/builder/remotecontext/archive.go @@ -0,0 +1,128 @@ +package remotecontext + +import ( + "io" + "os" + "path/filepath" + + "github.com/docker/docker/builder" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/chrootarchive" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/pkg/symlink" + "github.com/docker/docker/pkg/tarsum" + "github.com/pkg/errors" +) + +type archiveContext struct { + root string + sums tarsum.FileInfoSums +} + +func (c *archiveContext) Close() error { + return os.RemoveAll(c.root) +} + +func convertPathError(err error, cleanpath string) error { + if err, ok := err.(*os.PathError); ok { + err.Path = cleanpath + return err + } + return err +} + +type modifiableContext interface { + builder.Source + // Remove deletes the entry specified by `path`. + // It is usual for directory entries to delete all its subentries. + Remove(path string) error +} + +// FromArchive returns a build source from a tar stream. +// +// It extracts the tar stream to a temporary folder that is deleted as soon as +// the Context is closed. +// As the extraction happens, a tarsum is calculated for every file, and the set of +// all those sums then becomes the source of truth for all operations on this Context. +// +// Closing tarStream has to be done by the caller. +func FromArchive(tarStream io.Reader) (builder.Source, error) { + root, err := ioutils.TempDir("", "docker-builder") + if err != nil { + return nil, err + } + + tsc := &archiveContext{root: root} + + // Make sure we clean-up upon error. In the happy case the caller + // is expected to manage the clean-up + defer func() { + if err != nil { + tsc.Close() + } + }() + + decompressedStream, err := archive.DecompressStream(tarStream) + if err != nil { + return nil, err + } + + sum, err := tarsum.NewTarSum(decompressedStream, true, tarsum.Version1) + if err != nil { + return nil, err + } + + err = chrootarchive.Untar(sum, root, nil) + if err != nil { + return nil, err + } + + tsc.sums = sum.GetSums() + + return tsc, nil +} + +func (c *archiveContext) Root() string { + return c.root +} + +func (c *archiveContext) Remove(path string) error { + _, fullpath, err := normalize(path, c.root) + if err != nil { + return err + } + return os.RemoveAll(fullpath) +} + +func (c *archiveContext) Hash(path string) (string, error) { + cleanpath, fullpath, err := normalize(path, c.root) + if err != nil { + return "", err + } + + rel, err := filepath.Rel(c.root, fullpath) + if err != nil { + return "", convertPathError(err, cleanpath) + } + + // Use the checksum of the followed path(not the possible symlink) because + // this is the file that is actually copied. + if tsInfo := c.sums.GetFile(filepath.ToSlash(rel)); tsInfo != nil { + return tsInfo.Sum(), nil + } + // We set sum to path by default for the case where GetFile returns nil. + // The usual case is if relative path is empty. + return path, nil // backwards compat TODO: see if really needed +} + +func normalize(path, root string) (cleanPath, fullPath string, err error) { + cleanPath = filepath.Clean(string(os.PathSeparator) + path)[1:] + fullPath, err = symlink.FollowSymlinkInScope(filepath.Join(root, path), root) + if err != nil { + return "", "", errors.Wrapf(err, "forbidden path outside the build context: %s (%s)", path, cleanPath) + } + if _, err := os.Lstat(fullPath); err != nil { + return "", "", errors.WithStack(convertPathError(err, path)) + } + return +} diff --git a/builder/remotecontext/detect.go b/builder/remotecontext/detect.go index 99385698cf..889f0fce34 100644 --- a/builder/remotecontext/detect.go +++ b/builder/remotecontext/detect.go @@ -19,6 +19,9 @@ import ( "github.com/pkg/errors" ) +// ClientSessionRemote is identifier for client-session context transport +const ClientSessionRemote = "client-session" + // Detect returns a context and dockerfile from remote location or local // archive. progressReader is only used if remoteURL is actually a URL // (not empty, and not a Git endpoint). @@ -29,6 +32,12 @@ func Detect(config backend.BuildConfig) (remote builder.Source, dockerfile *pars switch { case remoteURL == "": remote, dockerfile, err = newArchiveRemote(config.Source, dockerfilePath) + case remoteURL == ClientSessionRemote: + res, err := parser.Parse(config.Source) + if err != nil { + return nil, nil, err + } + return nil, res, nil case urlutil.IsGitURL(remoteURL): remote, dockerfile, err = newGitRemote(remoteURL, dockerfilePath) case urlutil.IsURL(remoteURL): @@ -41,7 +50,7 @@ func Detect(config backend.BuildConfig) (remote builder.Source, dockerfile *pars func newArchiveRemote(rc io.ReadCloser, dockerfilePath string) (builder.Source, *parser.Result, error) { defer rc.Close() - c, err := MakeTarSumContext(rc) + c, err := FromArchive(rc) if err != nil { return nil, nil, err } diff --git a/builder/remotecontext/filehash.go b/builder/remotecontext/filehash.go index a9b324272b..417230297b 100644 --- a/builder/remotecontext/filehash.go +++ b/builder/remotecontext/filehash.go @@ -12,10 +12,21 @@ import ( // NewFileHash returns new hash that is used for the builder cache keys func NewFileHash(path, name string, fi os.FileInfo) (hash.Hash, error) { - hdr, err := archive.FileInfoHeader(path, name, fi) + var link string + if fi.Mode()&os.ModeSymlink != 0 { + var err error + link, err = os.Readlink(path) + if err != nil { + return nil, err + } + } + hdr, err := archive.FileInfoHeader(name, fi, link) if err != nil { return nil, err } + if err := archive.ReadSecurityXattrToTarHeader(path, hdr); err != nil { + return nil, err + } tsh := &tarsumHash{hdr: hdr, Hash: sha256.New()} tsh.Reset() // initialize header return tsh, nil diff --git a/builder/remotecontext/generate.go b/builder/remotecontext/generate.go new file mode 100644 index 0000000000..0b52d49926 --- /dev/null +++ b/builder/remotecontext/generate.go @@ -0,0 +1,3 @@ +package remotecontext + +//go:generate protoc --gogoslick_out=. tarsum.proto diff --git a/builder/remotecontext/git.go b/builder/remotecontext/git.go index 7a55bfa9d4..158bb5ad4d 100644 --- a/builder/remotecontext/git.go +++ b/builder/remotecontext/git.go @@ -25,5 +25,5 @@ func MakeGitContext(gitURL string) (builder.Source, error) { c.Close() os.RemoveAll(root) }() - return MakeTarSumContext(c) + return FromArchive(c) } diff --git a/builder/remotecontext/lazycontext.go b/builder/remotecontext/lazycontext.go index 41351a3583..b29c413fac 100644 --- a/builder/remotecontext/lazycontext.go +++ b/builder/remotecontext/lazycontext.go @@ -43,7 +43,7 @@ func (c *lazySource) Hash(path string) (string, error) { fi, err := os.Lstat(fullPath) if err != nil { - return "", err + return "", errors.WithStack(err) } relPath, err := Rel(c.root, fullPath) diff --git a/builder/remotecontext/remote.go b/builder/remotecontext/remote.go index bf45fca064..4afd516be5 100644 --- a/builder/remotecontext/remote.go +++ b/builder/remotecontext/remote.go @@ -28,7 +28,7 @@ var mimeRe = regexp.MustCompile(acceptableRemoteMIME) // // If a match is found, then the body is sent to the contentType handler and a (potentially compressed) tar stream is expected // to be returned. If no match is found, it is assumed the body is a tar stream (compressed or not). -// In either case, an (assumed) tar stream is passed to MakeTarSumContext whose result is returned. +// In either case, an (assumed) tar stream is passed to FromArchive whose result is returned. func MakeRemoteContext(remoteURL string, contentTypeHandlers map[string]func(io.ReadCloser) (io.ReadCloser, error)) (builder.Source, error) { f, err := GetWithStatusError(remoteURL) if err != nil { @@ -63,7 +63,7 @@ func MakeRemoteContext(remoteURL string, contentTypeHandlers map[string]func(io. // Pass through - this is a pre-packaged context, presumably // with a Dockerfile with the right name inside it. - return MakeTarSumContext(contextReader) + return FromArchive(contextReader) } // GetWithStatusError does an http.Get() and returns an error if the diff --git a/builder/remotecontext/remote_test.go b/builder/remotecontext/remote_test.go index 37c2740674..c698726e8b 100644 --- a/builder/remotecontext/remote_test.go +++ b/builder/remotecontext/remote_test.go @@ -212,26 +212,13 @@ func TestMakeRemoteContext(t *testing.T) { t.Fatal("Remote context should not be nil") } - tarSumCtx, ok := remoteContext.(*tarSumContext) - - if !ok { - t.Fatal("Cast error, remote context should be casted to tarSumContext") + h, err := remoteContext.Hash(builder.DefaultDockerfileName) + if err != nil { + t.Fatalf("failed to compute hash %s", err) } - fileInfoSums := tarSumCtx.sums - - if fileInfoSums.Len() != 1 { - t.Fatalf("Size of file info sums should be 1, got: %d", fileInfoSums.Len()) - } - - fileInfo := fileInfoSums.GetFile(builder.DefaultDockerfileName) - - if fileInfo == nil { - t.Fatalf("There should be file named %s in fileInfoSums", builder.DefaultDockerfileName) - } - - if fileInfo.Pos() != 0 { - t.Fatalf("File %s should have position 0, got %d", builder.DefaultDockerfileName, fileInfo.Pos()) + if expected, actual := "7b6b6b66bee9e2102fbdc2228be6c980a2a23adf371962a37286a49f7de0f7cc", h; expected != actual { + t.Fatalf("There should be file named %s %s in fileInfoSums", expected, actual) } } diff --git a/builder/remotecontext/tarsum.go b/builder/remotecontext/tarsum.go index dabf590e59..3ae9d82427 100644 --- a/builder/remotecontext/tarsum.go +++ b/builder/remotecontext/tarsum.go @@ -1,128 +1,174 @@ package remotecontext import ( - "io" + "fmt" "os" "path/filepath" + "sync" - "github.com/docker/docker/builder" - "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/chrootarchive" - "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/symlink" - "github.com/docker/docker/pkg/tarsum" + iradix "github.com/hashicorp/go-immutable-radix" "github.com/pkg/errors" + "github.com/tonistiigi/fsutil" ) -type tarSumContext struct { +type hashed interface { + Hash() string +} + +// CachableSource is a source that contains cache records for its contents +type CachableSource struct { + mu sync.Mutex root string - sums tarsum.FileInfoSums + tree *iradix.Tree + txn *iradix.Txn } -func (c *tarSumContext) Close() error { - return os.RemoveAll(c.root) +// NewCachableSource creates new CachableSource +func NewCachableSource(root string) *CachableSource { + ts := &CachableSource{ + tree: iradix.New(), + root: root, + } + return ts } -func convertPathError(err error, cleanpath string) error { - if err, ok := err.(*os.PathError); ok { - err.Path = cleanpath +// MarshalBinary marshals current cache information to a byte array +func (cs *CachableSource) MarshalBinary() ([]byte, error) { + b := TarsumBackup{Hashes: make(map[string]string)} + root := cs.getRoot() + root.Walk(func(k []byte, v interface{}) bool { + b.Hashes[string(k)] = v.(*fileInfo).sum + return false + }) + return b.Marshal() +} + +// UnmarshalBinary decodes cache information for presented byte array +func (cs *CachableSource) UnmarshalBinary(data []byte) error { + var b TarsumBackup + if err := b.Unmarshal(data); err != nil { return err } - return err -} - -type modifiableContext interface { - builder.Source - // Remove deletes the entry specified by `path`. - // It is usual for directory entries to delete all its subentries. - Remove(path string) error -} - -// MakeTarSumContext returns a build Context from a tar stream. -// -// It extracts the tar stream to a temporary folder that is deleted as soon as -// the Context is closed. -// As the extraction happens, a tarsum is calculated for every file, and the set of -// all those sums then becomes the source of truth for all operations on this Context. -// -// Closing tarStream has to be done by the caller. -func MakeTarSumContext(tarStream io.Reader) (builder.Source, error) { - root, err := ioutils.TempDir("", "docker-builder") - if err != nil { - return nil, err + txn := iradix.New().Txn() + for p, v := range b.Hashes { + txn.Insert([]byte(p), &fileInfo{sum: v}) } + cs.mu.Lock() + defer cs.mu.Unlock() + cs.tree = txn.Commit() + return nil +} - tsc := &tarSumContext{root: root} - - // Make sure we clean-up upon error. In the happy case the caller - // is expected to manage the clean-up - defer func() { +// Scan rescans the cache information from the file system +func (cs *CachableSource) Scan() error { + lc, err := NewLazySource(cs.root) + if err != nil { + return err + } + txn := iradix.New().Txn() + err = filepath.Walk(cs.root, func(path string, info os.FileInfo, err error) error { if err != nil { - tsc.Close() + return errors.Wrapf(err, "failed to walk %s", path) } - }() - - decompressedStream, err := archive.DecompressStream(tarStream) - if err != nil { - return nil, err - } - - sum, err := tarsum.NewTarSum(decompressedStream, true, tarsum.Version1) - if err != nil { - return nil, err - } - - err = chrootarchive.Untar(sum, root, nil) - if err != nil { - return nil, err - } - - tsc.sums = sum.GetSums() - - return tsc, nil -} - -func (c *tarSumContext) Root() string { - return c.root -} - -func (c *tarSumContext) Remove(path string) error { - _, fullpath, err := normalize(path, c.root) + rel, err := Rel(cs.root, path) + if err != nil { + return err + } + h, err := lc.Hash(rel) + if err != nil { + return err + } + txn.Insert([]byte(rel), &fileInfo{sum: h}) + return nil + }) if err != nil { return err } - return os.RemoveAll(fullpath) + cs.mu.Lock() + defer cs.mu.Unlock() + cs.tree = txn.Commit() + return nil } -func (c *tarSumContext) Hash(path string) (string, error) { - cleanpath, fullpath, err := normalize(path, c.root) - if err != nil { - return "", err +// HandleChange notifies the source about a modification operation +func (cs *CachableSource) HandleChange(kind fsutil.ChangeKind, p string, fi os.FileInfo, err error) (retErr error) { + cs.mu.Lock() + if cs.txn == nil { + cs.txn = cs.tree.Txn() + } + if kind == fsutil.ChangeKindDelete { + cs.txn.Delete([]byte(p)) + cs.mu.Unlock() + return } - rel, err := filepath.Rel(c.root, fullpath) - if err != nil { - return "", convertPathError(err, cleanpath) + h, ok := fi.(hashed) + if !ok { + cs.mu.Unlock() + return errors.Errorf("invalid fileinfo: %s", p) } - // Use the checksum of the followed path(not the possible symlink) because - // this is the file that is actually copied. - if tsInfo := c.sums.GetFile(filepath.ToSlash(rel)); tsInfo != nil { - return tsInfo.Sum(), nil + hfi := &fileInfo{ + sum: h.Hash(), } - // We set sum to path by default for the case where GetFile returns nil. - // The usual case is if relative path is empty. - return path, nil // backwards compat TODO: see if really needed + cs.txn.Insert([]byte(p), hfi) + cs.mu.Unlock() + return nil } -func normalize(path, root string) (cleanPath, fullPath string, err error) { - cleanPath = filepath.Clean(string(os.PathSeparator) + path)[1:] - fullPath, err = symlink.FollowSymlinkInScope(filepath.Join(root, path), root) - if err != nil { - return "", "", errors.Wrapf(err, "forbidden path outside the build context: %s (%s)", path, cleanPath) +func (cs *CachableSource) getRoot() *iradix.Node { + cs.mu.Lock() + if cs.txn != nil { + cs.tree = cs.txn.Commit() + cs.txn = nil } - if _, err := os.Lstat(fullPath); err != nil { + t := cs.tree + cs.mu.Unlock() + return t.Root() +} + +// Close closes the source +func (cs *CachableSource) Close() error { + return nil +} + +func (cs *CachableSource) normalize(path string) (cleanpath, fullpath string, err error) { + cleanpath = filepath.Clean(string(os.PathSeparator) + path)[1:] + fullpath, err = symlink.FollowSymlinkInScope(filepath.Join(cs.root, path), cs.root) + if err != nil { + return "", "", fmt.Errorf("Forbidden path outside the context: %s (%s)", path, fullpath) + } + _, err = os.Lstat(fullpath) + if err != nil { return "", "", convertPathError(err, path) } return } + +// Hash returns a hash for a single file in the source +func (cs *CachableSource) Hash(path string) (string, error) { + n := cs.getRoot() + sum := "" + // TODO: check this for symlinks + v, ok := n.Get([]byte(path)) + if !ok { + sum = path + } else { + sum = v.(*fileInfo).sum + } + return sum, nil +} + +// Root returns a root directory for the source +func (cs *CachableSource) Root() string { + return cs.root +} + +type fileInfo struct { + sum string +} + +func (fi *fileInfo) Hash() string { + return fi.sum +} diff --git a/builder/remotecontext/tarsum.pb.go b/builder/remotecontext/tarsum.pb.go new file mode 100644 index 0000000000..561a7f6367 --- /dev/null +++ b/builder/remotecontext/tarsum.pb.go @@ -0,0 +1,525 @@ +// Code generated by protoc-gen-gogo. +// source: tarsum.proto +// DO NOT EDIT! + +/* +Package remotecontext is a generated protocol buffer package. + +It is generated from these files: + tarsum.proto + +It has these top-level messages: + TarsumBackup +*/ +package remotecontext + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import strings "strings" +import reflect "reflect" +import github_com_gogo_protobuf_sortkeys "github.com/gogo/protobuf/sortkeys" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +type TarsumBackup struct { + Hashes map[string]string `protobuf:"bytes,1,rep,name=Hashes" json:"Hashes,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (m *TarsumBackup) Reset() { *m = TarsumBackup{} } +func (*TarsumBackup) ProtoMessage() {} +func (*TarsumBackup) Descriptor() ([]byte, []int) { return fileDescriptorTarsum, []int{0} } + +func (m *TarsumBackup) GetHashes() map[string]string { + if m != nil { + return m.Hashes + } + return nil +} + +func init() { + proto.RegisterType((*TarsumBackup)(nil), "remotecontext.TarsumBackup") +} +func (this *TarsumBackup) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*TarsumBackup) + if !ok { + that2, ok := that.(TarsumBackup) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if len(this.Hashes) != len(that1.Hashes) { + return false + } + for i := range this.Hashes { + if this.Hashes[i] != that1.Hashes[i] { + return false + } + } + return true +} +func (this *TarsumBackup) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&remotecontext.TarsumBackup{") + keysForHashes := make([]string, 0, len(this.Hashes)) + for k, _ := range this.Hashes { + keysForHashes = append(keysForHashes, k) + } + github_com_gogo_protobuf_sortkeys.Strings(keysForHashes) + mapStringForHashes := "map[string]string{" + for _, k := range keysForHashes { + mapStringForHashes += fmt.Sprintf("%#v: %#v,", k, this.Hashes[k]) + } + mapStringForHashes += "}" + if this.Hashes != nil { + s = append(s, "Hashes: "+mapStringForHashes+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringTarsum(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *TarsumBackup) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *TarsumBackup) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Hashes) > 0 { + for k, _ := range m.Hashes { + dAtA[i] = 0xa + i++ + v := m.Hashes[k] + mapSize := 1 + len(k) + sovTarsum(uint64(len(k))) + 1 + len(v) + sovTarsum(uint64(len(v))) + i = encodeVarintTarsum(dAtA, i, uint64(mapSize)) + dAtA[i] = 0xa + i++ + i = encodeVarintTarsum(dAtA, i, uint64(len(k))) + i += copy(dAtA[i:], k) + dAtA[i] = 0x12 + i++ + i = encodeVarintTarsum(dAtA, i, uint64(len(v))) + i += copy(dAtA[i:], v) + } + } + return i, nil +} + +func encodeFixed64Tarsum(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Tarsum(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintTarsum(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *TarsumBackup) Size() (n int) { + var l int + _ = l + if len(m.Hashes) > 0 { + for k, v := range m.Hashes { + _ = k + _ = v + mapEntrySize := 1 + len(k) + sovTarsum(uint64(len(k))) + 1 + len(v) + sovTarsum(uint64(len(v))) + n += mapEntrySize + 1 + sovTarsum(uint64(mapEntrySize)) + } + } + return n +} + +func sovTarsum(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozTarsum(x uint64) (n int) { + return sovTarsum(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *TarsumBackup) String() string { + if this == nil { + return "nil" + } + keysForHashes := make([]string, 0, len(this.Hashes)) + for k, _ := range this.Hashes { + keysForHashes = append(keysForHashes, k) + } + github_com_gogo_protobuf_sortkeys.Strings(keysForHashes) + mapStringForHashes := "map[string]string{" + for _, k := range keysForHashes { + mapStringForHashes += fmt.Sprintf("%v: %v,", k, this.Hashes[k]) + } + mapStringForHashes += "}" + s := strings.Join([]string{`&TarsumBackup{`, + `Hashes:` + mapStringForHashes + `,`, + `}`, + }, "") + return s +} +func valueToStringTarsum(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *TarsumBackup) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: TarsumBackup: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: TarsumBackup: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Hashes", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthTarsum + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + var keykey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + keykey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var stringLenmapkey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapkey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapkey := int(stringLenmapkey) + if intStringLenmapkey < 0 { + return ErrInvalidLengthTarsum + } + postStringIndexmapkey := iNdEx + intStringLenmapkey + if postStringIndexmapkey > l { + return io.ErrUnexpectedEOF + } + mapkey := string(dAtA[iNdEx:postStringIndexmapkey]) + iNdEx = postStringIndexmapkey + if m.Hashes == nil { + m.Hashes = make(map[string]string) + } + if iNdEx < postIndex { + var valuekey uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + valuekey |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + var stringLenmapvalue uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowTarsum + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLenmapvalue |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLenmapvalue := int(stringLenmapvalue) + if intStringLenmapvalue < 0 { + return ErrInvalidLengthTarsum + } + postStringIndexmapvalue := iNdEx + intStringLenmapvalue + if postStringIndexmapvalue > l { + return io.ErrUnexpectedEOF + } + mapvalue := string(dAtA[iNdEx:postStringIndexmapvalue]) + iNdEx = postStringIndexmapvalue + m.Hashes[mapkey] = mapvalue + } else { + var mapvalue string + m.Hashes[mapkey] = mapvalue + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipTarsum(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthTarsum + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipTarsum(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTarsum + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTarsum + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTarsum + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthTarsum + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowTarsum + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipTarsum(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthTarsum = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowTarsum = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("tarsum.proto", fileDescriptorTarsum) } + +var fileDescriptorTarsum = []byte{ + // 196 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0x49, 0x2c, 0x2a, + 0x2e, 0xcd, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x2d, 0x4a, 0xcd, 0xcd, 0x2f, 0x49, + 0x4d, 0xce, 0xcf, 0x2b, 0x49, 0xad, 0x28, 0x51, 0xea, 0x62, 0xe4, 0xe2, 0x09, 0x01, 0xcb, 0x3b, + 0x25, 0x26, 0x67, 0x97, 0x16, 0x08, 0xd9, 0x73, 0xb1, 0x79, 0x24, 0x16, 0x67, 0xa4, 0x16, 0x4b, + 0x30, 0x2a, 0x30, 0x6b, 0x70, 0x1b, 0xa9, 0xeb, 0xa1, 0x68, 0xd0, 0x43, 0x56, 0xac, 0x07, 0x51, + 0xe9, 0x9a, 0x57, 0x52, 0x54, 0x19, 0x04, 0xd5, 0x26, 0x65, 0xc9, 0xc5, 0x8d, 0x24, 0x2c, 0x24, + 0xc0, 0xc5, 0x9c, 0x9d, 0x5a, 0x29, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x62, 0x0a, 0x89, + 0x70, 0xb1, 0x96, 0x25, 0xe6, 0x94, 0xa6, 0x4a, 0x30, 0x81, 0xc5, 0x20, 0x1c, 0x2b, 0x26, 0x0b, + 0x46, 0x27, 0x9d, 0x0b, 0x0f, 0xe5, 0x18, 0x6e, 0x3c, 0x94, 0x63, 0xf8, 0xf0, 0x50, 0x8e, 0xb1, + 0xe1, 0x91, 0x1c, 0xe3, 0x8a, 0x47, 0x72, 0x8c, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, + 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8b, 0x47, 0x72, 0x0c, 0x1f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, + 0xc7, 0x90, 0xc4, 0x06, 0xf6, 0x90, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x89, 0x57, 0x7d, 0x3f, + 0xe0, 0x00, 0x00, 0x00, +} diff --git a/builder/remotecontext/tarsum.proto b/builder/remotecontext/tarsum.proto new file mode 100644 index 0000000000..cb94240ba8 --- /dev/null +++ b/builder/remotecontext/tarsum.proto @@ -0,0 +1,7 @@ +syntax = "proto3"; + +package remotecontext; // no namespace because only used internally + +message TarsumBackup { + map Hashes = 1; +} \ No newline at end of file diff --git a/builder/remotecontext/tarsum_test.go b/builder/remotecontext/tarsum_test.go index 35d38cef3c..8a9d69bb73 100644 --- a/builder/remotecontext/tarsum_test.go +++ b/builder/remotecontext/tarsum_test.go @@ -9,6 +9,7 @@ import ( "github.com/docker/docker/builder" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/reexec" + "github.com/pkg/errors" ) const ( @@ -22,24 +23,22 @@ func init() { func TestCloseRootDirectory(t *testing.T) { contextDir, err := ioutil.TempDir("", "builder-tarsum-test") - + defer os.RemoveAll(contextDir) if err != nil { t.Fatalf("Error with creating temporary directory: %s", err) } - tarsum := &tarSumContext{root: contextDir} - - err = tarsum.Close() + src := makeTestArchiveContext(t, contextDir) + err = src.Close() if err != nil { t.Fatalf("Error while executing Close: %s", err) } - _, err = os.Stat(contextDir) + _, err = os.Stat(src.Root()) if !os.IsNotExist(err) { t.Fatal("Directory should not exist at this point") - defer os.RemoveAll(contextDir) } } @@ -49,7 +48,7 @@ func TestHashFile(t *testing.T) { createTestTempFile(t, contextDir, filename, contents, 0755) - tarSum := makeTestTarsumContext(t, contextDir) + tarSum := makeTestArchiveContext(t, contextDir) sum, err := tarSum.Hash(filename) @@ -80,7 +79,7 @@ func TestHashSubdir(t *testing.T) { testFilename := createTestTempFile(t, contextSubdir, filename, contents, 0755) - tarSum := makeTestTarsumContext(t, contextDir) + tarSum := makeTestArchiveContext(t, contextDir) relativePath, err := filepath.Rel(contextDir, testFilename) @@ -109,11 +108,9 @@ func TestStatNotExisting(t *testing.T) { contextDir, cleanup := createTestTempDir(t, "", "builder-tarsum-test") defer cleanup() - tarSum := &tarSumContext{root: contextDir} - - _, err := tarSum.Hash("not-existing") - - if !os.IsNotExist(err) { + src := makeTestArchiveContext(t, contextDir) + _, err := src.Hash("not-existing") + if !os.IsNotExist(errors.Cause(err)) { t.Fatalf("This file should not exist: %s", err) } } @@ -130,30 +127,31 @@ func TestRemoveDirectory(t *testing.T) { t.Fatalf("Error when getting relative path: %s", err) } - tarSum := &tarSumContext{root: contextDir} + src := makeTestArchiveContext(t, contextDir) + + tarSum := src.(modifiableContext) err = tarSum.Remove(relativePath) - if err != nil { t.Fatalf("Error when executing Remove: %s", err) } - _, err = os.Stat(contextSubdir) + _, err = src.Hash(contextSubdir) - if !os.IsNotExist(err) { + if !os.IsNotExist(errors.Cause(err)) { t.Fatal("Directory should not exist at this point") } } -func makeTestTarsumContext(t *testing.T, dir string) builder.Source { +func makeTestArchiveContext(t *testing.T, dir string) builder.Source { tarStream, err := archive.Tar(dir, archive.Uncompressed) if err != nil { t.Fatalf("error: %s", err) } defer tarStream.Close() - tarSum, err := MakeTarSumContext(tarStream) + tarSum, err := FromArchive(tarStream) if err != nil { - t.Fatalf("Error when executing MakeTarSumContext: %s", err) + t.Fatalf("Error when executing FromArchive: %s", err) } return tarSum } diff --git a/client/build_prune.go b/client/build_prune.go new file mode 100644 index 0000000000..ccab115d33 --- /dev/null +++ b/client/build_prune.go @@ -0,0 +1,30 @@ +package client + +import ( + "encoding/json" + "fmt" + + "github.com/docker/docker/api/types" + "golang.org/x/net/context" +) + +// BuildCachePrune requests the daemon to delete unused cache data +func (cli *Client) BuildCachePrune(ctx context.Context) (*types.BuildCachePruneReport, error) { + if err := cli.NewVersionError("1.31", "build prune"); err != nil { + return nil, err + } + + report := types.BuildCachePruneReport{} + + serverResp, err := cli.post(ctx, "/build/prune", nil, nil, nil) + if err != nil { + return nil, err + } + defer ensureReaderClosed(serverResp) + + if err := json.NewDecoder(serverResp.body).Decode(&report); err != nil { + return nil, fmt.Errorf("Error retrieving disk usage: %v", err) + } + + return &report, nil +} diff --git a/client/interface.go b/client/interface.go index 9c838e3dff..acd4de1dbd 100644 --- a/client/interface.go +++ b/client/interface.go @@ -82,6 +82,7 @@ type DistributionAPIClient interface { // ImageAPIClient defines API client methods for the images type ImageAPIClient interface { ImageBuild(ctx context.Context, context io.Reader, options types.ImageBuildOptions) (types.ImageBuildResponse, error) + BuildCachePrune(ctx context.Context) (*types.BuildCachePruneReport, error) ImageCreate(ctx context.Context, parentReference string, options types.ImageCreateOptions) (io.ReadCloser, error) ImageHistory(ctx context.Context, image string) ([]image.HistoryResponseItem, error) ImageImport(ctx context.Context, source types.ImageImportSource, ref string, options types.ImageImportOptions) (io.ReadCloser, error) diff --git a/client/session/filesync/diffcopy.go b/client/session/filesync/diffcopy.go new file mode 100644 index 0000000000..b15e4ee4bf --- /dev/null +++ b/client/session/filesync/diffcopy.go @@ -0,0 +1,30 @@ +package filesync + +import ( + "time" + + "google.golang.org/grpc" + + "github.com/Sirupsen/logrus" + "github.com/tonistiigi/fsutil" +) + +func sendDiffCopy(stream grpc.Stream, dir string, excludes []string, progress progressCb) error { + return fsutil.Send(stream.Context(), stream, dir, &fsutil.WalkOpt{ + ExcludePatterns: excludes, + }, progress) +} + +func recvDiffCopy(ds grpc.Stream, dest string, cu CacheUpdater) error { + st := time.Now() + defer func() { + logrus.Debugf("diffcopy took: %v", time.Since(st)) + }() + var cf fsutil.ChangeFunc + if cu != nil { + cu.MarkSupported(true) + cf = cu.HandleChange + } + + return fsutil.Receive(ds.Context(), ds, dest, cf) +} diff --git a/client/session/filesync/filesync.go b/client/session/filesync/filesync.go new file mode 100644 index 0000000000..fa6dafb6b0 --- /dev/null +++ b/client/session/filesync/filesync.go @@ -0,0 +1,173 @@ +package filesync + +import ( + "os" + "strings" + + "github.com/docker/docker/client/session" + "github.com/pkg/errors" + "github.com/tonistiigi/fsutil" + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type fsSyncProvider struct { + root string + excludes []string + p progressCb + doneCh chan error +} + +// NewFSSyncProvider creates a new provider for sending files from client +func NewFSSyncProvider(root string, excludes []string) session.Attachable { + p := &fsSyncProvider{ + root: root, + excludes: excludes, + } + return p +} + +func (sp *fsSyncProvider) Register(server *grpc.Server) { + RegisterFileSyncServer(server, sp) +} + +func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error { + return sp.handle("diffcopy", stream) +} +func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error { + return sp.handle("tarstream", stream) +} + +func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error { + var pr *protocol + for _, p := range supportedProtocols { + if method == p.name && isProtoSupported(p.name) { + pr = &p + break + } + } + if pr == nil { + return errors.New("failed to negotiate protocol") + } + + opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object + + var excludes []string + if len(opts["Override-Excludes"]) == 0 || opts["Override-Excludes"][0] != "true" { + excludes = sp.excludes + } + + var progress progressCb + if sp.p != nil { + progress = sp.p + sp.p = nil + } + + var doneCh chan error + if sp.doneCh != nil { + doneCh = sp.doneCh + sp.doneCh = nil + } + err := pr.sendFn(stream, sp.root, excludes, progress) + if doneCh != nil { + if err != nil { + doneCh <- err + } + close(doneCh) + } + return err +} + +func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) { + sp.p = f + sp.doneCh = doneCh +} + +type progressCb func(int, bool) + +type protocol struct { + name string + sendFn func(stream grpc.Stream, srcDir string, excludes []string, progress progressCb) error + recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error +} + +func isProtoSupported(p string) bool { + // TODO: this should be removed after testing if stability is confirmed + if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" { + return strings.EqualFold(p, override) + } + return true +} + +var supportedProtocols = []protocol{ + { + name: "diffcopy", + sendFn: sendDiffCopy, + recvFn: recvDiffCopy, + }, + { + name: "tarstream", + sendFn: sendTarStream, + recvFn: recvTarStream, + }, +} + +// FSSendRequestOpt defines options for FSSend request +type FSSendRequestOpt struct { + SrcPaths []string + OverrideExcludes bool + DestDir string + CacheUpdater CacheUpdater +} + +// CacheUpdater is an object capable of sending notifications for the cache hash changes +type CacheUpdater interface { + MarkSupported(bool) + HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error +} + +// FSSync initializes a transfer of files +func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error { + var pr *protocol + for _, p := range supportedProtocols { + if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) { + pr = &p + break + } + } + if pr == nil { + return errors.New("no fssync handlers") + } + + opts := make(map[string][]string) + if opt.OverrideExcludes { + opts["Override-Excludes"] = []string{"true"} + } + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + client := NewFileSyncClient(c.Conn()) + + var stream grpc.ClientStream + + ctx = metadata.NewContext(ctx, opts) + + switch pr.name { + case "tarstream": + cc, err := client.TarStream(ctx) + if err != nil { + return err + } + stream = cc + case "diffcopy": + cc, err := client.DiffCopy(ctx) + if err != nil { + return err + } + stream = cc + } + + return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater) +} diff --git a/client/session/filesync/filesync.pb.go b/client/session/filesync/filesync.pb.go new file mode 100644 index 0000000000..c6ed666383 --- /dev/null +++ b/client/session/filesync/filesync.pb.go @@ -0,0 +1,575 @@ +// Code generated by protoc-gen-gogo. +// source: filesync.proto +// DO NOT EDIT! + +/* +Package filesync is a generated protocol buffer package. + +It is generated from these files: + filesync.proto + +It has these top-level messages: + BytesMessage +*/ +package filesync + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import bytes "bytes" + +import strings "strings" +import reflect "reflect" + +import ( + context "golang.org/x/net/context" + grpc "google.golang.org/grpc" +) + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// BytesMessage contains a chunk of byte data +type BytesMessage struct { + Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (m *BytesMessage) Reset() { *m = BytesMessage{} } +func (*BytesMessage) ProtoMessage() {} +func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorFilesync, []int{0} } + +func (m *BytesMessage) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func init() { + proto.RegisterType((*BytesMessage)(nil), "moby.filesync.v1.BytesMessage") +} +func (this *BytesMessage) Equal(that interface{}) bool { + if that == nil { + if this == nil { + return true + } + return false + } + + that1, ok := that.(*BytesMessage) + if !ok { + that2, ok := that.(BytesMessage) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + if this == nil { + return true + } + return false + } else if this == nil { + return false + } + if !bytes.Equal(this.Data, that1.Data) { + return false + } + return true +} +func (this *BytesMessage) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&filesync.BytesMessage{") + s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringFilesync(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// Client API for FileSync service + +type FileSyncClient interface { + DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) + TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) +} + +type fileSyncClient struct { + cc *grpc.ClientConn +} + +func NewFileSyncClient(cc *grpc.ClientConn) FileSyncClient { + return &fileSyncClient{cc} +} + +func (c *fileSyncClient) DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) { + stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[0], c.cc, "/moby.filesync.v1.FileSync/DiffCopy", opts...) + if err != nil { + return nil, err + } + x := &fileSyncDiffCopyClient{stream} + return x, nil +} + +type FileSync_DiffCopyClient interface { + Send(*BytesMessage) error + Recv() (*BytesMessage, error) + grpc.ClientStream +} + +type fileSyncDiffCopyClient struct { + grpc.ClientStream +} + +func (x *fileSyncDiffCopyClient) Send(m *BytesMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *fileSyncDiffCopyClient) Recv() (*BytesMessage, error) { + m := new(BytesMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *fileSyncClient) TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) { + stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[1], c.cc, "/moby.filesync.v1.FileSync/TarStream", opts...) + if err != nil { + return nil, err + } + x := &fileSyncTarStreamClient{stream} + return x, nil +} + +type FileSync_TarStreamClient interface { + Send(*BytesMessage) error + Recv() (*BytesMessage, error) + grpc.ClientStream +} + +type fileSyncTarStreamClient struct { + grpc.ClientStream +} + +func (x *fileSyncTarStreamClient) Send(m *BytesMessage) error { + return x.ClientStream.SendMsg(m) +} + +func (x *fileSyncTarStreamClient) Recv() (*BytesMessage, error) { + m := new(BytesMessage) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Server API for FileSync service + +type FileSyncServer interface { + DiffCopy(FileSync_DiffCopyServer) error + TarStream(FileSync_TarStreamServer) error +} + +func RegisterFileSyncServer(s *grpc.Server, srv FileSyncServer) { + s.RegisterService(&_FileSync_serviceDesc, srv) +} + +func _FileSync_DiffCopy_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(FileSyncServer).DiffCopy(&fileSyncDiffCopyServer{stream}) +} + +type FileSync_DiffCopyServer interface { + Send(*BytesMessage) error + Recv() (*BytesMessage, error) + grpc.ServerStream +} + +type fileSyncDiffCopyServer struct { + grpc.ServerStream +} + +func (x *fileSyncDiffCopyServer) Send(m *BytesMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *fileSyncDiffCopyServer) Recv() (*BytesMessage, error) { + m := new(BytesMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _FileSync_TarStream_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(FileSyncServer).TarStream(&fileSyncTarStreamServer{stream}) +} + +type FileSync_TarStreamServer interface { + Send(*BytesMessage) error + Recv() (*BytesMessage, error) + grpc.ServerStream +} + +type fileSyncTarStreamServer struct { + grpc.ServerStream +} + +func (x *fileSyncTarStreamServer) Send(m *BytesMessage) error { + return x.ServerStream.SendMsg(m) +} + +func (x *fileSyncTarStreamServer) Recv() (*BytesMessage, error) { + m := new(BytesMessage) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +var _FileSync_serviceDesc = grpc.ServiceDesc{ + ServiceName: "moby.filesync.v1.FileSync", + HandlerType: (*FileSyncServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "DiffCopy", + Handler: _FileSync_DiffCopy_Handler, + ServerStreams: true, + ClientStreams: true, + }, + { + StreamName: "TarStream", + Handler: _FileSync_TarStream_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "filesync.proto", +} + +func (m *BytesMessage) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *BytesMessage) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Data) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintFilesync(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + return i, nil +} + +func encodeFixed64Filesync(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Filesync(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintFilesync(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *BytesMessage) Size() (n int) { + var l int + _ = l + l = len(m.Data) + if l > 0 { + n += 1 + l + sovFilesync(uint64(l)) + } + return n +} + +func sovFilesync(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozFilesync(x uint64) (n int) { + return sovFilesync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *BytesMessage) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&BytesMessage{`, + `Data:` + fmt.Sprintf("%v", this.Data) + `,`, + `}`, + }, "") + return s +} +func valueToStringFilesync(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *BytesMessage) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: BytesMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: BytesMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowFilesync + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthFilesync + } + postIndex := iNdEx + byteLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipFilesync(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthFilesync + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipFilesync(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFilesync + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFilesync + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFilesync + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthFilesync + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowFilesync + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipFilesync(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthFilesync = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowFilesync = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("filesync.proto", fileDescriptorFilesync) } + +var fileDescriptorFilesync = []byte{ + // 198 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xcb, 0xcc, 0x49, + 0x2d, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xc8, 0xcd, 0x4f, 0xaa, + 0xd4, 0x83, 0x0b, 0x96, 0x19, 0x2a, 0x29, 0x71, 0xf1, 0x38, 0x55, 0x96, 0xa4, 0x16, 0xfb, 0xa6, + 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a, + 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x46, 0xab, 0x19, 0xb9, 0x38, 0xdc, 0x32, 0x73, 0x52, 0x83, + 0x2b, 0xf3, 0x92, 0x85, 0xfc, 0xb8, 0x38, 0x5c, 0x32, 0xd3, 0xd2, 0x9c, 0xf3, 0x0b, 0x2a, 0x85, + 0xe4, 0xf4, 0xd0, 0xcd, 0xd3, 0x43, 0x36, 0x4c, 0x8a, 0x80, 0xbc, 0x06, 0xa3, 0x01, 0xa3, 0x90, + 0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x74, 0x32, + 0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, + 0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, + 0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51, + 0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x0c, + 0x8d, 0xc5, 0x34, 0x01, 0x00, 0x00, +} diff --git a/client/session/filesync/filesync.proto b/client/session/filesync/filesync.proto new file mode 100644 index 0000000000..2fd5b3ec8d --- /dev/null +++ b/client/session/filesync/filesync.proto @@ -0,0 +1,15 @@ +syntax = "proto3"; + +package moby.filesync.v1; + +option go_package = "filesync"; + +service FileSync{ + rpc DiffCopy(stream BytesMessage) returns (stream BytesMessage); + rpc TarStream(stream BytesMessage) returns (stream BytesMessage); +} + +// BytesMessage contains a chunk of byte data +message BytesMessage{ + bytes data = 1; +} \ No newline at end of file diff --git a/client/session/filesync/generate.go b/client/session/filesync/generate.go new file mode 100644 index 0000000000..261e876272 --- /dev/null +++ b/client/session/filesync/generate.go @@ -0,0 +1,3 @@ +package filesync + +//go:generate protoc --gogoslick_out=plugins=grpc:. filesync.proto diff --git a/client/session/filesync/tarstream.go b/client/session/filesync/tarstream.go new file mode 100644 index 0000000000..ee01e30a75 --- /dev/null +++ b/client/session/filesync/tarstream.go @@ -0,0 +1,83 @@ +package filesync + +import ( + "io" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/pkg/chrootarchive" + "github.com/pkg/errors" + "google.golang.org/grpc" +) + +func sendTarStream(stream grpc.Stream, dir string, excludes []string, progress progressCb) error { + a, err := archive.TarWithOptions(dir, &archive.TarOptions{ + ExcludePatterns: excludes, + }) + if err != nil { + return err + } + + size := 0 + buf := make([]byte, 1<<15) + t := new(BytesMessage) + for { + n, err := a.Read(buf) + if err != nil { + if err == io.EOF { + break + } + return err + } + t.Data = buf[:n] + + if err := stream.SendMsg(t); err != nil { + return err + } + size += n + if progress != nil { + progress(size, false) + } + } + if progress != nil { + progress(size, true) + } + return nil +} + +func recvTarStream(ds grpc.Stream, dest string, cs CacheUpdater) error { + + pr, pw := io.Pipe() + + go func() { + var ( + err error + t = new(BytesMessage) + ) + for { + if err = ds.RecvMsg(t); err != nil { + if err == io.EOF { + err = nil + } + break + } + _, err = pw.Write(t.Data) + if err != nil { + break + } + } + if err = pw.CloseWithError(err); err != nil { + logrus.Errorf("failed to close tar transfer pipe") + } + }() + + decompressedStream, err := archive.DecompressStream(pr) + if err != nil { + return errors.Wrap(err, "failed to decompress stream") + } + + if err := chrootarchive.Untar(decompressedStream, dest, nil); err != nil { + return errors.Wrap(err, "failed to untar context") + } + return nil +} diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index f650ec5923..3ac4b84269 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -27,6 +27,8 @@ import ( swarmrouter "github.com/docker/docker/api/server/router/swarm" systemrouter "github.com/docker/docker/api/server/router/system" "github.com/docker/docker/api/server/router/volume" + "github.com/docker/docker/builder/dockerfile" + "github.com/docker/docker/builder/fscache" "github.com/docker/docker/cli/debug" "github.com/docker/docker/client/session" "github.com/docker/docker/daemon" @@ -268,7 +270,26 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { logrus.Fatalf("Error starting cluster component: %v", err) } - bb, err := buildbackend.NewBackend(d, d, sm, d.IDMappings()) + builderStateDir := filepath.Join(cli.Config.Root, "builder") + + fsCache, err := fscache.NewFSCache(fscache.Opt{ + Backend: fscache.NewNaiveCacheBackend(builderStateDir), + Root: builderStateDir, + GCPolicy: fscache.GCPolicy{ // TODO: expose this in config + MaxSize: 1024 * 1024 * 512, // 512MB + MaxKeepDuration: 7 * 24 * time.Hour, // 1 week + }, + }) + if err != nil { + return errors.Wrap(err, "failed to create fscache") + } + + manager, err := dockerfile.NewBuildManager(d, sm, fsCache, d.IDMappings()) + if err != nil { + return err + } + + bb, err := buildbackend.NewBackend(d, manager, fsCache) if err != nil { return errors.Wrap(err, "failed to create buildmanager") } @@ -282,7 +303,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { cli.d = d - initRouter(api, d, c, sm, bb) + initRouter(api, d, c, sm, bb, fsCache) // process cluster change notifications watchCtx, cancel := context.WithCancel(context.Background()) @@ -455,7 +476,7 @@ func loadDaemonCliConfig(opts *daemonOptions) (*config.Config, error) { return conf, nil } -func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *session.Manager, bb *buildbackend.Backend) { +func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *session.Manager, bb *buildbackend.Backend, bc *fscache.FSCache) { decoder := runconfig.ContainerDecoder{} routers := []router.Router{ @@ -463,7 +484,7 @@ func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster, sm *s checkpointrouter.NewRouter(d, decoder), container.NewRouter(d, decoder), image.NewRouter(d, decoder), - systemrouter.NewRouter(d, c), + systemrouter.NewRouter(d, c, bc), volume.NewRouter(d), build.NewRouter(bb, d), sessionrouter.NewRouter(sm), diff --git a/hack/validate/gofmt b/hack/validate/gofmt index f70d574fbf..38027a9f77 100755 --- a/hack/validate/gofmt +++ b/hack/validate/gofmt @@ -5,7 +5,8 @@ source "${SCRIPTDIR}/.validate" IFS=$'\n' files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | - grep -v '^vendor/' || true) ) + grep -v '^vendor/' | + grep -v '\.pb\.go$' || true) ) unset IFS badFiles=() diff --git a/hack/validate/lint b/hack/validate/lint index 85f7f1abe6..341490a042 100755 --- a/hack/validate/lint +++ b/hack/validate/lint @@ -4,7 +4,7 @@ export SCRIPTDIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" source "${SCRIPTDIR}/.validate" IFS=$'\n' -files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '^api/types/plugins/logdriver/entry.pb.go' || true) ) +files=( $(validate_diff --diff-filter=ACMR --name-only -- '*.go' | grep -v '^vendor/' | grep -v '^api/types/container/' | grep -v '\.pb\.go$' || true) ) unset IFS errors=() diff --git a/integration-cli/docker_api_build_test.go b/integration-cli/docker_api_build_test.go index 7a7549a47e..a5e19f658b 100644 --- a/integration-cli/docker_api_build_test.go +++ b/integration-cli/docker_api_build_test.go @@ -12,6 +12,8 @@ import ( "strings" "github.com/docker/docker/api/types" + "github.com/docker/docker/client/session" + "github.com/docker/docker/client/session/filesync" "github.com/docker/docker/integration-cli/checker" "github.com/docker/docker/integration-cli/cli/build/fakecontext" "github.com/docker/docker/integration-cli/cli/build/fakegit" @@ -22,6 +24,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context" + "golang.org/x/sync/errgroup" ) func (s *DockerSuite) TestBuildAPIDockerFileRemote(c *check.C) { @@ -363,6 +366,108 @@ func (s *DockerRegistrySuite) TestBuildCopyFromForcePull(c *check.C) { assert.Contains(c, string(out), "Successfully built") } +func (s *DockerSuite) TestBuildWithSession(c *check.C) { + testRequires(c, ExperimentalDaemon) + + dockerfile := ` + FROM busybox + COPY file / + RUN cat /file + ` + + fctx := fakecontext.New(c, "", + fakecontext.WithFile("file", "some content"), + ) + defer fctx.Close() + + out := testBuildWithSession(c, fctx.Dir, dockerfile) + assert.Contains(c, out, "some content") + + fctx.Add("second", "contentcontent") + + dockerfile += ` + COPY second / + RUN cat /second + ` + + out = testBuildWithSession(c, fctx.Dir, dockerfile) + assert.Equal(c, strings.Count(out, "Using cache"), 2) + assert.Contains(c, out, "contentcontent") + + client, err := request.NewClient() + require.NoError(c, err) + + du, err := client.DiskUsage(context.TODO()) + assert.Nil(c, err) + assert.True(c, du.BuilderSize > 10) + + out = testBuildWithSession(c, fctx.Dir, dockerfile) + assert.Equal(c, strings.Count(out, "Using cache"), 4) + + du2, err := client.DiskUsage(context.TODO()) + assert.Nil(c, err) + assert.Equal(c, du.BuilderSize, du2.BuilderSize) + + // rebuild with regular tar, confirm cache still applies + fctx.Add("Dockerfile", dockerfile) + res, body, err := request.Post( + "/build", + request.RawContent(fctx.AsTarReader(c)), + request.ContentType("application/x-tar")) + require.NoError(c, err) + assert.Equal(c, http.StatusOK, res.StatusCode) + + outBytes, err := testutil.ReadBody(body) + require.NoError(c, err) + assert.Contains(c, string(outBytes), "Successfully built") + assert.Equal(c, strings.Count(string(outBytes), "Using cache"), 4) + + _, err = client.BuildCachePrune(context.TODO()) + assert.Nil(c, err) + + du, err = client.DiskUsage(context.TODO()) + assert.Nil(c, err) + assert.Equal(c, du.BuilderSize, int64(0)) +} + +func testBuildWithSession(c *check.C, dir, dockerfile string) (outStr string) { + client, err := request.NewClient() + require.NoError(c, err) + + sess, err := session.NewSession("foo1", "foo") + assert.Nil(c, err) + + fsProvider := filesync.NewFSSyncProvider(dir, nil) + sess.Allow(fsProvider) + + g, ctx := errgroup.WithContext(context.Background()) + + g.Go(func() error { + return sess.Run(ctx, client.DialSession) + }) + + g.Go(func() error { + res, body, err := request.Post("/build?remote=client-session&session="+sess.UUID(), func(req *http.Request) error { + req.Body = ioutil.NopCloser(strings.NewReader(dockerfile)) + return nil + }) + if err != nil { + return err + } + assert.Equal(c, res.StatusCode, http.StatusOK) + out, err := testutil.ReadBody(body) + require.NoError(c, err) + assert.Contains(c, string(out), "Successfully built") + sess.Close() + outStr = string(out) + return nil + }) + + err = g.Wait() + assert.Nil(c, err) + return +} + type buildLine struct { Stream string Aux struct { diff --git a/integration-cli/docker_cli_daemon_test.go b/integration-cli/docker_cli_daemon_test.go index 4ec5ac230d..0361215965 100644 --- a/integration-cli/docker_cli_daemon_test.go +++ b/integration-cli/docker_cli_daemon_test.go @@ -1789,7 +1789,7 @@ func (s *DockerDaemonSuite) TestDaemonNoSpaceLeftOnDeviceError(c *check.C) { // create a 2MiB image and mount it as graph root // Why in a container? Because `mount` sometimes behaves weirdly and often fails outright on this test in debian:jessie (which is what the test suite runs under if run from the Makefile) - dockerCmd(c, "run", "--rm", "-v", testDir+":/test", "busybox", "sh", "-c", "dd of=/test/testfs.img bs=1M seek=2 count=0") + dockerCmd(c, "run", "--rm", "-v", testDir+":/test", "busybox", "sh", "-c", "dd of=/test/testfs.img bs=1M seek=3 count=0") icmd.RunCommand("mkfs.ext4", "-F", filepath.Join(testDir, "testfs.img")).Assert(c, icmd.Success) result := icmd.RunCommand("losetup", "-f", "--show", filepath.Join(testDir, "testfs.img")) diff --git a/integration-cli/request/request.go b/integration-cli/request/request.go index 1cb3ffec12..6f2bf650d4 100644 --- a/integration-cli/request/request.go +++ b/integration-cli/request/request.go @@ -17,6 +17,7 @@ import ( "strings" "time" + "github.com/docker/docker/api" dclient "github.com/docker/docker/client" "github.com/docker/docker/opts" "github.com/docker/docker/pkg/ioutils" @@ -170,7 +171,7 @@ func NewClient() (dclient.APIClient, error) { if err != nil { return nil, err } - return dclient.NewClient(host, "", httpClient, nil) + return dclient.NewClient(host, api.DefaultVersion, httpClient, nil) } // FIXME(vdemeester) httputil.ClientConn is deprecated, use http.Client instead (closer to actual client) diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go index 06e8e7e8bb..c6ad7c58a8 100644 --- a/pkg/archive/archive.go +++ b/pkg/archive/archive.go @@ -305,15 +305,7 @@ func (compression *Compression) Extension() string { // FileInfoHeader creates a populated Header from fi. // Compared to archive pkg this function fills in more information. -func FileInfoHeader(path, name string, fi os.FileInfo) (*tar.Header, error) { - var link string - if fi.Mode()&os.ModeSymlink != 0 { - var err error - link, err = os.Readlink(path) - if err != nil { - return nil, err - } - } +func FileInfoHeader(name string, fi os.FileInfo, link string) (*tar.Header, error) { hdr, err := tar.FileInfoHeader(fi, link) if err != nil { return nil, err @@ -327,12 +319,18 @@ func FileInfoHeader(path, name string, fi os.FileInfo) (*tar.Header, error) { if err := setHeaderForSpecialDevice(hdr, name, fi.Sys()); err != nil { return nil, err } + return hdr, nil +} + +// ReadSecurityXattrToTarHeader reads security.capability xattr from filesystem +// to a tar header +func ReadSecurityXattrToTarHeader(path string, hdr *tar.Header) error { capability, _ := system.Lgetxattr(path, "security.capability") if capability != nil { hdr.Xattrs = make(map[string]string) hdr.Xattrs["security.capability"] = string(capability) } - return hdr, nil + return nil } type tarWhiteoutConverter interface { @@ -386,10 +384,22 @@ func (ta *tarAppender) addTarFile(path, name string) error { return err } - hdr, err := FileInfoHeader(path, name, fi) + var link string + if fi.Mode()&os.ModeSymlink != 0 { + var err error + link, err = os.Readlink(path) + if err != nil { + return err + } + } + + hdr, err := FileInfoHeader(name, fi, link) if err != nil { return err } + if err := ReadSecurityXattrToTarHeader(path, hdr); err != nil { + return err + } // if it's not a directory and has more than 1 link, // it's hard linked, so set the type flag accordingly diff --git a/pkg/archive/archive_unix.go b/pkg/archive/archive_unix.go index 33ee88766f..a33f0fe779 100644 --- a/pkg/archive/archive_unix.go +++ b/pkg/archive/archive_unix.go @@ -45,16 +45,13 @@ func chmodTarEntry(perm os.FileMode) os.FileMode { func setHeaderForSpecialDevice(hdr *tar.Header, name string, stat interface{}) (err error) { s, ok := stat.(*syscall.Stat_t) - if !ok { - err = errors.New("cannot convert stat value to syscall.Stat_t") - return - } - - // Currently go does not fill in the major/minors - if s.Mode&syscall.S_IFBLK != 0 || - s.Mode&syscall.S_IFCHR != 0 { - hdr.Devmajor = int64(major(uint64(s.Rdev))) - hdr.Devminor = int64(minor(uint64(s.Rdev))) + if ok { + // Currently go does not fill in the major/minors + if s.Mode&syscall.S_IFBLK != 0 || + s.Mode&syscall.S_IFCHR != 0 { + hdr.Devmajor = int64(major(uint64(s.Rdev))) + hdr.Devminor = int64(minor(uint64(s.Rdev))) + } } return @@ -63,13 +60,10 @@ func setHeaderForSpecialDevice(hdr *tar.Header, name string, stat interface{}) ( func getInodeFromStat(stat interface{}) (inode uint64, err error) { s, ok := stat.(*syscall.Stat_t) - if !ok { - err = errors.New("cannot convert stat value to syscall.Stat_t") - return + if ok { + inode = uint64(s.Ino) } - inode = uint64(s.Ino) - return }