builder: correct output buffering order

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
This commit is contained in:
Tonis Tiigi 2018-05-25 17:30:42 -07:00
parent 577732f655
commit ab4cbe2c3f
1 changed files with 26 additions and 16 deletions

View File

@ -193,10 +193,8 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
var output writeCloseFlusher = ioutils.NewWriteFlusher(w)
defer output.Close()
body := r.Body body := r.Body
var ww io.Writer = w
if body != nil { if body != nil {
// there is a possibility that output is written before request body // there is a possibility that output is written before request body
// has been fully read so we need to protect against it. // has been fully read so we need to protect against it.
@ -204,18 +202,25 @@ func (br *buildRouter) postBuild(ctx context.Context, w http.ResponseWriter, r *
// https://github.com/golang/go/issues/15527 // https://github.com/golang/go/issues/15527
// https://github.com/golang/go/issues/22209 // https://github.com/golang/go/issues/22209
// has been fixed // has been fixed
body, output = wrapOutputBufferedUntilRequestRead(body, output) body, ww = wrapOutputBufferedUntilRequestRead(body, ww)
} }
output := ioutils.NewWriteFlusher(ww)
defer output.Close()
errf := func(err error) error { errf := func(err error) error {
if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 { if httputils.BoolValue(r, "q") && notVerboseBuffer.Len() > 0 {
output.Write(notVerboseBuffer.Bytes()) output.Write(notVerboseBuffer.Bytes())
} }
logrus.Debugf("isflushed %v", output.Flushed())
// Do not write the error in the http output if it's still empty. // Do not write the error in the http output if it's still empty.
// This prevents from writing a 200(OK) when there is an internal error. // This prevents from writing a 200(OK) when there is an internal error.
if !output.Flushed() { if !output.Flushed() {
return err return err
} }
_, err = w.Write(streamformatter.FormatError(err)) _, err = output.Write(streamformatter.FormatError(err))
if err != nil { if err != nil {
logrus.Warnf("could not write error response: %v", err) logrus.Warnf("could not write error response: %v", err)
} }
@ -311,16 +316,20 @@ func buildProgressWriter(out io.Writer, wantAux bool, createProgressReader func(
} }
} }
type writeCloseFlusher interface { type flusher interface {
Flush() Flush()
Flushed() bool
io.WriteCloser
} }
func wrapOutputBufferedUntilRequestRead(rc io.ReadCloser, out writeCloseFlusher) (io.ReadCloser, writeCloseFlusher) { func wrapOutputBufferedUntilRequestRead(rc io.ReadCloser, out io.Writer) (io.ReadCloser, io.Writer) {
var fl flusher = &ioutils.NopFlusher{}
if f, ok := out.(flusher); ok {
fl = f
}
w := &wcf{ w := &wcf{
buf: bytes.NewBuffer(nil), buf: bytes.NewBuffer(nil),
writeCloseFlusher: out, Writer: out,
flusher: fl,
} }
r := bufio.NewReader(rc) r := bufio.NewReader(rc)
_, err := r.Peek(1) _, err := r.Peek(1)
@ -355,7 +364,8 @@ func (r *rcNotifier) Close() error {
} }
type wcf struct { type wcf struct {
writeCloseFlusher io.Writer
flusher
mu sync.Mutex mu sync.Mutex
ready bool ready bool
buf *bytes.Buffer buf *bytes.Buffer
@ -370,7 +380,7 @@ func (w *wcf) Flush() {
return return
} }
w.mu.Unlock() w.mu.Unlock()
w.writeCloseFlusher.Flush() w.flusher.Flush()
} }
func (w *wcf) Flushed() bool { func (w *wcf) Flushed() bool {
@ -388,17 +398,17 @@ func (w *wcf) Write(b []byte) (int, error) {
return n, err return n, err
} }
w.mu.Unlock() w.mu.Unlock()
return w.writeCloseFlusher.Write(b) return w.Writer.Write(b)
} }
func (w *wcf) notify() { func (w *wcf) notify() {
w.mu.Lock() w.mu.Lock()
if !w.ready { if !w.ready {
if w.buf.Len() > 0 { if w.buf.Len() > 0 {
io.Copy(w.writeCloseFlusher, w.buf) io.Copy(w.Writer, w.buf)
} }
if w.flushed { if w.flushed {
w.writeCloseFlusher.Flush() w.flusher.Flush()
} }
w.ready = true w.ready = true
} }