diff --git a/api/server/backend/build/backend.go b/api/server/backend/build/backend.go index 3415cfbd8e..043c9e84b7 100644 --- a/api/server/backend/build/backend.go +++ b/api/server/backend/build/backend.go @@ -63,6 +63,10 @@ func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string } } + if build == nil { + return "", nil + } + var imageID = build.ImageID if options.Squash { if imageID, err = squashBuild(build, b.imageComponent); err != nil { diff --git a/builder/builder-next/builder.go b/builder/builder-next/builder.go index 141b441fdb..6ac5b226c2 100644 --- a/builder/builder-next/builder.go +++ b/builder/builder-next/builder.go @@ -6,6 +6,7 @@ import ( "io" "strings" "sync" + "time" "github.com/containerd/containerd/content" "github.com/docker/docker/api/types" @@ -34,7 +35,7 @@ type Builder struct { reqBodyHandler *reqBodyHandler mu sync.Mutex - jobs map[string]func() + jobs map[string]*buildJob } func New(opt Opt) (*Builder, error) { @@ -47,15 +48,15 @@ func New(opt Opt) (*Builder, error) { b := &Builder{ controller: c, reqBodyHandler: reqHandler, - jobs: map[string]func(){}, + jobs: map[string]*buildJob{}, } return b, nil } func (b *Builder) Cancel(ctx context.Context, id string) error { b.mu.Lock() - if cancel, ok := b.jobs[id]; ok { - cancel() + if j, ok := b.jobs[id]; ok && j.cancel != nil { + j.cancel() } b.mu.Unlock() return nil @@ -114,10 +115,43 @@ func (b *Builder) Prune(ctx context.Context) (int64, error) { } func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder.Result, error) { + var rc = opt.Source + if buildID := opt.Options.BuildID; buildID != "" { b.mu.Lock() - ctx, b.jobs[buildID] = context.WithCancel(ctx) + + upload := false + if strings.HasPrefix(buildID, "upload-request:") { + upload = true + buildID = strings.TrimPrefix(buildID, "upload-request:") + } + + if _, ok := b.jobs[buildID]; !ok { + b.jobs[buildID] = newBuildJob() + } + j := b.jobs[buildID] + ctx, cancel := context.WithCancel(ctx) + j.cancel = cancel b.mu.Unlock() + + if upload { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + err := j.SetUpload(ctx2, rc) + return nil, err + } + + if remoteContext := opt.Options.RemoteContext; remoteContext == "upload-request" { + ctx2, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + var err error + rc, err = j.WaitUpload(ctx2) + if err != nil { + return nil, err + } + opt.Options.RemoteContext = "" + } + defer func() { delete(b.jobs, buildID) }() @@ -142,7 +176,7 @@ func (b *Builder) Build(ctx context.Context, opt backend.BuildConfig) (*builder. frontendAttrs["context"] = opt.Options.RemoteContext } } else { - url, cancel := b.reqBodyHandler.newRequest(opt.Source) + url, cancel := b.reqBodyHandler.newRequest(rc) defer cancel() frontendAttrs["context"] = url } @@ -295,3 +329,78 @@ type contentStoreNoLabels struct { func (c *contentStoreNoLabels) Update(ctx context.Context, info content.Info, fieldpaths ...string) (content.Info, error) { return content.Info{}, nil } + +type wrapRC struct { + io.ReadCloser + once sync.Once + err error + waitCh chan struct{} +} + +func (w *wrapRC) Read(b []byte) (int, error) { + n, err := w.ReadCloser.Read(b) + if err != nil { + e := err + if e == io.EOF { + e = nil + } + w.close(e) + } + return n, err +} + +func (w *wrapRC) Close() error { + err := w.ReadCloser.Close() + w.close(err) + return err +} + +func (w *wrapRC) close(err error) { + w.once.Do(func() { + w.err = err + close(w.waitCh) + }) +} + +func (w *wrapRC) wait() error { + <-w.waitCh + return w.err +} + +type buildJob struct { + cancel func() + waitCh chan func(io.ReadCloser) error +} + +func newBuildJob() *buildJob { + return &buildJob{waitCh: make(chan func(io.ReadCloser) error)} +} + +func (j *buildJob) WaitUpload(ctx context.Context) (io.ReadCloser, error) { + done := make(chan struct{}) + + var upload io.ReadCloser + fn := func(rc io.ReadCloser) error { + w := &wrapRC{ReadCloser: rc, waitCh: make(chan struct{})} + upload = w + close(done) + return w.wait() + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case j.waitCh <- fn: + <-done + return upload, nil + } +} + +func (j *buildJob) SetUpload(ctx context.Context, rc io.ReadCloser) error { + select { + case <-ctx.Done(): + return ctx.Err() + case fn := <-j.waitCh: + return fn(rc) + } +}