From 41445a4745451b99d1c361c6c87447da790cb01b Mon Sep 17 00:00:00 2001 From: Tibor Vass Date: Fri, 28 Jul 2017 16:04:34 -0700 Subject: [PATCH] Remove client/session package, depend on buildkit's session package gofmt -w -r '"github.com/docker/docker/client/session" -> "github.com/moby/buildkit/session"' gofmt -w -r '"github.com/docker/docker/client/session/filesync" -> "github.com/moby/buildkit/session/filesync"' Signed-off-by: Tibor Vass --- builder/dockerfile/builder.go | 2 +- builder/dockerfile/clientsession.go | 4 +- builder/fscache/fscache.go | 2 +- builder/fscache/fscache_test.go | 2 +- client/session/filesync/diffcopy.go | 31 -- client/session/filesync/filesync.go | 183 -------- client/session/filesync/filesync.pb.go | 575 ----------------------- client/session/filesync/filesync.proto | 15 - client/session/filesync/filesync_test.go | 71 --- client/session/filesync/generate.go | 3 - client/session/filesync/tarstream.go | 83 ---- client/session/grpc.go | 62 --- client/session/manager.go | 202 -------- client/session/session.go | 117 ----- client/session/testutil/testutil.go | 70 --- cmd/dockerd/daemon.go | 2 +- integration-cli/docker_api_build_test.go | 4 +- 17 files changed, 8 insertions(+), 1420 deletions(-) delete mode 100644 client/session/filesync/diffcopy.go delete mode 100644 client/session/filesync/filesync.go delete mode 100644 client/session/filesync/filesync.pb.go delete mode 100644 client/session/filesync/filesync.proto delete mode 100644 client/session/filesync/filesync_test.go delete mode 100644 client/session/filesync/generate.go delete mode 100644 client/session/filesync/tarstream.go delete mode 100644 client/session/grpc.go delete mode 100644 client/session/manager.go delete mode 100644 client/session/session.go delete mode 100644 client/session/testutil/testutil.go diff --git a/builder/dockerfile/builder.go b/builder/dockerfile/builder.go index fb1786225a..c47840777e 100644 --- a/builder/dockerfile/builder.go +++ b/builder/dockerfile/builder.go @@ -18,13 +18,13 @@ import ( "github.com/docker/docker/builder/dockerfile/parser" "github.com/docker/docker/builder/fscache" "github.com/docker/docker/builder/remotecontext" - "github.com/docker/docker/client/session" "github.com/docker/docker/pkg/archive" "github.com/docker/docker/pkg/chrootarchive" "github.com/docker/docker/pkg/idtools" "github.com/docker/docker/pkg/streamformatter" "github.com/docker/docker/pkg/stringid" "github.com/docker/docker/pkg/system" + "github.com/moby/buildkit/session" "github.com/pkg/errors" "golang.org/x/net/context" "golang.org/x/sync/syncmap" diff --git a/builder/dockerfile/clientsession.go b/builder/dockerfile/clientsession.go index a7709ce517..9a5411685a 100644 --- a/builder/dockerfile/clientsession.go +++ b/builder/dockerfile/clientsession.go @@ -5,8 +5,8 @@ import ( "github.com/docker/docker/builder/fscache" "github.com/docker/docker/builder/remotecontext" - "github.com/docker/docker/client/session" - "github.com/docker/docker/client/session/filesync" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/session/filesync" "github.com/pkg/errors" "golang.org/x/net/context" ) diff --git a/builder/fscache/fscache.go b/builder/fscache/fscache.go index 63331091a6..eba65b8eb7 100644 --- a/builder/fscache/fscache.go +++ b/builder/fscache/fscache.go @@ -12,9 +12,9 @@ import ( "github.com/boltdb/bolt" "github.com/docker/docker/builder" "github.com/docker/docker/builder/remotecontext" - "github.com/docker/docker/client/session/filesync" "github.com/docker/docker/pkg/directory" "github.com/docker/docker/pkg/stringid" + "github.com/moby/buildkit/session/filesync" "github.com/pkg/errors" "github.com/tonistiigi/fsutil" "golang.org/x/net/context" diff --git a/builder/fscache/fscache_test.go b/builder/fscache/fscache_test.go index 2532a218c8..3f6a1b02af 100644 --- a/builder/fscache/fscache_test.go +++ b/builder/fscache/fscache_test.go @@ -7,7 +7,7 @@ import ( "testing" "time" - "github.com/docker/docker/client/session/filesync" + "github.com/moby/buildkit/session/filesync" "github.com/stretchr/testify/assert" "golang.org/x/net/context" ) diff --git a/client/session/filesync/diffcopy.go b/client/session/filesync/diffcopy.go deleted file mode 100644 index 533847acdf..0000000000 --- a/client/session/filesync/diffcopy.go +++ /dev/null @@ -1,31 +0,0 @@ -package filesync - -import ( - "time" - - "google.golang.org/grpc" - - "github.com/Sirupsen/logrus" - "github.com/tonistiigi/fsutil" -) - -func sendDiffCopy(stream grpc.Stream, dir string, includes, excludes []string, progress progressCb) error { - return fsutil.Send(stream.Context(), stream, dir, &fsutil.WalkOpt{ - ExcludePatterns: excludes, - IncludePaths: includes, // TODO: rename IncludePatterns - }, progress) -} - -func recvDiffCopy(ds grpc.Stream, dest string, cu CacheUpdater) error { - st := time.Now() - defer func() { - logrus.Debugf("diffcopy took: %v", time.Since(st)) - }() - var cf fsutil.ChangeFunc - if cu != nil { - cu.MarkSupported(true) - cf = cu.HandleChange - } - - return fsutil.Receive(ds.Context(), ds, dest, cf) -} diff --git a/client/session/filesync/filesync.go b/client/session/filesync/filesync.go deleted file mode 100644 index 9a2ffc8578..0000000000 --- a/client/session/filesync/filesync.go +++ /dev/null @@ -1,183 +0,0 @@ -package filesync - -import ( - "os" - "strings" - - "github.com/docker/docker/client/session" - "github.com/pkg/errors" - "github.com/tonistiigi/fsutil" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/metadata" -) - -const ( - keyOverrideExcludes = "override-excludes" - keyIncludePatterns = "include-patterns" -) - -type fsSyncProvider struct { - root string - excludes []string - p progressCb - doneCh chan error -} - -// NewFSSyncProvider creates a new provider for sending files from client -func NewFSSyncProvider(root string, excludes []string) session.Attachable { - p := &fsSyncProvider{ - root: root, - excludes: excludes, - } - return p -} - -func (sp *fsSyncProvider) Register(server *grpc.Server) { - RegisterFileSyncServer(server, sp) -} - -func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error { - return sp.handle("diffcopy", stream) -} -func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error { - return sp.handle("tarstream", stream) -} - -func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error { - var pr *protocol - for _, p := range supportedProtocols { - if method == p.name && isProtoSupported(p.name) { - pr = &p - break - } - } - if pr == nil { - return errors.New("failed to negotiate protocol") - } - - opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object - - var excludes []string - if len(opts[keyOverrideExcludes]) == 0 || opts[keyOverrideExcludes][0] != "true" { - excludes = sp.excludes - } - includes := opts[keyIncludePatterns] - - var progress progressCb - if sp.p != nil { - progress = sp.p - sp.p = nil - } - - var doneCh chan error - if sp.doneCh != nil { - doneCh = sp.doneCh - sp.doneCh = nil - } - err := pr.sendFn(stream, sp.root, includes, excludes, progress) - if doneCh != nil { - if err != nil { - doneCh <- err - } - close(doneCh) - } - return err -} - -func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) { - sp.p = f - sp.doneCh = doneCh -} - -type progressCb func(int, bool) - -type protocol struct { - name string - sendFn func(stream grpc.Stream, srcDir string, includes, excludes []string, progress progressCb) error - recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error -} - -func isProtoSupported(p string) bool { - // TODO: this should be removed after testing if stability is confirmed - if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" { - return strings.EqualFold(p, override) - } - return true -} - -var supportedProtocols = []protocol{ - { - name: "diffcopy", - sendFn: sendDiffCopy, - recvFn: recvDiffCopy, - }, - { - name: "tarstream", - sendFn: sendTarStream, - recvFn: recvTarStream, - }, -} - -// FSSendRequestOpt defines options for FSSend request -type FSSendRequestOpt struct { - IncludePatterns []string - OverrideExcludes bool - DestDir string - CacheUpdater CacheUpdater -} - -// CacheUpdater is an object capable of sending notifications for the cache hash changes -type CacheUpdater interface { - MarkSupported(bool) - HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error -} - -// FSSync initializes a transfer of files -func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error { - var pr *protocol - for _, p := range supportedProtocols { - if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) { - pr = &p - break - } - } - if pr == nil { - return errors.New("no fssync handlers") - } - - opts := make(map[string][]string) - if opt.OverrideExcludes { - opts[keyOverrideExcludes] = []string{"true"} - } - - if opt.IncludePatterns != nil { - opts[keyIncludePatterns] = opt.IncludePatterns - } - - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - client := NewFileSyncClient(c.Conn()) - - var stream grpc.ClientStream - - ctx = metadata.NewContext(ctx, opts) - - switch pr.name { - case "tarstream": - cc, err := client.TarStream(ctx) - if err != nil { - return err - } - stream = cc - case "diffcopy": - cc, err := client.DiffCopy(ctx) - if err != nil { - return err - } - stream = cc - } - - return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater) -} diff --git a/client/session/filesync/filesync.pb.go b/client/session/filesync/filesync.pb.go deleted file mode 100644 index c6ed666383..0000000000 --- a/client/session/filesync/filesync.pb.go +++ /dev/null @@ -1,575 +0,0 @@ -// Code generated by protoc-gen-gogo. -// source: filesync.proto -// DO NOT EDIT! - -/* -Package filesync is a generated protocol buffer package. - -It is generated from these files: - filesync.proto - -It has these top-level messages: - BytesMessage -*/ -package filesync - -import proto "github.com/gogo/protobuf/proto" -import fmt "fmt" -import math "math" - -import bytes "bytes" - -import strings "strings" -import reflect "reflect" - -import ( - context "golang.org/x/net/context" - grpc "google.golang.org/grpc" -) - -import io "io" - -// Reference imports to suppress errors if they are not otherwise used. -var _ = proto.Marshal -var _ = fmt.Errorf -var _ = math.Inf - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the proto package it is being compiled against. -// A compilation error at this line likely means your copy of the -// proto package needs to be updated. -const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package - -// BytesMessage contains a chunk of byte data -type BytesMessage struct { - Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` -} - -func (m *BytesMessage) Reset() { *m = BytesMessage{} } -func (*BytesMessage) ProtoMessage() {} -func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorFilesync, []int{0} } - -func (m *BytesMessage) GetData() []byte { - if m != nil { - return m.Data - } - return nil -} - -func init() { - proto.RegisterType((*BytesMessage)(nil), "moby.filesync.v1.BytesMessage") -} -func (this *BytesMessage) Equal(that interface{}) bool { - if that == nil { - if this == nil { - return true - } - return false - } - - that1, ok := that.(*BytesMessage) - if !ok { - that2, ok := that.(BytesMessage) - if ok { - that1 = &that2 - } else { - return false - } - } - if that1 == nil { - if this == nil { - return true - } - return false - } else if this == nil { - return false - } - if !bytes.Equal(this.Data, that1.Data) { - return false - } - return true -} -func (this *BytesMessage) GoString() string { - if this == nil { - return "nil" - } - s := make([]string, 0, 5) - s = append(s, "&filesync.BytesMessage{") - s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") - s = append(s, "}") - return strings.Join(s, "") -} -func valueToGoStringFilesync(v interface{}, typ string) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) -} - -// Reference imports to suppress errors if they are not otherwise used. -var _ context.Context -var _ grpc.ClientConn - -// This is a compile-time assertion to ensure that this generated file -// is compatible with the grpc package it is being compiled against. -const _ = grpc.SupportPackageIsVersion4 - -// Client API for FileSync service - -type FileSyncClient interface { - DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) - TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) -} - -type fileSyncClient struct { - cc *grpc.ClientConn -} - -func NewFileSyncClient(cc *grpc.ClientConn) FileSyncClient { - return &fileSyncClient{cc} -} - -func (c *fileSyncClient) DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) { - stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[0], c.cc, "/moby.filesync.v1.FileSync/DiffCopy", opts...) - if err != nil { - return nil, err - } - x := &fileSyncDiffCopyClient{stream} - return x, nil -} - -type FileSync_DiffCopyClient interface { - Send(*BytesMessage) error - Recv() (*BytesMessage, error) - grpc.ClientStream -} - -type fileSyncDiffCopyClient struct { - grpc.ClientStream -} - -func (x *fileSyncDiffCopyClient) Send(m *BytesMessage) error { - return x.ClientStream.SendMsg(m) -} - -func (x *fileSyncDiffCopyClient) Recv() (*BytesMessage, error) { - m := new(BytesMessage) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func (c *fileSyncClient) TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) { - stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[1], c.cc, "/moby.filesync.v1.FileSync/TarStream", opts...) - if err != nil { - return nil, err - } - x := &fileSyncTarStreamClient{stream} - return x, nil -} - -type FileSync_TarStreamClient interface { - Send(*BytesMessage) error - Recv() (*BytesMessage, error) - grpc.ClientStream -} - -type fileSyncTarStreamClient struct { - grpc.ClientStream -} - -func (x *fileSyncTarStreamClient) Send(m *BytesMessage) error { - return x.ClientStream.SendMsg(m) -} - -func (x *fileSyncTarStreamClient) Recv() (*BytesMessage, error) { - m := new(BytesMessage) - if err := x.ClientStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -// Server API for FileSync service - -type FileSyncServer interface { - DiffCopy(FileSync_DiffCopyServer) error - TarStream(FileSync_TarStreamServer) error -} - -func RegisterFileSyncServer(s *grpc.Server, srv FileSyncServer) { - s.RegisterService(&_FileSync_serviceDesc, srv) -} - -func _FileSync_DiffCopy_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(FileSyncServer).DiffCopy(&fileSyncDiffCopyServer{stream}) -} - -type FileSync_DiffCopyServer interface { - Send(*BytesMessage) error - Recv() (*BytesMessage, error) - grpc.ServerStream -} - -type fileSyncDiffCopyServer struct { - grpc.ServerStream -} - -func (x *fileSyncDiffCopyServer) Send(m *BytesMessage) error { - return x.ServerStream.SendMsg(m) -} - -func (x *fileSyncDiffCopyServer) Recv() (*BytesMessage, error) { - m := new(BytesMessage) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -func _FileSync_TarStream_Handler(srv interface{}, stream grpc.ServerStream) error { - return srv.(FileSyncServer).TarStream(&fileSyncTarStreamServer{stream}) -} - -type FileSync_TarStreamServer interface { - Send(*BytesMessage) error - Recv() (*BytesMessage, error) - grpc.ServerStream -} - -type fileSyncTarStreamServer struct { - grpc.ServerStream -} - -func (x *fileSyncTarStreamServer) Send(m *BytesMessage) error { - return x.ServerStream.SendMsg(m) -} - -func (x *fileSyncTarStreamServer) Recv() (*BytesMessage, error) { - m := new(BytesMessage) - if err := x.ServerStream.RecvMsg(m); err != nil { - return nil, err - } - return m, nil -} - -var _FileSync_serviceDesc = grpc.ServiceDesc{ - ServiceName: "moby.filesync.v1.FileSync", - HandlerType: (*FileSyncServer)(nil), - Methods: []grpc.MethodDesc{}, - Streams: []grpc.StreamDesc{ - { - StreamName: "DiffCopy", - Handler: _FileSync_DiffCopy_Handler, - ServerStreams: true, - ClientStreams: true, - }, - { - StreamName: "TarStream", - Handler: _FileSync_TarStream_Handler, - ServerStreams: true, - ClientStreams: true, - }, - }, - Metadata: "filesync.proto", -} - -func (m *BytesMessage) Marshal() (dAtA []byte, err error) { - size := m.Size() - dAtA = make([]byte, size) - n, err := m.MarshalTo(dAtA) - if err != nil { - return nil, err - } - return dAtA[:n], nil -} - -func (m *BytesMessage) MarshalTo(dAtA []byte) (int, error) { - var i int - _ = i - var l int - _ = l - if len(m.Data) > 0 { - dAtA[i] = 0xa - i++ - i = encodeVarintFilesync(dAtA, i, uint64(len(m.Data))) - i += copy(dAtA[i:], m.Data) - } - return i, nil -} - -func encodeFixed64Filesync(dAtA []byte, offset int, v uint64) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - dAtA[offset+4] = uint8(v >> 32) - dAtA[offset+5] = uint8(v >> 40) - dAtA[offset+6] = uint8(v >> 48) - dAtA[offset+7] = uint8(v >> 56) - return offset + 8 -} -func encodeFixed32Filesync(dAtA []byte, offset int, v uint32) int { - dAtA[offset] = uint8(v) - dAtA[offset+1] = uint8(v >> 8) - dAtA[offset+2] = uint8(v >> 16) - dAtA[offset+3] = uint8(v >> 24) - return offset + 4 -} -func encodeVarintFilesync(dAtA []byte, offset int, v uint64) int { - for v >= 1<<7 { - dAtA[offset] = uint8(v&0x7f | 0x80) - v >>= 7 - offset++ - } - dAtA[offset] = uint8(v) - return offset + 1 -} -func (m *BytesMessage) Size() (n int) { - var l int - _ = l - l = len(m.Data) - if l > 0 { - n += 1 + l + sovFilesync(uint64(l)) - } - return n -} - -func sovFilesync(x uint64) (n int) { - for { - n++ - x >>= 7 - if x == 0 { - break - } - } - return n -} -func sozFilesync(x uint64) (n int) { - return sovFilesync(uint64((x << 1) ^ uint64((int64(x) >> 63)))) -} -func (this *BytesMessage) String() string { - if this == nil { - return "nil" - } - s := strings.Join([]string{`&BytesMessage{`, - `Data:` + fmt.Sprintf("%v", this.Data) + `,`, - `}`, - }, "") - return s -} -func valueToStringFilesync(v interface{}) string { - rv := reflect.ValueOf(v) - if rv.IsNil() { - return "nil" - } - pv := reflect.Indirect(rv).Interface() - return fmt.Sprintf("*%v", pv) -} -func (m *BytesMessage) Unmarshal(dAtA []byte) error { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - preIndex := iNdEx - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowFilesync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - fieldNum := int32(wire >> 3) - wireType := int(wire & 0x7) - if wireType == 4 { - return fmt.Errorf("proto: BytesMessage: wiretype end group for non-group") - } - if fieldNum <= 0 { - return fmt.Errorf("proto: BytesMessage: illegal tag %d (wire type %d)", fieldNum, wire) - } - switch fieldNum { - case 1: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) - } - var byteLen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowFilesync - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - byteLen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if byteLen < 0 { - return ErrInvalidLengthFilesync - } - postIndex := iNdEx + byteLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) - if m.Data == nil { - m.Data = []byte{} - } - iNdEx = postIndex - default: - iNdEx = preIndex - skippy, err := skipFilesync(dAtA[iNdEx:]) - if err != nil { - return err - } - if skippy < 0 { - return ErrInvalidLengthFilesync - } - if (iNdEx + skippy) > l { - return io.ErrUnexpectedEOF - } - iNdEx += skippy - } - } - - if iNdEx > l { - return io.ErrUnexpectedEOF - } - return nil -} -func skipFilesync(dAtA []byte) (n int, err error) { - l := len(dAtA) - iNdEx := 0 - for iNdEx < l { - var wire uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowFilesync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - wire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - wireType := int(wire & 0x7) - switch wireType { - case 0: - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowFilesync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - iNdEx++ - if dAtA[iNdEx-1] < 0x80 { - break - } - } - return iNdEx, nil - case 1: - iNdEx += 8 - return iNdEx, nil - case 2: - var length int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowFilesync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - length |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - iNdEx += length - if length < 0 { - return 0, ErrInvalidLengthFilesync - } - return iNdEx, nil - case 3: - for { - var innerWire uint64 - var start int = iNdEx - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return 0, ErrIntOverflowFilesync - } - if iNdEx >= l { - return 0, io.ErrUnexpectedEOF - } - b := dAtA[iNdEx] - iNdEx++ - innerWire |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - innerWireType := int(innerWire & 0x7) - if innerWireType == 4 { - break - } - next, err := skipFilesync(dAtA[start:]) - if err != nil { - return 0, err - } - iNdEx = start + next - } - return iNdEx, nil - case 4: - return iNdEx, nil - case 5: - iNdEx += 4 - return iNdEx, nil - default: - return 0, fmt.Errorf("proto: illegal wireType %d", wireType) - } - } - panic("unreachable") -} - -var ( - ErrInvalidLengthFilesync = fmt.Errorf("proto: negative length found during unmarshaling") - ErrIntOverflowFilesync = fmt.Errorf("proto: integer overflow") -) - -func init() { proto.RegisterFile("filesync.proto", fileDescriptorFilesync) } - -var fileDescriptorFilesync = []byte{ - // 198 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xcb, 0xcc, 0x49, - 0x2d, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xc8, 0xcd, 0x4f, 0xaa, - 0xd4, 0x83, 0x0b, 0x96, 0x19, 0x2a, 0x29, 0x71, 0xf1, 0x38, 0x55, 0x96, 0xa4, 0x16, 0xfb, 0xa6, - 0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a, - 0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x46, 0xab, 0x19, 0xb9, 0x38, 0xdc, 0x32, 0x73, 0x52, 0x83, - 0x2b, 0xf3, 0x92, 0x85, 0xfc, 0xb8, 0x38, 0x5c, 0x32, 0xd3, 0xd2, 0x9c, 0xf3, 0x0b, 0x2a, 0x85, - 0xe4, 0xf4, 0xd0, 0xcd, 0xd3, 0x43, 0x36, 0x4c, 0x8a, 0x80, 0xbc, 0x06, 0xa3, 0x01, 0xa3, 0x90, - 0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x74, 0x32, - 0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9, - 0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e, - 0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51, - 0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x0c, - 0x8d, 0xc5, 0x34, 0x01, 0x00, 0x00, -} diff --git a/client/session/filesync/filesync.proto b/client/session/filesync/filesync.proto deleted file mode 100644 index 2fd5b3ec8d..0000000000 --- a/client/session/filesync/filesync.proto +++ /dev/null @@ -1,15 +0,0 @@ -syntax = "proto3"; - -package moby.filesync.v1; - -option go_package = "filesync"; - -service FileSync{ - rpc DiffCopy(stream BytesMessage) returns (stream BytesMessage); - rpc TarStream(stream BytesMessage) returns (stream BytesMessage); -} - -// BytesMessage contains a chunk of byte data -message BytesMessage{ - bytes data = 1; -} \ No newline at end of file diff --git a/client/session/filesync/filesync_test.go b/client/session/filesync/filesync_test.go deleted file mode 100644 index b48c08b826..0000000000 --- a/client/session/filesync/filesync_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package filesync - -import ( - "context" - "io/ioutil" - "path/filepath" - "testing" - - "github.com/docker/docker/client/session" - "github.com/docker/docker/client/session/testutil" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "golang.org/x/sync/errgroup" -) - -func TestFileSyncIncludePatterns(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "fsynctest") - require.NoError(t, err) - - destDir, err := ioutil.TempDir("", "fsynctest") - require.NoError(t, err) - - err = ioutil.WriteFile(filepath.Join(tmpDir, "foo"), []byte("content1"), 0600) - require.NoError(t, err) - - err = ioutil.WriteFile(filepath.Join(tmpDir, "bar"), []byte("content2"), 0600) - require.NoError(t, err) - - s, err := session.NewSession("foo", "bar") - require.NoError(t, err) - - m, err := session.NewManager() - require.NoError(t, err) - - fs := NewFSSyncProvider(tmpDir, nil) - s.Allow(fs) - - dialer := session.Dialer(testutil.TestStream(testutil.Handler(m.HandleConn))) - - g, ctx := errgroup.WithContext(context.Background()) - - g.Go(func() error { - return s.Run(ctx, dialer) - }) - - g.Go(func() (reterr error) { - c, err := m.Get(ctx, s.UUID()) - if err != nil { - return err - } - if err := FSSync(ctx, c, FSSendRequestOpt{ - DestDir: destDir, - IncludePatterns: []string{"ba*"}, - }); err != nil { - return err - } - - _, err = ioutil.ReadFile(filepath.Join(destDir, "foo")) - assert.Error(t, err) - - dt, err := ioutil.ReadFile(filepath.Join(destDir, "bar")) - if err != nil { - return err - } - assert.Equal(t, "content2", string(dt)) - return s.Close() - }) - - err = g.Wait() - require.NoError(t, err) -} diff --git a/client/session/filesync/generate.go b/client/session/filesync/generate.go deleted file mode 100644 index 261e876272..0000000000 --- a/client/session/filesync/generate.go +++ /dev/null @@ -1,3 +0,0 @@ -package filesync - -//go:generate protoc --gogoslick_out=plugins=grpc:. filesync.proto diff --git a/client/session/filesync/tarstream.go b/client/session/filesync/tarstream.go deleted file mode 100644 index da139ebf5d..0000000000 --- a/client/session/filesync/tarstream.go +++ /dev/null @@ -1,83 +0,0 @@ -package filesync - -import ( - "io" - - "github.com/Sirupsen/logrus" - "github.com/docker/docker/pkg/archive" - "github.com/docker/docker/pkg/chrootarchive" - "github.com/pkg/errors" - "google.golang.org/grpc" -) - -func sendTarStream(stream grpc.Stream, dir string, includes, excludes []string, progress progressCb) error { - a, err := archive.TarWithOptions(dir, &archive.TarOptions{ - ExcludePatterns: excludes, - }) - if err != nil { - return err - } - - size := 0 - buf := make([]byte, 1<<15) - t := new(BytesMessage) - for { - n, err := a.Read(buf) - if err != nil { - if err == io.EOF { - break - } - return err - } - t.Data = buf[:n] - - if err := stream.SendMsg(t); err != nil { - return err - } - size += n - if progress != nil { - progress(size, false) - } - } - if progress != nil { - progress(size, true) - } - return nil -} - -func recvTarStream(ds grpc.Stream, dest string, cs CacheUpdater) error { - - pr, pw := io.Pipe() - - go func() { - var ( - err error - t = new(BytesMessage) - ) - for { - if err = ds.RecvMsg(t); err != nil { - if err == io.EOF { - err = nil - } - break - } - _, err = pw.Write(t.Data) - if err != nil { - break - } - } - if err = pw.CloseWithError(err); err != nil { - logrus.Errorf("failed to close tar transfer pipe") - } - }() - - decompressedStream, err := archive.DecompressStream(pr) - if err != nil { - return errors.Wrap(err, "failed to decompress stream") - } - - if err := chrootarchive.Untar(decompressedStream, dest, nil); err != nil { - return errors.Wrap(err, "failed to untar context") - } - return nil -} diff --git a/client/session/grpc.go b/client/session/grpc.go deleted file mode 100644 index 0f20b15047..0000000000 --- a/client/session/grpc.go +++ /dev/null @@ -1,62 +0,0 @@ -package session - -import ( - "net" - "time" - - "github.com/Sirupsen/logrus" - "github.com/pkg/errors" - "golang.org/x/net/context" - "golang.org/x/net/http2" - "google.golang.org/grpc" - "google.golang.org/grpc/health/grpc_health_v1" -) - -func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) { - go func() { - <-ctx.Done() - conn.Close() - }() - logrus.Debugf("serving grpc connection") - (&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer}) -} - -func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) { - dialOpt := grpc.WithDialer(func(addr string, d time.Duration) (net.Conn, error) { - return conn, nil - }) - - cc, err := grpc.DialContext(ctx, "", dialOpt, grpc.WithInsecure()) - if err != nil { - return nil, nil, errors.Wrap(err, "failed to create grpc client") - } - - ctx, cancel := context.WithCancel(ctx) - go monitorHealth(ctx, cc, cancel) - - return ctx, cc, nil -} - -func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) { - defer cancelConn() - defer cc.Close() - - ticker := time.NewTicker(500 * time.Millisecond) - defer ticker.Stop() - healthClient := grpc_health_v1.NewHealthClient(cc) - - for { - select { - case <-ctx.Done(): - return - case <-ticker.C: - <-ticker.C - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - _, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{}) - cancel() - if err != nil { - return - } - } - } -} diff --git a/client/session/manager.go b/client/session/manager.go deleted file mode 100644 index 9523e6f317..0000000000 --- a/client/session/manager.go +++ /dev/null @@ -1,202 +0,0 @@ -package session - -import ( - "net" - "net/http" - "strings" - "sync" - - "github.com/pkg/errors" - "golang.org/x/net/context" - "google.golang.org/grpc" -) - -// Caller can invoke requests on the session -type Caller interface { - Context() context.Context - Supports(method string) bool - Conn() *grpc.ClientConn - Name() string - SharedKey() string -} - -type client struct { - Session - cc *grpc.ClientConn - supported map[string]struct{} -} - -// Manager is a controller for accessing currently active sessions -type Manager struct { - sessions map[string]*client - mu sync.Mutex - updateCondition *sync.Cond -} - -// NewManager returns a new Manager -func NewManager() (*Manager, error) { - sm := &Manager{ - sessions: make(map[string]*client), - } - sm.updateCondition = sync.NewCond(&sm.mu) - return sm, nil -} - -// HandleHTTPRequest handles an incoming HTTP request -func (sm *Manager) HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error { - hijacker, ok := w.(http.Hijacker) - if !ok { - return errors.New("handler does not support hijack") - } - - uuid := r.Header.Get(headerSessionUUID) - - proto := r.Header.Get("Upgrade") - - sm.mu.Lock() - if _, ok := sm.sessions[uuid]; ok { - sm.mu.Unlock() - return errors.Errorf("session %s already exists", uuid) - } - - if proto == "" { - sm.mu.Unlock() - return errors.New("no upgrade proto in request") - } - - if proto != "h2c" { - sm.mu.Unlock() - return errors.Errorf("protocol %s not supported", proto) - } - - conn, _, err := hijacker.Hijack() - if err != nil { - sm.mu.Unlock() - return errors.Wrap(err, "failed to hijack connection") - } - - resp := &http.Response{ - StatusCode: http.StatusSwitchingProtocols, - ProtoMajor: 1, - ProtoMinor: 1, - Header: http.Header{}, - } - resp.Header.Set("Connection", "Upgrade") - resp.Header.Set("Upgrade", proto) - - // set raw mode - conn.Write([]byte{}) - resp.Write(conn) - - return sm.handleConn(ctx, conn, r.Header) -} - -// HandleConn handles an incoming raw connection -func (sm *Manager) HandleConn(ctx context.Context, conn net.Conn, opts map[string][]string) error { - sm.mu.Lock() - return sm.handleConn(ctx, conn, opts) -} - -// caller needs to take lock, this function will release it -func (sm *Manager) handleConn(ctx context.Context, conn net.Conn, opts map[string][]string) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - h := http.Header(opts) - uuid := h.Get(headerSessionUUID) - name := h.Get(headerSessionName) - sharedKey := h.Get(headerSessionSharedKey) - - ctx, cc, err := grpcClientConn(ctx, conn) - if err != nil { - sm.mu.Unlock() - return err - } - - c := &client{ - Session: Session{ - uuid: uuid, - name: name, - sharedKey: sharedKey, - ctx: ctx, - cancelCtx: cancel, - done: make(chan struct{}), - }, - cc: cc, - supported: make(map[string]struct{}), - } - - for _, m := range opts[headerSessionMethod] { - c.supported[strings.ToLower(m)] = struct{}{} - } - sm.sessions[uuid] = c - sm.updateCondition.Broadcast() - sm.mu.Unlock() - - defer func() { - sm.mu.Lock() - delete(sm.sessions, uuid) - sm.mu.Unlock() - }() - - <-c.ctx.Done() - conn.Close() - close(c.done) - - return nil -} - -// Get returns a session by UUID -func (sm *Manager) Get(ctx context.Context, uuid string) (Caller, error) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - go func() { - select { - case <-ctx.Done(): - sm.updateCondition.Broadcast() - } - }() - - var c *client - - sm.mu.Lock() - for { - select { - case <-ctx.Done(): - sm.mu.Unlock() - return nil, errors.Wrapf(ctx.Err(), "no active session for %s", uuid) - default: - } - var ok bool - c, ok = sm.sessions[uuid] - if !ok || c.closed() { - sm.updateCondition.Wait() - continue - } - sm.mu.Unlock() - break - } - - return c, nil -} - -func (c *client) Context() context.Context { - return c.context() -} - -func (c *client) Name() string { - return c.name -} - -func (c *client) SharedKey() string { - return c.sharedKey -} - -func (c *client) Supports(url string) bool { - _, ok := c.supported[strings.ToLower(url)] - return ok -} -func (c *client) Conn() *grpc.ClientConn { - return c.cc -} diff --git a/client/session/session.go b/client/session/session.go deleted file mode 100644 index 147486a75b..0000000000 --- a/client/session/session.go +++ /dev/null @@ -1,117 +0,0 @@ -package session - -import ( - "net" - - "github.com/docker/docker/pkg/stringid" - "github.com/pkg/errors" - "golang.org/x/net/context" - "google.golang.org/grpc" - "google.golang.org/grpc/health" - "google.golang.org/grpc/health/grpc_health_v1" -) - -const ( - headerSessionUUID = "X-Docker-Expose-Session-Uuid" - headerSessionName = "X-Docker-Expose-Session-Name" - headerSessionSharedKey = "X-Docker-Expose-Session-Sharedkey" - headerSessionMethod = "X-Docker-Expose-Session-Grpc-Method" -) - -// Dialer returns a connection that can be used by the session -type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) - -// Attachable defines a feature that can be expsed on a session -type Attachable interface { - Register(*grpc.Server) -} - -// Session is a long running connection between client and a daemon -type Session struct { - uuid string - name string - sharedKey string - ctx context.Context - cancelCtx func() - done chan struct{} - grpcServer *grpc.Server -} - -// NewSession returns a new long running session -func NewSession(name, sharedKey string) (*Session, error) { - uuid := stringid.GenerateRandomID() - s := &Session{ - uuid: uuid, - name: name, - sharedKey: sharedKey, - grpcServer: grpc.NewServer(), - } - - grpc_health_v1.RegisterHealthServer(s.grpcServer, health.NewServer()) - - return s, nil -} - -// Allow enable a given service to be reachable through the grpc session -func (s *Session) Allow(a Attachable) { - a.Register(s.grpcServer) -} - -// UUID returns unique identifier for the session -func (s *Session) UUID() string { - return s.uuid -} - -// Run activates the session -func (s *Session) Run(ctx context.Context, dialer Dialer) error { - ctx, cancel := context.WithCancel(ctx) - s.cancelCtx = cancel - s.done = make(chan struct{}) - - defer cancel() - defer close(s.done) - - meta := make(map[string][]string) - meta[headerSessionUUID] = []string{s.uuid} - meta[headerSessionName] = []string{s.name} - meta[headerSessionSharedKey] = []string{s.sharedKey} - - for name, svc := range s.grpcServer.GetServiceInfo() { - for _, method := range svc.Methods { - meta[headerSessionMethod] = append(meta[headerSessionMethod], MethodURL(name, method.Name)) - } - } - conn, err := dialer(ctx, "h2c", meta) - if err != nil { - return errors.Wrap(err, "failed to dial gRPC") - } - serve(ctx, s.grpcServer, conn) - return nil -} - -// Close closes the session -func (s *Session) Close() error { - if s.cancelCtx != nil && s.done != nil { - s.cancelCtx() - <-s.done - } - return nil -} - -func (s *Session) context() context.Context { - return s.ctx -} - -func (s *Session) closed() bool { - select { - case <-s.context().Done(): - return true - default: - return false - } -} - -// MethodURL returns a gRPC method URL for service and method name -func MethodURL(s, m string) string { - return "/" + s + "/" + m -} diff --git a/client/session/testutil/testutil.go b/client/session/testutil/testutil.go deleted file mode 100644 index 2e145d9006..0000000000 --- a/client/session/testutil/testutil.go +++ /dev/null @@ -1,70 +0,0 @@ -package testutil - -import ( - "io" - "net" - "time" - - "github.com/Sirupsen/logrus" - "golang.org/x/net/context" -) - -// Handler is function called to handle incoming connection -type Handler func(ctx context.Context, conn net.Conn, meta map[string][]string) error - -// Dialer is a function for dialing an outgoing connection -type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) - -// TestStream creates an in memory session dialer for a handler function -func TestStream(handler Handler) Dialer { - s1, s2 := sockPair() - return func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) { - go func() { - err := handler(context.TODO(), s1, meta) - if err != nil { - logrus.Error(err) - } - s1.Close() - }() - return s2, nil - } -} - -func sockPair() (*sock, *sock) { - pr1, pw1 := io.Pipe() - pr2, pw2 := io.Pipe() - return &sock{pw1, pr2, pw1}, &sock{pw2, pr1, pw2} -} - -type sock struct { - io.Writer - io.Reader - io.Closer -} - -func (s *sock) LocalAddr() net.Addr { - return dummyAddr{} -} -func (s *sock) RemoteAddr() net.Addr { - return dummyAddr{} -} -func (s *sock) SetDeadline(t time.Time) error { - return nil -} -func (s *sock) SetReadDeadline(t time.Time) error { - return nil -} -func (s *sock) SetWriteDeadline(t time.Time) error { - return nil -} - -type dummyAddr struct { -} - -func (d dummyAddr) Network() string { - return "tcp" -} - -func (d dummyAddr) String() string { - return "localhost" -} diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 215f9c2d59..faa927dd6d 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -30,7 +30,6 @@ import ( "github.com/docker/docker/builder/dockerfile" "github.com/docker/docker/builder/fscache" "github.com/docker/docker/cli/debug" - "github.com/docker/docker/client/session" "github.com/docker/docker/daemon" "github.com/docker/docker/daemon/cluster" "github.com/docker/docker/daemon/config" @@ -50,6 +49,7 @@ import ( "github.com/docker/docker/runconfig" "github.com/docker/go-connections/tlsconfig" swarmapi "github.com/docker/swarmkit/api" + "github.com/moby/buildkit/session" "github.com/pkg/errors" "github.com/spf13/pflag" ) diff --git a/integration-cli/docker_api_build_test.go b/integration-cli/docker_api_build_test.go index c1ab7661e0..fb62917823 100644 --- a/integration-cli/docker_api_build_test.go +++ b/integration-cli/docker_api_build_test.go @@ -12,8 +12,6 @@ import ( "strings" "github.com/docker/docker/api/types" - "github.com/docker/docker/client/session" - "github.com/docker/docker/client/session/filesync" "github.com/docker/docker/integration-cli/checker" "github.com/docker/docker/integration-cli/cli/build/fakecontext" "github.com/docker/docker/integration-cli/cli/build/fakegit" @@ -21,6 +19,8 @@ import ( "github.com/docker/docker/integration-cli/request" "github.com/docker/docker/pkg/testutil" "github.com/go-check/check" + "github.com/moby/buildkit/session" + "github.com/moby/buildkit/session/filesync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/net/context"