1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #17634 from stevvooe/avoid-panic-on-flush

Avoid panic on write after close in http
This commit is contained in:
Alexander Morozov 2015-11-02 21:11:52 -08:00
commit 3c695d7ed7
4 changed files with 79 additions and 25 deletions

View file

@ -63,7 +63,9 @@ func (s *router) getContainersStats(ctx context.Context, w http.ResponseWriter,
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
out = w out = w
} else { } else {
out = ioutils.NewWriteFlusher(w) wf := ioutils.NewWriteFlusher(w)
out = wf
defer wf.Close()
} }
var closeNotifier <-chan bool var closeNotifier <-chan bool
@ -116,11 +118,16 @@ func (s *router) getContainersLogs(ctx context.Context, w http.ResponseWriter, r
return derr.ErrorCodeNoSuchContainer.WithArgs(containerName) return derr.ErrorCodeNoSuchContainer.WithArgs(containerName)
} }
outStream := ioutils.NewWriteFlusher(w)
// write an empty chunk of data (this is to ensure that the // write an empty chunk of data (this is to ensure that the
// HTTP Response is sent immediately, even if the container has // HTTP Response is sent immediately, even if the container has
// not yet produced any data) // not yet produced any data)
outStream.Write(nil) w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
output := ioutils.NewWriteFlusher(w)
defer output.Close()
logsConfig := &daemon.ContainerLogsConfig{ logsConfig := &daemon.ContainerLogsConfig{
Follow: httputils.BoolValue(r, "follow"), Follow: httputils.BoolValue(r, "follow"),
@ -129,7 +136,7 @@ func (s *router) getContainersLogs(ctx context.Context, w http.ResponseWriter, r
Tail: r.Form.Get("tail"), Tail: r.Form.Get("tail"),
UseStdout: stdout, UseStdout: stdout,
UseStderr: stderr, UseStderr: stderr,
OutStream: outStream, OutStream: output,
Stop: closeNotifier, Stop: closeNotifier,
} }

View file

@ -105,6 +105,7 @@ func (s *router) postImagesCreate(ctx context.Context, w http.ResponseWriter, r
err error err error
output = ioutils.NewWriteFlusher(w) output = ioutils.NewWriteFlusher(w)
) )
defer output.Close()
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@ -184,6 +185,7 @@ func (s *router) postImagesPush(ctx context.Context, w http.ResponseWriter, r *h
name := vars["name"] name := vars["name"]
output := ioutils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
defer output.Close()
imagePushConfig := &graph.ImagePushConfig{ imagePushConfig := &graph.ImagePushConfig{
MetaHeaders: metaHeaders, MetaHeaders: metaHeaders,
AuthConfig: authConfig, AuthConfig: authConfig,
@ -211,6 +213,7 @@ func (s *router) getImagesGet(ctx context.Context, w http.ResponseWriter, r *htt
w.Header().Set("Content-Type", "application/x-tar") w.Header().Set("Content-Type", "application/x-tar")
output := ioutils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
defer output.Close()
var names []string var names []string
if name, ok := vars["name"]; ok { if name, ok := vars["name"]; ok {
names = []string{name} names = []string{name}
@ -283,6 +286,7 @@ func (s *router) postBuild(ctx context.Context, w http.ResponseWriter, r *http.R
version := httputils.VersionFromContext(ctx) version := httputils.VersionFromContext(ctx)
output := ioutils.NewWriteFlusher(w) output := ioutils.NewWriteFlusher(w)
defer output.Close()
sf := streamformatter.NewJSONStreamFormatter() sf := streamformatter.NewJSONStreamFormatter()
errf := func(err error) error { errf := func(err error) error {
// Do not write the error in the http output if it's still empty. // Do not write the error in the http output if it's still empty.

View file

@ -52,16 +52,6 @@ func (s *router) getInfo(ctx context.Context, w http.ResponseWriter, r *http.Req
return httputils.WriteJSON(w, http.StatusOK, info) return httputils.WriteJSON(w, http.StatusOK, info)
} }
func buildOutputEncoder(w http.ResponseWriter) *json.Encoder {
w.Header().Set("Content-Type", "application/json")
outStream := ioutils.NewWriteFlusher(w)
// Write an empty chunk of data.
// This is to ensure that the HTTP status code is sent immediately,
// so that it will not block the receiver.
outStream.Write(nil)
return json.NewEncoder(outStream)
}
func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil { if err := httputils.ParseForm(r); err != nil {
return err return err
@ -87,7 +77,19 @@ func (s *router) getEvents(ctx context.Context, w http.ResponseWriter, r *http.R
return err return err
} }
enc := buildOutputEncoder(w) w.Header().Set("Content-Type", "application/json")
// This is to ensure that the HTTP status code is sent immediately,
// so that it will not block the receiver.
w.WriteHeader(http.StatusOK)
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
output := ioutils.NewWriteFlusher(w)
defer output.Close()
enc := json.NewEncoder(output)
current, l, cancel := s.daemon.SubscribeToEvents() current, l, cancel := s.daemon.SubscribeToEvents()
defer cancel() defer cancel()

View file

@ -1,32 +1,54 @@
package ioutils package ioutils
import ( import (
"errors"
"io" "io"
"net/http" "net/http"
"sync" "sync"
) )
// WriteFlusher wraps the Write and Flush operation. // WriteFlusher wraps the Write and Flush operation ensuring that every write
// is a flush. In addition, the Close method can be called to intercept
// Read/Write calls if the targets lifecycle has already ended.
type WriteFlusher struct { type WriteFlusher struct {
sync.Mutex mu sync.Mutex
w io.Writer w io.Writer
flusher http.Flusher flusher http.Flusher
flushed bool flushed bool
closed error
// TODO(stevvooe): Use channel for closed instead, remove mutex. Using a
// channel will allow one to properly order the operations.
} }
var errWriteFlusherClosed = errors.New("writeflusher: closed")
func (wf *WriteFlusher) Write(b []byte) (n int, err error) { func (wf *WriteFlusher) Write(b []byte) (n int, err error) {
wf.Lock() wf.mu.Lock()
defer wf.Unlock() defer wf.mu.Unlock()
if wf.closed != nil {
return 0, wf.closed
}
n, err = wf.w.Write(b) n, err = wf.w.Write(b)
wf.flushed = true wf.flush() // every write is a flush.
wf.flusher.Flush()
return n, err return n, err
} }
// Flush the stream immediately. // Flush the stream immediately.
func (wf *WriteFlusher) Flush() { func (wf *WriteFlusher) Flush() {
wf.Lock() wf.mu.Lock()
defer wf.Unlock() defer wf.mu.Unlock()
wf.flush()
}
// flush the stream immediately without taking a lock. Used internally.
func (wf *WriteFlusher) flush() {
if wf.closed != nil {
return
}
wf.flushed = true wf.flushed = true
wf.flusher.Flush() wf.flusher.Flush()
} }
@ -34,11 +56,30 @@ func (wf *WriteFlusher) Flush() {
// Flushed returns the state of flushed. // Flushed returns the state of flushed.
// If it's flushed, return true, or else it return false. // If it's flushed, return true, or else it return false.
func (wf *WriteFlusher) Flushed() bool { func (wf *WriteFlusher) Flushed() bool {
wf.Lock() // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to
defer wf.Unlock() // be used to detect whether or a response code has been issued or not.
// Another hook should be used instead.
wf.mu.Lock()
defer wf.mu.Unlock()
return wf.flushed return wf.flushed
} }
// Close closes the write flusher, disallowing any further writes to the
// target. After the flusher is closed, all calls to write or flush will
// result in an error.
func (wf *WriteFlusher) Close() error {
wf.mu.Lock()
defer wf.mu.Unlock()
if wf.closed != nil {
return wf.closed
}
wf.closed = errWriteFlusherClosed
return nil
}
// NewWriteFlusher returns a new WriteFlusher. // NewWriteFlusher returns a new WriteFlusher.
func NewWriteFlusher(w io.Writer) *WriteFlusher { func NewWriteFlusher(w io.Writer) *WriteFlusher {
var flusher http.Flusher var flusher http.Flusher