diff --git a/daemon/logger/jsonfilelog/read.go b/daemon/logger/jsonfilelog/read.go index cc4649903a..635f74a4bc 100644 --- a/daemon/logger/jsonfilelog/read.go +++ b/daemon/logger/jsonfilelog/read.go @@ -61,9 +61,10 @@ func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, erro } type decoder struct { - rdr io.Reader - dec *json.Decoder - jl *jsonlog.JSONLog + rdr io.Reader + dec *json.Decoder + jl *jsonlog.JSONLog + maxRetry int } func (d *decoder) Reset(rdr io.Reader) { @@ -87,7 +88,11 @@ func (d *decoder) Decode() (msg *logger.Message, err error) { if d.jl == nil { d.jl = &jsonlog.JSONLog{} } - for retries := 0; retries < maxJSONDecodeRetry; retries++ { + if d.maxRetry == 0 { + // We aren't using maxJSONDecodeRetry directly so we can give a custom value for testing. + d.maxRetry = maxJSONDecodeRetry + } + for retries := 0; retries < d.maxRetry; retries++ { msg, err = decodeLogLine(d.dec, d.jl) if err == nil || err == io.EOF { break @@ -105,8 +110,7 @@ func (d *decoder) Decode() (msg *logger.Message, err error) { // If the json logger writes a partial json log entry to the disk // while at the same time the decoder tries to decode it, the race condition happens. if err == io.ErrUnexpectedEOF { - d.rdr = io.MultiReader(d.dec.Buffered(), d.rdr) - d.dec = json.NewDecoder(d.rdr) + d.dec = json.NewDecoder(io.MultiReader(d.dec.Buffered(), d.rdr)) continue } } diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index 19536a1ff9..1f0a295f63 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -2,6 +2,7 @@ package jsonfilelog // import "github.com/docker/docker/daemon/logger/jsonfilelo import ( "bytes" + "encoding/json" "io" "testing" "time" @@ -93,3 +94,61 @@ func TestEncodeDecode(t *testing.T) { _, err = dec.Decode() assert.Assert(t, err == io.EOF) } + +func TestUnexpectedEOF(t *testing.T) { + buf := bytes.NewBuffer(nil) + msg1 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello1")} + msg2 := &logger.Message{Timestamp: time.Now(), Line: []byte("hello2")} + + err := marshalMessage(msg1, json.RawMessage{}, buf) + assert.NilError(t, err) + err = marshalMessage(msg2, json.RawMessage{}, buf) + assert.NilError(t, err) + + r := &readerWithErr{ + err: io.EOF, + after: buf.Len() / 4, + r: buf, + } + dec := &decoder{rdr: r, maxRetry: 1} + + _, err = dec.Decode() + assert.Error(t, err, io.ErrUnexpectedEOF.Error()) + // again just to check + _, err = dec.Decode() + assert.Error(t, err, io.ErrUnexpectedEOF.Error()) + + // reset the error + // from here all reads should succeed until we get EOF on the underlying reader + r.err = nil + + msg, err := dec.Decode() + assert.NilError(t, err) + assert.Equal(t, string(msg1.Line)+"\n", string(msg.Line)) + + msg, err = dec.Decode() + assert.NilError(t, err) + assert.Equal(t, string(msg2.Line)+"\n", string(msg.Line)) + + _, err = dec.Decode() + assert.Error(t, err, io.EOF.Error()) +} + +type readerWithErr struct { + err error + after int + r io.Reader + read int +} + +func (r *readerWithErr) Read(p []byte) (int, error) { + if r.err != nil && r.read > r.after { + return 0, r.err + } + + n, err := r.r.Read(p[:1]) + if n > 0 { + r.read += n + } + return n, err +}