mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
builder: protect early progress writes
Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
parent
9cc49b4ab9
commit
f3ef8c93d6
2 changed files with 108 additions and 9 deletions
|
@ -1,6 +1,7 @@
|
|||
package build // import "github.com/docker/docker/api/server/router/build"
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/base64"
|
||||
|
@ -192,8 +193,19 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
|
|||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
|
||||
output := ioutils.NewWriteFlusher(w)
|
||||
var output writeCloseFlusher = ioutils.NewWriteFlusher(w)
|
||||
defer output.Close()
|
||||
|
||||
body := r.Body
|
||||
if body != nil {
|
||||
// there is a possibility that output is written before request body
|
||||
// has been fully read so we need to protect against it.
|
||||
// this can be removed when
|
||||
// https://github.com/golang/go/issues/15527
|
||||
// https://github.com/golang/go/issues/22209
|
||||
// has been fixed
|
||||
body, output = wrapOutputBufferedUntilRequestRead(body, output)
|
||||
}
|
||||
errf := func(err error) error {
|
||||
if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 {
|
||||
output.Write(notVerboseBuffer.Bytes())
|
||||
|
@ -235,7 +247,7 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
|
|||
wantAux := versions.GreaterThanOrEqualTo(version, "1.30")
|
||||
|
||||
imgID, err := br.backend.Build(ctx, backend.BuildConfig{
|
||||
Source: r.Body,
|
||||
Source: body,
|
||||
Options: buildOptions,
|
||||
ProgressWriter: buildProgressWriter(out, wantAux, createProgressReader),
|
||||
})
|
||||
|
@ -294,3 +306,92 @@ func buildProgressWriter(out io.Writer, wantAux bool, createProgressReader func(
|
|||
ProgressReaderFunc: createProgressReader,
|
||||
}
|
||||
}
|
||||
|
||||
type writeCloseFlusher interface {
|
||||
Flush()
|
||||
Flushed() bool
|
||||
io.WriteCloser
|
||||
}
|
||||
|
||||
func wrapOutputBufferedUntilRequestRead(rc io.ReadCloser, out writeCloseFlusher) (io.ReadCloser, writeCloseFlusher) {
|
||||
w := &wcf{
|
||||
buf: bytes.NewBuffer(nil),
|
||||
writeCloseFlusher: out,
|
||||
}
|
||||
r := bufio.NewReader(rc)
|
||||
_, err := r.Peek(1)
|
||||
if err != nil {
|
||||
return rc, out
|
||||
}
|
||||
rc = &rcNotifier{
|
||||
Reader: r,
|
||||
Closer: rc,
|
||||
notify: w.notify,
|
||||
}
|
||||
return rc, w
|
||||
}
|
||||
|
||||
type rcNotifier struct {
|
||||
io.Reader
|
||||
io.Closer
|
||||
notify func()
|
||||
}
|
||||
|
||||
func (r *rcNotifier) Read(b []byte) (int, error) {
|
||||
n, err := r.Reader.Read(b)
|
||||
if err != nil {
|
||||
r.notify()
|
||||
}
|
||||
return n, err
|
||||
}
|
||||
|
||||
type wcf struct {
|
||||
writeCloseFlusher
|
||||
mu sync.Mutex
|
||||
ready bool
|
||||
buf *bytes.Buffer
|
||||
flushed bool
|
||||
}
|
||||
|
||||
func (w *wcf) Flush() {
|
||||
w.mu.Lock()
|
||||
w.flushed = true
|
||||
if !w.ready {
|
||||
w.mu.Unlock()
|
||||
return
|
||||
}
|
||||
w.mu.Unlock()
|
||||
w.writeCloseFlusher.Flush()
|
||||
}
|
||||
|
||||
func (w *wcf) Flushed() bool {
|
||||
w.mu.Lock()
|
||||
b := w.flushed
|
||||
w.mu.Unlock()
|
||||
return b
|
||||
}
|
||||
|
||||
func (w *wcf) Write(b []byte) (int, error) {
|
||||
w.mu.Lock()
|
||||
if !w.ready {
|
||||
n, err := w.buf.Write(b)
|
||||
w.mu.Unlock()
|
||||
return n, err
|
||||
}
|
||||
w.mu.Unlock()
|
||||
return w.writeCloseFlusher.Write(b)
|
||||
}
|
||||
|
||||
func (w *wcf) notify() {
|
||||
w.mu.Lock()
|
||||
if !w.ready {
|
||||
if w.buf.Len() > 0 {
|
||||
io.Copy(w.writeCloseFlusher, w.buf)
|
||||
}
|
||||
if w.flushed {
|
||||
w.writeCloseFlusher.Flush()
|
||||
}
|
||||
w.ready = true
|
||||
}
|
||||
w.mu.Unlock()
|
||||
}
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
package buildkit
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
@ -28,12 +27,9 @@ func newReqBodyHandler(rt http.RoundTripper) *reqBodyHandler {
|
|||
}
|
||||
|
||||
func (h *reqBodyHandler) newRequest(rc io.ReadCloser) (string, func()) {
|
||||
// handle expect-continue vs chunked output
|
||||
r := bufio.NewReader(rc)
|
||||
r.Peek(1)
|
||||
id := identity.NewID()
|
||||
h.mu.Lock()
|
||||
h.requests[id] = &readCloser{Reader: r, Closer: rc}
|
||||
h.requests[id] = rc
|
||||
h.mu.Unlock()
|
||||
return "http://" + urlPrefix + id, func() {
|
||||
h.mu.Lock()
|
||||
|
@ -58,12 +54,14 @@ func (h *reqBodyHandler) RoundTrip(req *http.Request) (*http.Response, error) {
|
|||
return nil, errors.Errorf("context not found")
|
||||
}
|
||||
|
||||
return &http.Response{
|
||||
resp := &http.Response{
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
Body: rc,
|
||||
ContentLength: -1,
|
||||
}, nil
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
return h.rt.RoundTrip(req)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue