mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Remove client/session package, depend on buildkit's session package
gofmt -w -r '"github.com/docker/docker/client/session" -> "github.com/moby/buildkit/session"' gofmt -w -r '"github.com/docker/docker/client/session/filesync" -> "github.com/moby/buildkit/session/filesync"' Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
parent
b6d164e6c4
commit
41445a4745
17 changed files with 8 additions and 1420 deletions
|
@ -18,13 +18,13 @@ import (
|
|||
"github.com/docker/docker/builder/dockerfile/parser"
|
||||
"github.com/docker/docker/builder/fscache"
|
||||
"github.com/docker/docker/builder/remotecontext"
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/chrootarchive"
|
||||
"github.com/docker/docker/pkg/idtools"
|
||||
"github.com/docker/docker/pkg/streamformatter"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/docker/docker/pkg/system"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/sync/syncmap"
|
||||
|
|
|
@ -5,8 +5,8 @@ import (
|
|||
|
||||
"github.com/docker/docker/builder/fscache"
|
||||
"github.com/docker/docker/builder/remotecontext"
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/docker/docker/client/session/filesync"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/session/filesync"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
|
|
@ -12,9 +12,9 @@ import (
|
|||
"github.com/boltdb/bolt"
|
||||
"github.com/docker/docker/builder"
|
||||
"github.com/docker/docker/builder/remotecontext"
|
||||
"github.com/docker/docker/client/session/filesync"
|
||||
"github.com/docker/docker/pkg/directory"
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/moby/buildkit/session/filesync"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/fsutil"
|
||||
"golang.org/x/net/context"
|
||||
|
|
|
@ -7,7 +7,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/docker/docker/client/session/filesync"
|
||||
"github.com/moby/buildkit/session/filesync"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
|
|
@ -1,31 +0,0 @@
|
|||
package filesync
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/tonistiigi/fsutil"
|
||||
)
|
||||
|
||||
func sendDiffCopy(stream grpc.Stream, dir string, includes, excludes []string, progress progressCb) error {
|
||||
return fsutil.Send(stream.Context(), stream, dir, &fsutil.WalkOpt{
|
||||
ExcludePatterns: excludes,
|
||||
IncludePaths: includes, // TODO: rename IncludePatterns
|
||||
}, progress)
|
||||
}
|
||||
|
||||
func recvDiffCopy(ds grpc.Stream, dest string, cu CacheUpdater) error {
|
||||
st := time.Now()
|
||||
defer func() {
|
||||
logrus.Debugf("diffcopy took: %v", time.Since(st))
|
||||
}()
|
||||
var cf fsutil.ChangeFunc
|
||||
if cu != nil {
|
||||
cu.MarkSupported(true)
|
||||
cf = cu.HandleChange
|
||||
}
|
||||
|
||||
return fsutil.Receive(ds.Context(), ds, dest, cf)
|
||||
}
|
|
@ -1,183 +0,0 @@
|
|||
package filesync
|
||||
|
||||
import (
|
||||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/tonistiigi/fsutil"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
const (
|
||||
keyOverrideExcludes = "override-excludes"
|
||||
keyIncludePatterns = "include-patterns"
|
||||
)
|
||||
|
||||
type fsSyncProvider struct {
|
||||
root string
|
||||
excludes []string
|
||||
p progressCb
|
||||
doneCh chan error
|
||||
}
|
||||
|
||||
// NewFSSyncProvider creates a new provider for sending files from client
|
||||
func NewFSSyncProvider(root string, excludes []string) session.Attachable {
|
||||
p := &fsSyncProvider{
|
||||
root: root,
|
||||
excludes: excludes,
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
func (sp *fsSyncProvider) Register(server *grpc.Server) {
|
||||
RegisterFileSyncServer(server, sp)
|
||||
}
|
||||
|
||||
func (sp *fsSyncProvider) DiffCopy(stream FileSync_DiffCopyServer) error {
|
||||
return sp.handle("diffcopy", stream)
|
||||
}
|
||||
func (sp *fsSyncProvider) TarStream(stream FileSync_TarStreamServer) error {
|
||||
return sp.handle("tarstream", stream)
|
||||
}
|
||||
|
||||
func (sp *fsSyncProvider) handle(method string, stream grpc.ServerStream) error {
|
||||
var pr *protocol
|
||||
for _, p := range supportedProtocols {
|
||||
if method == p.name && isProtoSupported(p.name) {
|
||||
pr = &p
|
||||
break
|
||||
}
|
||||
}
|
||||
if pr == nil {
|
||||
return errors.New("failed to negotiate protocol")
|
||||
}
|
||||
|
||||
opts, _ := metadata.FromContext(stream.Context()) // if no metadata continue with empty object
|
||||
|
||||
var excludes []string
|
||||
if len(opts[keyOverrideExcludes]) == 0 || opts[keyOverrideExcludes][0] != "true" {
|
||||
excludes = sp.excludes
|
||||
}
|
||||
includes := opts[keyIncludePatterns]
|
||||
|
||||
var progress progressCb
|
||||
if sp.p != nil {
|
||||
progress = sp.p
|
||||
sp.p = nil
|
||||
}
|
||||
|
||||
var doneCh chan error
|
||||
if sp.doneCh != nil {
|
||||
doneCh = sp.doneCh
|
||||
sp.doneCh = nil
|
||||
}
|
||||
err := pr.sendFn(stream, sp.root, includes, excludes, progress)
|
||||
if doneCh != nil {
|
||||
if err != nil {
|
||||
doneCh <- err
|
||||
}
|
||||
close(doneCh)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (sp *fsSyncProvider) SetNextProgressCallback(f func(int, bool), doneCh chan error) {
|
||||
sp.p = f
|
||||
sp.doneCh = doneCh
|
||||
}
|
||||
|
||||
type progressCb func(int, bool)
|
||||
|
||||
type protocol struct {
|
||||
name string
|
||||
sendFn func(stream grpc.Stream, srcDir string, includes, excludes []string, progress progressCb) error
|
||||
recvFn func(stream grpc.Stream, destDir string, cu CacheUpdater) error
|
||||
}
|
||||
|
||||
func isProtoSupported(p string) bool {
|
||||
// TODO: this should be removed after testing if stability is confirmed
|
||||
if override := os.Getenv("BUILD_STREAM_PROTOCOL"); override != "" {
|
||||
return strings.EqualFold(p, override)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
var supportedProtocols = []protocol{
|
||||
{
|
||||
name: "diffcopy",
|
||||
sendFn: sendDiffCopy,
|
||||
recvFn: recvDiffCopy,
|
||||
},
|
||||
{
|
||||
name: "tarstream",
|
||||
sendFn: sendTarStream,
|
||||
recvFn: recvTarStream,
|
||||
},
|
||||
}
|
||||
|
||||
// FSSendRequestOpt defines options for FSSend request
|
||||
type FSSendRequestOpt struct {
|
||||
IncludePatterns []string
|
||||
OverrideExcludes bool
|
||||
DestDir string
|
||||
CacheUpdater CacheUpdater
|
||||
}
|
||||
|
||||
// CacheUpdater is an object capable of sending notifications for the cache hash changes
|
||||
type CacheUpdater interface {
|
||||
MarkSupported(bool)
|
||||
HandleChange(fsutil.ChangeKind, string, os.FileInfo, error) error
|
||||
}
|
||||
|
||||
// FSSync initializes a transfer of files
|
||||
func FSSync(ctx context.Context, c session.Caller, opt FSSendRequestOpt) error {
|
||||
var pr *protocol
|
||||
for _, p := range supportedProtocols {
|
||||
if isProtoSupported(p.name) && c.Supports(session.MethodURL(_FileSync_serviceDesc.ServiceName, p.name)) {
|
||||
pr = &p
|
||||
break
|
||||
}
|
||||
}
|
||||
if pr == nil {
|
||||
return errors.New("no fssync handlers")
|
||||
}
|
||||
|
||||
opts := make(map[string][]string)
|
||||
if opt.OverrideExcludes {
|
||||
opts[keyOverrideExcludes] = []string{"true"}
|
||||
}
|
||||
|
||||
if opt.IncludePatterns != nil {
|
||||
opts[keyIncludePatterns] = opt.IncludePatterns
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
client := NewFileSyncClient(c.Conn())
|
||||
|
||||
var stream grpc.ClientStream
|
||||
|
||||
ctx = metadata.NewContext(ctx, opts)
|
||||
|
||||
switch pr.name {
|
||||
case "tarstream":
|
||||
cc, err := client.TarStream(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stream = cc
|
||||
case "diffcopy":
|
||||
cc, err := client.DiffCopy(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
stream = cc
|
||||
}
|
||||
|
||||
return pr.recvFn(stream, opt.DestDir, opt.CacheUpdater)
|
||||
}
|
|
@ -1,575 +0,0 @@
|
|||
// Code generated by protoc-gen-gogo.
|
||||
// source: filesync.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package filesync is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
filesync.proto
|
||||
|
||||
It has these top-level messages:
|
||||
BytesMessage
|
||||
*/
|
||||
package filesync
|
||||
|
||||
import proto "github.com/gogo/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import bytes "bytes"
|
||||
|
||||
import strings "strings"
|
||||
import reflect "reflect"
|
||||
|
||||
import (
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
import io "io"
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
// A compilation error at this line likely means your copy of the
|
||||
// proto package needs to be updated.
|
||||
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
|
||||
|
||||
// BytesMessage contains a chunk of byte data
|
||||
type BytesMessage struct {
|
||||
Data []byte `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"`
|
||||
}
|
||||
|
||||
func (m *BytesMessage) Reset() { *m = BytesMessage{} }
|
||||
func (*BytesMessage) ProtoMessage() {}
|
||||
func (*BytesMessage) Descriptor() ([]byte, []int) { return fileDescriptorFilesync, []int{0} }
|
||||
|
||||
func (m *BytesMessage) GetData() []byte {
|
||||
if m != nil {
|
||||
return m.Data
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*BytesMessage)(nil), "moby.filesync.v1.BytesMessage")
|
||||
}
|
||||
func (this *BytesMessage) Equal(that interface{}) bool {
|
||||
if that == nil {
|
||||
if this == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
that1, ok := that.(*BytesMessage)
|
||||
if !ok {
|
||||
that2, ok := that.(BytesMessage)
|
||||
if ok {
|
||||
that1 = &that2
|
||||
} else {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if that1 == nil {
|
||||
if this == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
} else if this == nil {
|
||||
return false
|
||||
}
|
||||
if !bytes.Equal(this.Data, that1.Data) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
func (this *BytesMessage) GoString() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := make([]string, 0, 5)
|
||||
s = append(s, "&filesync.BytesMessage{")
|
||||
s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n")
|
||||
s = append(s, "}")
|
||||
return strings.Join(s, "")
|
||||
}
|
||||
func valueToGoStringFilesync(v interface{}, typ string) string {
|
||||
rv := reflect.ValueOf(v)
|
||||
if rv.IsNil() {
|
||||
return "nil"
|
||||
}
|
||||
pv := reflect.Indirect(rv).Interface()
|
||||
return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion4
|
||||
|
||||
// Client API for FileSync service
|
||||
|
||||
type FileSyncClient interface {
|
||||
DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error)
|
||||
TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error)
|
||||
}
|
||||
|
||||
type fileSyncClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewFileSyncClient(cc *grpc.ClientConn) FileSyncClient {
|
||||
return &fileSyncClient{cc}
|
||||
}
|
||||
|
||||
func (c *fileSyncClient) DiffCopy(ctx context.Context, opts ...grpc.CallOption) (FileSync_DiffCopyClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[0], c.cc, "/moby.filesync.v1.FileSync/DiffCopy", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &fileSyncDiffCopyClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type FileSync_DiffCopyClient interface {
|
||||
Send(*BytesMessage) error
|
||||
Recv() (*BytesMessage, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type fileSyncDiffCopyClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *fileSyncDiffCopyClient) Send(m *BytesMessage) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *fileSyncDiffCopyClient) Recv() (*BytesMessage, error) {
|
||||
m := new(BytesMessage)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *fileSyncClient) TarStream(ctx context.Context, opts ...grpc.CallOption) (FileSync_TarStreamClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_FileSync_serviceDesc.Streams[1], c.cc, "/moby.filesync.v1.FileSync/TarStream", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &fileSyncTarStreamClient{stream}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type FileSync_TarStreamClient interface {
|
||||
Send(*BytesMessage) error
|
||||
Recv() (*BytesMessage, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type fileSyncTarStreamClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *fileSyncTarStreamClient) Send(m *BytesMessage) error {
|
||||
return x.ClientStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *fileSyncTarStreamClient) Recv() (*BytesMessage, error) {
|
||||
m := new(BytesMessage)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// Server API for FileSync service
|
||||
|
||||
type FileSyncServer interface {
|
||||
DiffCopy(FileSync_DiffCopyServer) error
|
||||
TarStream(FileSync_TarStreamServer) error
|
||||
}
|
||||
|
||||
func RegisterFileSyncServer(s *grpc.Server, srv FileSyncServer) {
|
||||
s.RegisterService(&_FileSync_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _FileSync_DiffCopy_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(FileSyncServer).DiffCopy(&fileSyncDiffCopyServer{stream})
|
||||
}
|
||||
|
||||
type FileSync_DiffCopyServer interface {
|
||||
Send(*BytesMessage) error
|
||||
Recv() (*BytesMessage, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type fileSyncDiffCopyServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *fileSyncDiffCopyServer) Send(m *BytesMessage) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *fileSyncDiffCopyServer) Recv() (*BytesMessage, error) {
|
||||
m := new(BytesMessage)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func _FileSync_TarStream_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
return srv.(FileSyncServer).TarStream(&fileSyncTarStreamServer{stream})
|
||||
}
|
||||
|
||||
type FileSync_TarStreamServer interface {
|
||||
Send(*BytesMessage) error
|
||||
Recv() (*BytesMessage, error)
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type fileSyncTarStreamServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *fileSyncTarStreamServer) Send(m *BytesMessage) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func (x *fileSyncTarStreamServer) Recv() (*BytesMessage, error) {
|
||||
m := new(BytesMessage)
|
||||
if err := x.ServerStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
var _FileSync_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "moby.filesync.v1.FileSync",
|
||||
HandlerType: (*FileSyncServer)(nil),
|
||||
Methods: []grpc.MethodDesc{},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "DiffCopy",
|
||||
Handler: _FileSync_DiffCopy_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
{
|
||||
StreamName: "TarStream",
|
||||
Handler: _FileSync_TarStream_Handler,
|
||||
ServerStreams: true,
|
||||
ClientStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "filesync.proto",
|
||||
}
|
||||
|
||||
func (m *BytesMessage) Marshal() (dAtA []byte, err error) {
|
||||
size := m.Size()
|
||||
dAtA = make([]byte, size)
|
||||
n, err := m.MarshalTo(dAtA)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return dAtA[:n], nil
|
||||
}
|
||||
|
||||
func (m *BytesMessage) MarshalTo(dAtA []byte) (int, error) {
|
||||
var i int
|
||||
_ = i
|
||||
var l int
|
||||
_ = l
|
||||
if len(m.Data) > 0 {
|
||||
dAtA[i] = 0xa
|
||||
i++
|
||||
i = encodeVarintFilesync(dAtA, i, uint64(len(m.Data)))
|
||||
i += copy(dAtA[i:], m.Data)
|
||||
}
|
||||
return i, nil
|
||||
}
|
||||
|
||||
func encodeFixed64Filesync(dAtA []byte, offset int, v uint64) int {
|
||||
dAtA[offset] = uint8(v)
|
||||
dAtA[offset+1] = uint8(v >> 8)
|
||||
dAtA[offset+2] = uint8(v >> 16)
|
||||
dAtA[offset+3] = uint8(v >> 24)
|
||||
dAtA[offset+4] = uint8(v >> 32)
|
||||
dAtA[offset+5] = uint8(v >> 40)
|
||||
dAtA[offset+6] = uint8(v >> 48)
|
||||
dAtA[offset+7] = uint8(v >> 56)
|
||||
return offset + 8
|
||||
}
|
||||
func encodeFixed32Filesync(dAtA []byte, offset int, v uint32) int {
|
||||
dAtA[offset] = uint8(v)
|
||||
dAtA[offset+1] = uint8(v >> 8)
|
||||
dAtA[offset+2] = uint8(v >> 16)
|
||||
dAtA[offset+3] = uint8(v >> 24)
|
||||
return offset + 4
|
||||
}
|
||||
func encodeVarintFilesync(dAtA []byte, offset int, v uint64) int {
|
||||
for v >= 1<<7 {
|
||||
dAtA[offset] = uint8(v&0x7f | 0x80)
|
||||
v >>= 7
|
||||
offset++
|
||||
}
|
||||
dAtA[offset] = uint8(v)
|
||||
return offset + 1
|
||||
}
|
||||
func (m *BytesMessage) Size() (n int) {
|
||||
var l int
|
||||
_ = l
|
||||
l = len(m.Data)
|
||||
if l > 0 {
|
||||
n += 1 + l + sovFilesync(uint64(l))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
func sovFilesync(x uint64) (n int) {
|
||||
for {
|
||||
n++
|
||||
x >>= 7
|
||||
if x == 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
func sozFilesync(x uint64) (n int) {
|
||||
return sovFilesync(uint64((x << 1) ^ uint64((int64(x) >> 63))))
|
||||
}
|
||||
func (this *BytesMessage) String() string {
|
||||
if this == nil {
|
||||
return "nil"
|
||||
}
|
||||
s := strings.Join([]string{`&BytesMessage{`,
|
||||
`Data:` + fmt.Sprintf("%v", this.Data) + `,`,
|
||||
`}`,
|
||||
}, "")
|
||||
return s
|
||||
}
|
||||
func valueToStringFilesync(v interface{}) string {
|
||||
rv := reflect.ValueOf(v)
|
||||
if rv.IsNil() {
|
||||
return "nil"
|
||||
}
|
||||
pv := reflect.Indirect(rv).Interface()
|
||||
return fmt.Sprintf("*%v", pv)
|
||||
}
|
||||
func (m *BytesMessage) Unmarshal(dAtA []byte) error {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
preIndex := iNdEx
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
fieldNum := int32(wire >> 3)
|
||||
wireType := int(wire & 0x7)
|
||||
if wireType == 4 {
|
||||
return fmt.Errorf("proto: BytesMessage: wiretype end group for non-group")
|
||||
}
|
||||
if fieldNum <= 0 {
|
||||
return fmt.Errorf("proto: BytesMessage: illegal tag %d (wire type %d)", fieldNum, wire)
|
||||
}
|
||||
switch fieldNum {
|
||||
case 1:
|
||||
if wireType != 2 {
|
||||
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
|
||||
}
|
||||
var byteLen int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
byteLen |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if byteLen < 0 {
|
||||
return ErrInvalidLengthFilesync
|
||||
}
|
||||
postIndex := iNdEx + byteLen
|
||||
if postIndex > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...)
|
||||
if m.Data == nil {
|
||||
m.Data = []byte{}
|
||||
}
|
||||
iNdEx = postIndex
|
||||
default:
|
||||
iNdEx = preIndex
|
||||
skippy, err := skipFilesync(dAtA[iNdEx:])
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if skippy < 0 {
|
||||
return ErrInvalidLengthFilesync
|
||||
}
|
||||
if (iNdEx + skippy) > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx += skippy
|
||||
}
|
||||
}
|
||||
|
||||
if iNdEx > l {
|
||||
return io.ErrUnexpectedEOF
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func skipFilesync(dAtA []byte) (n int, err error) {
|
||||
l := len(dAtA)
|
||||
iNdEx := 0
|
||||
for iNdEx < l {
|
||||
var wire uint64
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
wire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
wireType := int(wire & 0x7)
|
||||
switch wireType {
|
||||
case 0:
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
iNdEx++
|
||||
if dAtA[iNdEx-1] < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 1:
|
||||
iNdEx += 8
|
||||
return iNdEx, nil
|
||||
case 2:
|
||||
var length int
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
length |= (int(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
iNdEx += length
|
||||
if length < 0 {
|
||||
return 0, ErrInvalidLengthFilesync
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 3:
|
||||
for {
|
||||
var innerWire uint64
|
||||
var start int = iNdEx
|
||||
for shift := uint(0); ; shift += 7 {
|
||||
if shift >= 64 {
|
||||
return 0, ErrIntOverflowFilesync
|
||||
}
|
||||
if iNdEx >= l {
|
||||
return 0, io.ErrUnexpectedEOF
|
||||
}
|
||||
b := dAtA[iNdEx]
|
||||
iNdEx++
|
||||
innerWire |= (uint64(b) & 0x7F) << shift
|
||||
if b < 0x80 {
|
||||
break
|
||||
}
|
||||
}
|
||||
innerWireType := int(innerWire & 0x7)
|
||||
if innerWireType == 4 {
|
||||
break
|
||||
}
|
||||
next, err := skipFilesync(dAtA[start:])
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
iNdEx = start + next
|
||||
}
|
||||
return iNdEx, nil
|
||||
case 4:
|
||||
return iNdEx, nil
|
||||
case 5:
|
||||
iNdEx += 4
|
||||
return iNdEx, nil
|
||||
default:
|
||||
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
|
||||
}
|
||||
}
|
||||
panic("unreachable")
|
||||
}
|
||||
|
||||
var (
|
||||
ErrInvalidLengthFilesync = fmt.Errorf("proto: negative length found during unmarshaling")
|
||||
ErrIntOverflowFilesync = fmt.Errorf("proto: integer overflow")
|
||||
)
|
||||
|
||||
func init() { proto.RegisterFile("filesync.proto", fileDescriptorFilesync) }
|
||||
|
||||
var fileDescriptorFilesync = []byte{
|
||||
// 198 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x4b, 0xcb, 0xcc, 0x49,
|
||||
0x2d, 0xae, 0xcc, 0x4b, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x12, 0xc8, 0xcd, 0x4f, 0xaa,
|
||||
0xd4, 0x83, 0x0b, 0x96, 0x19, 0x2a, 0x29, 0x71, 0xf1, 0x38, 0x55, 0x96, 0xa4, 0x16, 0xfb, 0xa6,
|
||||
0x16, 0x17, 0x27, 0xa6, 0xa7, 0x0a, 0x09, 0x71, 0xb1, 0xa4, 0x24, 0x96, 0x24, 0x4a, 0x30, 0x2a,
|
||||
0x30, 0x6a, 0xf0, 0x04, 0x81, 0xd9, 0x46, 0xab, 0x19, 0xb9, 0x38, 0xdc, 0x32, 0x73, 0x52, 0x83,
|
||||
0x2b, 0xf3, 0x92, 0x85, 0xfc, 0xb8, 0x38, 0x5c, 0x32, 0xd3, 0xd2, 0x9c, 0xf3, 0x0b, 0x2a, 0x85,
|
||||
0xe4, 0xf4, 0xd0, 0xcd, 0xd3, 0x43, 0x36, 0x4c, 0x8a, 0x80, 0xbc, 0x06, 0xa3, 0x01, 0xa3, 0x90,
|
||||
0x3f, 0x17, 0x67, 0x48, 0x62, 0x51, 0x70, 0x49, 0x51, 0x6a, 0x62, 0x2e, 0x35, 0x0c, 0x74, 0x32,
|
||||
0xbb, 0xf0, 0x50, 0x8e, 0xe1, 0xc6, 0x43, 0x39, 0x86, 0x0f, 0x0f, 0xe5, 0x18, 0x1b, 0x1e, 0xc9,
|
||||
0x31, 0xae, 0x78, 0x24, 0xc7, 0x78, 0xe2, 0x91, 0x1c, 0xe3, 0x85, 0x47, 0x72, 0x8c, 0x0f, 0x1e,
|
||||
0xc9, 0x31, 0xbe, 0x78, 0x24, 0xc7, 0xf0, 0xe1, 0x91, 0x1c, 0xe3, 0x84, 0xc7, 0x72, 0x0c, 0x51,
|
||||
0x1c, 0x30, 0xb3, 0x92, 0xd8, 0xc0, 0x41, 0x64, 0x0c, 0x08, 0x00, 0x00, 0xff, 0xff, 0x5f, 0x0c,
|
||||
0x8d, 0xc5, 0x34, 0x01, 0x00, 0x00,
|
||||
}
|
|
@ -1,15 +0,0 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package moby.filesync.v1;
|
||||
|
||||
option go_package = "filesync";
|
||||
|
||||
service FileSync{
|
||||
rpc DiffCopy(stream BytesMessage) returns (stream BytesMessage);
|
||||
rpc TarStream(stream BytesMessage) returns (stream BytesMessage);
|
||||
}
|
||||
|
||||
// BytesMessage contains a chunk of byte data
|
||||
message BytesMessage{
|
||||
bytes data = 1;
|
||||
}
|
|
@ -1,71 +0,0 @@
|
|||
package filesync
|
||||
|
||||
import (
|
||||
"context"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/docker/docker/client/session/testutil"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
func TestFileSyncIncludePatterns(t *testing.T) {
|
||||
tmpDir, err := ioutil.TempDir("", "fsynctest")
|
||||
require.NoError(t, err)
|
||||
|
||||
destDir, err := ioutil.TempDir("", "fsynctest")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(filepath.Join(tmpDir, "foo"), []byte("content1"), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = ioutil.WriteFile(filepath.Join(tmpDir, "bar"), []byte("content2"), 0600)
|
||||
require.NoError(t, err)
|
||||
|
||||
s, err := session.NewSession("foo", "bar")
|
||||
require.NoError(t, err)
|
||||
|
||||
m, err := session.NewManager()
|
||||
require.NoError(t, err)
|
||||
|
||||
fs := NewFSSyncProvider(tmpDir, nil)
|
||||
s.Allow(fs)
|
||||
|
||||
dialer := session.Dialer(testutil.TestStream(testutil.Handler(m.HandleConn)))
|
||||
|
||||
g, ctx := errgroup.WithContext(context.Background())
|
||||
|
||||
g.Go(func() error {
|
||||
return s.Run(ctx, dialer)
|
||||
})
|
||||
|
||||
g.Go(func() (reterr error) {
|
||||
c, err := m.Get(ctx, s.UUID())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := FSSync(ctx, c, FSSendRequestOpt{
|
||||
DestDir: destDir,
|
||||
IncludePatterns: []string{"ba*"},
|
||||
}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
_, err = ioutil.ReadFile(filepath.Join(destDir, "foo"))
|
||||
assert.Error(t, err)
|
||||
|
||||
dt, err := ioutil.ReadFile(filepath.Join(destDir, "bar"))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
assert.Equal(t, "content2", string(dt))
|
||||
return s.Close()
|
||||
})
|
||||
|
||||
err = g.Wait()
|
||||
require.NoError(t, err)
|
||||
}
|
|
@ -1,3 +0,0 @@
|
|||
package filesync
|
||||
|
||||
//go:generate protoc --gogoslick_out=plugins=grpc:. filesync.proto
|
|
@ -1,83 +0,0 @@
|
|||
package filesync
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/docker/docker/pkg/archive"
|
||||
"github.com/docker/docker/pkg/chrootarchive"
|
||||
"github.com/pkg/errors"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
func sendTarStream(stream grpc.Stream, dir string, includes, excludes []string, progress progressCb) error {
|
||||
a, err := archive.TarWithOptions(dir, &archive.TarOptions{
|
||||
ExcludePatterns: excludes,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
size := 0
|
||||
buf := make([]byte, 1<<15)
|
||||
t := new(BytesMessage)
|
||||
for {
|
||||
n, err := a.Read(buf)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return err
|
||||
}
|
||||
t.Data = buf[:n]
|
||||
|
||||
if err := stream.SendMsg(t); err != nil {
|
||||
return err
|
||||
}
|
||||
size += n
|
||||
if progress != nil {
|
||||
progress(size, false)
|
||||
}
|
||||
}
|
||||
if progress != nil {
|
||||
progress(size, true)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func recvTarStream(ds grpc.Stream, dest string, cs CacheUpdater) error {
|
||||
|
||||
pr, pw := io.Pipe()
|
||||
|
||||
go func() {
|
||||
var (
|
||||
err error
|
||||
t = new(BytesMessage)
|
||||
)
|
||||
for {
|
||||
if err = ds.RecvMsg(t); err != nil {
|
||||
if err == io.EOF {
|
||||
err = nil
|
||||
}
|
||||
break
|
||||
}
|
||||
_, err = pw.Write(t.Data)
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err = pw.CloseWithError(err); err != nil {
|
||||
logrus.Errorf("failed to close tar transfer pipe")
|
||||
}
|
||||
}()
|
||||
|
||||
decompressedStream, err := archive.DecompressStream(pr)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to decompress stream")
|
||||
}
|
||||
|
||||
if err := chrootarchive.Untar(decompressedStream, dest, nil); err != nil {
|
||||
return errors.Wrap(err, "failed to untar context")
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1,62 +0,0 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
func serve(ctx context.Context, grpcServer *grpc.Server, conn net.Conn) {
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
conn.Close()
|
||||
}()
|
||||
logrus.Debugf("serving grpc connection")
|
||||
(&http2.Server{}).ServeConn(conn, &http2.ServeConnOpts{Handler: grpcServer})
|
||||
}
|
||||
|
||||
func grpcClientConn(ctx context.Context, conn net.Conn) (context.Context, *grpc.ClientConn, error) {
|
||||
dialOpt := grpc.WithDialer(func(addr string, d time.Duration) (net.Conn, error) {
|
||||
return conn, nil
|
||||
})
|
||||
|
||||
cc, err := grpc.DialContext(ctx, "", dialOpt, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
return nil, nil, errors.Wrap(err, "failed to create grpc client")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
go monitorHealth(ctx, cc, cancel)
|
||||
|
||||
return ctx, cc, nil
|
||||
}
|
||||
|
||||
func monitorHealth(ctx context.Context, cc *grpc.ClientConn, cancelConn func()) {
|
||||
defer cancelConn()
|
||||
defer cc.Close()
|
||||
|
||||
ticker := time.NewTicker(500 * time.Millisecond)
|
||||
defer ticker.Stop()
|
||||
healthClient := grpc_health_v1.NewHealthClient(cc)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
<-ticker.C
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
_, err := healthClient.Check(ctx, &grpc_health_v1.HealthCheckRequest{})
|
||||
cancel()
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,202 +0,0 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"net"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Caller can invoke requests on the session
|
||||
type Caller interface {
|
||||
Context() context.Context
|
||||
Supports(method string) bool
|
||||
Conn() *grpc.ClientConn
|
||||
Name() string
|
||||
SharedKey() string
|
||||
}
|
||||
|
||||
type client struct {
|
||||
Session
|
||||
cc *grpc.ClientConn
|
||||
supported map[string]struct{}
|
||||
}
|
||||
|
||||
// Manager is a controller for accessing currently active sessions
|
||||
type Manager struct {
|
||||
sessions map[string]*client
|
||||
mu sync.Mutex
|
||||
updateCondition *sync.Cond
|
||||
}
|
||||
|
||||
// NewManager returns a new Manager
|
||||
func NewManager() (*Manager, error) {
|
||||
sm := &Manager{
|
||||
sessions: make(map[string]*client),
|
||||
}
|
||||
sm.updateCondition = sync.NewCond(&sm.mu)
|
||||
return sm, nil
|
||||
}
|
||||
|
||||
// HandleHTTPRequest handles an incoming HTTP request
|
||||
func (sm *Manager) HandleHTTPRequest(ctx context.Context, w http.ResponseWriter, r *http.Request) error {
|
||||
hijacker, ok := w.(http.Hijacker)
|
||||
if !ok {
|
||||
return errors.New("handler does not support hijack")
|
||||
}
|
||||
|
||||
uuid := r.Header.Get(headerSessionUUID)
|
||||
|
||||
proto := r.Header.Get("Upgrade")
|
||||
|
||||
sm.mu.Lock()
|
||||
if _, ok := sm.sessions[uuid]; ok {
|
||||
sm.mu.Unlock()
|
||||
return errors.Errorf("session %s already exists", uuid)
|
||||
}
|
||||
|
||||
if proto == "" {
|
||||
sm.mu.Unlock()
|
||||
return errors.New("no upgrade proto in request")
|
||||
}
|
||||
|
||||
if proto != "h2c" {
|
||||
sm.mu.Unlock()
|
||||
return errors.Errorf("protocol %s not supported", proto)
|
||||
}
|
||||
|
||||
conn, _, err := hijacker.Hijack()
|
||||
if err != nil {
|
||||
sm.mu.Unlock()
|
||||
return errors.Wrap(err, "failed to hijack connection")
|
||||
}
|
||||
|
||||
resp := &http.Response{
|
||||
StatusCode: http.StatusSwitchingProtocols,
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
Header: http.Header{},
|
||||
}
|
||||
resp.Header.Set("Connection", "Upgrade")
|
||||
resp.Header.Set("Upgrade", proto)
|
||||
|
||||
// set raw mode
|
||||
conn.Write([]byte{})
|
||||
resp.Write(conn)
|
||||
|
||||
return sm.handleConn(ctx, conn, r.Header)
|
||||
}
|
||||
|
||||
// HandleConn handles an incoming raw connection
|
||||
func (sm *Manager) HandleConn(ctx context.Context, conn net.Conn, opts map[string][]string) error {
|
||||
sm.mu.Lock()
|
||||
return sm.handleConn(ctx, conn, opts)
|
||||
}
|
||||
|
||||
// caller needs to take lock, this function will release it
|
||||
func (sm *Manager) handleConn(ctx context.Context, conn net.Conn, opts map[string][]string) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
h := http.Header(opts)
|
||||
uuid := h.Get(headerSessionUUID)
|
||||
name := h.Get(headerSessionName)
|
||||
sharedKey := h.Get(headerSessionSharedKey)
|
||||
|
||||
ctx, cc, err := grpcClientConn(ctx, conn)
|
||||
if err != nil {
|
||||
sm.mu.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
c := &client{
|
||||
Session: Session{
|
||||
uuid: uuid,
|
||||
name: name,
|
||||
sharedKey: sharedKey,
|
||||
ctx: ctx,
|
||||
cancelCtx: cancel,
|
||||
done: make(chan struct{}),
|
||||
},
|
||||
cc: cc,
|
||||
supported: make(map[string]struct{}),
|
||||
}
|
||||
|
||||
for _, m := range opts[headerSessionMethod] {
|
||||
c.supported[strings.ToLower(m)] = struct{}{}
|
||||
}
|
||||
sm.sessions[uuid] = c
|
||||
sm.updateCondition.Broadcast()
|
||||
sm.mu.Unlock()
|
||||
|
||||
defer func() {
|
||||
sm.mu.Lock()
|
||||
delete(sm.sessions, uuid)
|
||||
sm.mu.Unlock()
|
||||
}()
|
||||
|
||||
<-c.ctx.Done()
|
||||
conn.Close()
|
||||
close(c.done)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get returns a session by UUID
|
||||
func (sm *Manager) Get(ctx context.Context, uuid string) (Caller, error) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
sm.updateCondition.Broadcast()
|
||||
}
|
||||
}()
|
||||
|
||||
var c *client
|
||||
|
||||
sm.mu.Lock()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
sm.mu.Unlock()
|
||||
return nil, errors.Wrapf(ctx.Err(), "no active session for %s", uuid)
|
||||
default:
|
||||
}
|
||||
var ok bool
|
||||
c, ok = sm.sessions[uuid]
|
||||
if !ok || c.closed() {
|
||||
sm.updateCondition.Wait()
|
||||
continue
|
||||
}
|
||||
sm.mu.Unlock()
|
||||
break
|
||||
}
|
||||
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func (c *client) Context() context.Context {
|
||||
return c.context()
|
||||
}
|
||||
|
||||
func (c *client) Name() string {
|
||||
return c.name
|
||||
}
|
||||
|
||||
func (c *client) SharedKey() string {
|
||||
return c.sharedKey
|
||||
}
|
||||
|
||||
func (c *client) Supports(url string) bool {
|
||||
_, ok := c.supported[strings.ToLower(url)]
|
||||
return ok
|
||||
}
|
||||
func (c *client) Conn() *grpc.ClientConn {
|
||||
return c.cc
|
||||
}
|
|
@ -1,117 +0,0 @@
|
|||
package session
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/docker/docker/pkg/stringid"
|
||||
"github.com/pkg/errors"
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/health"
|
||||
"google.golang.org/grpc/health/grpc_health_v1"
|
||||
)
|
||||
|
||||
const (
|
||||
headerSessionUUID = "X-Docker-Expose-Session-Uuid"
|
||||
headerSessionName = "X-Docker-Expose-Session-Name"
|
||||
headerSessionSharedKey = "X-Docker-Expose-Session-Sharedkey"
|
||||
headerSessionMethod = "X-Docker-Expose-Session-Grpc-Method"
|
||||
)
|
||||
|
||||
// Dialer returns a connection that can be used by the session
|
||||
type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
|
||||
|
||||
// Attachable defines a feature that can be expsed on a session
|
||||
type Attachable interface {
|
||||
Register(*grpc.Server)
|
||||
}
|
||||
|
||||
// Session is a long running connection between client and a daemon
|
||||
type Session struct {
|
||||
uuid string
|
||||
name string
|
||||
sharedKey string
|
||||
ctx context.Context
|
||||
cancelCtx func()
|
||||
done chan struct{}
|
||||
grpcServer *grpc.Server
|
||||
}
|
||||
|
||||
// NewSession returns a new long running session
|
||||
func NewSession(name, sharedKey string) (*Session, error) {
|
||||
uuid := stringid.GenerateRandomID()
|
||||
s := &Session{
|
||||
uuid: uuid,
|
||||
name: name,
|
||||
sharedKey: sharedKey,
|
||||
grpcServer: grpc.NewServer(),
|
||||
}
|
||||
|
||||
grpc_health_v1.RegisterHealthServer(s.grpcServer, health.NewServer())
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
||||
// Allow enable a given service to be reachable through the grpc session
|
||||
func (s *Session) Allow(a Attachable) {
|
||||
a.Register(s.grpcServer)
|
||||
}
|
||||
|
||||
// UUID returns unique identifier for the session
|
||||
func (s *Session) UUID() string {
|
||||
return s.uuid
|
||||
}
|
||||
|
||||
// Run activates the session
|
||||
func (s *Session) Run(ctx context.Context, dialer Dialer) error {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
s.cancelCtx = cancel
|
||||
s.done = make(chan struct{})
|
||||
|
||||
defer cancel()
|
||||
defer close(s.done)
|
||||
|
||||
meta := make(map[string][]string)
|
||||
meta[headerSessionUUID] = []string{s.uuid}
|
||||
meta[headerSessionName] = []string{s.name}
|
||||
meta[headerSessionSharedKey] = []string{s.sharedKey}
|
||||
|
||||
for name, svc := range s.grpcServer.GetServiceInfo() {
|
||||
for _, method := range svc.Methods {
|
||||
meta[headerSessionMethod] = append(meta[headerSessionMethod], MethodURL(name, method.Name))
|
||||
}
|
||||
}
|
||||
conn, err := dialer(ctx, "h2c", meta)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "failed to dial gRPC")
|
||||
}
|
||||
serve(ctx, s.grpcServer, conn)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Close closes the session
|
||||
func (s *Session) Close() error {
|
||||
if s.cancelCtx != nil && s.done != nil {
|
||||
s.cancelCtx()
|
||||
<-s.done
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
func (s *Session) closed() bool {
|
||||
select {
|
||||
case <-s.context().Done():
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// MethodURL returns a gRPC method URL for service and method name
|
||||
func MethodURL(s, m string) string {
|
||||
return "/" + s + "/" + m
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
package testutil
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Handler is function called to handle incoming connection
|
||||
type Handler func(ctx context.Context, conn net.Conn, meta map[string][]string) error
|
||||
|
||||
// Dialer is a function for dialing an outgoing connection
|
||||
type Dialer func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error)
|
||||
|
||||
// TestStream creates an in memory session dialer for a handler function
|
||||
func TestStream(handler Handler) Dialer {
|
||||
s1, s2 := sockPair()
|
||||
return func(ctx context.Context, proto string, meta map[string][]string) (net.Conn, error) {
|
||||
go func() {
|
||||
err := handler(context.TODO(), s1, meta)
|
||||
if err != nil {
|
||||
logrus.Error(err)
|
||||
}
|
||||
s1.Close()
|
||||
}()
|
||||
return s2, nil
|
||||
}
|
||||
}
|
||||
|
||||
func sockPair() (*sock, *sock) {
|
||||
pr1, pw1 := io.Pipe()
|
||||
pr2, pw2 := io.Pipe()
|
||||
return &sock{pw1, pr2, pw1}, &sock{pw2, pr1, pw2}
|
||||
}
|
||||
|
||||
type sock struct {
|
||||
io.Writer
|
||||
io.Reader
|
||||
io.Closer
|
||||
}
|
||||
|
||||
func (s *sock) LocalAddr() net.Addr {
|
||||
return dummyAddr{}
|
||||
}
|
||||
func (s *sock) RemoteAddr() net.Addr {
|
||||
return dummyAddr{}
|
||||
}
|
||||
func (s *sock) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
func (s *sock) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
func (s *sock) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type dummyAddr struct {
|
||||
}
|
||||
|
||||
func (d dummyAddr) Network() string {
|
||||
return "tcp"
|
||||
}
|
||||
|
||||
func (d dummyAddr) String() string {
|
||||
return "localhost"
|
||||
}
|
|
@ -30,7 +30,6 @@ import (
|
|||
"github.com/docker/docker/builder/dockerfile"
|
||||
"github.com/docker/docker/builder/fscache"
|
||||
"github.com/docker/docker/cli/debug"
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/docker/docker/daemon"
|
||||
"github.com/docker/docker/daemon/cluster"
|
||||
"github.com/docker/docker/daemon/config"
|
||||
|
@ -50,6 +49,7 @@ import (
|
|||
"github.com/docker/docker/runconfig"
|
||||
"github.com/docker/go-connections/tlsconfig"
|
||||
swarmapi "github.com/docker/swarmkit/api"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/spf13/pflag"
|
||||
)
|
||||
|
|
|
@ -12,8 +12,6 @@ import (
|
|||
"strings"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/client/session"
|
||||
"github.com/docker/docker/client/session/filesync"
|
||||
"github.com/docker/docker/integration-cli/checker"
|
||||
"github.com/docker/docker/integration-cli/cli/build/fakecontext"
|
||||
"github.com/docker/docker/integration-cli/cli/build/fakegit"
|
||||
|
@ -21,6 +19,8 @@ import (
|
|||
"github.com/docker/docker/integration-cli/request"
|
||||
"github.com/docker/docker/pkg/testutil"
|
||||
"github.com/go-check/check"
|
||||
"github.com/moby/buildkit/session"
|
||||
"github.com/moby/buildkit/session/filesync"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/net/context"
|
||||
|
|
Loading…
Add table
Reference in a new issue