2018-04-17 18:50:17 -04:00
package worker
import (
"context"
2018-04-20 14:52:10 -04:00
"fmt"
2018-04-17 18:50:17 -04:00
"io"
2018-05-18 01:47:34 -04:00
nethttp "net/http"
2018-04-17 18:50:17 -04:00
"time"
"github.com/containerd/containerd/content"
2019-03-11 22:04:08 -04:00
"github.com/containerd/containerd/images"
2018-06-26 14:30:19 -04:00
"github.com/containerd/containerd/platforms"
2018-04-17 18:50:17 -04:00
"github.com/containerd/containerd/rootfs"
2020-07-20 01:18:46 -04:00
"github.com/docker/docker/builder/builder-next/adapters/containerimage"
2018-04-20 14:52:10 -04:00
distmetadata "github.com/docker/docker/distribution/metadata"
"github.com/docker/docker/distribution/xfer"
"github.com/docker/docker/image"
"github.com/docker/docker/layer"
pkgprogress "github.com/docker/docker/pkg/progress"
2018-04-17 18:50:17 -04:00
"github.com/moby/buildkit/cache"
"github.com/moby/buildkit/client"
2020-02-24 17:14:23 -05:00
"github.com/moby/buildkit/client/llb"
2018-04-17 18:50:17 -04:00
"github.com/moby/buildkit/executor"
"github.com/moby/buildkit/exporter"
2019-02-28 01:35:26 -05:00
localexporter "github.com/moby/buildkit/exporter/local"
2019-04-03 00:55:22 -04:00
tarexporter "github.com/moby/buildkit/exporter/tar"
2018-04-17 18:50:17 -04:00
"github.com/moby/buildkit/frontend"
"github.com/moby/buildkit/session"
"github.com/moby/buildkit/snapshot"
2018-05-14 14:05:49 -04:00
"github.com/moby/buildkit/solver"
2020-11-12 21:14:57 -05:00
"github.com/moby/buildkit/solver/llbsolver/mounts"
2018-05-14 14:05:49 -04:00
"github.com/moby/buildkit/solver/llbsolver/ops"
2018-04-17 18:50:17 -04:00
"github.com/moby/buildkit/solver/pb"
"github.com/moby/buildkit/source"
"github.com/moby/buildkit/source/git"
"github.com/moby/buildkit/source/http"
"github.com/moby/buildkit/source/local"
2020-11-12 21:14:57 -05:00
"github.com/moby/buildkit/util/archutil"
2020-11-13 19:06:56 -05:00
"github.com/moby/buildkit/util/compression"
2018-04-20 14:52:10 -04:00
"github.com/moby/buildkit/util/contentutil"
2018-04-17 18:50:17 -04:00
"github.com/moby/buildkit/util/progress"
2022-03-04 08:49:42 -05:00
"github.com/opencontainers/go-digest"
2018-04-17 18:50:17 -04:00
ocispec "github.com/opencontainers/image-spec/specs-go/v1"
"github.com/pkg/errors"
2018-06-13 12:33:19 -04:00
"github.com/sirupsen/logrus"
2021-06-05 15:56:40 -04:00
"golang.org/x/sync/semaphore"
2018-04-17 18:50:17 -04:00
)
2019-03-11 22:04:08 -04:00
const labelCreatedAt = "buildkit/createdat"
// LayerAccess provides access to a moby layer from a snapshot
type LayerAccess interface {
GetDiffIDs ( ctx context . Context , key string ) ( [ ] layer . DiffID , error )
EnsureLayer ( ctx context . Context , key string ) ( [ ] layer . DiffID , error )
}
2018-05-23 18:53:14 -04:00
// Opt defines a structure for creating a worker.
type Opt struct {
2018-04-20 14:52:10 -04:00
ID string
Labels map [ string ] string
2018-09-06 14:50:41 -04:00
GCPolicy [ ] client . PruneInfo
2018-04-20 14:52:10 -04:00
Executor executor . Executor
Snapshotter snapshot . Snapshotter
ContentStore content . Store
CacheManager cache . Manager
2020-07-20 01:18:46 -04:00
ImageSource * containerimage . Source
2022-01-21 07:48:44 -05:00
DownloadManager * xfer . LayerDownloadManager
2018-04-20 14:52:10 -04:00
V2MetadataService distmetadata . V2MetadataService
2018-05-18 01:47:34 -04:00
Transport nethttp . RoundTripper
2019-02-28 01:35:26 -05:00
Exporter exporter . Exporter
2019-03-11 22:04:08 -04:00
Layers LayerAccess
2019-03-14 21:32:21 -04:00
Platforms [ ] ocispec . Platform
2018-04-17 18:50:17 -04:00
}
// Worker is a local worker instance with dedicated snapshotter, cache, and so on.
// TODO: s/Worker/OpWorker/g ?
type Worker struct {
2018-05-23 18:53:14 -04:00
Opt
2018-04-17 18:50:17 -04:00
SourceManager * source . Manager
}
// NewWorker instantiates a local worker
2018-05-23 18:53:14 -04:00
func NewWorker ( opt Opt ) ( * Worker , error ) {
2018-04-17 18:50:17 -04:00
sm , err := source . NewManager ( )
if err != nil {
return nil , err
}
cm := opt . CacheManager
sm . Register ( opt . ImageSource )
gs , err := git . NewSource ( git . Opt {
CacheAccessor : cm ,
} )
2018-06-13 12:33:19 -04:00
if err == nil {
sm . Register ( gs )
} else {
logrus . Warnf ( "Could not register builder git source: %s" , err )
2018-04-17 18:50:17 -04:00
}
hs , err := http . NewSource ( http . Opt {
CacheAccessor : cm ,
2018-05-18 01:47:34 -04:00
Transport : opt . Transport ,
2018-04-17 18:50:17 -04:00
} )
2018-06-13 12:33:19 -04:00
if err == nil {
sm . Register ( hs )
} else {
logrus . Warnf ( "Could not register builder http source: %s" , err )
2018-04-17 18:50:17 -04:00
}
ss , err := local . NewSource ( local . Opt {
2019-02-28 01:35:26 -05:00
CacheAccessor : cm ,
2018-04-17 18:50:17 -04:00
} )
2018-06-13 12:33:19 -04:00
if err == nil {
sm . Register ( ss )
} else {
logrus . Warnf ( "Could not register builder local source: %s" , err )
2018-04-17 18:50:17 -04:00
}
return & Worker {
2018-05-23 18:53:14 -04:00
Opt : opt ,
2018-04-17 18:50:17 -04:00
SourceManager : sm ,
} , nil
}
2018-05-23 18:53:14 -04:00
// ID returns worker ID
2018-04-17 18:50:17 -04:00
func ( w * Worker ) ID ( ) string {
2018-05-23 18:53:14 -04:00
return w . Opt . ID
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// Labels returns map of all worker labels
2018-04-17 18:50:17 -04:00
func ( w * Worker ) Labels ( ) map [ string ] string {
2018-05-23 18:53:14 -04:00
return w . Opt . Labels
2018-04-17 18:50:17 -04:00
}
2018-06-26 14:30:19 -04:00
// Platforms returns one or more platforms supported by the image.
2020-02-28 02:52:14 -05:00
func ( w * Worker ) Platforms ( noCache bool ) [ ] ocispec . Platform {
if noCache {
pm := make ( map [ string ] struct { } , len ( w . Opt . Platforms ) )
for _ , p := range w . Opt . Platforms {
pm [ platforms . Format ( p ) ] = struct { } { }
}
2020-11-12 21:14:57 -05:00
for _ , p := range archutil . SupportedPlatforms ( noCache ) {
2022-03-18 11:01:18 -04:00
if _ , ok := pm [ platforms . Format ( p ) ] ; ! ok {
w . Opt . Platforms = append ( w . Opt . Platforms , p )
2020-02-28 02:52:14 -05:00
}
}
}
2019-03-14 21:32:21 -04:00
if len ( w . Opt . Platforms ) == 0 {
return [ ] ocispec . Platform { platforms . DefaultSpec ( ) }
}
return w . Opt . Platforms
2018-06-26 14:30:19 -04:00
}
2018-09-06 14:50:41 -04:00
// GCPolicy returns automatic GC Policy
func ( w * Worker ) GCPolicy ( ) [ ] client . PruneInfo {
return w . Opt . GCPolicy
}
2019-10-02 20:16:29 -04:00
// ContentStore returns content store
func ( w * Worker ) ContentStore ( ) content . Store {
return w . Opt . ContentStore
}
2018-05-23 18:53:14 -04:00
// LoadRef loads a reference by ID
2020-11-12 21:14:57 -05:00
func ( w * Worker ) LoadRef ( ctx context . Context , id string , hidden bool ) ( cache . ImmutableRef , error ) {
2018-09-21 16:43:26 -04:00
var opts [ ] cache . RefOption
if hidden {
opts = append ( opts , cache . NoUpdateLastUsed )
}
2022-03-18 11:01:18 -04:00
return w . CacheManager ( ) . Get ( ctx , id , nil , opts ... )
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// ResolveOp converts a LLB vertex into a LLB operation
2019-02-28 01:35:26 -05:00
func ( w * Worker ) ResolveOp ( v solver . Vertex , s frontend . FrontendLLBBridge , sm * session . Manager ) ( solver . Op , error ) {
2018-06-26 14:30:19 -04:00
if baseOp , ok := v . Sys ( ) . ( * pb . Op ) ; ok {
2021-06-05 15:56:40 -04:00
// TODO do we need to pass a value here? Where should it come from? https://github.com/moby/buildkit/commit/b3cf7c43cfefdfd7a945002c0e76b54e346ab6cf
var parallelism * semaphore . Weighted
2018-06-26 14:30:19 -04:00
switch op := baseOp . Op . ( type ) {
case * pb . Op_Source :
2021-06-05 15:56:40 -04:00
return ops . NewSourceOp ( v , op , baseOp . Platform , w . SourceManager , parallelism , sm , w )
2018-06-26 14:30:19 -04:00
case * pb . Op_Exec :
2022-03-18 11:01:18 -04:00
return ops . NewExecOp ( v , op , baseOp . Platform , w . CacheManager ( ) , parallelism , sm , w . Executor ( ) , w )
2019-03-20 21:45:16 -04:00
case * pb . Op_File :
2022-03-18 11:01:18 -04:00
return ops . NewFileOp ( v , op , w . CacheManager ( ) , parallelism , w )
2018-06-26 14:30:19 -04:00
case * pb . Op_Build :
return ops . NewBuildOp ( v , op , s , w )
}
2018-04-17 18:50:17 -04:00
}
2018-06-26 14:30:19 -04:00
return nil , errors . Errorf ( "could not resolve %v" , v )
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// ResolveImageConfig returns image config for an image
2020-07-20 01:18:46 -04:00
func ( w * Worker ) ResolveImageConfig ( ctx context . Context , ref string , opt llb . ResolveImageConfigOpt , sm * session . Manager , g session . Group ) ( digest . Digest , [ ] byte , error ) {
return w . ImageSource . ResolveImageConfig ( ctx , ref , opt , sm , g )
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// DiskUsage returns disk usage report
2018-04-17 18:50:17 -04:00
func ( w * Worker ) DiskUsage ( ctx context . Context , opt client . DiskUsageInfo ) ( [ ] * client . UsageInfo , error ) {
2020-07-20 01:18:46 -04:00
return w . CacheManager ( ) . DiskUsage ( ctx , opt )
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// Prune deletes reclaimable build cache
2018-09-06 14:50:41 -04:00
func ( w * Worker ) Prune ( ctx context . Context , ch chan client . UsageInfo , info ... client . PruneInfo ) error {
2020-07-20 01:18:46 -04:00
return w . CacheManager ( ) . Prune ( ctx , ch , info ... )
2018-04-17 18:50:17 -04:00
}
2018-05-23 18:53:14 -04:00
// Exporter returns exporter by name
2019-02-28 01:35:26 -05:00
func ( w * Worker ) Exporter ( name string , sm * session . Manager ) ( exporter . Exporter , error ) {
switch name {
case "moby" :
return w . Opt . Exporter , nil
case client . ExporterLocal :
return localexporter . New ( localexporter . Opt {
SessionManager : sm ,
} )
2019-04-03 00:55:22 -04:00
case client . ExporterTar :
return tarexporter . New ( tarexporter . Opt {
SessionManager : sm ,
} )
2019-02-28 01:35:26 -05:00
default :
2018-04-17 18:50:17 -04:00
return nil , errors . Errorf ( "exporter %q could not be found" , name )
}
}
2018-05-23 18:53:14 -04:00
// GetRemote returns a remote snapshot reference for a local one
2020-11-13 19:06:56 -05:00
func ( w * Worker ) GetRemote ( ctx context . Context , ref cache . ImmutableRef , createIfNeeded bool , _ compression . Type , _ session . Group ) ( * solver . Remote , error ) {
2019-03-11 22:04:08 -04:00
var diffIDs [ ] layer . DiffID
var err error
if ! createIfNeeded {
diffIDs , err = w . Layers . GetDiffIDs ( ctx , ref . ID ( ) )
if err != nil {
return nil , err
}
} else {
2022-03-18 11:01:18 -04:00
if err := ref . Finalize ( ctx ) ; err != nil {
2019-03-11 22:04:08 -04:00
return nil , err
}
diffIDs , err = w . Layers . EnsureLayer ( ctx , ref . ID ( ) )
if err != nil {
return nil , err
}
}
descriptors := make ( [ ] ocispec . Descriptor , len ( diffIDs ) )
for i , dgst := range diffIDs {
descriptors [ i ] = ocispec . Descriptor {
MediaType : images . MediaTypeDockerSchema2Layer ,
Digest : digest . Digest ( dgst ) ,
Size : - 1 ,
}
}
return & solver . Remote {
Descriptors : descriptors ,
Provider : & emptyProvider { } ,
} , nil
2018-04-17 18:50:17 -04:00
}
2019-08-06 19:08:45 -04:00
// PruneCacheMounts removes the current cache snapshots for specified IDs
func ( w * Worker ) PruneCacheMounts ( ctx context . Context , ids [ ] string ) error {
2020-11-12 21:14:57 -05:00
mu := mounts . CacheMountsLocker ( )
2019-08-06 19:08:45 -04:00
mu . Lock ( )
defer mu . Unlock ( )
for _ , id := range ids {
2022-03-18 11:01:18 -04:00
mds , err := mounts . SearchCacheDir ( ctx , w . CacheManager ( ) , id )
2019-08-06 19:08:45 -04:00
if err != nil {
return err
}
2022-03-18 11:01:18 -04:00
for _ , md := range mds {
if err := md . SetCachePolicyDefault ( ) ; err != nil {
return err
}
if err := md . ClearCacheDirIndex ( ) ; err != nil {
return err
}
// if ref is unused try to clean it up right away by releasing it
if mref , err := w . CacheManager ( ) . GetMutable ( ctx , md . ID ( ) ) ; err == nil {
go mref . Release ( context . TODO ( ) )
2019-08-06 19:08:45 -04:00
}
}
}
2020-11-12 21:14:57 -05:00
mounts . ClearActiveCacheMounts ( )
2019-08-06 19:08:45 -04:00
return nil
}
2019-10-09 20:20:17 -04:00
func ( w * Worker ) getRef ( ctx context . Context , diffIDs [ ] layer . DiffID , opts ... cache . RefOption ) ( cache . ImmutableRef , error ) {
var parent cache . ImmutableRef
if len ( diffIDs ) > 1 {
var err error
parent , err = w . getRef ( ctx , diffIDs [ : len ( diffIDs ) - 1 ] , opts ... )
if err != nil {
return nil , err
}
defer parent . Release ( context . TODO ( ) )
}
2020-07-20 01:18:46 -04:00
return w . CacheManager ( ) . GetByBlob ( context . TODO ( ) , ocispec . Descriptor {
2019-10-09 20:20:17 -04:00
Annotations : map [ string ] string {
"containerd.io/uncompressed" : diffIDs [ len ( diffIDs ) - 1 ] . String ( ) ,
} ,
} , parent , opts ... )
}
2018-05-23 18:53:14 -04:00
// FromRemote converts a remote snapshot reference to a local one
2018-04-17 18:50:17 -04:00
func ( w * Worker ) FromRemote ( ctx context . Context , remote * solver . Remote ) ( cache . ImmutableRef , error ) {
2018-04-20 14:52:10 -04:00
rootfs , err := getLayers ( ctx , remote . Descriptors )
if err != nil {
return nil , err
}
layers := make ( [ ] xfer . DownloadDescriptor , 0 , len ( rootfs ) )
for _ , l := range rootfs {
// ongoing.add(desc)
layers = append ( layers , & layerDescriptor {
desc : l . Blob ,
diffID : layer . DiffID ( l . Diff . Digest ) ,
provider : remote . Provider ,
w : w ,
pctx : ctx ,
} )
}
defer func ( ) {
for _ , l := range rootfs {
2019-10-02 20:16:29 -04:00
w . ContentStore ( ) . Delete ( context . TODO ( ) , l . Blob . Digest )
2018-04-20 14:52:10 -04:00
}
} ( )
r := image . NewRootFS ( )
2022-01-21 07:34:37 -05:00
rootFS , release , err := w . DownloadManager . Download ( ctx , * r , layers , & discardProgress { } )
2018-04-20 14:52:10 -04:00
if err != nil {
return nil , err
}
defer release ( )
2019-03-11 22:04:08 -04:00
if len ( rootFS . DiffIDs ) != len ( layers ) {
return nil , errors . Errorf ( "invalid layer count mismatch %d vs %d" , len ( rootFS . DiffIDs ) , len ( layers ) )
}
for i := range rootFS . DiffIDs {
tm := time . Now ( )
if tmstr , ok := remote . Descriptors [ i ] . Annotations [ labelCreatedAt ] ; ok {
if err := ( & tm ) . UnmarshalText ( [ ] byte ( tmstr ) ) ; err != nil {
return nil , err
}
}
descr := fmt . Sprintf ( "imported %s" , remote . Descriptors [ i ] . Digest )
if v , ok := remote . Descriptors [ i ] . Annotations [ "buildkit/description" ] ; ok {
descr = v
}
2019-10-09 20:20:17 -04:00
ref , err := w . getRef ( ctx , rootFS . DiffIDs [ : i + 1 ] , cache . WithDescription ( descr ) , cache . WithCreationTime ( tm ) )
2019-03-11 22:04:08 -04:00
if err != nil {
return nil , err
}
if i == len ( remote . Descriptors ) - 1 {
return ref , nil
}
defer ref . Release ( context . TODO ( ) )
2018-04-20 14:52:10 -04:00
}
2019-03-11 22:04:08 -04:00
return nil , errors . Errorf ( "unreachable" )
2018-04-17 18:50:17 -04:00
}
2020-07-20 01:18:46 -04:00
// Executor returns executor.Executor for running processes
func ( w * Worker ) Executor ( ) executor . Executor {
return w . Opt . Executor
}
// CacheManager returns cache.Manager for accessing local storage
func ( w * Worker ) CacheManager ( ) cache . Manager {
return w . Opt . CacheManager
}
2018-04-20 14:52:10 -04:00
type discardProgress struct { }
2018-05-23 18:53:14 -04:00
func ( * discardProgress ) WriteProgress ( _ pkgprogress . Progress ) error {
2018-04-20 14:52:10 -04:00
return nil
}
// Fetch(ctx context.Context, desc ocispec.Descriptor) (io.ReadCloser, error)
type layerDescriptor struct {
provider content . Provider
desc ocispec . Descriptor
diffID layer . DiffID
// ref ctdreference.Spec
w * Worker
pctx context . Context
}
func ( ld * layerDescriptor ) Key ( ) string {
return "v2:" + ld . desc . Digest . String ( )
}
func ( ld * layerDescriptor ) ID ( ) string {
return ld . desc . Digest . String ( )
}
func ( ld * layerDescriptor ) DiffID ( ) ( layer . DiffID , error ) {
return ld . diffID , nil
}
2018-05-14 16:48:30 -04:00
func ( ld * layerDescriptor ) Download ( ctx context . Context , progressOutput pkgprogress . Output ) ( io . ReadCloser , int64 , error ) {
2018-04-20 14:52:10 -04:00
done := oneOffProgress ( ld . pctx , fmt . Sprintf ( "pulling %s" , ld . desc . Digest ) )
2020-11-18 05:35:04 -05:00
// TODO should this write output to progressOutput? Or use something similar to loggerFromContext()? see https://github.com/moby/buildkit/commit/aa29e7729464f3c2a773e27795e584023c751cb8
discardLogs := func ( _ [ ] byte ) { }
2022-03-18 11:01:18 -04:00
if err := contentutil . Copy ( ctx , ld . w . ContentStore ( ) , ld . provider , ld . desc , "" , discardLogs ) ; err != nil {
2018-04-20 14:52:10 -04:00
return nil , 0 , done ( err )
}
2019-08-28 11:42:47 -04:00
_ = done ( nil )
2018-04-20 14:52:10 -04:00
2019-10-02 20:16:29 -04:00
ra , err := ld . w . ContentStore ( ) . ReaderAt ( ctx , ld . desc )
2018-04-20 14:52:10 -04:00
if err != nil {
return nil , 0 , err
}
2021-08-24 06:10:50 -04:00
return io . NopCloser ( content . NewReader ( ra ) ) , ld . desc . Size , nil
2018-04-20 14:52:10 -04:00
}
func ( ld * layerDescriptor ) Close ( ) {
2019-10-02 20:16:29 -04:00
// ld.is.ContentStore().Delete(context.TODO(), ld.desc.Digest)
2018-04-20 14:52:10 -04:00
}
func ( ld * layerDescriptor ) Registered ( diffID layer . DiffID ) {
// Cache mapping from this layer's DiffID to the blobsum
ld . w . V2MetadataService . Add ( diffID , distmetadata . V2Metadata { Digest : ld . desc . Digest } )
}
2018-04-17 18:50:17 -04:00
func getLayers ( ctx context . Context , descs [ ] ocispec . Descriptor ) ( [ ] rootfs . Layer , error ) {
layers := make ( [ ] rootfs . Layer , len ( descs ) )
for i , desc := range descs {
diffIDStr := desc . Annotations [ "containerd.io/uncompressed" ]
if diffIDStr == "" {
return nil , errors . Errorf ( "%s missing uncompressed digest" , desc . Digest )
}
diffID , err := digest . Parse ( diffIDStr )
if err != nil {
return nil , err
}
layers [ i ] . Diff = ocispec . Descriptor {
MediaType : ocispec . MediaTypeImageLayer ,
Digest : diffID ,
}
layers [ i ] . Blob = ocispec . Descriptor {
MediaType : desc . MediaType ,
Digest : desc . Digest ,
Size : desc . Size ,
}
}
return layers , nil
}
func oneOffProgress ( ctx context . Context , id string ) func ( err error ) error {
2022-03-18 11:01:18 -04:00
pw , _ , _ := progress . NewFromContext ( ctx )
2018-04-17 18:50:17 -04:00
now := time . Now ( )
st := progress . Status {
Started : & now ,
}
2019-08-28 11:42:47 -04:00
_ = pw . Write ( id , st )
2018-04-17 18:50:17 -04:00
return func ( err error ) error {
// TODO: set error on status
now := time . Now ( )
st . Completed = & now
2019-08-28 11:42:47 -04:00
_ = pw . Write ( id , st )
_ = pw . Close ( )
2018-04-17 18:50:17 -04:00
return err
}
}
2018-05-23 18:53:14 -04:00
2019-03-11 22:04:08 -04:00
type emptyProvider struct {
}
func ( p * emptyProvider ) ReaderAt ( ctx context . Context , dec ocispec . Descriptor ) ( content . ReaderAt , error ) {
return nil , errors . Errorf ( "ReaderAt not implemented for empty provider" )
}