diff --git a/vendor.conf b/vendor.conf index 9a86d52ea2..7d0440375c 100644 --- a/vendor.conf +++ b/vendor.conf @@ -74,7 +74,7 @@ github.com/opencontainers/go-digest v1.0.0-rc1 github.com/mistifyio/go-zfs 22c9b32c84eb0d0c6f4043b6e90fc94073de92fa github.com/pborman/uuid v1.0 -google.golang.org/grpc v1.12.0 +google.golang.org/grpc 7a6a684ca69eb4cae85ad0a484f2e531598c047b # v1.12.2 # The version of runc should match the version that is used by the containerd # version that is used. If you need to update runc, open a pull request in diff --git a/vendor/google.golang.org/grpc/rpc_util.go b/vendor/google.golang.org/grpc/rpc_util.go index 69ef3c0b59..5de1b031ec 100644 --- a/vendor/google.golang.org/grpc/rpc_util.go +++ b/vendor/google.golang.org/grpc/rpc_util.go @@ -722,6 +722,6 @@ const ( ) // Version is the current grpc version. -const Version = "1.12.0" +const Version = "1.12.2" const grpcUA = "grpc-go/" + Version diff --git a/vendor/google.golang.org/grpc/transport/http2_server.go b/vendor/google.golang.org/grpc/transport/http2_server.go index 8b93e222e9..ab35618905 100644 --- a/vendor/google.golang.org/grpc/transport/http2_server.go +++ b/vendor/google.golang.org/grpc/transport/http2_server.go @@ -682,28 +682,7 @@ func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) { }) } -// WriteHeader sends the header metedata md back to the client. -func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { - if s.headerOk || s.getState() == streamDone { - return ErrIllegalHeaderWrite - } - s.headerOk = true - if md.Len() > 0 { - if s.header.Len() > 0 { - s.header = metadata.Join(s.header, md) - } else { - s.header = md - } - } - md = s.header - // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields - // first and create a slice of that exact size. - headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. - headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) - if s.sendCompress != "" { - headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) - } +func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField { for k, vv := range md { if isReservedHeader(k) { // Clients don't tolerate reading restricted headers after some non restricted ones were sent. @@ -713,6 +692,37 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) } } + return headerFields +} + +// WriteHeader sends the header metedata md back to the client. +func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { + if s.updateHeaderSent() || s.getState() == streamDone { + return ErrIllegalHeaderWrite + } + s.hdrMu.Lock() + if md.Len() > 0 { + if s.header.Len() > 0 { + s.header = metadata.Join(s.header, md) + } else { + s.header = md + } + } + t.writeHeaderLocked(s) + s.hdrMu.Unlock() + return nil +} + +func (t *http2Server) writeHeaderLocked(s *Stream) { + // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields + // first and create a slice of that exact size. + headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else. + headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) + if s.sendCompress != "" { + headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress}) + } + headerFields = appendHeaderFieldsFromMD(headerFields, s.header) t.controlBuf.put(&headerFrame{ streamID: s.id, hf: headerFields, @@ -728,7 +738,6 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { outHeader := &stats.OutHeader{} t.stats.HandleRPC(s.Context(), outHeader) } - return nil } // WriteStatus sends stream status to the client and terminates the stream. @@ -736,21 +745,20 @@ func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error { // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early // OK is adopted. func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { - if !s.headerOk && s.header.Len() > 0 { - if err := t.WriteHeader(s, nil); err != nil { - return err - } - } else { - if s.getState() == streamDone { - return nil - } + if s.getState() == streamDone { + return nil } + s.hdrMu.Lock() // TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields // first and create a slice of that exact size. headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else. - if !s.headerOk { - headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) - headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) + if !s.updateHeaderSent() { // No headers have been sent. + if len(s.header) > 0 { // Send a separate header frame. + t.writeHeaderLocked(s) + } else { // Send a trailer only response. + headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"}) + headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: contentType(s.contentSubtype)}) + } } headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))}) headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())}) @@ -766,16 +774,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { } // Attach the trailer metadata. - for k, vv := range s.trailer { - // Clients don't tolerate reading restricted headers after some non restricted ones were sent. - if isReservedHeader(k) { - continue - } - for _, v := range vv { - headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)}) - } - } - trailer := &headerFrame{ + headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer) + trailingHeader := &headerFrame{ streamID: s.id, hf: headerFields, endStream: true, @@ -783,7 +783,8 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { atomic.StoreUint32(&t.resetPingStrikes, 1) }, } - t.closeStream(s, false, 0, trailer, true) + s.hdrMu.Unlock() + t.closeStream(s, false, 0, trailingHeader, true) if t.stats != nil { t.stats.HandleRPC(s.Context(), &stats.OutTrailer{}) } @@ -793,7 +794,7 @@ func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error { // Write converts the data into HTTP2 data frame and sends it out. Non-nil error // is returns if it fails (e.g., framing error, transport error). func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error { - if !s.headerOk { // Headers haven't been written yet. + if !s.isHeaderSent() { // Headers haven't been written yet. if err := t.WriteHeader(s, nil); err != nil { // TODO(mmukhi, dfawley): Make sure this is the right code to return. return streamErrorf(codes.Internal, "transport: %v", err) diff --git a/vendor/google.golang.org/grpc/transport/http_util.go b/vendor/google.golang.org/grpc/transport/http_util.go index a355866089..835c812694 100644 --- a/vendor/google.golang.org/grpc/transport/http_util.go +++ b/vendor/google.golang.org/grpc/transport/http_util.go @@ -531,10 +531,14 @@ func (w *bufWriter) Write(b []byte) (n int, err error) { if w.err != nil { return 0, w.err } - n = copy(w.buf[w.offset:], b) - w.offset += n - if w.offset >= w.batchSize { - err = w.Flush() + for len(b) > 0 { + nn := copy(w.buf[w.offset:], b) + b = b[nn:] + w.offset += nn + n += nn + if w.offset >= w.batchSize { + err = w.Flush() + } } return n, err } diff --git a/vendor/google.golang.org/grpc/transport/transport.go b/vendor/google.golang.org/grpc/transport/transport.go index 2f643a3d04..f51f878884 100644 --- a/vendor/google.golang.org/grpc/transport/transport.go +++ b/vendor/google.golang.org/grpc/transport/transport.go @@ -185,13 +185,20 @@ type Stream struct { headerChan chan struct{} // closed to indicate the end of header metadata. headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times. - header metadata.MD // the received header metadata. - trailer metadata.MD // the key-value map of trailer metadata. - headerOk bool // becomes true from the first header is about to send - state streamState + // hdrMu protects header and trailer metadata on the server-side. + hdrMu sync.Mutex + header metadata.MD // the received header metadata. + trailer metadata.MD // the key-value map of trailer metadata. - status *status.Status // the status error received from the server + // On the server-side, headerSent is atomically set to 1 when the headers are sent out. + headerSent uint32 + + state streamState + + // On client-side it is the status error received from the server. + // On server-side it is unused. + status *status.Status bytesReceived uint32 // indicates whether any bytes have been received on this stream unprocessed uint32 // set if the server sends a refused stream or GOAWAY including this stream @@ -201,6 +208,17 @@ type Stream struct { contentSubtype string } +// isHeaderSent is only valid on the server-side. +func (s *Stream) isHeaderSent() bool { + return atomic.LoadUint32(&s.headerSent) == 1 +} + +// updateHeaderSent updates headerSent and returns true +// if it was alreay set. It is valid only on server-side. +func (s *Stream) updateHeaderSent() bool { + return atomic.SwapUint32(&s.headerSent, 1) == 1 +} + func (s *Stream) swapState(st streamState) streamState { return streamState(atomic.SwapUint32((*uint32)(&s.state), uint32(st))) } @@ -313,10 +331,12 @@ func (s *Stream) SetHeader(md metadata.MD) error { if md.Len() == 0 { return nil } - if s.headerOk || atomic.LoadUint32((*uint32)(&s.state)) == uint32(streamDone) { + if s.isHeaderSent() || s.getState() == streamDone { return ErrIllegalHeaderWrite } + s.hdrMu.Lock() s.header = metadata.Join(s.header, md) + s.hdrMu.Unlock() return nil } @@ -335,7 +355,12 @@ func (s *Stream) SetTrailer(md metadata.MD) error { if md.Len() == 0 { return nil } + if s.getState() == streamDone { + return ErrIllegalHeaderWrite + } + s.hdrMu.Lock() s.trailer = metadata.Join(s.trailer, md) + s.hdrMu.Unlock() return nil }