package buildkit import ( "context" "encoding/json" "io" "strings" "sync" "time" "github.com/containerd/containerd/content" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/backend" "github.com/docker/docker/builder" "github.com/docker/docker/daemon/images" "github.com/docker/docker/pkg/jsonmessage" controlapi "github.com/moby/buildkit/api/services/control" "github.com/moby/buildkit/control" "github.com/moby/buildkit/identity" "github.com/moby/buildkit/session" "github.com/moby/buildkit/util/tracing" "github.com/pkg/errors" "golang.org/x/sync/errgroup" grpcmetadata "google.golang.org/grpc/metadata" ) type Opt struct { SessionManager *session.Manager Root string Dist images.DistributionServices } type Builder struct { controller *control.Controller reqBodyHandler *reqBodyHandler mu sync.Mutex jobs map[string]*buildJob } func New(opt Opt) (*Builder, error) { reqHandler := newReqBodyHandler(tracing.DefaultTransport) c, err := newController(reqHandler, opt) if err != nil { return nil, err } b := &Builder{ controller: c, reqBodyHandler: reqHandler, jobs: map[string]*buildJob{}, } return b, nil } func (b *Builder) Cancel(ctx context.Context, id string) error { b.mu.Lock() if j, ok := b.jobs[id]; ok && j.cancel != nil { j.cancel() } b.mu.Unlock() return nil } func (b *Builder) DiskUsage(ctx context.Context) ([]*types.BuildCache, error) { duResp, err := b.controller.DiskUsage(ctx, &controlapi.DiskUsageRequest{}) if err != nil { return nil, err } var items []*types.BuildCache for _, r := range duResp.Record { items = append(items, &types.BuildCache{ ID: r.ID, Mutable: r.Mutable, InUse: r.InUse, Size: r.Size_, CreatedAt: r.CreatedAt, LastUsedAt: r.LastUsedAt, UsageCount: int(r.UsageCount), Parent: r.Parent, Description: r.Description, }) } return items, nil } func (b *Builder) Prune(ctx context.Context) (int64, error) { ch := make(chan *controlapi.UsageRecord) eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { defer close(ch) return b.controller.Prune(&controlapi.PruneRequest{}, &pruneProxy{ streamProxy: streamProxy{ctx: ctx}, ch: ch, }) }) var size int64 eg.Go(func() error { for r := range ch { size += r.Size_ } return nil }) if err := eg.Wait(); err != nil { return 0, err } return size, nil } func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) { var rc = opt.Source if buildID := opt.Options.BuildID; buildID != "" { b.mu.Lock() upload := false if strings.HasPrefix(buildID, "upload-request:") { upload = true buildID = strings.TrimPrefix(buildID, "upload-request:") } if _, ok := b.jobs[buildID]; !ok { b.jobs[buildID] = newBuildJob() } j := b.jobs[buildID] ctx, cancel := context.WithCancel(ctx) j.cancel = cancel b.mu.Unlock() if upload { ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() err := j.SetUpload(ctx2, rc) return nil, err } if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" { ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() var err error rc, err = j.WaitUpload(ctx2) if err != nil { return nil, err } opt.Options.RemoteContext = "" } defer func() { delete(b.jobs, buildID) }() } var out builder.Result id := identity.NewID() frontendAttrs := map[string]string{} if opt.Options.Target != "" { frontendAttrs["target"] = opt.Options.Target } if opt.Options.Dockerfile != "" && opt.Options.Dockerfile != "." { frontendAttrs["filename"] = opt.Options.Dockerfile } if opt.Options.RemoteContext != "" { if opt.Options.RemoteContext != "client-session" { frontendAttrs["context"] = opt.Options.RemoteContext } } else { url, cancel := b.reqBodyHandler.newRequest(rc) defer cancel() frontendAttrs["context"] = url } var cacheFrom []string for _, v := range opt.Options.CacheFrom { cacheFrom = append(cacheFrom, v) } frontendAttrs["cache-from"] = strings.Join(cacheFrom, ",") for k, v := range opt.Options.BuildArgs { if v == nil { continue } frontendAttrs["build-arg:"+k] = *v } for k, v := range opt.Options.Labels { frontendAttrs["label:"+k] = v } if opt.Options.NoCache { frontendAttrs["no-cache"] = "" } req := &controlapi.SolveRequest{ Ref: id, Exporter: "moby", Frontend: "dockerfile.v0", FrontendAttrs: frontendAttrs, Session: opt.Options.SessionID, } eg, ctx := errgroup.WithContext(ctx) eg.Go(func() error { resp, err := b.controller.Solve(ctx, req) if err != nil { return err } id, ok := resp.ExporterResponse["containerimage.digest"] if !ok { return errors.Errorf("missing image id") } out.ImageID = id return nil }) ch := make(chan *controlapi.StatusResponse) eg.Go(func() error { defer close(ch) return b.controller.Status(&controlapi.StatusRequest{ Ref: id, }, &statusProxy{streamProxy: streamProxy{ctx: ctx}, ch: ch}) }) eg.Go(func() error { for sr := range ch { dt, err := sr.Marshal() if err != nil { return err } auxJSONBytes, err := json.Marshal(dt) if err != nil { return err } auxJSON := new(json.RawMessage) *auxJSON = auxJSONBytes msgJSON, err := json.Marshal(&jsonmessage.JSONMessage{ID: "buildkit-trace", Aux: auxJSON}) if err != nil { return err } msgJSON = append(msgJSON, []byte("\r\n")...) n, err := opt.ProgressWriter.Output.Write(msgJSON) if err != nil { return err } if n != len(msgJSON) { return io.ErrShortWrite } } return nil }) if err := eg.Wait(); err != nil { return nil, err } return &out, nil } type streamProxy struct { ctx context.Context } func (sp *streamProxy) SetHeader(_ grpcmetadata.MD) error { return nil } func (sp *streamProxy) SendHeader(_ grpcmetadata.MD) error { return nil } func (sp *streamProxy) SetTrailer(_ grpcmetadata.MD) { } func (sp *streamProxy) Context() context.Context { return sp.ctx } func (sp *streamProxy) RecvMsg(m interface{}) error { return io.EOF } type statusProxy struct { streamProxy ch chan *controlapi.StatusResponse } func (sp *statusProxy) Send(resp *controlapi.StatusResponse) error { return sp.SendMsg(resp) } func (sp *statusProxy) SendMsg(m interface{}) error { if sr, ok := m.(*controlapi.StatusResponse); ok { sp.ch <- sr } return nil } type pruneProxy struct { streamProxy ch chan *controlapi.UsageRecord } func (sp *pruneProxy) Send(resp *controlapi.UsageRecord) error { return sp.SendMsg(resp) } func (sp *pruneProxy) SendMsg(m interface{}) error { if sr, ok := m.(*controlapi.UsageRecord); ok { sp.ch <- sr } return nil } type contentStoreNoLabels struct { content.Store } func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { return content.Info{}, nil } type wrapRC struct { io.ReadCloser once sync.Once err error waitCh chan struct{} } func (w *wrapRC) Read(b []byte) (int, error) { n, err := w.ReadCloser.Read(b) if err != nil { e := err if e == io.EOF { e = nil } w.close(e) } return n, err } func (w *wrapRC) Close() error { err := w.ReadCloser.Close() w.close(err) return err } func (w *wrapRC) close(err error) { w.once.Do(func() { w.err = err close(w.waitCh) }) } func (w *wrapRC) wait() error { <-w.waitCh return w.err } type buildJob struct { cancel func() waitCh chan func(io.ReadCloser) error } func newBuildJob() *buildJob { return &buildJob{waitCh: make(chan func(io.ReadCloser) error)} } func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) { done := make(chan struct{}) var upload io.ReadCloser fn := func(rc io.ReadCloser) error { w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})} upload = w close(done) return w.wait() } select { case <-ctx.Done(): return nil, ctx.Err() case j.waitCh <- fn: <-done return upload, nil } } func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error { select { case <-ctx.Done(): return ctx.Err() case fn := <-j.waitCh: return fn(rc) } }