mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #38990 from tiborvass/http-grpc
Add undocumented /grpc endpoint and register BuildKit's controller
This commit is contained in:
commit
7a337ec3c4
10 changed files with 131 additions and 20 deletions
|
@ -14,6 +14,7 @@ import (
|
||||||
"github.com/docker/docker/pkg/stringid"
|
"github.com/docker/docker/pkg/stringid"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"google.golang.org/grpc"
|
||||||
)
|
)
|
||||||
|
|
||||||
// ImageComponent provides an interface for working with images
|
// ImageComponent provides an interface for working with images
|
||||||
|
@ -40,6 +41,13 @@ func NewBackend(components ImageComponent, builder Builder, fsCache *fscache.FSC
|
||||||
return &Backend{imageComponent: components, builder: builder, fsCache: fsCache, buildkit: buildkit}, nil
|
return &Backend{imageComponent: components, builder: builder, fsCache: fsCache, buildkit: buildkit}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterGRPC registers buildkit controller to the grpc server.
|
||||||
|
func (b *Backend) RegisterGRPC(s *grpc.Server) {
|
||||||
|
if b.buildkit != nil {
|
||||||
|
b.buildkit.RegisterGRPC(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Build builds an image from a Source
|
// Build builds an image from a Source
|
||||||
func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string, error) {
|
func (b *Backend) Build(ctx context.Context, config backend.BuildConfig) (string, error) {
|
||||||
options := config.Options
|
options := config.Options
|
||||||
|
|
8
api/server/router/grpc/backend.go
Normal file
8
api/server/router/grpc/backend.go
Normal file
|
@ -0,0 +1,8 @@
|
||||||
|
package grpc // import "github.com/docker/docker/api/server/router/grpc"
|
||||||
|
|
||||||
|
import "google.golang.org/grpc"
|
||||||
|
|
||||||
|
// Backend abstracts a registerable GRPC service.
|
||||||
|
type Backend interface {
|
||||||
|
RegisterGRPC(*grpc.Server)
|
||||||
|
}
|
37
api/server/router/grpc/grpc.go
Normal file
37
api/server/router/grpc/grpc.go
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
package grpc // import "github.com/docker/docker/api/server/router/grpc"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"github.com/docker/docker/api/server/router"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
"google.golang.org/grpc"
|
||||||
|
)
|
||||||
|
|
||||||
|
type grpcRouter struct {
|
||||||
|
routes []router.Route
|
||||||
|
grpcServer *grpc.Server
|
||||||
|
h2Server *http2.Server
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRouter initializes a new grpc http router
|
||||||
|
func NewRouter(backends ...Backend) router.Router {
|
||||||
|
r := &grpcRouter{
|
||||||
|
h2Server: &http2.Server{},
|
||||||
|
grpcServer: grpc.NewServer(),
|
||||||
|
}
|
||||||
|
for _, b := range backends {
|
||||||
|
b.RegisterGRPC(r.grpcServer)
|
||||||
|
}
|
||||||
|
r.initRoutes()
|
||||||
|
return r
|
||||||
|
}
|
||||||
|
|
||||||
|
// Routes returns the available routers to the session controller
|
||||||
|
func (r *grpcRouter) Routes() []router.Route {
|
||||||
|
return r.routes
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *grpcRouter) initRoutes() {
|
||||||
|
r.routes = []router.Route{
|
||||||
|
router.NewPostRoute("/grpc", r.serveGRPC),
|
||||||
|
}
|
||||||
|
}
|
45
api/server/router/grpc/grpc_routes.go
Normal file
45
api/server/router/grpc/grpc_routes.go
Normal file
|
@ -0,0 +1,45 @@
|
||||||
|
package grpc // import "github.com/docker/docker/api/server/router/grpc"
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/pkg/errors"
|
||||||
|
"golang.org/x/net/http2"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (gr *grpcRouter) serveGRPC(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
|
||||||
|
h, ok := w.(http.Hijacker)
|
||||||
|
if !ok {
|
||||||
|
return errors.New("handler does not support hijack")
|
||||||
|
}
|
||||||
|
proto := r.Header.Get("Upgrade")
|
||||||
|
if proto == "" {
|
||||||
|
return errors.New("no upgrade proto in request")
|
||||||
|
}
|
||||||
|
if proto != "h2c" {
|
||||||
|
return errors.Errorf("protocol %s not supported", proto)
|
||||||
|
}
|
||||||
|
|
||||||
|
conn, _, err := h.Hijack()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
resp := &http.Response{
|
||||||
|
StatusCode: http.StatusSwitchingProtocols,
|
||||||
|
ProtoMajor: 1,
|
||||||
|
ProtoMinor: 1,
|
||||||
|
Header: http.Header{},
|
||||||
|
}
|
||||||
|
resp.Header.Set("Connection", "Upgrade")
|
||||||
|
resp.Header.Set("Upgrade", proto)
|
||||||
|
|
||||||
|
// set raw mode
|
||||||
|
conn.Write([]byte{})
|
||||||
|
resp.Write(conn)
|
||||||
|
|
||||||
|
// https://godoc.org/golang.org/x/net/http2#Server.ServeConn
|
||||||
|
// TODO: is it a problem that conn has already been written to?
|
||||||
|
gr.h2Server.ServeConn(conn, &http2.ServeConnOpts{Handler: gr.grpcServer})
|
||||||
|
return nil
|
||||||
|
}
|
|
@ -31,6 +31,7 @@ import (
|
||||||
"github.com/moby/buildkit/util/tracing"
|
"github.com/moby/buildkit/util/tracing"
|
||||||
"github.com/pkg/errors"
|
"github.com/pkg/errors"
|
||||||
"golang.org/x/sync/errgroup"
|
"golang.org/x/sync/errgroup"
|
||||||
|
"google.golang.org/grpc"
|
||||||
grpcmetadata "google.golang.org/grpc/metadata"
|
grpcmetadata "google.golang.org/grpc/metadata"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -104,6 +105,11 @@ func New(opt Opt) (*Builder, error) {
|
||||||
return b, nil
|
return b, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RegisterGRPC registers controller to the grpc server.
|
||||||
|
func (b *Builder) RegisterGRPC(s *grpc.Server) {
|
||||||
|
b.controller.Register(s)
|
||||||
|
}
|
||||||
|
|
||||||
// Cancel cancels a build using ID
|
// Cancel cancels a build using ID
|
||||||
func (b *Builder) Cancel(ctx context.Context, id string) error {
|
func (b *Builder) Cancel(ctx context.Context, id string) error {
|
||||||
b.mu.Lock()
|
b.mu.Lock()
|
||||||
|
|
|
@ -38,6 +38,17 @@ func (cli *Client) postHijacked(ctx context.Context, path string, query url.Valu
|
||||||
return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
|
return types.HijackedResponse{Conn: conn, Reader: bufio.NewReader(conn)}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DialHijack returns a hijacked connection with negotiated protocol proto.
|
||||||
|
func (cli *Client) DialHijack(ctx context.Context, url, proto string, meta map[string][]string) (net.Conn, error) {
|
||||||
|
req, err := http.NewRequest("POST", url, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req = cli.addHeaders(req, meta)
|
||||||
|
|
||||||
|
return cli.setupHijackConn(ctx, req, proto)
|
||||||
|
}
|
||||||
|
|
||||||
// fallbackDial is used when WithDialer() was not called.
|
// fallbackDial is used when WithDialer() was not called.
|
||||||
// See cli.Dialer().
|
// See cli.Dialer().
|
||||||
func fallbackDial(proto, addr string, tlsConfig *tls.Config) (net.Conn, error) {
|
func fallbackDial(proto, addr string, tlsConfig *tls.Config) (net.Conn, error) {
|
||||||
|
|
|
@ -38,7 +38,7 @@ type CommonAPIClient interface {
|
||||||
ServerVersion(ctx context.Context) (types.Version, error)
|
ServerVersion(ctx context.Context) (types.Version, error)
|
||||||
NegotiateAPIVersion(ctx context.Context)
|
NegotiateAPIVersion(ctx context.Context)
|
||||||
NegotiateAPIVersionPing(types.Ping)
|
NegotiateAPIVersionPing(types.Ping)
|
||||||
DialSession(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
|
DialHijack(ctx context.Context, url, proto string, meta map[string][]string) (net.Conn, error)
|
||||||
Dialer() func(context.Context) (net.Conn, error)
|
Dialer() func(context.Context) (net.Conn, error)
|
||||||
Close() error
|
Close() error
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,18 +0,0 @@
|
||||||
package client // import "github.com/docker/docker/client"
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"net"
|
|
||||||
"net/http"
|
|
||||||
)
|
|
||||||
|
|
||||||
// DialSession returns a connection that can be used communication with daemon
|
|
||||||
func (cli *Client) DialSession(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
|
|
||||||
req, err := http.NewRequest("POST", "/session", nil)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
req = cli.addHeaders(req, meta)
|
|
||||||
|
|
||||||
return cli.setupHijackConn(ctx, req, proto)
|
|
||||||
}
|
|
|
@ -21,6 +21,7 @@ import (
|
||||||
checkpointrouter "github.com/docker/docker/api/server/router/checkpoint"
|
checkpointrouter "github.com/docker/docker/api/server/router/checkpoint"
|
||||||
"github.com/docker/docker/api/server/router/container"
|
"github.com/docker/docker/api/server/router/container"
|
||||||
distributionrouter "github.com/docker/docker/api/server/router/distribution"
|
distributionrouter "github.com/docker/docker/api/server/router/distribution"
|
||||||
|
grpcrouter "github.com/docker/docker/api/server/router/grpc"
|
||||||
"github.com/docker/docker/api/server/router/image"
|
"github.com/docker/docker/api/server/router/image"
|
||||||
"github.com/docker/docker/api/server/router/network"
|
"github.com/docker/docker/api/server/router/network"
|
||||||
pluginrouter "github.com/docker/docker/api/server/router/plugin"
|
pluginrouter "github.com/docker/docker/api/server/router/plugin"
|
||||||
|
@ -481,6 +482,16 @@ func initRouter(opts routerOptions) {
|
||||||
distributionrouter.NewRouter(opts.daemon.ImageService()),
|
distributionrouter.NewRouter(opts.daemon.ImageService()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
grpcBackends := []grpcrouter.Backend{}
|
||||||
|
for _, b := range []interface{}{opts.daemon, opts.buildBackend} {
|
||||||
|
if b, ok := b.(grpcrouter.Backend); ok {
|
||||||
|
grpcBackends = append(grpcBackends, b)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if len(grpcBackends) > 0 {
|
||||||
|
routers = append(routers, grpcrouter.NewRouter(grpcBackends...))
|
||||||
|
}
|
||||||
|
|
||||||
if opts.daemon.NetworkControllerEnabled() {
|
if opts.daemon.NetworkControllerEnabled() {
|
||||||
routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))
|
routers = append(routers, network.NewRouter(opts.daemon, opts.cluster))
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,6 +3,7 @@ package build
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"testing"
|
"testing"
|
||||||
|
@ -109,7 +110,9 @@ func testBuildWithSession(t *testing.T, client dclient.APIClient, daemonHost str
|
||||||
g, ctx := errgroup.WithContext(ctx)
|
g, ctx := errgroup.WithContext(ctx)
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
return sess.Run(ctx, client.DialSession)
|
return sess.Run(ctx, func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
|
||||||
|
return client.DialHijack(ctx, "/session", "h2c", meta)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
g.Go(func() error {
|
g.Go(func() error {
|
||||||
|
|
Loading…
Reference in a new issue