1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

vendor: update buildkit to 4d1f260e8

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2020-07-19 22:18:46 -07:00
parent cadd72d37d
commit 8774804ca7
65 changed files with 991 additions and 905 deletions

View file

@ -56,15 +56,16 @@ type SourceOpt struct {
LayerStore layer.Store
}
type imageSource struct {
// Source is the source implementation for accessing container images
type Source struct {
SourceOpt
g flightcontrol.Group
resolverCache *resolverCache
}
// NewSource creates a new image source
func NewSource(opt SourceOpt) (source.Source, error) {
is := &imageSource{
func NewSource(opt SourceOpt) (*Source, error) {
is := &Source{
SourceOpt: opt,
resolverCache: newResolverCache(),
}
@ -72,20 +73,22 @@ func NewSource(opt SourceOpt) (source.Source, error) {
return is, nil
}
func (is *imageSource) ID() string {
// ID returns image scheme identifier
func (is *Source) ID() string {
return source.DockerImageScheme
}
func (is *imageSource) getResolver(ctx context.Context, hosts docker.RegistryHosts, ref string, sm *session.Manager) remotes.Resolver {
if res := is.resolverCache.Get(ctx, ref); res != nil {
func (is *Source) getResolver(hosts docker.RegistryHosts, ref string, sm *session.Manager, g session.Group) remotes.Resolver {
if res := is.resolverCache.Get(ref, g); res != nil {
return res
}
r := resolver.New(ctx, hosts, sm)
r = is.resolverCache.Add(ctx, ref, r)
auth := resolver.NewSessionAuthenticator(sm, g)
r := resolver.New(hosts, auth)
r = is.resolverCache.Add(ref, auth, r, g)
return r
}
func (is *imageSource) resolveLocal(refStr string) (*image.Image, error) {
func (is *Source) resolveLocal(refStr string) (*image.Image, error) {
ref, err := distreference.ParseNormalizedNamed(refStr)
if err != nil {
return nil, err
@ -101,13 +104,13 @@ func (is *imageSource) resolveLocal(refStr string) (*image.Image, error) {
return img, nil
}
func (is *imageSource) resolveRemote(ctx context.Context, ref string, platform *ocispec.Platform, sm *session.Manager) (digest.Digest, []byte, error) {
func (is *Source) resolveRemote(ctx context.Context, ref string, platform *ocispec.Platform, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
type t struct {
dgst digest.Digest
dt []byte
}
res, err := is.g.Do(ctx, ref, func(ctx context.Context) (interface{}, error) {
dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(ctx, is.RegistryHosts, ref, sm), is.ContentStore, nil, platform)
dgst, dt, err := imageutil.Config(ctx, ref, is.getResolver(is.RegistryHosts, ref, sm, g), is.ContentStore, nil, platform)
if err != nil {
return nil, err
}
@ -121,14 +124,15 @@ func (is *imageSource) resolveRemote(ctx context.Context, ref string, platform *
return typed.dgst, typed.dt, nil
}
func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
// ResolveImageConfig returns image config for an image
func (is *Source) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
resolveMode, err := source.ParseImageResolveMode(opt.ResolveMode)
if err != nil {
return "", nil, err
}
switch resolveMode {
case source.ResolveModeForcePull:
dgst, dt, err := is.resolveRemote(ctx, ref, opt.Platform, sm)
dgst, dt, err := is.resolveRemote(ctx, ref, opt.Platform, sm, g)
// TODO: pull should fallback to local in case of failure to allow offline behavior
// the fallback doesn't work currently
return dgst, dt, err
@ -157,13 +161,14 @@ func (is *imageSource) ResolveImageConfig(ctx context.Context, ref string, opt l
}
}
// fallback to remote
return is.resolveRemote(ctx, ref, opt.Platform, sm)
return is.resolveRemote(ctx, ref, opt.Platform, sm, g)
}
// should never happen
return "", nil, fmt.Errorf("builder cannot resolve image %s: invalid mode %q", ref, opt.ResolveMode)
}
func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
// Resolve returns access to pulling for an identifier
func (is *Source) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
imageIdentifier, ok := id.(*source.ImageIdentifier)
if !ok {
return nil, errors.Errorf("invalid image identifier %v", id)
@ -175,9 +180,9 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se
}
p := &puller{
src: imageIdentifier,
is: is,
resolver: is.getResolver(ctx, is.RegistryHosts, imageIdentifier.Reference.String(), sm),
src: imageIdentifier,
is: is,
//resolver: is.getResolver(is.RegistryHosts, imageIdentifier.Reference.String(), sm, g),
platform: platform,
sm: sm,
}
@ -185,19 +190,29 @@ func (is *imageSource) Resolve(ctx context.Context, id source.Identifier, sm *se
}
type puller struct {
is *imageSource
is *Source
resolveOnce sync.Once
resolveLocalOnce sync.Once
src *source.ImageIdentifier
desc ocispec.Descriptor
ref string
resolveErr error
resolver remotes.Resolver
resolverInstance remotes.Resolver
resolverOnce sync.Once
config []byte
platform ocispec.Platform
sm *session.Manager
}
func (p *puller) resolver(g session.Group) remotes.Resolver {
p.resolverOnce.Do(func() {
if p.resolverInstance == nil {
p.resolverInstance = p.is.getResolver(p.is.RegistryHosts, p.src.Reference.String(), p.sm, g)
}
})
return p.resolverInstance
}
func (p *puller) mainManifestKey(dgst digest.Digest, platform ocispec.Platform) (digest.Digest, error) {
dt, err := json.Marshal(struct {
Digest digest.Digest
@ -255,7 +270,7 @@ func (p *puller) resolveLocal() {
})
}
func (p *puller) resolve(ctx context.Context) error {
func (p *puller) resolve(ctx context.Context, g session.Group) error {
p.resolveOnce.Do(func() {
resolveProgressDone := oneOffProgress(ctx, "resolve "+p.src.Reference.String())
@ -267,7 +282,7 @@ func (p *puller) resolve(ctx context.Context) error {
}
if p.desc.Digest == "" && p.config == nil {
origRef, desc, err := p.resolver.Resolve(ctx, ref.String())
origRef, desc, err := p.resolver(g).Resolve(ctx, ref.String())
if err != nil {
p.resolveErr = err
_ = resolveProgressDone(err)
@ -290,7 +305,7 @@ func (p *puller) resolve(ctx context.Context) error {
_ = resolveProgressDone(err)
return
}
_, dt, err := p.is.ResolveImageConfig(ctx, ref.String(), llb.ResolveImageConfigOpt{Platform: &p.platform, ResolveMode: resolveModeToString(p.src.ResolveMode)}, p.sm)
_, dt, err := p.is.ResolveImageConfig(ctx, ref.String(), llb.ResolveImageConfigOpt{Platform: &p.platform, ResolveMode: resolveModeToString(p.src.ResolveMode)}, p.sm, g)
if err != nil {
p.resolveErr = err
_ = resolveProgressDone(err)
@ -304,7 +319,7 @@ func (p *puller) resolve(ctx context.Context) error {
return p.resolveErr
}
func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (p *puller) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
p.resolveLocal()
if p.desc.Digest != "" && index == 0 {
@ -323,7 +338,7 @@ func (p *puller) CacheKey(ctx context.Context, index int) (string, bool, error)
return k, true, nil
}
if err := p.resolve(ctx); err != nil {
if err := p.resolve(ctx, g); err != nil {
return "", false, err
}
@ -364,9 +379,9 @@ func (p *puller) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cac
}, parent, opts...)
}
func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
func (p *puller) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
p.resolveLocal()
if err := p.resolve(ctx); err != nil {
if err := p.resolve(ctx, g); err != nil {
return nil, err
}
@ -404,7 +419,7 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
<-progressDone
}()
fetcher, err := p.resolver.Fetcher(ctx, p.ref)
fetcher, err := p.resolver(g).Fetcher(ctx, p.ref)
if err != nil {
stopProgress()
return nil, err
@ -414,7 +429,7 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
// workaround for GCR bug that requires a request to manifest endpoint for authentication to work.
// if current resolver has not used manifests do a dummy request.
// in most cases resolver should be cached and extra request is not needed.
ensureManifestRequested(ctx, p.resolver, p.ref)
ensureManifestRequested(ctx, p.resolver(g), p.ref)
var (
schema1Converter *schema1.Converter
@ -578,7 +593,7 @@ func (p *puller) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
type layerDescriptor struct {
is *imageSource
is *Source
fetcher remotes.Fetcher
desc ocispec.Descriptor
diffID layer.DiffID
@ -839,6 +854,7 @@ type cachedResolver struct {
counter int64 // needs to be 64bit aligned for 32bit systems
timeout time.Time
remotes.Resolver
auth *resolver.SessionAuthenticator
}
func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string, desc ocispec.Descriptor, err error) {
@ -846,19 +862,21 @@ func (cr *cachedResolver) Resolve(ctx context.Context, ref string) (name string,
return cr.Resolver.Resolve(ctx, ref)
}
func (r *resolverCache) Add(ctx context.Context, ref string, resolver remotes.Resolver) remotes.Resolver {
func (r *resolverCache) Add(ref string, auth *resolver.SessionAuthenticator, resolver remotes.Resolver, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref) + "-" + session.FromContext(ctx)
ref = r.repo(ref)
cr, ok := r.m[ref]
cr.timeout = time.Now().Add(time.Minute)
if ok {
cr.auth.AddSession(g)
return &cr
}
cr.Resolver = resolver
cr.auth = auth
r.m[ref] = cr
return &cr
}
@ -871,17 +889,18 @@ func (r *resolverCache) repo(refStr string) string {
return ref.Name()
}
func (r *resolverCache) Get(ctx context.Context, ref string) remotes.Resolver {
func (r *resolverCache) Get(ref string, g session.Group) *cachedResolver {
r.mu.Lock()
defer r.mu.Unlock()
ref = r.repo(ref) + "-" + session.FromContext(ctx)
ref = r.repo(ref)
cr, ok := r.m[ref]
if !ok {
return nil
if ok {
cr.auth.AddSession(g)
return &cr
}
return &cr
return nil
}
func (r *resolverCache) clean(now time.Time) {

View file

@ -27,11 +27,11 @@ func ResolveCacheImporterFunc(sm *session.Manager, resolverFunc docker.RegistryH
upstream := registryremotecache.ResolveCacheImporterFunc(sm, cs, resolverFunc)
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, group session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
if dt, err := tryImportLocal(rs, is, attrs["ref"]); err == nil {
return newLocalImporter(dt), specs.Descriptor{}, nil
}
return upstream(ctx, attrs)
return upstream(ctx, group, attrs)
}
}

View file

@ -3,7 +3,6 @@ package buildkit
import (
"context"
"errors"
"io"
"github.com/docker/docker/daemon/config"
"github.com/docker/docker/pkg/idtools"
@ -20,7 +19,11 @@ func newExecutor(_, _ string, _ libnetwork.NetworkController, _ *oci.DNSConfig,
type winExecutor struct {
}
func (e *winExecutor) Exec(ctx context.Context, meta executor.Meta, rootfs cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
func (w *winExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
return errors.New("buildkit executor not implemented for windows")
}
func (w *winExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
return errors.New("buildkit executor not implemented for windows")
}

View file

@ -77,8 +77,7 @@ func (e *imageExporterInstance) Name() string {
return "exporting to image"
}
func (e *imageExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *imageExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
if len(inp.Refs) > 1 {
return nil, fmt.Errorf("exporting multiple references to image store is currently unsupported")
}

View file

@ -14,6 +14,7 @@ import (
"github.com/containerd/containerd/images"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/rootfs"
"github.com/docker/docker/builder/builder-next/adapters/containerimage"
"github.com/docker/docker/distribution"
distmetadata "github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
@ -66,7 +67,7 @@ type Opt struct {
Snapshotter snapshot.Snapshotter
ContentStore content.Store
CacheManager cache.Manager
ImageSource source.Source
ImageSource *containerimage.Source
DownloadManager distribution.RootFSDownloadManager
V2MetadataService distmetadata.V2MetadataService
Transport nethttp.RoundTripper
@ -175,7 +176,7 @@ func (w *Worker) LoadRef(id string, hidden bool) (cache.ImmutableRef, error) {
if hidden {
opts = append(opts, cache.NoUpdateLastUsed)
}
return w.CacheManager.Get(context.TODO(), id, opts...)
return w.CacheManager().Get(context.TODO(), id, opts...)
}
// ResolveOp converts a LLB vertex into a LLB operation
@ -185,9 +186,9 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
case *pb.Op_Source:
return ops.NewSourceOp(v, op, baseOp.Platform, w.SourceManager, sm, w)
case *pb.Op_Exec:
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager, sm, w.MetadataStore, w.Executor, w)
return ops.NewExecOp(v, op, baseOp.Platform, w.CacheManager(), sm, w.MetadataStore, w.Executor(), w)
case *pb.Op_File:
return ops.NewFileOp(v, op, w.CacheManager, w.MetadataStore, w)
return ops.NewFileOp(v, op, w.CacheManager(), w.MetadataStore, w)
case *pb.Op_Build:
return ops.NewBuildOp(v, op, s, w)
}
@ -196,33 +197,18 @@ func (w *Worker) ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *se
}
// ResolveImageConfig returns image config for an image
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error) {
// ImageSource is typically source/containerimage
resolveImageConfig, ok := w.ImageSource.(resolveImageConfig)
if !ok {
return "", nil, errors.Errorf("worker %q does not implement ResolveImageConfig", w.ID())
}
return resolveImageConfig.ResolveImageConfig(ctx, ref, opt, sm)
}
// Exec executes a process directly on a worker
func (w *Worker) Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
active, err := w.CacheManager.New(ctx, rootFS)
if err != nil {
return err
}
defer active.Release(context.TODO())
return w.Executor.Exec(ctx, meta, active, nil, stdin, stdout, stderr)
func (w *Worker) ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error) {
return w.ImageSource.ResolveImageConfig(ctx, ref, opt, sm, g)
}
// DiskUsage returns disk usage report
func (w *Worker) DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error) {
return w.CacheManager.DiskUsage(ctx, opt)
return w.CacheManager().DiskUsage(ctx, opt)
}
// Prune deletes reclaimable build cache
func (w *Worker) Prune(ctx context.Context, ch chan client.UsageInfo, info ...client.PruneInfo) error {
return w.CacheManager.Prune(ctx, ch, info...)
return w.CacheManager().Prune(ctx, ch, info...)
}
// Exporter returns exporter by name
@ -292,7 +278,7 @@ func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
for _, si := range sis {
for _, k := range si.Indexes() {
if k == id || strings.HasPrefix(k, id+":") {
if siCached := w.CacheManager.Metadata(si.ID()); siCached != nil {
if siCached := w.CacheManager().Metadata(si.ID()); siCached != nil {
si = siCached
}
if err := cache.CachePolicyDefault(si); err != nil {
@ -305,7 +291,7 @@ func (w *Worker) PruneCacheMounts(ctx context.Context, ids []string) error {
return err
}
// if ref is unused try to clean it up right away by releasing it
if mref, err := w.CacheManager.GetMutable(ctx, si.ID()); err == nil {
if mref, err := w.CacheManager().GetMutable(ctx, si.ID()); err == nil {
go mref.Release(context.TODO())
}
break
@ -328,7 +314,7 @@ func (w *Worker) getRef(ctx context.Context, diffIDs []layer.DiffID, opts ...cac
}
defer parent.Release(context.TODO())
}
return w.CacheManager.GetByBlob(context.TODO(), ocispec.Descriptor{
return w.CacheManager().GetByBlob(context.TODO(), ocispec.Descriptor{
Annotations: map[string]string{
"containerd.io/uncompressed": diffIDs[len(diffIDs)-1].String(),
},
@ -396,6 +382,16 @@ func (w *Worker) FromRemote(ctx context.Context, remote *solver.Remote) (cache.I
return nil, errors.Errorf("unreachable")
}
// Executor returns executor.Executor for running processes
func (w *Worker) Executor() executor.Executor {
return w.Opt.Executor
}
// CacheManager returns cache.Manager for accessing local storage
func (w *Worker) CacheManager() cache.Manager {
return w.Opt.CacheManager
}
type discardProgress struct{}
func (*discardProgress) WriteProgress(_ pkgprogress.Progress) error {
@ -489,10 +485,6 @@ func oneOffProgress(ctx context.Context, id string) func(err error) error {
}
}
type resolveImageConfig interface {
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
}
type emptyProvider struct {
}

View file

@ -27,8 +27,8 @@ github.com/imdario/mergo 1afb36080aec31e0d1528973ebe6
golang.org/x/sync cd5d95a43a6e21273425c7ae415d3df9ea832eeb
# buildkit
github.com/moby/buildkit df35e9818d1f9066e616e03f4b8d727c97562e5b
github.com/tonistiigi/fsutil c2c7d7b0e1441705cd802e5699c0a10b1dfe39fd
github.com/moby/buildkit 4d1f260e8490ec438ab66e08bb105577aca0ce06
github.com/tonistiigi/fsutil ae3a8d753069d0f76fbee396457e8b6cfd7cb8c3
github.com/grpc-ecosystem/grpc-opentracing 8e809c8a86450a29b90dcc9efbf062d0fe6d9746
github.com/opentracing/opentracing-go 1361b9cd60be79c4c3a7fa9841b3c132e40066a7
github.com/google/shlex e7afc7fbc51079733e9468cdfd1efcd7d196cd1d

View file

@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/contentutil"
"github.com/moby/buildkit/util/progress"
@ -19,7 +20,7 @@ import (
"github.com/pkg/errors"
)
type ResolveCacheExporterFunc func(ctx context.Context, attrs map[string]string) (Exporter, error)
type ResolveCacheExporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Exporter, error)
func oneOffProgress(ctx context.Context, id string) func(err error) error {
pw, _, _ := progress.FromContext(ctx)

View file

@ -10,6 +10,7 @@ import (
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/images"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/util/imageutil"
"github.com/moby/buildkit/worker"
@ -21,7 +22,7 @@ import (
)
// ResolveCacheImporterFunc returns importer and descriptor.
type ResolveCacheImporterFunc func(ctx context.Context, attrs map[string]string) (Importer, ocispec.Descriptor, error)
type ResolveCacheImporterFunc func(ctx context.Context, g session.Group, attrs map[string]string) (Importer, ocispec.Descriptor, error)
type Importer interface {
Resolve(ctx context.Context, desc ocispec.Descriptor, id string, w worker.Worker) (solver.CacheManager, error)

View file

@ -6,13 +6,14 @@ import (
"github.com/moby/buildkit/cache/remotecache"
v1 "github.com/moby/buildkit/cache/remotecache/v1"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
digest "github.com/opencontainers/go-digest"
"github.com/sirupsen/logrus"
)
func ResolveCacheExporterFunc() remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, _ map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, _ session.Group, _ map[string]string) (remotecache.Exporter, error) {
return NewExporter(), nil
}
}
@ -72,7 +73,7 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) {
return nil, nil
}
cache := map[digest.Digest]int{}
cache := map[int]int{}
// reorder layers based on the order in the image
for i, r := range cfg.Records {
@ -93,14 +94,14 @@ func (ce *exporter) ExportForLayers(layers []digest.Digest) ([]byte, error) {
return dt, nil
}
func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[digest.Digest]int) int {
func getSortedLayerIndex(idx int, layers []v1.CacheLayer, cache map[int]int) int {
if idx == -1 {
return -1
}
l := layers[idx]
if i, ok := cache[l.Blob]; ok {
if i, ok := cache[idx]; ok {
return i
}
cache[l.Blob] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1
return cache[l.Blob]
cache[idx] = getSortedLayerIndex(l.ParentIndex, layers, cache) + 1
return cache[idx]
}

View file

@ -22,13 +22,13 @@ const (
// ResolveCacheExporterFunc for "local" cache exporter.
func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
store := attrs[attrDest]
if store == "" {
return nil, errors.New("local cache exporter requires dest")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, err
}
@ -38,7 +38,7 @@ func ResolveCacheExporterFunc(sm *session.Manager) remotecache.ResolveCacheExpor
// ResolveCacheImporterFunc for "local" cache importer.
func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
dgstStr := attrs[attrDigest]
if dgstStr == "" {
return nil, specs.Descriptor{}, errors.New("local cache importer requires explicit digest")
@ -49,7 +49,7 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
return nil, specs.Descriptor{}, errors.New("local cache importer requires src")
}
csID := contentStoreIDPrefix + store
cs, err := getContentStore(ctx, sm, csID)
cs, err := getContentStore(ctx, sm, g, csID)
if err != nil {
return nil, specs.Descriptor{}, err
}
@ -67,8 +67,9 @@ func ResolveCacheImporterFunc(sm *session.Manager) remotecache.ResolveCacheImpor
}
}
func getContentStore(ctx context.Context, sm *session.Manager, storeID string) (content.Store, error) {
sessionID := session.FromContext(ctx)
func getContentStore(ctx context.Context, sm *session.Manager, g session.Group, storeID string) (content.Store, error) {
// TODO: to ensure correct session is detected, new api for finding if storeID is supported is needed
sessionID := g.SessionIterator().NextSession()
if sessionID == "" {
return nil, errors.New("local cache exporter/importer requires session")
}

View file

@ -32,12 +32,12 @@ const (
)
func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) remotecache.ResolveCacheExporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Exporter, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Exporter, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
pusher, err := remote.Pusher(ctx, ref)
if err != nil {
return nil, err
@ -47,12 +47,12 @@ func ResolveCacheExporterFunc(sm *session.Manager, hosts docker.RegistryHosts) r
}
func ResolveCacheImporterFunc(sm *session.Manager, cs content.Store, hosts docker.RegistryHosts) remotecache.ResolveCacheImporterFunc {
return func(ctx context.Context, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
return func(ctx context.Context, g session.Group, attrs map[string]string) (remotecache.Importer, specs.Descriptor, error) {
ref, err := canonicalizeRef(attrs[attrRef])
if err != nil {
return nil, specs.Descriptor{}, err
}
remote := resolver.New(ctx, hosts, sm)
remote := resolver.New(hosts, resolver.NewSessionAuthenticator(sm, g))
xref, desc, err := remote.Resolve(ctx, ref)
if err != nil {
return nil, specs.Descriptor{}, err

View file

@ -220,6 +220,7 @@ func (cs *cacheResultStorage) LoadWithParents(ctx context.Context, res solver.Ca
m := map[string]solver.Result{}
visited := make(map[*item]struct{})
if err := v.walkAllResults(func(i *item) error {
if i.result == nil {
return nil
@ -236,7 +237,7 @@ func (cs *cacheResultStorage) LoadWithParents(ctx context.Context, res solver.Ca
m[id] = worker.NewWorkerRefResult(ref, cs.w)
}
return nil
}); err != nil {
}, visited); err != nil {
for _, v := range m {
v.Release(context.TODO())
}

View file

@ -128,13 +128,17 @@ func (c *item) LinkFrom(rec solver.CacheExporterRecord, index int, selector stri
c.links[index][link{src: src, selector: selector}] = struct{}{}
}
func (c *item) walkAllResults(fn func(i *item) error) error {
func (c *item) walkAllResults(fn func(i *item) error, visited map[*item]struct{}) error {
if _, ok := visited[c]; ok {
return nil
}
visited[c] = struct{}{}
if err := fn(c); err != nil {
return err
}
for _, links := range c.links {
for l := range links {
if err := l.src.walkAllResults(fn); err != nil {
if err := l.src.walkAllResults(fn, visited); err != nil {
return err
}
}

View file

@ -215,7 +215,10 @@ func Git(remote, ref string, opts ...GitOption) State {
id += "#" + ref
}
gi := &GitInfo{}
gi := &GitInfo{
AuthHeaderSecret: "GIT_AUTH_HEADER",
AuthTokenSecret: "GIT_AUTH_TOKEN",
}
for _, o := range opts {
o.SetGitOption(gi)
}
@ -228,6 +231,14 @@ func Git(remote, ref string, opts ...GitOption) State {
attrs[pb.AttrFullRemoteURL] = url
addCap(&gi.Constraints, pb.CapSourceGitFullURL)
}
if gi.AuthTokenSecret != "" {
attrs[pb.AttrAuthTokenSecret] = gi.AuthTokenSecret
addCap(&gi.Constraints, pb.CapSourceGitHttpAuth)
}
if gi.AuthHeaderSecret != "" {
attrs[pb.AttrAuthHeaderSecret] = gi.AuthHeaderSecret
addCap(&gi.Constraints, pb.CapSourceGitHttpAuth)
}
addCap(&gi.Constraints, pb.CapSourceGit)
@ -246,7 +257,9 @@ func (fn gitOptionFunc) SetGitOption(gi *GitInfo) {
type GitInfo struct {
constraintsWrapper
KeepGitDir bool
KeepGitDir bool
AuthTokenSecret string
AuthHeaderSecret string
}
func KeepGitDir() GitOption {
@ -255,6 +268,18 @@ func KeepGitDir() GitOption {
})
}
func AuthTokenSecret(v string) GitOption {
return gitOptionFunc(func(gi *GitInfo) {
gi.AuthTokenSecret = v
})
}
func AuthHeaderSecret(v string) GitOption {
return gitOptionFunc(func(gi *GitInfo) {
gi.AuthHeaderSecret = v
})
}
func Scratch() State {
return NewState(nil)
}

View file

@ -220,7 +220,6 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if err := translateLegacySolveRequest(req); err != nil {
return nil, err
}
ctx = session.NewContext(ctx, req.Session)
defer func() {
time.AfterFunc(time.Second, c.throttledGC)
@ -260,7 +259,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
if !ok {
return nil, errors.Errorf("unknown cache exporter: %q", e.Type)
}
cacheExporter, err = cacheExporterFunc(ctx, e.Attrs)
cacheExporter, err = cacheExporterFunc(ctx, session.NewGroup(req.Session), e.Attrs)
if err != nil {
return nil, err
}
@ -273,7 +272,7 @@ func (c *Controller) Solve(ctx context.Context, req *controlapi.SolveRequest) (*
})
}
resp, err := c.solver.Solve(ctx, req.Ref, frontend.SolveRequest{
resp, err := c.solver.Solve(ctx, req.Ref, req.Session, frontend.SolveRequest{
Frontend: req.Frontend,
Definition: req.Definition,
FrontendOpt: req.FrontendAttrs,

View file

@ -28,9 +28,20 @@ type Mount struct {
Readonly bool
}
type ProcessInfo struct {
Meta Meta
Stdin io.ReadCloser
Stdout, Stderr io.WriteCloser
}
type Executor interface {
// TODO: add stdout/err
Exec(ctx context.Context, meta Meta, rootfs cache.Mountable, mounts []Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
// Run will start a container for the given process with rootfs, mounts.
// `id` is an optional name for the container so it can be referenced later via Exec.
// `started` is an optional channel that will be closed when the container setup completes and has started running.
Run(ctx context.Context, id string, rootfs cache.Mountable, mounts []Mount, process ProcessInfo, started chan<- struct{}) error
// Exec will start a process in container matching `id`. An error will be returned
// if the container failed to start (via Run) or has exited before Exec is called.
Exec(ctx context.Context, id string, process ProcessInfo) error
}
type HostIP struct {

View file

@ -8,6 +8,7 @@ import (
"os/exec"
"path/filepath"
"strings"
"sync"
"syscall"
"time"
@ -24,6 +25,7 @@ import (
"github.com/moby/buildkit/util/network"
rootlessspecconv "github.com/moby/buildkit/util/rootless/specconv"
"github.com/moby/buildkit/util/stack"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
@ -59,6 +61,8 @@ type runcExecutor struct {
noPivot bool
dns *oci.DNSConfig
oomScoreAdj *int
running map[string]chan error
mu sync.Mutex
}
func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Executor, error) {
@ -119,11 +123,32 @@ func New(opt Opt, networkProviders map[pb.NetMode]network.Provider) (executor.Ex
noPivot: opt.NoPivot,
dns: opt.DNS,
oomScoreAdj: opt.OOMScoreAdj,
running: make(map[string]chan error),
}
return w, nil
}
func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.Mountable, mounts []executor.Mount, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error {
func (w *runcExecutor) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
meta := process.Meta
startedOnce := sync.Once{}
done := make(chan error, 1)
w.mu.Lock()
w.running[id] = done
w.mu.Unlock()
defer func() {
w.mu.Lock()
delete(w.running, id)
w.mu.Unlock()
done <- err
close(done)
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
}()
provider, ok := w.networkProviders[meta.NetMode]
if !ok {
return errors.Errorf("unknown network mode %s", meta.NetMode)
@ -164,7 +189,9 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
defer release()
}
id := identity.NewID()
if id == "" {
id = identity.NewID()
}
bundle := filepath.Join(w.root, id)
if err := os.Mkdir(bundle, 0711); err != nil {
@ -245,6 +272,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
}
}
spec.Process.Terminal = meta.Tty
spec.Process.OOMScoreAdj = w.oomScoreAdj
if w.rootless {
if err := rootlessspecconv.ToRootless(spec); err != nil {
@ -260,7 +288,7 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
runCtx, cancelRun := context.WithCancel(context.Background())
defer cancelRun()
done := make(chan struct{})
ended := make(chan struct{})
go func() {
for {
select {
@ -279,21 +307,27 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
timeout()
select {
case <-time.After(50 * time.Millisecond):
case <-done:
case <-ended:
return
}
case <-done:
case <-ended:
return
}
}
}()
logrus.Debugf("> creating %s %v", id, meta.Args)
// this is a cheat, we have not actually started, but as close as we can get with runc for now
if started != nil {
startedOnce.Do(func() {
close(started)
})
}
status, err := w.runc.Run(runCtx, id, bundle, &runc.CreateOpts{
IO: &forwardIO{stdin: stdin, stdout: stdout, stderr: stderr},
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
NoPivot: w.noPivot,
})
close(done)
close(ended)
if status != 0 || err != nil {
if err == nil {
@ -310,6 +344,74 @@ func (w *runcExecutor) Exec(ctx context.Context, meta executor.Meta, root cache.
return nil
}
func (w *runcExecutor) Exec(ctx context.Context, id string, process executor.ProcessInfo) error {
// first verify the container is running, if we get an error assume the container
// is in the process of being created and check again every 100ms or until
// context is canceled.
var state *runc.Container
for {
w.mu.Lock()
done, ok := w.running[id]
w.mu.Unlock()
if !ok {
return errors.Errorf("container %s not found", id)
}
state, _ = w.runc.State(ctx, id)
if state != nil && state.Status == "running" {
break
}
select {
case <-ctx.Done():
return ctx.Err()
case err, ok := <-done:
if !ok || err == nil {
return errors.Errorf("container %s has stopped", id)
}
return errors.Wrapf(err, "container %s has exited with error", id)
case <-time.After(100 * time.Millisecond):
}
}
// load default process spec (for Env, Cwd etc) from bundle
f, err := os.Open(filepath.Join(state.Bundle, "config.json"))
if err != nil {
return errors.WithStack(err)
}
defer f.Close()
spec := &specs.Spec{}
if err := json.NewDecoder(f).Decode(spec); err != nil {
return err
}
if process.Meta.User != "" {
uid, gid, sgids, err := oci.GetUser(ctx, state.Rootfs, process.Meta.User)
if err != nil {
return err
}
spec.Process.User = specs.User{
UID: uid,
GID: gid,
AdditionalGids: sgids,
}
}
spec.Process.Terminal = process.Meta.Tty
spec.Process.Args = process.Meta.Args
if process.Meta.Cwd != "" {
spec.Process.Cwd = process.Meta.Cwd
}
if len(process.Meta.Env) > 0 {
spec.Process.Env = process.Meta.Env
}
return w.runc.Exec(ctx, id, *spec.Process, &runc.ExecOpts{
IO: &forwardIO{stdin: process.Stdin, stdout: process.Stdout, stderr: process.Stderr},
})
}
type forwardIO struct {
stdin io.ReadCloser
stdout, stderr io.WriteCloser

View file

@ -12,7 +12,7 @@ type Exporter interface {
type ExporterInstance interface {
Name() string
Export(context.Context, Source) (map[string]string, error)
Export(ctx context.Context, src Source, sessionID string) (map[string]string, error)
}
type Source struct {

View file

@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
"golang.org/x/sync/errgroup"
@ -36,33 +35,27 @@ func New(opt Opt) (exporter.Exporter, error) {
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
return li, nil
return &localExporterInstance{localExporter: e}, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
isMap := len(inp.Refs) > 0
export := func(ctx context.Context, k string, ref cache.ImmutableRef) func() error {
@ -125,7 +118,7 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
}
progress := newProgressHandler(ctx, lbl)
if err := filesync.CopyToCaller(ctx, fs, e.caller, progress); err != nil {
if err := filesync.CopyToCaller(ctx, fs, caller, progress); err != nil {
return err
}
return nil

View file

@ -14,7 +14,6 @@ import (
"github.com/moby/buildkit/session/filesync"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/util/progress"
"github.com/pkg/errors"
"github.com/tonistiigi/fsutil"
fstypes "github.com/tonistiigi/fsutil/types"
)
@ -34,33 +33,19 @@ func New(opt Opt) (exporter.Exporter, error) {
}
func (e *localExporter) Resolve(ctx context.Context, opt map[string]string) (exporter.ExporterInstance, error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
li := &localExporterInstance{localExporter: e, caller: caller}
li := &localExporterInstance{localExporter: e}
return li, nil
}
type localExporterInstance struct {
*localExporter
caller session.Caller
}
func (e *localExporterInstance) Name() string {
return "exporting to client"
}
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source) (map[string]string, error) {
func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source, sessionID string) (map[string]string, error) {
var defers []func()
defer func() {
@ -147,7 +132,15 @@ func (e *localExporterInstance) Export(ctx context.Context, inp exporter.Source)
fs = d.FS
}
w, err := filesync.CopyFileWriter(ctx, nil, e.caller)
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.opt.SessionManager.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
w, err := filesync.CopyFileWriter(ctx, nil, caller)
if err != nil {
return nil, err
}

View file

@ -164,7 +164,7 @@ func Build(ctx context.Context, c client.Client) (*client.Result, error) {
},
})
if err != nil {
return nil, errors.Errorf("failed to read downloaded context")
return nil, errors.Wrapf(err, "failed to read downloaded context")
}
if isArchive(dt) {
if fileop {

View file

@ -2,9 +2,7 @@ package frontend
import (
"context"
"io"
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/executor"
@ -14,13 +12,13 @@ import (
)
type Frontend interface {
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition) (*Result, error)
Solve(ctx context.Context, llb FrontendLLBBridge, opt map[string]string, inputs map[string]*pb.Definition, sid string) (*Result, error)
}
type FrontendLLBBridge interface {
Solve(ctx context.Context, req SolveRequest) (*Result, error)
executor.Executor
Solve(ctx context.Context, req SolveRequest, sid string) (*Result, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt) (digest.Digest, []byte, error)
Exec(ctx context.Context, meta executor.Meta, rootfs cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
}
type SolveRequest = gw.SolveRequest

View file

@ -11,7 +11,6 @@ import (
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/frontend/gateway/client"
gwpb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
@ -20,12 +19,12 @@ import (
fstypes "github.com/tonistiigi/fsutil/types"
)
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, workerInfos []clienttypes.WorkerInfo) (*bridgeClient, error) {
func llbBridgeToGatewayClient(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, workerInfos []clienttypes.WorkerInfo, sid string) (*bridgeClient, error) {
return &bridgeClient{
opts: opts,
inputs: inputs,
FrontendLLBBridge: llbBridge,
sid: session.FromContext(ctx),
sid: sid,
workerInfos: workerInfos,
final: map[*ref]struct{}{},
}, nil
@ -50,7 +49,7 @@ func (c *bridgeClient) Solve(ctx context.Context, req client.SolveRequest) (*cli
FrontendOpt: req.FrontendOpt,
FrontendInputs: req.FrontendInputs,
CacheImports: req.CacheImports,
})
}, c.sid)
if err != nil {
return nil, err
}

View file

@ -20,8 +20,8 @@ type GatewayForwarder struct {
f client.BuildFunc
}
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition) (retRes *frontend.Result, retErr error) {
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers.WorkerInfos())
func (gf *GatewayForwarder) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*pb.Definition, sid string) (retRes *frontend.Result, retErr error) {
c, err := llbBridgeToGatewayClient(ctx, llbBridge, opts, inputs, gf.workers.WorkerInfos(), sid)
if err != nil {
return nil, err
}

View file

@ -25,7 +25,6 @@ import (
"github.com/moby/buildkit/frontend"
pb "github.com/moby/buildkit/frontend/gateway/pb"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
opspb "github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/util/apicaps"
@ -68,26 +67,24 @@ func filterPrefix(opts map[string]string, pfx string) map[string]string {
return m
}
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition) (*frontend.Result, error) {
func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.FrontendLLBBridge, opts map[string]string, inputs map[string]*opspb.Definition, sid string) (*frontend.Result, error) {
source, ok := opts[keySource]
if !ok {
return nil, errors.Errorf("no source specified for gateway")
}
sid := session.FromContext(ctx)
_, isDevel := opts[keyDevel]
var img specs.Image
var rootFS cache.ImmutableRef
var rootFS cache.MutableRef
var readonly bool // TODO: try to switch to read-only by default.
if isDevel {
devRes, err := llbBridge.Solve(session.NewContext(ctx, "gateway:"+sid),
devRes, err := llbBridge.Solve(ctx,
frontend.SolveRequest{
Frontend: source,
FrontendOpt: filterPrefix(opts, "gateway-"),
FrontendInputs: inputs,
})
}, "gateway:"+sid)
if err != nil {
return nil, err
}
@ -107,7 +104,12 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
if !ok {
return nil, errors.Errorf("invalid ref: %T", res.Sys())
}
rootFS = workerRef.ImmutableRef
rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef)
if err != nil {
return nil, err
}
defer rootFS.Release(context.TODO())
config, ok := devRes.Metadata[exptypes.ExporterImageConfigKey]
if ok {
if err := json.Unmarshal(config, &img); err != nil {
@ -145,7 +147,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
res, err := llbBridge.Solve(ctx, frontend.SolveRequest{
Definition: def.ToPB(),
})
}, sid)
if err != nil {
return nil, err
}
@ -166,10 +168,14 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
if !ok {
return nil, errors.Errorf("invalid ref: %T", r.Sys())
}
rootFS = workerRef.ImmutableRef
rootFS, err = workerRef.Worker.CacheManager().New(ctx, workerRef.ImmutableRef)
if err != nil {
return nil, err
}
defer rootFS.Release(context.TODO())
}
lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs)
lbf, ctx, err := newLLBBridgeForwarder(ctx, llbBridge, gf.workers, inputs, sid)
defer lbf.conn.Close()
if err != nil {
return nil, err
@ -218,7 +224,7 @@ func (gf *gatewayFrontend) Solve(ctx context.Context, llbBridge frontend.Fronten
}
}
err = llbBridge.Exec(ctx, meta, rootFS, lbf.Stdin, lbf.Stdout, os.Stderr)
err = llbBridge.Run(ctx, "", rootFS, nil, executor.ProcessInfo{Meta: meta, Stdin: lbf.Stdin, Stdout: lbf.Stdout, Stderr: os.Stderr}, nil)
if err != nil {
if errors.Is(err, context.Canceled) && lbf.isErrServerClosed {
@ -296,7 +302,7 @@ func (lbf *llbBridgeForwarder) Result() (*frontend.Result, error) {
return lbf.result, nil
}
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) *llbBridgeForwarder {
func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string) *llbBridgeForwarder {
lbf := &llbBridgeForwarder{
callCtx: ctx,
llbBridge: llbBridge,
@ -305,13 +311,14 @@ func NewBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridg
pipe: newPipe(),
workers: workers,
inputs: inputs,
sid: sid,
}
return lbf
}
func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition) (*llbBridgeForwarder, context.Context, error) {
func newLLBBridgeForwarder(ctx context.Context, llbBridge frontend.FrontendLLBBridge, workers frontend.WorkerInfos, inputs map[string]*opspb.Definition, sid string) (*llbBridgeForwarder, context.Context, error) {
ctx, cancel := context.WithCancel(ctx)
lbf := NewBridgeForwarder(ctx, llbBridge, workers, inputs)
lbf := NewBridgeForwarder(ctx, llbBridge, workers, inputs, sid)
server := grpc.NewServer(grpc.UnaryInterceptor(grpcerrors.UnaryServerInterceptor), grpc.StreamInterceptor(grpcerrors.StreamServerInterceptor))
grpc_health_v1.RegisterHealthServer(server, health.NewServer())
pb.RegisterLLBBridgeServer(server, lbf)
@ -403,6 +410,7 @@ type llbBridgeForwarder struct {
workers frontend.WorkerInfos
inputs map[string]*opspb.Definition
isErrServerClosed bool
sid string
*pipe
}
@ -465,7 +473,7 @@ func (lbf *llbBridgeForwarder) Solve(ctx context.Context, req *pb.SolveRequest)
FrontendOpt: req.FrontendOpt,
FrontendInputs: req.FrontendInputs,
CacheImports: cacheImports,
})
}, lbf.sid)
if err != nil {
return nil, err
}

View file

@ -32,7 +32,7 @@ type GrpcClient interface {
}
func New(ctx context.Context, opts map[string]string, session, product string, c pb.LLBBridgeClient, w []client.WorkerInfo) (GrpcClient, error) {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
ctx, cancel := context.WithTimeout(ctx, 15*time.Second)
defer cancel()
resp, err := c.Ping(ctx, &pb.PingRequest{})
if err != nil {

View file

@ -8,19 +8,28 @@ import (
"google.golang.org/grpc/codes"
)
func CredentialsFunc(ctx context.Context, c session.Caller) func(string) (string, string, error) {
func CredentialsFunc(sm *session.Manager, g session.Group) func(string) (string, string, error) {
return func(host string) (string, string, error) {
client := NewAuthClient(c.Conn())
var user, secret string
err := sm.Any(context.TODO(), g, func(ctx context.Context, _ string, c session.Caller) error {
client := NewAuthClient(c.Conn())
resp, err := client.Credentials(ctx, &CredentialsRequest{
Host: host,
resp, err := client.Credentials(ctx, &CredentialsRequest{
Host: host,
})
if err != nil {
if grpcerrors.Code(err) == codes.Unimplemented {
return nil
}
return err
}
user = resp.Username
secret = resp.Secret
return nil
})
if err != nil {
if grpcerrors.Code(err) == codes.Unimplemented {
return "", "", nil
}
return "", "", err
}
return resp.Username, resp.Secret, nil
return user, secret, nil
}
}

View file

@ -1,22 +0,0 @@
package session
import "context"
type contextKeyT string
var contextKey = contextKeyT("buildkit/session-id")
func NewContext(ctx context.Context, id string) context.Context {
if id != "" {
return context.WithValue(ctx, contextKey, id)
}
return ctx
}
func FromContext(ctx context.Context) string {
v := ctx.Value(contextKey)
if v == nil {
return ""
}
return v.(string)
}

88
vendor/github.com/moby/buildkit/session/group.go generated vendored Normal file
View file

@ -0,0 +1,88 @@
package session
import (
"context"
"time"
"github.com/pkg/errors"
)
type Group interface {
SessionIterator() Iterator
}
type Iterator interface {
NextSession() string
}
func NewGroup(ids ...string) Group {
return &group{ids: ids}
}
type group struct {
ids []string
}
func (g *group) SessionIterator() Iterator {
return &group{ids: g.ids}
}
func (g *group) NextSession() string {
if len(g.ids) == 0 {
return ""
}
v := g.ids[0]
g.ids = g.ids[1:]
return v
}
func AllSessionIDs(g Group) (out []string) {
if g == nil {
return nil
}
it := g.SessionIterator()
if it == nil {
return nil
}
for {
v := it.NextSession()
if v == "" {
return
}
out = append(out, v)
}
}
func (sm *Manager) Any(ctx context.Context, g Group, f func(context.Context, string, Caller) error) error {
if g == nil {
return nil
}
iter := g.SessionIterator()
if iter == nil {
return nil
}
var lastErr error
for {
id := iter.NextSession()
if id == "" {
if lastErr != nil {
return lastErr
}
return errors.Errorf("no active sessions")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
c, err := sm.Get(timeoutCtx, id)
if err != nil {
lastErr = err
continue
}
if err := f(ctx, id, c); err != nil {
lastErr = err
continue
}
return nil
}
}

View file

@ -1,12 +1,9 @@
package snapshot
import (
"io/ioutil"
"os"
"sync"
"github.com/containerd/containerd/mount"
"github.com/pkg/errors"
)
type Mounter interface {
@ -33,42 +30,3 @@ type localMounter struct {
target string
release func() error
}
func (lm *localMounter) Mount() (string, error) {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.mounts == nil {
mounts, release, err := lm.mountable.Mount()
if err != nil {
return "", err
}
lm.mounts = mounts
lm.release = release
}
if len(lm.mounts) == 1 && (lm.mounts[0].Type == "bind" || lm.mounts[0].Type == "rbind") {
ro := false
for _, opt := range lm.mounts[0].Options {
if opt == "ro" {
ro = true
break
}
}
if !ro {
return lm.mounts[0].Source, nil
}
}
dir, err := ioutil.TempDir("", "buildkit-mount")
if err != nil {
return "", errors.Wrap(err, "failed to create temp dir")
}
if err := mount.All(lm.mounts, dir); err != nil {
os.RemoveAll(dir)
return "", errors.Wrapf(err, "failed to mount %s: %+v", dir, lm.mounts)
}
lm.target = dir
return dir, nil
}

View file

@ -3,12 +3,53 @@
package snapshot
import (
"io/ioutil"
"os"
"syscall"
"github.com/containerd/containerd/mount"
"github.com/pkg/errors"
)
func (lm *localMounter) Mount() (string, error) {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.mounts == nil {
mounts, release, err := lm.mountable.Mount()
if err != nil {
return "", err
}
lm.mounts = mounts
lm.release = release
}
if len(lm.mounts) == 1 && (lm.mounts[0].Type == "bind" || lm.mounts[0].Type == "rbind") {
ro := false
for _, opt := range lm.mounts[0].Options {
if opt == "ro" {
ro = true
break
}
}
if !ro {
return lm.mounts[0].Source, nil
}
}
dir, err := ioutil.TempDir("", "buildkit-mount")
if err != nil {
return "", errors.Wrap(err, "failed to create temp dir")
}
if err := mount.All(lm.mounts, dir); err != nil {
os.RemoveAll(dir)
return "", errors.Wrapf(err, "failed to mount %s: %+v", dir, lm.mounts)
}
lm.target = dir
return dir, nil
}
func (lm *localMounter) Unmount() error {
lm.mu.Lock()
defer lm.mu.Unlock()

View file

@ -1,11 +1,41 @@
package snapshot
import (
"os"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/mount"
"github.com/pkg/errors"
)
func (lm *localMounter) Mount() (string, error) {
lm.mu.Lock()
defer lm.mu.Unlock()
if lm.mounts == nil {
mounts, release, err := lm.mountable.Mount()
if err != nil {
return "", err
}
lm.mounts = mounts
lm.release = release
}
// Windows can only mount a single mount at a given location.
// Parent layers are carried in Options, opaquely to localMounter.
if len(lm.mounts) != 1 {
return "", errors.Wrapf(errdefs.ErrNotImplemented, "request to mount %d layers, only 1 is supported", len(lm.mounts))
}
// Windows mounts always activate in-place, so the target of the mount must be the source directory.
// See https://github.com/containerd/containerd/pull/2366
dir := lm.mounts[0].Source
if err := lm.mounts[0].Mount(dir); err != nil {
return "", errors.Wrapf(err, "failed to mount in-place: %v", lm.mounts[0])
}
lm.target = dir
return lm.target, nil
}
func (lm *localMounter) Unmount() error {
lm.mu.Lock()
defer lm.mu.Unlock()
@ -14,7 +44,6 @@ func (lm *localMounter) Unmount() error {
if err := mount.Unmount(lm.target, 0); err != nil {
return err
}
os.RemoveAll(lm.target)
lm.target = ""
}

View file

@ -23,7 +23,7 @@ type ResolveOpFunc func(Vertex, Builder) (Op, error)
type Builder interface {
Build(ctx context.Context, e Edge) (CachedResult, error)
Context(ctx context.Context) context.Context
InContext(ctx context.Context, f func(ctx context.Context, g session.Group) error) error
EachValue(ctx context.Context, key string, fn func(interface{}) error) error
}
@ -67,32 +67,69 @@ type state struct {
solver *Solver
}
func (s *state) getSessionID() string {
// TODO: connect with sessionmanager to avoid getting dropped sessions
s.mu.Lock()
for j := range s.jobs {
if j.SessionID != "" {
s.mu.Unlock()
return j.SessionID
}
}
parents := map[digest.Digest]struct{}{}
for p := range s.parents {
parents[p] = struct{}{}
}
s.mu.Unlock()
func (s *state) SessionIterator() session.Iterator {
return s.sessionIterator()
}
for p := range parents {
s.solver.mu.Lock()
pst, ok := s.solver.actives[p]
s.solver.mu.Unlock()
if ok {
if sessionID := pst.getSessionID(); sessionID != "" {
return sessionID
func (s *state) sessionIterator() *sessionGroup {
return &sessionGroup{state: s, visited: map[string]struct{}{}}
}
type sessionGroup struct {
*state
visited map[string]struct{}
parents []session.Iterator
mode int
}
func (g *sessionGroup) NextSession() string {
if g.mode == 0 {
g.mu.Lock()
for j := range g.jobs {
if j.SessionID != "" {
if _, ok := g.visited[j.SessionID]; ok {
continue
}
g.visited[j.SessionID] = struct{}{}
g.mu.Unlock()
return j.SessionID
}
}
g.mu.Unlock()
g.mode = 1
}
if g.mode == 1 {
parents := map[digest.Digest]struct{}{}
g.mu.Lock()
for p := range g.state.parents {
parents[p] = struct{}{}
}
g.mu.Unlock()
for p := range parents {
g.solver.mu.Lock()
pst, ok := g.solver.actives[p]
g.solver.mu.Unlock()
if ok {
gg := pst.sessionIterator()
gg.visited = g.visited
g.parents = append(g.parents, gg)
}
}
g.mode = 2
}
for {
if len(g.parents) == 0 {
return ""
}
p := g.parents[0]
id := p.NextSession()
if id != "" {
return id
}
g.parents = g.parents[1:]
}
return ""
}
func (s *state) builder() *subBuilder {
@ -172,9 +209,8 @@ func (sb *subBuilder) Build(ctx context.Context, e Edge) (CachedResult, error) {
return res, nil
}
func (sb *subBuilder) Context(ctx context.Context) context.Context {
ctx = session.NewContext(ctx, sb.state.getSessionID())
return opentracing.ContextWithSpan(progress.WithProgress(ctx, sb.mpw), sb.mspan)
func (sb *subBuilder) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
return f(opentracing.ContextWithSpan(progress.WithProgress(ctx, sb.mpw), sb.mspan), sb.state)
}
func (sb *subBuilder) EachValue(ctx context.Context, key string, fn func(interface{}) error) error {
@ -480,9 +516,8 @@ func (j *Job) Discard() error {
return nil
}
func (j *Job) Context(ctx context.Context) context.Context {
ctx = session.NewContext(ctx, j.SessionID)
return progress.WithProgress(ctx, j.pw)
func (j *Job) InContext(ctx context.Context, f func(context.Context, session.Group) error) error {
return f(progress.WithProgress(ctx, j.pw), session.NewGroup(j.SessionID))
}
func (j *Job) SetValue(key string, v interface{}) {
@ -631,7 +666,6 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
return nil, s.cacheErr
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = session.NewContext(ctx, s.st.getSessionID())
if len(s.st.vtx.Inputs()) == 0 {
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, "cache request: "+s.st.vtx.Name())
@ -641,7 +675,7 @@ func (s *sharedOp) CacheMap(ctx context.Context, index int) (resp *cacheMapResp,
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
}
res, done, err := op.CacheMap(ctx, len(s.cacheRes))
res, done, err := op.CacheMap(ctx, s.st, len(s.cacheRes))
complete := true
if err != nil {
select {
@ -687,7 +721,6 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
}
ctx = opentracing.ContextWithSpan(progress.WithProgress(ctx, s.st.mpw), s.st.mspan)
ctx = session.NewContext(ctx, s.st.getSessionID())
// no cache hit. start evaluating the node
span, ctx := tracing.StartSpan(ctx, s.st.vtx.Name())
@ -697,7 +730,7 @@ func (s *sharedOp) Exec(ctx context.Context, inputs []Result) (outputs []Result,
notifyCompleted(ctx, &s.st.clientVertex, retErr, false)
}()
res, err := op.Exec(ctx, inputs)
res, err := op.Exec(ctx, s.st, inputs)
complete := true
if err != nil {
select {

View file

@ -3,7 +3,6 @@ package llbsolver
import (
"context"
"fmt"
"io"
"strings"
"sync"
"time"
@ -60,12 +59,12 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
func(cmID string, im gw.CacheOptionsEntry) {
cm = newLazyCacheManager(cmID, func() (solver.CacheManager, error) {
var cmNew solver.CacheManager
if err := inVertexContext(b.builder.Context(context.TODO()), "importing cache manifest from "+cmID, "", func(ctx context.Context) error {
if err := inBuilderContext(context.TODO(), b.builder, "importing cache manifest from "+cmID, "", func(ctx context.Context, g session.Group) error {
resolveCI, ok := b.resolveCacheImporterFuncs[im.Type]
if !ok {
return errors.Errorf("unknown cache importer: %s", im.Type)
}
ci, desc, err := resolveCI(ctx, im.Attrs)
ci, desc, err := resolveCI(ctx, g, im.Attrs)
if err != nil {
return err
}
@ -120,7 +119,7 @@ func (b *llbBridge) loadResult(ctx context.Context, def *pb.Definition, cacheImp
return res, err
}
func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *frontend.Result, err error) {
func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest, sid string) (res *frontend.Result, err error) {
if req.Definition != nil && req.Definition.Def != nil && req.Frontend != "" {
return nil, errors.New("cannot solve with both Definition and Frontend specified")
}
@ -132,7 +131,7 @@ func (b *llbBridge) Solve(ctx context.Context, req frontend.SolveRequest) (res *
if !ok {
return nil, errors.Errorf("invalid frontend: %s", req.Frontend)
}
res, err = f.Solve(ctx, b, req.FrontendOpt, req.FrontendInputs)
res, err = f.Solve(ctx, b, req.FrontendOpt, req.FrontendInputs, sid)
if err != nil {
return nil, errors.Wrapf(err, "failed to solve with frontend %s", req.Frontend)
}
@ -246,13 +245,24 @@ func (rp *resultProxy) Result(ctx context.Context) (res solver.CachedResult, err
return nil, err
}
func (s *llbBridge) Exec(ctx context.Context, meta executor.Meta, root cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) (err error) {
func (s *llbBridge) Run(ctx context.Context, id string, root cache.Mountable, mounts []executor.Mount, process executor.ProcessInfo, started chan<- struct{}) (err error) {
w, err := s.resolveWorker()
if err != nil {
return err
}
span, ctx := tracing.StartSpan(ctx, strings.Join(meta.Args, " "))
err = w.Exec(ctx, meta, root, stdin, stdout, stderr)
span, ctx := tracing.StartSpan(ctx, strings.Join(process.Meta.Args, " "))
err = w.Executor().Run(ctx, id, root, mounts, process, started)
tracing.FinishWithError(span, err)
return err
}
func (s *llbBridge) Exec(ctx context.Context, id string, process executor.ProcessInfo) (err error) {
w, err := s.resolveWorker()
if err != nil {
return err
}
span, ctx := tracing.StartSpan(ctx, strings.Join(process.Meta.Args, " "))
err = w.Executor().Exec(ctx, id, process)
tracing.FinishWithError(span, err)
return err
}
@ -271,8 +281,8 @@ func (s *llbBridge) ResolveImageConfig(ctx context.Context, ref string, opt llb.
} else {
id += platforms.Format(*platform)
}
err = inVertexContext(s.builder.Context(ctx), opt.LogName, id, func(ctx context.Context) error {
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt, s.sm)
err = inBuilderContext(ctx, s.builder, opt.LogName, id, func(ctx context.Context, g session.Group) error {
dgst, config, err = w.ResolveImageConfig(ctx, ref, opt, s.sm, g)
return err
})
return dgst, config, err

View file

@ -8,6 +8,7 @@ import (
"github.com/containerd/continuity/fs"
"github.com/moby/buildkit/client/llb"
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
@ -36,7 +37,7 @@ func NewBuildOp(v solver.Vertex, op *pb.Op_Build, b frontend.FrontendLLBBridge,
}, nil
}
func (b *buildOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (b *buildOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
dt, err := json.Marshal(struct {
Type string
Exec *pb.BuildOp
@ -57,7 +58,7 @@ func (b *buildOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bo
}, true, nil
}
func (b *buildOp) Exec(ctx context.Context, inputs []solver.Result) (outputs []solver.Result, retErr error) {
func (b *buildOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) (outputs []solver.Result, retErr error) {
if b.op.Builder != pb.LLBBuilder {
return nil, errors.Errorf("only LLB builder is currently allowed")
}
@ -123,7 +124,7 @@ func (b *buildOp) Exec(ctx context.Context, inputs []solver.Result) (outputs []s
newRes, err := b.b.Solve(ctx, frontend.SolveRequest{
Definition: def.ToPB(),
})
}, g.SessionIterator().NextSession())
if err != nil {
return nil, err
}

View file

@ -17,6 +17,7 @@ import (
"github.com/containerd/containerd/mount"
"github.com/containerd/containerd/platforms"
"github.com/containerd/containerd/sys"
"github.com/docker/docker/pkg/idtools"
"github.com/docker/docker/pkg/locker"
"github.com/moby/buildkit/cache"
@ -37,7 +38,6 @@ import (
"github.com/moby/buildkit/worker"
digest "github.com/opencontainers/go-digest"
specs "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
bolt "go.etcd.io/bbolt"
@ -94,7 +94,7 @@ func cloneExecOp(old *pb.ExecOp) pb.ExecOp {
return n
}
func (e *execOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (e *execOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
op := cloneExecOp(e.op)
for i := range op.Meta.ExtraHosts {
h := op.Meta.ExtraHosts[i]
@ -331,30 +331,26 @@ func (g *cacheRefGetter) getRefCacheDirNoCache(ctx context.Context, key string,
return mRef, nil
}
func (e *execOp) getSSHMountable(ctx context.Context, m *pb.Mount) (cache.Mountable, error) {
sessionID := session.FromContext(ctx)
if sessionID == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.sm.Get(timeoutCtx, sessionID)
func (e *execOp) getSSHMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
var caller session.Caller
err := e.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
if err := sshforward.CheckSSHID(ctx, c, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return err
}
caller = c
return nil
})
if err != nil {
return nil, err
}
if err := sshforward.CheckSSHID(ctx, caller, m.SSHOpt.ID); err != nil {
if m.SSHOpt.Optional {
return nil, nil
}
if grpcerrors.Code(err) == codes.Unimplemented {
return nil, errors.Errorf("no SSH key %q forwarded from the client", m.SSHOpt.ID)
}
return nil, err
}
// because ssh socket remains active, to actually handle session disconnecting ssh error
// should restart the whole exec with new session
return &sshMount{mount: m, caller: caller, idmap: e.cm.IdentityMapping()}, nil
}
@ -385,6 +381,7 @@ func (sm *sshMountInstance) Mount() ([]mount.Mount, func() error, error) {
GID: gid,
})
if err != nil {
cancel()
return nil, nil, err
}
uid = identity.UID
@ -421,7 +418,7 @@ func (sm *sshMountInstance) IdentityMapping() *idtools.IdentityMapping {
return sm.idmap
}
func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount) (cache.Mountable, error) {
func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount, g session.Group) (cache.Mountable, error) {
if m.SecretOpt == nil {
return nil, errors.Errorf("invalid sercet mount options")
}
@ -431,28 +428,21 @@ func (e *execOp) getSecretMountable(ctx context.Context, m *pb.Mount) (cache.Mou
if id == "" {
return nil, errors.Errorf("secret ID missing from mount options")
}
sessionID := session.FromContext(ctx)
if sessionID == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := e.sm.Get(timeoutCtx, sessionID)
if err != nil {
return nil, err
}
dt, err := secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil, nil
var dt []byte
var err error
err = e.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
dt, err = secrets.GetSecret(ctx, caller, id)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) && m.SecretOpt.Optional {
return nil
}
return err
}
return nil
})
if err != nil || dt == nil {
return nil, err
}
return &secretMount{mount: m, data: dt, idmap: e.cm.IdentityMapping()}, nil
}
@ -492,7 +482,7 @@ func (sm *secretMountInstance) Mount() ([]mount.Mount, func() error, error) {
Options: []string{"nodev", "nosuid", "noexec", fmt.Sprintf("uid=%d,gid=%d", os.Geteuid(), os.Getegid())},
}
if system.RunningInUserNS() {
if sys.RunningInUserNS() {
tmpMount.Options = nil
}
@ -562,7 +552,7 @@ func addDefaultEnvvar(env []string, k, v string) []string {
return append(env, k+"="+v)
}
func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Result, error) {
func (e *execOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) ([]solver.Result, error) {
var mounts []executor.Mount
var root cache.Mountable
var readonlyRootFS bool
@ -651,7 +641,7 @@ func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Res
mountable = newTmpfs(e.cm.IdentityMapping())
case pb.MountType_SECRET:
secretMount, err := e.getSecretMountable(ctx, m)
secretMount, err := e.getSecretMountable(ctx, m, g)
if err != nil {
return nil, err
}
@ -661,7 +651,7 @@ func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Res
mountable = secretMount
case pb.MountType_SSH:
sshMount, err := e.getSSHMountable(ctx, m)
sshMount, err := e.getSSHMountable(ctx, m, g)
if err != nil {
return nil, err
}
@ -742,7 +732,7 @@ func (e *execOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Res
defer stdout.Close()
defer stderr.Close()
if err := e.exec.Exec(ctx, meta, root, mounts, nil, stdout, stderr); err != nil {
if err := e.exec.Run(ctx, "", root, mounts, executor.ProcessInfo{Meta: meta, Stdin: nil, Stdout: stdout, Stderr: stderr}, nil); err != nil {
return nil, errors.Wrapf(err, "executor failed running %v", meta.Args)
}

View file

@ -12,6 +12,7 @@ import (
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/cache/metadata"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver"
"github.com/moby/buildkit/solver/llbsolver"
"github.com/moby/buildkit/solver/llbsolver/file"
@ -47,7 +48,7 @@ func NewFileOp(v solver.Vertex, op *pb.Op_File, cm cache.Manager, md *metadata.S
}, nil
}
func (f *fileOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (f *fileOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
selectors := map[int]map[llbsolver.Selector]struct{}{}
invalidSelectors := map[int]struct{}{}
@ -141,7 +142,7 @@ func (f *fileOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, boo
return cm, true, nil
}
func (f *fileOp) Exec(ctx context.Context, inputs []solver.Result) ([]solver.Result, error) {
func (f *fileOp) Exec(ctx context.Context, g session.Group, inputs []solver.Result) ([]solver.Result, error) {
inpRefs := make([]fileoptypes.Ref, 0, len(inputs))
for _, inp := range inputs {
workerRef, ok := inp.Sys().(*worker.WorkerRef)

View file

@ -57,12 +57,12 @@ func (s *sourceOp) instance(ctx context.Context) (source.SourceInstance, error)
return s.src, nil
}
func (s *sourceOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, bool, error) {
func (s *sourceOp) CacheMap(ctx context.Context, g session.Group, index int) (*solver.CacheMap, bool, error) {
src, err := s.instance(ctx)
if err != nil {
return nil, false, err
}
k, done, err := src.CacheKey(ctx, index)
k, done, err := src.CacheKey(ctx, g, index)
if err != nil {
return nil, false, err
}
@ -79,12 +79,12 @@ func (s *sourceOp) CacheMap(ctx context.Context, index int) (*solver.CacheMap, b
}, done, nil
}
func (s *sourceOp) Exec(ctx context.Context, _ []solver.Result) (outputs []solver.Result, err error) {
func (s *sourceOp) Exec(ctx context.Context, g session.Group, _ []solver.Result) (outputs []solver.Result, err error) {
src, err := s.instance(ctx)
if err != nil {
return nil, err
}
ref, err := src.Snapshot(ctx)
ref, err := src.Snapshot(ctx, g)
if err != nil {
return nil, err
}

View file

@ -88,7 +88,7 @@ func (s *Solver) Bridge(b solver.Builder) frontend.FrontendLLBBridge {
}
}
func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest, exp ExporterRequest, ent []entitlements.Entitlement) (*client.SolveResponse, error) {
func (s *Solver) Solve(ctx context.Context, id string, sessionID string, req frontend.SolveRequest, exp ExporterRequest, ent []entitlements.Entitlement) (*client.SolveResponse, error) {
j, err := s.solver.NewJob(id)
if err != nil {
return nil, err
@ -102,11 +102,11 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
}
j.SetValue(keyEntitlements, set)
j.SessionID = session.FromContext(ctx)
j.SessionID = sessionID
var res *frontend.Result
if s.gatewayForwarder != nil && req.Definition == nil && req.Frontend == "" {
fwd := gateway.NewBridgeForwarder(ctx, s.Bridge(j), s.workerController, req.FrontendInputs)
fwd := gateway.NewBridgeForwarder(ctx, s.Bridge(j), s.workerController, req.FrontendInputs, sessionID)
defer fwd.Discard()
if err := s.gatewayForwarder.RegisterBuild(ctx, id, fwd); err != nil {
return nil, err
@ -124,7 +124,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
return nil, err
}
} else {
res, err = s.Bridge(j).Solve(ctx, req)
res, err = s.Bridge(j).Solve(ctx, req, sessionID)
if err != nil {
return nil, err
}
@ -204,8 +204,8 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
inp.Refs = m
}
if err := inVertexContext(j.Context(ctx), e.Name(), "", func(ctx context.Context) error {
exporterResponse, err = e.Export(ctx, inp)
if err := inBuilderContext(ctx, j, e.Name(), "", func(ctx context.Context, _ session.Group) error {
exporterResponse, err = e.Export(ctx, inp, j.SessionID)
return err
}); err != nil {
return nil, err
@ -214,7 +214,7 @@ func (s *Solver) Solve(ctx context.Context, id string, req frontend.SolveRequest
var cacheExporterResponse map[string]string
if e := exp.CacheExporter; e != nil {
if err := inVertexContext(j.Context(ctx), "exporting cache", "", func(ctx context.Context) error {
if err := inBuilderContext(ctx, j, "exporting cache", "", func(ctx context.Context, _ session.Group) error {
prepareDone := oneOffProgress(ctx, "preparing build cache for export")
if err := res.EachRef(func(res solver.ResultProxy) error {
r, err := res.Result(ctx)
@ -335,7 +335,7 @@ func oneOffProgress(ctx context.Context, id string) func(err error) error {
}
}
func inVertexContext(ctx context.Context, name, id string, f func(ctx context.Context) error) error {
func inBuilderContext(ctx context.Context, b solver.Builder, name, id string, f func(ctx context.Context, g session.Group) error) error {
if id == "" {
id = name
}
@ -343,12 +343,14 @@ func inVertexContext(ctx context.Context, name, id string, f func(ctx context.Co
Digest: digest.FromBytes([]byte(id)),
Name: name,
}
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false)
defer pw.Close()
err := f(ctx)
notifyCompleted(ctx, &v, err, false)
return err
return b.InContext(ctx, func(ctx context.Context, g session.Group) error {
pw, _, ctx := progress.FromContext(ctx, progress.WithMetadata("vertex", v.Digest))
notifyStarted(ctx, &v, false)
defer pw.Close()
err := f(ctx, g)
notifyCompleted(ctx, &v, err, false)
return err
})
}
func notifyStarted(ctx context.Context, v *client.Vertex, cached bool) {

View file

@ -2,6 +2,8 @@ package pb
const AttrKeepGitDir = "git.keepgitdir"
const AttrFullRemoteURL = "git.fullurl"
const AttrAuthHeaderSecret = "git.authheadersecret"
const AttrAuthTokenSecret = "git.authtokensecret"
const AttrLocalSessionID = "local.session"
const AttrLocalUniqueID = "local.unique"
const AttrIncludePatterns = "local.includepattern"

View file

@ -19,9 +19,10 @@ const (
CapSourceLocalExcludePatterns apicaps.CapID = "source.local.excludepatterns"
CapSourceLocalSharedKeyHint apicaps.CapID = "source.local.sharedkeyhint"
CapSourceGit apicaps.CapID = "source.git"
CapSourceGitKeepDir apicaps.CapID = "source.git.keepgitdir"
CapSourceGitFullURL apicaps.CapID = "source.git.fullurl"
CapSourceGit apicaps.CapID = "source.git"
CapSourceGitKeepDir apicaps.CapID = "source.git.keepgitdir"
CapSourceGitFullURL apicaps.CapID = "source.git.fullurl"
CapSourceGitHttpAuth apicaps.CapID = "source.git.httpauth"
CapSourceHTTP apicaps.CapID = "source.http"
CapSourceHTTPChecksum apicaps.CapID = "source.http.checksum"
@ -131,6 +132,12 @@ func init() {
Status: apicaps.CapStatusExperimental,
})
Caps.Init(apicaps.Cap{
ID: CapSourceGitHttpAuth,
Enabled: true,
Status: apicaps.CapStatusExperimental,
})
Caps.Init(apicaps.Cap{
ID: CapSourceHTTP,
Enabled: true,

View file

@ -12,7 +12,7 @@ import (
)
func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}}
vs := &vertexStream{cache: map[digest.Digest]*client.Vertex{}, wasCached: make(map[digest.Digest]struct{})}
pr := j.pr.Reader(ctx)
defer func() {
if enc := vs.encore(); len(enc) > 0 {
@ -72,7 +72,8 @@ func (j *Job) Status(ctx context.Context, ch chan *client.SolveStatus) error {
}
type vertexStream struct {
cache map[digest.Digest]*client.Vertex
cache map[digest.Digest]*client.Vertex
wasCached map[digest.Digest]struct{}
}
func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
@ -91,17 +92,34 @@ func (vs *vertexStream) append(v client.Vertex) []*client.Vertex {
}
}
}
if v.Cached {
vs.markCached(v.Digest)
}
vcopy := v
return append(out, &vcopy)
}
func (vs *vertexStream) markCached(dgst digest.Digest) {
if v, ok := vs.cache[dgst]; ok {
if _, ok := vs.wasCached[dgst]; !ok {
for _, inp := range v.Inputs {
vs.markCached(inp)
}
}
vs.wasCached[dgst] = struct{}{}
}
}
func (vs *vertexStream) encore() []*client.Vertex {
var out []*client.Vertex
for _, v := range vs.cache {
if v.Started != nil && v.Completed == nil {
now := time.Now()
v.Completed = &now
v.Error = context.Canceled.Error()
if _, ok := vs.wasCached[v.Digest]; !ok && v.Error == "" {
v.Error = context.Canceled.Error()
}
out = append(out, v)
}
}

View file

@ -5,6 +5,7 @@ import (
"time"
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/solver/pb"
digest "github.com/opencontainers/go-digest"
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
@ -138,10 +139,10 @@ type CacheLink struct {
type Op interface {
// CacheMap returns structure describing how the operation is cached.
// Currently only roots are allowed to return multiple cache maps per op.
CacheMap(context.Context, int) (*CacheMap, bool, error)
CacheMap(context.Context, session.Group, int) (*CacheMap, bool, error)
// Exec runs an operation given results from previous operations.
Exec(ctx context.Context, inputs []Result) (outputs []Result, err error)
Exec(ctx context.Context, g session.Group, inputs []Result) (outputs []Result, err error)
}
type ResultBasedCacheFunc func(context.Context, Result) (digest.Digest, error)

View file

@ -3,8 +3,10 @@ package git
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"net/url"
"os"
"os/exec"
"path/filepath"
@ -17,6 +19,7 @@ import (
"github.com/moby/buildkit/client"
"github.com/moby/buildkit/identity"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/secrets"
"github.com/moby/buildkit/snapshot"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/util/progress/logs"
@ -60,7 +63,7 @@ func (gs *gitSource) ID() string {
}
// needs to be called with repo lock
func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target string, release func(), retErr error) {
func (gs *gitSource) mountRemote(ctx context.Context, remote string, auth []string) (target string, release func(), retErr error) {
remoteKey := "git-remote::" + remote
sis, err := gs.md.Search(remoteKey)
@ -119,11 +122,11 @@ func (gs *gitSource) mountRemote(ctx context.Context, remote string) (target str
}()
if initializeRepo {
if _, err := gitWithinDir(ctx, dir, "", "init", "--bare"); err != nil {
if _, err := gitWithinDir(ctx, dir, "", auth, "init", "--bare"); err != nil {
return "", nil, errors.Wrapf(err, "failed to init repo at %s", dir)
}
if _, err := gitWithinDir(ctx, dir, "", "remote", "add", "origin", remote); err != nil {
if _, err := gitWithinDir(ctx, dir, "", auth, "remote", "add", "origin", remote); err != nil {
return "", nil, errors.Wrapf(err, "failed add origin repo at %s", dir)
}
@ -151,6 +154,8 @@ type gitSourceHandler struct {
*gitSource
src source.GitIdentifier
cacheKey string
sm *session.Manager
auth []string
}
func (gs *gitSourceHandler) shaToCacheKey(sha string) string {
@ -161,7 +166,7 @@ func (gs *gitSourceHandler) shaToCacheKey(sha string) string {
return key
}
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, _ *session.Manager) (source.SourceInstance, error) {
func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
gitIdentifier, ok := id.(*source.GitIdentifier)
if !ok {
return nil, errors.Errorf("invalid git identifier %v", id)
@ -170,10 +175,63 @@ func (gs *gitSource) Resolve(ctx context.Context, id source.Identifier, _ *sessi
return &gitSourceHandler{
src: *gitIdentifier,
gitSource: gs,
sm: sm,
}, nil
}
func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
type authSecret struct {
token bool
name string
}
func (gs *gitSourceHandler) authSecretNames() (sec []authSecret, _ error) {
u, err := url.Parse(gs.src.Remote)
if err != nil {
return nil, err
}
if gs.src.AuthHeaderSecret != "" {
sec = append(sec, authSecret{name: gs.src.AuthHeaderSecret + "." + u.Host})
}
if gs.src.AuthTokenSecret != "" {
sec = append(sec, authSecret{name: gs.src.AuthTokenSecret + "." + u.Host, token: true})
}
if gs.src.AuthHeaderSecret != "" {
sec = append(sec, authSecret{name: gs.src.AuthHeaderSecret})
}
if gs.src.AuthTokenSecret != "" {
sec = append(sec, authSecret{name: gs.src.AuthTokenSecret, token: true})
}
return sec, nil
}
func (gs *gitSourceHandler) getAuthToken(ctx context.Context, g session.Group) error {
if gs.auth != nil {
return nil
}
sec, err := gs.authSecretNames()
if err != nil {
return err
}
return gs.sm.Any(ctx, g, func(ctx context.Context, _ string, caller session.Caller) error {
for _, s := range sec {
dt, err := secrets.GetSecret(ctx, caller, s.name)
if err != nil {
if errors.Is(err, secrets.ErrNotFound) {
continue
}
return err
}
if s.token {
dt = []byte("basic " + base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("x-access-token:%s", dt))))
}
gs.auth = []string{"-c", "http.extraheader=Authorization: " + string(dt)}
break
}
return nil
})
}
func (gs *gitSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
remote := gs.src.Remote
ref := gs.src.Ref
if ref == "" {
@ -188,7 +246,9 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bo
return ref, true, nil
}
gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote)
gs.getAuthToken(ctx, g)
gitDir, unmountGitDir, err := gs.mountRemote(ctx, remote, gs.auth)
if err != nil {
return "", false, err
}
@ -196,14 +256,14 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bo
// TODO: should we assume that remote tag is immutable? add a timer?
buf, err := gitWithinDir(ctx, gitDir, "", "ls-remote", "origin", ref)
buf, err := gitWithinDir(ctx, gitDir, "", gs.auth, "ls-remote", "origin", ref)
if err != nil {
return "", false, errors.Wrapf(err, "failed to fetch remote %s", remote)
}
out := buf.String()
idx := strings.Index(out, "\t")
if idx == -1 {
return "", false, errors.Errorf("failed to find commit SHA from output: %s", string(out))
return "", false, errors.Errorf("repository does not contain ref %s, output: %q", ref, string(out))
}
sha := string(out[:idx])
@ -215,7 +275,7 @@ func (gs *gitSourceHandler) CacheKey(ctx context.Context, index int) (string, bo
return sha, true, nil
}
func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {
func (gs *gitSourceHandler) Snapshot(ctx context.Context, g session.Group) (out cache.ImmutableRef, retErr error) {
ref := gs.src.Ref
if ref == "" {
ref = "master"
@ -224,12 +284,14 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
cacheKey := gs.cacheKey
if cacheKey == "" {
var err error
cacheKey, _, err = gs.CacheKey(ctx, 0)
cacheKey, _, err = gs.CacheKey(ctx, g, 0)
if err != nil {
return nil, err
}
}
gs.getAuthToken(ctx, g)
snapshotKey := "git-snapshot::" + cacheKey + ":" + gs.src.Subdir
gs.locker.Lock(snapshotKey)
defer gs.locker.Unlock(snapshotKey)
@ -244,7 +306,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
gs.locker.Lock(gs.src.Remote)
defer gs.locker.Unlock(gs.src.Remote)
gitDir, unmountGitDir, err := gs.mountRemote(ctx, gs.src.Remote)
gitDir, unmountGitDir, err := gs.mountRemote(ctx, gs.src.Remote, gs.auth)
if err != nil {
return nil, err
}
@ -253,7 +315,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
doFetch := true
if isCommitSHA(ref) {
// skip fetch if commit already exists
if _, err := gitWithinDir(ctx, gitDir, "", "cat-file", "-e", ref+"^{commit}"); err == nil {
if _, err := gitWithinDir(ctx, gitDir, "", nil, "cat-file", "-e", ref+"^{commit}"); err == nil {
doFetch = false
}
}
@ -277,7 +339,7 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
// in case the ref is a branch and it now points to a different commit sha
// TODO: is there a better way to do this?
}
if _, err := gitWithinDir(ctx, gitDir, "", args...); err != nil {
if _, err := gitWithinDir(ctx, gitDir, "", gs.auth, args...); err != nil {
return nil, errors.Wrapf(err, "failed to fetch remote %s", gs.src.Remote)
}
}
@ -313,41 +375,41 @@ func (gs *gitSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRe
if err := os.MkdirAll(checkoutDir, 0711); err != nil {
return nil, err
}
_, err = gitWithinDir(ctx, checkoutDirGit, "", "init")
_, err = gitWithinDir(ctx, checkoutDirGit, "", nil, "init")
if err != nil {
return nil, err
}
_, err = gitWithinDir(ctx, checkoutDirGit, "", "remote", "add", "origin", gitDir)
_, err = gitWithinDir(ctx, checkoutDirGit, "", nil, "remote", "add", "origin", gitDir)
if err != nil {
return nil, err
}
pullref := ref
if isCommitSHA(ref) {
pullref = "refs/buildkit/" + identity.NewID()
_, err = gitWithinDir(ctx, gitDir, "", "update-ref", pullref, ref)
_, err = gitWithinDir(ctx, gitDir, "", gs.auth, "update-ref", pullref, ref)
if err != nil {
return nil, err
}
} else {
pullref += ":" + pullref
}
_, err = gitWithinDir(ctx, checkoutDirGit, "", "fetch", "-u", "--depth=1", "origin", pullref)
_, err = gitWithinDir(ctx, checkoutDirGit, "", gs.auth, "fetch", "-u", "--depth=1", "origin", pullref)
if err != nil {
return nil, err
}
_, err = gitWithinDir(ctx, checkoutDirGit, checkoutDir, "checkout", "FETCH_HEAD")
_, err = gitWithinDir(ctx, checkoutDirGit, checkoutDir, nil, "checkout", "FETCH_HEAD")
if err != nil {
return nil, errors.Wrapf(err, "failed to checkout remote %s", gs.src.Remote)
}
gitDir = checkoutDirGit
} else {
_, err = gitWithinDir(ctx, gitDir, checkoutDir, "checkout", ref, "--", ".")
_, err = gitWithinDir(ctx, gitDir, checkoutDir, nil, "checkout", ref, "--", ".")
if err != nil {
return nil, errors.Wrapf(err, "failed to checkout remote %s", gs.src.Remote)
}
}
_, err = gitWithinDir(ctx, gitDir, checkoutDir, "submodule", "update", "--init", "--recursive", "--depth=1")
_, err = gitWithinDir(ctx, gitDir, checkoutDir, gs.auth, "submodule", "update", "--init", "--recursive", "--depth=1")
if err != nil {
return nil, errors.Wrapf(err, "failed to update submodules for %s", gs.src.Remote)
}
@ -396,8 +458,8 @@ func isCommitSHA(str string) bool {
return validHex.MatchString(str)
}
func gitWithinDir(ctx context.Context, gitDir, workDir string, args ...string) (*bytes.Buffer, error) {
a := []string{"--git-dir", gitDir}
func gitWithinDir(ctx context.Context, gitDir, workDir string, auth []string, args ...string) (*bytes.Buffer, error) {
a := append([]string{"--git-dir", gitDir}, auth...)
if workDir != "" {
a = append(a, "--work-tree", workDir)
}
@ -413,8 +475,14 @@ func git(ctx context.Context, dir string, args ...string) (*bytes.Buffer, error)
cmd.Dir = dir // some commands like submodule require this
buf := bytes.NewBuffer(nil)
errbuf := bytes.NewBuffer(nil)
cmd.Stdin = nil
cmd.Stdout = io.MultiWriter(stdout, buf)
cmd.Stderr = io.MultiWriter(stderr, errbuf)
cmd.Env = []string{
"PATH=" + os.Getenv("PATH"),
"GIT_TERMINAL_PROMPT=0",
// "GIT_TRACE=1",
}
// remote git commands spawn helper processes that inherit FDs and don't
// handle parent death signal so exec.CommandContext can't be used
err := runProcessGroup(ctx, cmd)

View file

@ -8,10 +8,12 @@ import (
)
type GitIdentifier struct {
Remote string
Ref string
Subdir string
KeepGitDir bool
Remote string
Ref string
Subdir string
KeepGitDir bool
AuthTokenSecret string
AuthHeaderSecret string
}
func NewGitIdentifier(remoteURL string) (*GitIdentifier, error) {

View file

@ -64,7 +64,7 @@ type httpSourceHandler struct {
src source.HttpIdentifier
refID string
cacheKey digest.Digest
client *http.Client
sm *session.Manager
}
func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *session.Manager) (source.SourceInstance, error) {
@ -73,15 +73,17 @@ func (hs *httpSource) Resolve(ctx context.Context, id source.Identifier, sm *ses
return nil, errors.Errorf("invalid http identifier %v", id)
}
sessionID := session.FromContext(ctx)
return &httpSourceHandler{
src: *httpIdentifier,
httpSource: hs,
client: &http.Client{Transport: newTransport(hs.transport, sm, sessionID)},
sm: sm,
}, nil
}
func (hs *httpSourceHandler) client(g session.Group) *http.Client {
return &http.Client{Transport: newTransport(hs.transport, hs.sm, g)}
}
// urlHash is internal hash the etag is stored by that doesn't leak outside
// this package.
func (hs *httpSourceHandler) urlHash() (digest.Digest, error) {
@ -120,7 +122,7 @@ func (hs *httpSourceHandler) formatCacheKey(filename string, dgst digest.Digest,
return digest.FromBytes(dt)
}
func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (hs *httpSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
if hs.src.Checksum != "" {
hs.cacheKey = hs.src.Checksum
return hs.formatCacheKey(getFileName(hs.src.URL, hs.src.Filename, nil), hs.src.Checksum, "").String(), true, nil
@ -172,12 +174,14 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, b
}
}
client := hs.client(g)
// Some servers seem to have trouble supporting If-None-Match properly even
// though they return ETag-s. So first, optionally try a HEAD request with
// manual ETag value comparison.
if len(m) > 0 {
req.Method = "HEAD"
resp, err := hs.client.Do(req)
resp, err := client.Do(req)
if err == nil {
if resp.StatusCode == http.StatusOK || resp.StatusCode == http.StatusNotModified {
respETag := resp.Header.Get("ETag")
@ -203,7 +207,7 @@ func (hs *httpSourceHandler) CacheKey(ctx context.Context, index int) (string, b
req.Method = "GET"
}
resp, err := hs.client.Do(req)
resp, err := client.Do(req)
if err != nil {
return "", false, err
}
@ -366,7 +370,7 @@ func (hs *httpSourceHandler) save(ctx context.Context, resp *http.Response) (ref
return ref, dgst, nil
}
func (hs *httpSourceHandler) Snapshot(ctx context.Context) (cache.ImmutableRef, error) {
func (hs *httpSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
if hs.refID != "" {
ref, err := hs.cache.Get(ctx, hs.refID)
if err == nil {
@ -380,7 +384,9 @@ func (hs *httpSourceHandler) Snapshot(ctx context.Context) (cache.ImmutableRef,
}
req = req.WithContext(ctx)
resp, err := hs.client.Do(req)
client := hs.client(g)
resp, err := client.Do(req)
if err != nil {
return nil, err
}

View file

@ -4,21 +4,20 @@ import (
"context"
"io"
"net/http"
"time"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/session/upload"
"github.com/pkg/errors"
)
func newTransport(rt http.RoundTripper, sm *session.Manager, id string) http.RoundTripper {
return &sessionHandler{rt: rt, sm: sm, id: id}
func newTransport(rt http.RoundTripper, sm *session.Manager, g session.Group) http.RoundTripper {
return &sessionHandler{rt: rt, sm: sm, g: g}
}
type sessionHandler struct {
sm *session.Manager
rt http.RoundTripper
id string
g session.Group
}
func (h *sessionHandler) RoundTrip(req *http.Request) (*http.Response, error) {
@ -30,31 +29,30 @@ func (h *sessionHandler) RoundTrip(req *http.Request) (*http.Response, error) {
return nil, errors.Errorf("invalid request")
}
timeoutCtx, cancel := context.WithTimeout(context.TODO(), 5*time.Second)
defer cancel()
var resp *http.Response
err := h.sm.Any(context.TODO(), h.g, func(ctx context.Context, _ string, caller session.Caller) error {
up, err := upload.New(context.TODO(), caller, req.URL)
if err != nil {
return err
}
caller, err := h.sm.Get(timeoutCtx, h.id)
pr, pw := io.Pipe()
go func() {
_, err := up.WriteTo(pw)
pw.CloseWithError(err)
}()
resp = &http.Response{
Status: "200 OK",
StatusCode: 200,
Body: pr,
ContentLength: -1,
}
return nil
})
if err != nil {
return nil, err
}
up, err := upload.New(context.TODO(), caller, req.URL)
if err != nil {
return nil, err
}
pr, pw := io.Pipe()
go func() {
_, err := up.WriteTo(pw)
pw.CloseWithError(err)
}()
resp := &http.Response{
Status: "200 OK",
StatusCode: 200,
Body: pr,
ContentLength: -1,
}
return resp, nil
}

View file

@ -103,6 +103,10 @@ func FromLLB(op *pb.Op_Source, platform *pb.Platform) (Identifier, error) {
}
case pb.AttrFullRemoteURL:
id.Remote = v
case pb.AttrAuthHeaderSecret:
id.AuthHeaderSecret = v
case pb.AttrAuthTokenSecret:
id.AuthTokenSecret = v
}
}
}

View file

@ -70,11 +70,11 @@ type localSourceHandler struct {
*localSource
}
func (ls *localSourceHandler) CacheKey(ctx context.Context, index int) (string, bool, error) {
func (ls *localSourceHandler) CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error) {
sessionID := ls.src.SessionID
if sessionID == "" {
id := session.FromContext(ctx)
id := g.SessionIterator().NextSession()
if id == "" {
return "", false, errors.New("could not access local files without session")
}
@ -92,21 +92,23 @@ func (ls *localSourceHandler) CacheKey(ctx context.Context, index int) (string,
return "session:" + ls.src.Name + ":" + digest.FromBytes(dt).String(), true, nil
}
func (ls *localSourceHandler) Snapshot(ctx context.Context) (out cache.ImmutableRef, retErr error) {
id := session.FromContext(ctx)
if id == "" {
return nil, errors.New("could not access local files without session")
}
timeoutCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
caller, err := ls.sm.Get(timeoutCtx, id)
func (ls *localSourceHandler) Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error) {
var ref cache.ImmutableRef
err := ls.sm.Any(ctx, g, func(ctx context.Context, _ string, c session.Caller) error {
r, err := ls.snapshot(ctx, c)
if err != nil {
return err
}
ref = r
return nil
})
if err != nil {
return nil, err
}
return ref, nil
}
func (ls *localSourceHandler) snapshot(ctx context.Context, caller session.Caller) (out cache.ImmutableRef, retErr error) {
sharedKey := keySharedKey + ":" + ls.src.Name + ":" + ls.src.SharedKeyHint + ":" + caller.SharedKey() // TODO: replace caller.SharedKey() with source based hint from client(absolute-path+nodeid)
var mutable cache.MutableRef

View file

@ -15,8 +15,8 @@ type Source interface {
}
type SourceInstance interface {
CacheKey(ctx context.Context, index int) (string, bool, error)
Snapshot(ctx context.Context) (cache.ImmutableRef, error)
CacheKey(ctx context.Context, g session.Group, index int) (string, bool, error)
Snapshot(ctx context.Context, g session.Group) (cache.ImmutableRef, error)
}
type Manager struct {

View file

@ -2,21 +2,34 @@ package contentutil
import (
"context"
"runtime"
"sync"
"time"
"github.com/containerd/containerd/content"
"github.com/containerd/containerd/errdefs"
"github.com/containerd/containerd/remotes"
digest "github.com/opencontainers/go-digest"
"github.com/pkg/errors"
)
func FromPusher(p remotes.Pusher) content.Ingester {
var mu sync.Mutex
c := sync.NewCond(&mu)
return &pushingIngester{
p: p,
mu: &mu,
c: c,
p: p,
active: map[digest.Digest]struct{}{},
}
}
type pushingIngester struct {
p remotes.Pusher
mu *sync.Mutex
c *sync.Cond
active map[digest.Digest]struct{}
}
// Writer implements content.Ingester. desc.MediaType must be set for manifest blobs.
@ -30,20 +43,55 @@ func (i *pushingIngester) Writer(ctx context.Context, opts ...content.WriterOpt)
if wOpts.Ref == "" {
return nil, errors.Wrap(errdefs.ErrInvalidArgument, "ref must not be empty")
}
st := time.Now()
i.mu.Lock()
for {
if time.Since(st) > time.Hour {
i.mu.Unlock()
return nil, errors.Wrapf(errdefs.ErrUnavailable, "ref %v locked", wOpts.Desc.Digest)
}
if _, ok := i.active[wOpts.Desc.Digest]; ok {
i.c.Wait()
} else {
break
}
}
i.active[wOpts.Desc.Digest] = struct{}{}
i.mu.Unlock()
var once sync.Once
release := func() {
once.Do(func() {
i.mu.Lock()
delete(i.active, wOpts.Desc.Digest)
i.c.Broadcast()
i.mu.Unlock()
})
}
// pusher requires desc.MediaType to determine the PUT URL, especially for manifest blobs.
contentWriter, err := i.p.Push(ctx, wOpts.Desc)
if err != nil {
release()
return nil, err
}
runtime.SetFinalizer(contentWriter, func(_ content.Writer) {
release()
})
return &writer{
Writer: contentWriter,
contentWriterRef: wOpts.Ref,
release: release,
}, nil
}
type writer struct {
content.Writer // returned from pusher.Push
contentWriterRef string // ref passed for Writer()
release func()
}
func (w *writer) Status() (content.Status, error) {
@ -56,3 +104,19 @@ func (w *writer) Status() (content.Status, error) {
}
return st, nil
}
func (w *writer) Commit(ctx context.Context, size int64, expected digest.Digest, opts ...content.Opt) error {
err := w.Writer.Commit(ctx, size, expected, opts...)
if w.release != nil {
w.release()
}
return err
}
func (w *writer) Close() error {
err := w.Writer.Close()
if w.release != nil {
w.release()
}
return err
}

View file

@ -7,7 +7,7 @@ import (
"github.com/containerd/containerd/containers"
"github.com/containerd/containerd/oci"
"github.com/opencontainers/runc/libcontainer/system"
"github.com/containerd/containerd/sys"
specs "github.com/opencontainers/runtime-spec/specs-go"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
@ -81,7 +81,7 @@ func WithInsecureSpec() oci.SpecOpts {
},
}
if !system.RunningInUserNS() {
if !sys.RunningInUserNS() {
// Devices automatically mounted on insecure mode
s.Linux.Devices = append(s.Linux.Devices, []specs.LinuxDevice{
// Writes to this come out as printk's, reads export the buffered printk records. (dmesg)

View file

@ -1,7 +1,6 @@
package resolver
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
@ -11,6 +10,7 @@ import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/containerd/containerd/remotes"
@ -149,16 +149,68 @@ func NewRegistryConfig(m map[string]config.RegistryConfig) docker.RegistryHosts
)
}
func New(ctx context.Context, hosts docker.RegistryHosts, sm *session.Manager) remotes.Resolver {
type SessionAuthenticator struct {
sm *session.Manager
groups []session.Group
mu sync.RWMutex
cache map[string]credentials
cacheMu sync.RWMutex
}
type credentials struct {
user string
secret string
created time.Time
}
func NewSessionAuthenticator(sm *session.Manager, g session.Group) *SessionAuthenticator {
return &SessionAuthenticator{sm: sm, groups: []session.Group{g}, cache: map[string]credentials{}}
}
func (a *SessionAuthenticator) credentials(h string) (string, string, error) {
const credentialsTimeout = time.Minute
a.cacheMu.RLock()
c, ok := a.cache[h]
if ok && time.Since(c.created) < credentialsTimeout {
a.cacheMu.RUnlock()
return c.user, c.secret, nil
}
a.cacheMu.RUnlock()
a.mu.RLock()
defer a.mu.RUnlock()
var err error
for i := len(a.groups) - 1; i >= 0; i-- {
var user, secret string
user, secret, err = auth.CredentialsFunc(a.sm, a.groups[i])(h)
if err != nil {
continue
}
a.cacheMu.Lock()
a.cache[h] = credentials{user: user, secret: secret, created: time.Now()}
a.cacheMu.Unlock()
return user, secret, nil
}
return "", "", err
}
func (a *SessionAuthenticator) AddSession(g session.Group) {
a.mu.Lock()
a.groups = append(a.groups, g)
a.mu.Unlock()
}
func New(hosts docker.RegistryHosts, auth *SessionAuthenticator) remotes.Resolver {
return docker.NewResolver(docker.ResolverOptions{
Hosts: hostsWithCredentials(ctx, hosts, sm),
Hosts: hostsWithCredentials(hosts, auth),
})
}
func hostsWithCredentials(ctx context.Context, hosts docker.RegistryHosts, sm *session.Manager) docker.RegistryHosts {
id := session.FromContext(ctx)
if id == "" {
return hosts
func hostsWithCredentials(hosts docker.RegistryHosts, auth *SessionAuthenticator) docker.RegistryHosts {
if hosts == nil {
return nil
}
return func(domain string) ([]docker.RegistryHost, error) {
res, err := hosts(domain)
@ -169,17 +221,9 @@ func hostsWithCredentials(ctx context.Context, hosts docker.RegistryHosts, sm *s
return nil, nil
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
caller, err := sm.Get(timeoutCtx, id)
if err != nil {
return nil, err
}
a := docker.NewDockerAuthorizer(
docker.WithAuthClient(res[0].Client),
docker.WithAuthCreds(auth.CredentialsFunc(context.TODO(), caller)),
docker.WithAuthCreds(auth.credentials),
)
for i := range res {
res[i].Authorizer = a

View file

@ -2,7 +2,6 @@ package worker
import (
"context"
"io"
"github.com/containerd/containerd/content"
"github.com/moby/buildkit/cache"
@ -27,9 +26,7 @@ type Worker interface {
LoadRef(id string, hidden bool) (cache.ImmutableRef, error)
// ResolveOp resolves Vertex.Sys() to Op implementation.
ResolveOp(v solver.Vertex, s frontend.FrontendLLBBridge, sm *session.Manager) (solver.Op, error)
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager) (digest.Digest, []byte, error)
// Exec is similar to executor.Exec but without []mount.Mount
Exec(ctx context.Context, meta executor.Meta, rootFS cache.ImmutableRef, stdin io.ReadCloser, stdout, stderr io.WriteCloser) error
ResolveImageConfig(ctx context.Context, ref string, opt llb.ResolveImageConfigOpt, sm *session.Manager, g session.Group) (digest.Digest, []byte, error)
DiskUsage(ctx context.Context, opt client.DiskUsageInfo) ([]*client.UsageInfo, error)
Exporter(name string, sm *session.Manager) (exporter.Exporter, error)
Prune(ctx context.Context, ch chan client.UsageInfo, opt ...client.PruneInfo) error
@ -37,6 +34,8 @@ type Worker interface {
FromRemote(ctx context.Context, remote *solver.Remote) (cache.ImmutableRef, error)
PruneCacheMounts(ctx context.Context, ids []string) error
ContentStore() content.Store
Executor() executor.Executor
CacheManager() cache.Manager
}
// Pre-defined label keys

View file

@ -1,155 +0,0 @@
// +build linux
package system
import (
"os"
"os/exec"
"syscall" // only for exec
"unsafe"
"github.com/opencontainers/runc/libcontainer/user"
"golang.org/x/sys/unix"
)
// If arg2 is nonzero, set the "child subreaper" attribute of the
// calling process; if arg2 is zero, unset the attribute. When a
// process is marked as a child subreaper, all of the children
// that it creates, and their descendants, will be marked as
// having a subreaper. In effect, a subreaper fulfills the role
// of init(1) for its descendant processes. Upon termination of
// a process that is orphaned (i.e., its immediate parent has
// already terminated) and marked as having a subreaper, the
// nearest still living ancestor subreaper will receive a SIGCHLD
// signal and be able to wait(2) on the process to discover its
// termination status.
const PR_SET_CHILD_SUBREAPER = 36
type ParentDeathSignal int
func (p ParentDeathSignal) Restore() error {
if p == 0 {
return nil
}
current, err := GetParentDeathSignal()
if err != nil {
return err
}
if p == current {
return nil
}
return p.Set()
}
func (p ParentDeathSignal) Set() error {
return SetParentDeathSignal(uintptr(p))
}
func Execv(cmd string, args []string, env []string) error {
name, err := exec.LookPath(cmd)
if err != nil {
return err
}
return syscall.Exec(name, args, env)
}
func Prlimit(pid, resource int, limit unix.Rlimit) error {
_, _, err := unix.RawSyscall6(unix.SYS_PRLIMIT64, uintptr(pid), uintptr(resource), uintptr(unsafe.Pointer(&limit)), uintptr(unsafe.Pointer(&limit)), 0, 0)
if err != 0 {
return err
}
return nil
}
func SetParentDeathSignal(sig uintptr) error {
if err := unix.Prctl(unix.PR_SET_PDEATHSIG, sig, 0, 0, 0); err != nil {
return err
}
return nil
}
func GetParentDeathSignal() (ParentDeathSignal, error) {
var sig int
if err := unix.Prctl(unix.PR_GET_PDEATHSIG, uintptr(unsafe.Pointer(&sig)), 0, 0, 0); err != nil {
return -1, err
}
return ParentDeathSignal(sig), nil
}
func SetKeepCaps() error {
if err := unix.Prctl(unix.PR_SET_KEEPCAPS, 1, 0, 0, 0); err != nil {
return err
}
return nil
}
func ClearKeepCaps() error {
if err := unix.Prctl(unix.PR_SET_KEEPCAPS, 0, 0, 0, 0); err != nil {
return err
}
return nil
}
func Setctty() error {
if err := unix.IoctlSetInt(0, unix.TIOCSCTTY, 0); err != nil {
return err
}
return nil
}
// RunningInUserNS detects whether we are currently running in a user namespace.
// Originally copied from github.com/lxc/lxd/shared/util.go
func RunningInUserNS() bool {
uidmap, err := user.CurrentProcessUIDMap()
if err != nil {
// This kernel-provided file only exists if user namespaces are supported
return false
}
return UIDMapInUserNS(uidmap)
}
func UIDMapInUserNS(uidmap []user.IDMap) bool {
/*
* We assume we are in the initial user namespace if we have a full
* range - 4294967295 uids starting at uid 0.
*/
if len(uidmap) == 1 && uidmap[0].ID == 0 && uidmap[0].ParentID == 0 && uidmap[0].Count == 4294967295 {
return false
}
return true
}
// GetParentNSeuid returns the euid within the parent user namespace
func GetParentNSeuid() int64 {
euid := int64(os.Geteuid())
uidmap, err := user.CurrentProcessUIDMap()
if err != nil {
// This kernel-provided file only exists if user namespaces are supported
return euid
}
for _, um := range uidmap {
if um.ID <= euid && euid <= um.ID+um.Count-1 {
return um.ParentID + euid - um.ID
}
}
return euid
}
// SetSubreaper sets the value i as the subreaper setting for the calling process
func SetSubreaper(i int) error {
return unix.Prctl(PR_SET_CHILD_SUBREAPER, uintptr(i), 0, 0, 0)
}
// GetSubreaper returns the subreaper setting for the calling process
func GetSubreaper() (int, error) {
var i uintptr
if err := unix.Prctl(unix.PR_GET_CHILD_SUBREAPER, uintptr(unsafe.Pointer(&i)), 0, 0, 0); err != nil {
return -1, err
}
return int(i), nil
}

View file

@ -1,113 +0,0 @@
package system
import (
"fmt"
"io/ioutil"
"path/filepath"
"strconv"
"strings"
)
// State is the status of a process.
type State rune
const ( // Only values for Linux 3.14 and later are listed here
Dead State = 'X'
DiskSleep State = 'D'
Running State = 'R'
Sleeping State = 'S'
Stopped State = 'T'
TracingStop State = 't'
Zombie State = 'Z'
)
// String forms of the state from proc(5)'s documentation for
// /proc/[pid]/status' "State" field.
func (s State) String() string {
switch s {
case Dead:
return "dead"
case DiskSleep:
return "disk sleep"
case Running:
return "running"
case Sleeping:
return "sleeping"
case Stopped:
return "stopped"
case TracingStop:
return "tracing stop"
case Zombie:
return "zombie"
default:
return fmt.Sprintf("unknown (%c)", s)
}
}
// Stat_t represents the information from /proc/[pid]/stat, as
// described in proc(5) with names based on the /proc/[pid]/status
// fields.
type Stat_t struct {
// PID is the process ID.
PID uint
// Name is the command run by the process.
Name string
// State is the state of the process.
State State
// StartTime is the number of clock ticks after system boot (since
// Linux 2.6).
StartTime uint64
}
// Stat returns a Stat_t instance for the specified process.
func Stat(pid int) (stat Stat_t, err error) {
bytes, err := ioutil.ReadFile(filepath.Join("/proc", strconv.Itoa(pid), "stat"))
if err != nil {
return stat, err
}
return parseStat(string(bytes))
}
// GetProcessStartTime is deprecated. Use Stat(pid) and
// Stat_t.StartTime instead.
func GetProcessStartTime(pid int) (string, error) {
stat, err := Stat(pid)
if err != nil {
return "", err
}
return fmt.Sprintf("%d", stat.StartTime), nil
}
func parseStat(data string) (stat Stat_t, err error) {
// From proc(5), field 2 could contain space and is inside `(` and `)`.
// The following is an example:
// 89653 (gunicorn: maste) S 89630 89653 89653 0 -1 4194560 29689 28896 0 3 146 32 76 19 20 0 1 0 2971844 52965376 3920 18446744073709551615 1 1 0 0 0 0 0 16781312 137447943 0 0 0 17 1 0 0 0 0 0 0 0 0 0 0 0 0 0
i := strings.LastIndex(data, ")")
if i <= 2 || i >= len(data)-1 {
return stat, fmt.Errorf("invalid stat data: %q", data)
}
parts := strings.SplitN(data[:i], "(", 2)
if len(parts) != 2 {
return stat, fmt.Errorf("invalid stat data: %q", data)
}
stat.Name = parts[1]
_, err = fmt.Sscanf(parts[0], "%d", &stat.PID)
if err != nil {
return stat, err
}
// parts indexes should be offset by 3 from the field number given
// proc(5), because parts is zero-indexed and we've removed fields
// one (PID) and two (Name) in the paren-split.
parts = strings.Split(data[i+2:], " ")
var state int
fmt.Sscanf(parts[3-3], "%c", &state)
stat.State = State(state)
fmt.Sscanf(parts[22-3], "%d", &stat.StartTime)
return stat, nil
}

View file

@ -1,26 +0,0 @@
// +build linux
// +build 386 arm
package system
import (
"golang.org/x/sys/unix"
)
// Setuid sets the uid of the calling thread to the specified uid.
func Setuid(uid int) (err error) {
_, _, e1 := unix.RawSyscall(unix.SYS_SETUID32, uintptr(uid), 0, 0)
if e1 != 0 {
err = e1
}
return
}
// Setgid sets the gid of the calling thread to the specified gid.
func Setgid(gid int) (err error) {
_, _, e1 := unix.RawSyscall(unix.SYS_SETGID32, uintptr(gid), 0, 0)
if e1 != 0 {
err = e1
}
return
}

View file

@ -1,26 +0,0 @@
// +build linux
// +build arm64 amd64 mips mipsle mips64 mips64le ppc ppc64 ppc64le riscv64 s390x
package system
import (
"golang.org/x/sys/unix"
)
// Setuid sets the uid of the calling thread to the specified uid.
func Setuid(uid int) (err error) {
_, _, e1 := unix.RawSyscall(unix.SYS_SETUID, uintptr(uid), 0, 0)
if e1 != 0 {
err = e1
}
return
}
// Setgid sets the gid of the calling thread to the specified gid.
func Setgid(gid int) (err error) {
_, _, e1 := unix.RawSyscall(unix.SYS_SETGID, uintptr(gid), 0, 0)
if e1 != 0 {
err = e1
}
return
}

View file

@ -1,12 +0,0 @@
// +build cgo,linux
package system
/*
#include <unistd.h>
*/
import "C"
func GetClockTicks() int {
return int(C.sysconf(C._SC_CLK_TCK))
}

View file

@ -1,15 +0,0 @@
// +build !cgo windows
package system
func GetClockTicks() int {
// TODO figure out a better alternative for platforms where we're missing cgo
//
// TODO Windows. This could be implemented using Win32 QueryPerformanceFrequency().
// https://msdn.microsoft.com/en-us/library/windows/desktop/ms644905(v=vs.85).aspx
//
// An example of its usage can be found here.
// https://msdn.microsoft.com/en-us/library/windows/desktop/dn553408(v=vs.85).aspx
return 100
}

View file

@ -1,27 +0,0 @@
// +build !linux
package system
import (
"os"
"github.com/opencontainers/runc/libcontainer/user"
)
// RunningInUserNS is a stub for non-Linux systems
// Always returns false
func RunningInUserNS() bool {
return false
}
// UIDMapInUserNS is a stub for non-Linux systems
// Always returns false
func UIDMapInUserNS(uidmap []user.IDMap) bool {
return false
}
// GetParentNSeuid returns the euid within the parent user namespace
// Always returns os.Geteuid on non-linux
func GetParentNSeuid() int {
return os.Geteuid()
}

View file

@ -1,35 +0,0 @@
package system
import "golang.org/x/sys/unix"
// Returns a []byte slice if the xattr is set and nil otherwise
// Requires path and its attribute as arguments
func Lgetxattr(path string, attr string) ([]byte, error) {
var sz int
// Start with a 128 length byte array
dest := make([]byte, 128)
sz, errno := unix.Lgetxattr(path, attr, dest)
switch {
case errno == unix.ENODATA:
return nil, errno
case errno == unix.ENOTSUP:
return nil, errno
case errno == unix.ERANGE:
// 128 byte array might just not be good enough,
// A dummy buffer is used to get the real size
// of the xattrs on disk
sz, errno = unix.Lgetxattr(path, attr, []byte{})
if errno != nil {
return nil, errno
}
dest = make([]byte, sz)
sz, errno = unix.Lgetxattr(path, attr, dest)
if errno != nil {
return nil, errno
}
case errno != nil:
return nil, errno
}
return dest[:sz], nil
}

View file

@ -1,28 +1,25 @@
module github.com/tonistiigi/fsutil
go 1.13
require (
github.com/Microsoft/go-winio v0.4.11 // indirect
github.com/containerd/continuity v0.0.0-20181001140422-bd77b46c8352
github.com/Microsoft/hcsshim v0.8.9 // indirect
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/docker v0.0.0-20180531152204-71cd53e4a197
github.com/docker/go-units v0.3.1 // indirect
github.com/docker/docker v0.0.0-20200511152416-a93e9eb0e95c
github.com/gogo/protobuf v1.3.1
github.com/google/go-cmp v0.2.0 // indirect
github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect
github.com/moby/sys/mount v0.1.0 // indirect
github.com/moby/sys/mountinfo v0.1.3 // indirect
github.com/onsi/ginkgo v1.7.0 // indirect
github.com/onsi/gomega v1.4.3 // indirect
github.com/opencontainers/go-digest v1.0.0-rc1
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opencontainers/runc v1.0.0-rc6 // indirect
github.com/pkg/errors v0.8.1
github.com/sirupsen/logrus v1.0.3 // indirect
github.com/stretchr/testify v1.3.0
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 // indirect
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e
github.com/opencontainers/runc v1.0.0-rc10 // indirect
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.5.1
golang.org/x/sync v0.0.0-20190423024810-112230192c58
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae
gopkg.in/airbrake/gobrake.v2 v2.0.9 // indirect
gopkg.in/gemnasium/logrus-airbrake-hook.v2 v2.1.2 // indirect
gotest.tools v2.1.0+incompatible // indirect
)
go 1.13

View file

@ -14,7 +14,7 @@ import (
func loadXattr(origpath string, stat *types.Stat) error {
xattrs, err := sysx.LListxattr(origpath)
if err != nil {
if errors.Cause(err) == syscall.ENOTSUP {
if errors.Is(err, syscall.ENOTSUP) {
return nil
}
return errors.Wrapf(err, "failed to xattr %s", origpath)

View file

@ -219,12 +219,5 @@ func trimUntilIndex(str, sep string, count int) string {
}
func isNotExist(err error) bool {
err = errors.Cause(err)
if os.IsNotExist(err) {
return true
}
if pe, ok := err.(*os.PathError); ok {
err = pe.Err
}
return err == syscall.ENOTDIR
return errors.Is(err, os.ErrNotExist) || errors.Is(err, syscall.ENOTDIR)
}