diff --git a/api/server/router/plugin/backend.go b/api/server/router/plugin/backend.go index a5f3c9790a..1b60501fc5 100644 --- a/api/server/router/plugin/backend.go +++ b/api/server/router/plugin/backend.go @@ -7,6 +7,7 @@ import ( "github.com/docker/distribution/reference" enginetypes "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/plugin" "golang.org/x/net/context" ) @@ -19,7 +20,7 @@ type Backend interface { Remove(name string, config *enginetypes.PluginRmConfig) error Set(name string, args []string) error Privileges(ctx context.Context, ref reference.Named, metaHeaders http.Header, authConfig *enginetypes.AuthConfig) (enginetypes.PluginPrivileges, error) - Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error + Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error Push(ctx context.Context, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, outStream io.Writer) error Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *enginetypes.PluginCreateOptions) error diff --git a/api/server/router/swarm/helpers.go b/api/server/router/swarm/helpers.go index ea692ea368..7d2944208f 100644 --- a/api/server/router/swarm/helpers.go +++ b/api/server/router/swarm/helpers.go @@ -44,7 +44,7 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r * // maybe should return some context with this error? return err } - tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty + tty = (s.Spec.TaskTemplate.ContainerSpec != nil && s.Spec.TaskTemplate.ContainerSpec.TTY) || tty } for _, task := range selector.Tasks { t, err := sr.backend.GetTask(task) diff --git a/api/swagger.yaml b/api/swagger.yaml index 06e6b0bd3f..5803266b0f 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -1975,11 +1975,39 @@ definitions: description: "User modifiable task configuration." type: "object" properties: + PluginSpec: + type: "object" + description: "Invalid when specified with `ContainerSpec`." + properties: + Name: + description: "The name or 'alias' to use for the plugin." + type: "string" + Remote: + description: "The plugin image reference to use." + type: "string" + Disabled: + description: "Disable the plugin once scheduled." + type: "boolean" + PluginPrivilege: + type: "array" + items: + description: "Describes a permission accepted by the user upon installing the plugin." + type: "object" + properties: + Name: + type: "string" + Description: + type: "string" + Value: + type: "array" + items: + type: "string" ContainerSpec: type: "object" + description: "Invalid when specified with `PluginSpec`." properties: Image: - description: "The image name to use for the container." + description: "The image name to use for the container" type: "string" Labels: description: "User-defined key/value data." diff --git a/api/types/swarm/runtime/gen.go b/api/types/swarm/runtime/gen.go new file mode 100644 index 0000000000..47ae234ef3 --- /dev/null +++ b/api/types/swarm/runtime/gen.go @@ -0,0 +1,3 @@ +//go:generate protoc -I . --gogofast_out=import_path=github.com/docker/docker/api/types/swarm/runtime:. plugin.proto + +package runtime diff --git a/api/types/swarm/runtime/plugin.pb.go b/api/types/swarm/runtime/plugin.pb.go new file mode 100644 index 0000000000..1fdc9b0436 --- /dev/null +++ b/api/types/swarm/runtime/plugin.pb.go @@ -0,0 +1,712 @@ +// Code generated by protoc-gen-gogo. +// source: plugin.proto +// DO NOT EDIT! + +/* + Package runtime is a generated protocol buffer package. + + It is generated from these files: + plugin.proto + + It has these top-level messages: + PluginSpec + PluginPrivilege +*/ +package runtime + +import proto "github.com/gogo/protobuf/proto" +import fmt "fmt" +import math "math" + +import io "io" + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package + +// PluginSpec defines the base payload which clients can specify for creating +// a service with the plugin runtime. +type PluginSpec struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Remote string `protobuf:"bytes,2,opt,name=remote,proto3" json:"remote,omitempty"` + Privileges []*PluginPrivilege `protobuf:"bytes,3,rep,name=privileges" json:"privileges,omitempty"` + Disabled bool `protobuf:"varint,4,opt,name=disabled,proto3" json:"disabled,omitempty"` +} + +func (m *PluginSpec) Reset() { *m = PluginSpec{} } +func (m *PluginSpec) String() string { return proto.CompactTextString(m) } +func (*PluginSpec) ProtoMessage() {} +func (*PluginSpec) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{0} } + +func (m *PluginSpec) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *PluginSpec) GetRemote() string { + if m != nil { + return m.Remote + } + return "" +} + +func (m *PluginSpec) GetPrivileges() []*PluginPrivilege { + if m != nil { + return m.Privileges + } + return nil +} + +func (m *PluginSpec) GetDisabled() bool { + if m != nil { + return m.Disabled + } + return false +} + +// PluginPrivilege describes a permission the user has to accept +// upon installing a plugin. +type PluginPrivilege struct { + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"` + Value []string `protobuf:"bytes,3,rep,name=value" json:"value,omitempty"` +} + +func (m *PluginPrivilege) Reset() { *m = PluginPrivilege{} } +func (m *PluginPrivilege) String() string { return proto.CompactTextString(m) } +func (*PluginPrivilege) ProtoMessage() {} +func (*PluginPrivilege) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{1} } + +func (m *PluginPrivilege) GetName() string { + if m != nil { + return m.Name + } + return "" +} + +func (m *PluginPrivilege) GetDescription() string { + if m != nil { + return m.Description + } + return "" +} + +func (m *PluginPrivilege) GetValue() []string { + if m != nil { + return m.Value + } + return nil +} + +func init() { + proto.RegisterType((*PluginSpec)(nil), "PluginSpec") + proto.RegisterType((*PluginPrivilege)(nil), "PluginPrivilege") +} +func (m *PluginSpec) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PluginSpec) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Remote) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Remote))) + i += copy(dAtA[i:], m.Remote) + } + if len(m.Privileges) > 0 { + for _, msg := range m.Privileges { + dAtA[i] = 0x1a + i++ + i = encodeVarintPlugin(dAtA, i, uint64(msg.Size())) + n, err := msg.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n + } + } + if m.Disabled { + dAtA[i] = 0x20 + i++ + if m.Disabled { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } + return i, nil +} + +func (m *PluginPrivilege) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PluginPrivilege) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if len(m.Name) > 0 { + dAtA[i] = 0xa + i++ + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name))) + i += copy(dAtA[i:], m.Name) + } + if len(m.Description) > 0 { + dAtA[i] = 0x12 + i++ + i = encodeVarintPlugin(dAtA, i, uint64(len(m.Description))) + i += copy(dAtA[i:], m.Description) + } + if len(m.Value) > 0 { + for _, s := range m.Value { + dAtA[i] = 0x1a + i++ + l = len(s) + for l >= 1<<7 { + dAtA[i] = uint8(uint64(l)&0x7f | 0x80) + l >>= 7 + i++ + } + dAtA[i] = uint8(l) + i++ + i += copy(dAtA[i:], s) + } + } + return i, nil +} + +func encodeFixed64Plugin(dAtA []byte, offset int, v uint64) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + dAtA[offset+4] = uint8(v >> 32) + dAtA[offset+5] = uint8(v >> 40) + dAtA[offset+6] = uint8(v >> 48) + dAtA[offset+7] = uint8(v >> 56) + return offset + 8 +} +func encodeFixed32Plugin(dAtA []byte, offset int, v uint32) int { + dAtA[offset] = uint8(v) + dAtA[offset+1] = uint8(v >> 8) + dAtA[offset+2] = uint8(v >> 16) + dAtA[offset+3] = uint8(v >> 24) + return offset + 4 +} +func encodeVarintPlugin(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *PluginSpec) Size() (n int) { + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovPlugin(uint64(l)) + } + l = len(m.Remote) + if l > 0 { + n += 1 + l + sovPlugin(uint64(l)) + } + if len(m.Privileges) > 0 { + for _, e := range m.Privileges { + l = e.Size() + n += 1 + l + sovPlugin(uint64(l)) + } + } + if m.Disabled { + n += 2 + } + return n +} + +func (m *PluginPrivilege) Size() (n int) { + var l int + _ = l + l = len(m.Name) + if l > 0 { + n += 1 + l + sovPlugin(uint64(l)) + } + l = len(m.Description) + if l > 0 { + n += 1 + l + sovPlugin(uint64(l)) + } + if len(m.Value) > 0 { + for _, s := range m.Value { + l = len(s) + n += 1 + l + sovPlugin(uint64(l)) + } + } + return n +} + +func sovPlugin(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozPlugin(x uint64) (n int) { + return sovPlugin(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PluginSpec) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PluginSpec: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PluginSpec: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Remote", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Remote = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Privileges", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Privileges = append(m.Privileges, &PluginPrivilege{}) + if err := m.Privileges[len(m.Privileges)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 4: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Disabled", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + m.Disabled = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipPlugin(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthPlugin + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PluginPrivilege) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PluginPrivilege: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PluginPrivilege: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Name = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Description = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowPlugin + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthPlugin + } + postIndex := iNdEx + intStringLen + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Value = append(m.Value, string(dAtA[iNdEx:postIndex])) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipPlugin(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthPlugin + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipPlugin(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPlugin + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPlugin + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPlugin + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + iNdEx += length + if length < 0 { + return 0, ErrInvalidLengthPlugin + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowPlugin + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipPlugin(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthPlugin = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowPlugin = fmt.Errorf("proto: integer overflow") +) + +func init() { proto.RegisterFile("plugin.proto", fileDescriptorPlugin) } + +var fileDescriptorPlugin = []byte{ + // 196 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0xc8, 0x29, 0x4d, + 0xcf, 0xcc, 0xd3, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x6a, 0x63, 0xe4, 0xe2, 0x0a, 0x00, 0x0b, + 0x04, 0x17, 0xa4, 0x26, 0x0b, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30, + 0x6a, 0x70, 0x06, 0x81, 0xd9, 0x42, 0x62, 0x5c, 0x6c, 0x45, 0xa9, 0xb9, 0xf9, 0x25, 0xa9, 0x12, + 0x4c, 0x60, 0x51, 0x28, 0x4f, 0xc8, 0x80, 0x8b, 0xab, 0xa0, 0x28, 0xb3, 0x2c, 0x33, 0x27, 0x35, + 0x3d, 0xb5, 0x58, 0x82, 0x59, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x40, 0x0f, 0x62, 0x58, 0x00, 0x4c, + 0x22, 0x08, 0x49, 0x8d, 0x90, 0x14, 0x17, 0x47, 0x4a, 0x66, 0x71, 0x62, 0x52, 0x4e, 0x6a, 0x8a, + 0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, 0x9c, 0xaf, 0x14, 0xcb, 0xc5, 0x8f, 0xa6, 0x15, 0xab, + 0x63, 0x14, 0xb8, 0xb8, 0x53, 0x52, 0x8b, 0x93, 0x8b, 0x32, 0x0b, 0x4a, 0x32, 0xf3, 0xf3, 0xa0, + 0x2e, 0x42, 0x16, 0x12, 0x12, 0xe1, 0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d, 0x05, 0xbb, 0x88, 0x33, + 0x08, 0xc2, 0x71, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4, + 0x18, 0x93, 0xd8, 0xc0, 0x9e, 0x37, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x84, 0xad, 0x79, + 0x0c, 0x01, 0x00, 0x00, +} diff --git a/api/types/swarm/runtime/plugin.proto b/api/types/swarm/runtime/plugin.proto new file mode 100644 index 0000000000..06eb7ba650 --- /dev/null +++ b/api/types/swarm/runtime/plugin.proto @@ -0,0 +1,18 @@ +syntax = "proto3"; + +// PluginSpec defines the base payload which clients can specify for creating +// a service with the plugin runtime. +message PluginSpec { + string name = 1; + string remote = 2; + repeated PluginPrivilege privileges = 3; + bool disabled = 4; +} + +// PluginPrivilege describes a permission the user has to accept +// upon installing a plugin. +message PluginPrivilege { + string name = 1; + string description = 2; + repeated string value = 3; +} diff --git a/api/types/swarm/task.go b/api/types/swarm/task.go index a598a79d59..1712c06cf6 100644 --- a/api/types/swarm/task.go +++ b/api/types/swarm/task.go @@ -1,6 +1,10 @@ package swarm -import "time" +import ( + "time" + + "github.com/docker/docker/api/types/swarm/runtime" +) // TaskState represents the state of a task. type TaskState string @@ -51,7 +55,11 @@ type Task struct { // TaskSpec represents the spec of a task. type TaskSpec struct { - ContainerSpec ContainerSpec `json:",omitempty"` + // ContainerSpec and PluginSpec are mutually exclusive. + // PluginSpec will only be used when the `Runtime` field is set to `plugin` + ContainerSpec *ContainerSpec `json:",omitempty"` + PluginSpec *runtime.PluginSpec `json:",omitempty"` + Resources *ResourceRequirements `json:",omitempty"` RestartPolicy *RestartPolicy `json:",omitempty"` Placement *Placement `json:",omitempty"` diff --git a/client/service_create.go b/client/service_create.go index c2fc2776a8..a36839443c 100644 --- a/client/service_create.go +++ b/client/service_create.go @@ -6,9 +6,9 @@ import ( "github.com/docker/distribution/reference" "github.com/docker/docker/api/types" - registrytypes "github.com/docker/docker/api/types/registry" "github.com/docker/docker/api/types/swarm" "github.com/opencontainers/go-digest" + "github.com/pkg/errors" "golang.org/x/net/context" ) @@ -24,24 +24,51 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec, headers["X-Registry-Auth"] = []string{options.EncodedRegistryAuth} } - // ensure that the image is tagged - if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" { - service.TaskTemplate.ContainerSpec.Image = taggedImg + // Make sure containerSpec is not nil when no runtime is set or the runtime is set to container + if service.TaskTemplate.ContainerSpec == nil && (service.TaskTemplate.Runtime == "" || service.TaskTemplate.Runtime == swarm.RuntimeContainer) { + service.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{} } - // Contact the registry to retrieve digest and platform information - if options.QueryRegistry { - distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth) - distErr = err - if err == nil { - // now pin by digest if the image doesn't already contain a digest - if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" { + if err := validateServiceSpec(service); err != nil { + return types.ServiceCreateResponse{}, err + } + + // ensure that the image is tagged + var imgPlatforms []swarm.Platform + if service.TaskTemplate.ContainerSpec != nil { + if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" { + service.TaskTemplate.ContainerSpec.Image = taggedImg + } + if options.QueryRegistry { + var img string + img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth) + if img != "" { service.TaskTemplate.ContainerSpec.Image = img } - // add platforms that are compatible with the service - service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect) } } + + // ensure that the image is tagged + if service.TaskTemplate.PluginSpec != nil { + if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" { + service.TaskTemplate.PluginSpec.Remote = taggedImg + } + if options.QueryRegistry { + var img string + img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth) + if img != "" { + service.TaskTemplate.PluginSpec.Remote = img + } + } + } + + if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 { + service.TaskTemplate.Placement = &swarm.Placement{} + } + if len(imgPlatforms) > 0 { + service.TaskTemplate.Placement.Platforms = imgPlatforms + } + var response types.ServiceCreateResponse resp, err := cli.post(ctx, "/services/create", nil, service, headers) if err != nil { @@ -58,6 +85,28 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec, return response, err } +func imageDigestAndPlatforms(ctx context.Context, cli *Client, image, encodedAuth string) (string, []swarm.Platform, error) { + distributionInspect, err := cli.DistributionInspect(ctx, image, encodedAuth) + imageWithDigest := image + var platforms []swarm.Platform + if err != nil { + return "", nil, err + } + + imageWithDigest = imageWithDigestString(image, distributionInspect.Descriptor.Digest) + + if len(distributionInspect.Platforms) > 0 { + platforms = make([]swarm.Platform, 0, len(distributionInspect.Platforms)) + for _, p := range distributionInspect.Platforms { + platforms = append(platforms, swarm.Platform{ + Architecture: p.Architecture, + OS: p.OS, + }) + } + } + return imageWithDigest, platforms, err +} + // imageWithDigestString takes an image string and a digest, and updates // the image string if it didn't originally contain a digest. It returns // an empty string if there are no updates. @@ -86,27 +135,22 @@ func imageWithTagString(image string) string { return "" } -// setServicePlatforms sets Platforms in swarm.Placement to list all -// compatible platforms for the service, as found in distributionInspect -// and returns a pointer to the new or updated swarm.Placement struct. -func setServicePlatforms(placement *swarm.Placement, distributionInspect registrytypes.DistributionInspect) *swarm.Placement { - if placement == nil { - placement = &swarm.Placement{} - } - // reset any existing listed platforms - placement.Platforms = []swarm.Platform{} - for _, p := range distributionInspect.Platforms { - placement.Platforms = append(placement.Platforms, swarm.Platform{ - Architecture: p.Architecture, - OS: p.OS, - }) - } - return placement -} - // digestWarning constructs a formatted warning string using the // image name that could not be pinned by digest. The formatting // is hardcoded, but could me made smarter in the future func digestWarning(image string) string { return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image) } + +func validateServiceSpec(s swarm.ServiceSpec) error { + if s.TaskTemplate.ContainerSpec != nil && s.TaskTemplate.PluginSpec != nil { + return errors.New("must not specify both a container spec and a plugin spec in the task template") + } + if s.TaskTemplate.PluginSpec != nil && s.TaskTemplate.Runtime != swarm.RuntimePlugin { + return errors.New("mismatched runtime with plugin spec") + } + if s.TaskTemplate.ContainerSpec != nil && (s.TaskTemplate.Runtime != "" && s.TaskTemplate.Runtime != swarm.RuntimeContainer) { + return errors.New("mismatched runtime with container spec") + } + return nil +} diff --git a/client/service_create_test.go b/client/service_create_test.go index eb9e1b59df..6915d636e0 100644 --- a/client/service_create_test.go +++ b/client/service_create_test.go @@ -112,7 +112,7 @@ func TestServiceCreateCompatiblePlatforms(t *testing.T) { }), } - spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: swarm.ContainerSpec{Image: "foobar:1.0"}}} + spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: &swarm.ContainerSpec{Image: "foobar:1.0"}}} r, err := client.ServiceCreate(context.Background(), spec, types.ServiceCreateOptions{QueryRegistry: true}) assert.NoError(t, err) @@ -189,7 +189,7 @@ func TestServiceCreateDigestPinning(t *testing.T) { for _, p := range pinByDigestTests { r, err := client.ServiceCreate(context.Background(), swarm.ServiceSpec{ TaskTemplate: swarm.TaskSpec{ - ContainerSpec: swarm.ContainerSpec{ + ContainerSpec: &swarm.ContainerSpec{ Image: p.img, }, }, diff --git a/client/service_update.go b/client/service_update.go index a72adc997c..8764f299a3 100644 --- a/client/service_update.go +++ b/client/service_update.go @@ -35,26 +35,46 @@ func (cli *Client) ServiceUpdate(ctx context.Context, serviceID string, version query.Set("version", strconv.FormatUint(version.Index, 10)) - // ensure that the image is tagged - if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" { - service.TaskTemplate.ContainerSpec.Image = taggedImg + if err := validateServiceSpec(service); err != nil { + return types.ServiceUpdateResponse{}, err } - // Contact the registry to retrieve digest and platform information - // This happens only when the image has changed - if options.QueryRegistry { - distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth) - distErr = err - if err == nil { - // now pin by digest if the image doesn't already contain a digest - if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" { + var imgPlatforms []swarm.Platform + // ensure that the image is tagged + if service.TaskTemplate.ContainerSpec != nil { + if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" { + service.TaskTemplate.ContainerSpec.Image = taggedImg + } + if options.QueryRegistry { + var img string + img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth) + if img != "" { service.TaskTemplate.ContainerSpec.Image = img } - // add platforms that are compatible with the service - service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect) } } + // ensure that the image is tagged + if service.TaskTemplate.PluginSpec != nil { + if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" { + service.TaskTemplate.PluginSpec.Remote = taggedImg + } + if options.QueryRegistry { + var img string + img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth) + if img != "" { + service.TaskTemplate.PluginSpec.Remote = img + } + } + } + + if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 { + service.TaskTemplate.Placement = &swarm.Placement{} + } + if len(imgPlatforms) > 0 { + service.TaskTemplate.Placement.Platforms = imgPlatforms + } + var response types.ServiceUpdateResponse resp, err := cli.post(ctx, "/services/"+serviceID+"/update", query, service, headers) if err != nil { diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index c9f7065c28..ad937ede7c 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -253,6 +253,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) { Root: cli.Config.Root, Name: name, Backend: d, + PluginBackend: d.PluginManager(), NetworkSubnetsProvider: d, DefaultAdvertiseAddr: cli.Config.SwarmDefaultAdvertiseAddr, RuntimeRoot: cli.getSwarmRunRoot(), diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 6874dbf0ee..57fc4d2d6f 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -49,6 +49,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/docker/api/types/network" types "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/daemon/cluster/controllers/plugin" executorpkg "github.com/docker/docker/daemon/cluster/executor" "github.com/docker/docker/pkg/signal" lncluster "github.com/docker/libnetwork/cluster" @@ -97,6 +98,7 @@ type Config struct { Root string Name string Backend executorpkg.Backend + PluginBackend plugin.Backend NetworkSubnetsProvider NetworkSubnetsProvider // DefaultAdvertiseAddr is the default host/IP or network interface to use diff --git a/daemon/cluster/controllers/plugin/controller.go b/daemon/cluster/controllers/plugin/controller.go index de7eb2c00f..e72edcdd75 100644 --- a/daemon/cluster/controllers/plugin/controller.go +++ b/daemon/cluster/controllers/plugin/controller.go @@ -1,79 +1,261 @@ package plugin import ( + "io" + "io/ioutil" + "net/http" + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/reference" + enginetypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm/runtime" + "github.com/docker/docker/plugin" + "github.com/docker/docker/plugin/v2" "github.com/docker/swarmkit/api" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" "golang.org/x/net/context" ) -// Controller is the controller for the plugin backend -type Controller struct{} +// Controller is the controller for the plugin backend. +// Plugins are managed as a singleton object with a desired state (different from containers). +// With the the plugin controller instead of having a strict create->start->stop->remove +// task lifecycle like containers, we manage the desired state of the plugin and let +// the plugin manager do what it already does and monitor the plugin. +// We'll also end up with many tasks all pointing to the same plugin ID. +// +// TODO(@cpuguy83): registry auth is intentionally not supported until we work out +// the right way to pass registry crednetials via secrets. +type Controller struct { + backend Backend + spec runtime.PluginSpec + logger *logrus.Entry + + pluginID string + serviceID string + taskID string + + // hook used to signal tests that `Wait()` is actually ready and waiting + signalWaitReady func() +} + +// Backend is the interface for interacting with the plugin manager +// Controller actions are passed to the configured backend to do the real work. +type Backend interface { + Disable(name string, config *enginetypes.PluginDisableConfig) error + Enable(name string, config *enginetypes.PluginEnableConfig) error + Remove(name string, config *enginetypes.PluginRmConfig) error + Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error + Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error + Get(name string) (*v2.Plugin, error) + SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func()) +} // NewController returns a new cluster plugin controller -func NewController() (*Controller, error) { - return &Controller{}, nil +func NewController(backend Backend, t *api.Task) (*Controller, error) { + spec, err := readSpec(t) + if err != nil { + return nil, err + } + return &Controller{ + backend: backend, + spec: spec, + serviceID: t.ServiceID, + logger: logrus.WithFields(logrus.Fields{ + "controller": "plugin", + "task": t.ID, + "plugin": spec.Name, + })}, nil +} + +func readSpec(t *api.Task) (runtime.PluginSpec, error) { + var cfg runtime.PluginSpec + + generic := t.Spec.GetGeneric() + if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil { + return cfg, errors.Wrap(err, "error reading plugin spec") + } + return cfg, nil } // Update is the update phase from swarmkit func (p *Controller) Update(ctx context.Context, t *api.Task) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Update") + p.logger.Debug("Update") return nil } // Prepare is the prepare phase from swarmkit -func (p *Controller) Prepare(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Prepare") +func (p *Controller) Prepare(ctx context.Context) (err error) { + p.logger.Debug("Prepare") + + remote, err := reference.ParseNormalizedNamed(p.spec.Remote) + if err != nil { + return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote) + } + + if p.spec.Name == "" { + p.spec.Name = remote.String() + } + + var authConfig enginetypes.AuthConfig + privs := convertPrivileges(p.spec.Privileges) + + pl, err := p.backend.Get(p.spec.Name) + + defer func() { + if pl != nil && err == nil { + pl.Acquire() + } + }() + + if err == nil && pl != nil { + if pl.SwarmServiceID != p.serviceID { + return errors.Errorf("plugin already exists: %s", p.spec.Name) + } + if pl.IsEnabled() { + if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil { + p.logger.WithError(err).Debug("could not disable plugin before running upgrade") + } + } + p.pluginID = pl.GetID() + return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard) + } + + if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID)); err != nil { + return err + } + pl, err = p.backend.Get(p.spec.Name) + if err != nil { + return err + } + p.pluginID = pl.GetID() + return nil } // Start is the start phase from swarmkit func (p *Controller) Start(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Start") + p.logger.Debug("Start") + + pl, err := p.backend.Get(p.pluginID) + if err != nil { + return err + } + + if p.spec.Disabled { + if pl.IsEnabled() { + return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false}) + } + return nil + } + if !pl.IsEnabled() { + return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30}) + } return nil } // Wait causes the task to wait until returned func (p *Controller) Wait(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Wait") - return nil + p.logger.Debug("Wait") + + pl, err := p.backend.Get(p.pluginID) + if err != nil { + return err + } + + events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj}) + defer cancel() + + if p.signalWaitReady != nil { + p.signalWaitReady() + } + + if !p.spec.Disabled != pl.IsEnabled() { + return errors.New("mismatched plugin state") + } + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case e := <-events: + p.logger.Debugf("got event %#T", e) + + switch e.(type) { + case plugin.EventEnable: + if p.spec.Disabled { + return errors.New("plugin enabled") + } + case plugin.EventRemove: + return errors.New("plugin removed") + case plugin.EventDisable: + if !p.spec.Disabled { + return errors.New("plugin disabled") + } + } + } + } +} + +func isNotFound(err error) bool { + _, ok := errors.Cause(err).(plugin.ErrNotFound) + return ok } // Shutdown is the shutdown phase from swarmkit func (p *Controller) Shutdown(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Shutdown") + p.logger.Debug("Shutdown") return nil } // Terminate is the terminate phase from swarmkit func (p *Controller) Terminate(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Terminate") + p.logger.Debug("Terminate") return nil } // Remove is the remove phase from swarmkit func (p *Controller) Remove(ctx context.Context) error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Remove") - return nil + p.logger.Debug("Remove") + + pl, err := p.backend.Get(p.pluginID) + if err != nil { + if isNotFound(err) { + return nil + } + return err + } + + pl.Release() + if pl.GetRefCount() > 0 { + p.logger.Debug("skipping remove due to ref count") + return nil + } + + // This may error because we have exactly 1 plugin, but potentially multiple + // tasks which are calling remove. + err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true}) + if isNotFound(err) { + return nil + } + return err } // Close is the close phase from swarmkit func (p *Controller) Close() error { - logrus.WithFields(logrus.Fields{ - "controller": "plugin", - }).Debug("Close") + p.logger.Debug("Close") return nil } + +func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges { + var out enginetypes.PluginPrivileges + for _, p := range ls { + pp := enginetypes.PluginPrivilege{ + Name: p.Name, + Description: p.Description, + Value: p.Value, + } + out = append(out, pp) + } + return out +} diff --git a/daemon/cluster/controllers/plugin/controller_test.go b/daemon/cluster/controllers/plugin/controller_test.go new file mode 100644 index 0000000000..17b77cc89f --- /dev/null +++ b/daemon/cluster/controllers/plugin/controller_test.go @@ -0,0 +1,390 @@ +package plugin + +import ( + "errors" + "io" + "io/ioutil" + "net/http" + "strings" + "testing" + "time" + + "github.com/Sirupsen/logrus" + "github.com/docker/distribution/reference" + enginetypes "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/swarm/runtime" + "github.com/docker/docker/pkg/pubsub" + "github.com/docker/docker/plugin" + "github.com/docker/docker/plugin/v2" + "golang.org/x/net/context" +) + +const ( + pluginTestName = "test" + pluginTestRemote = "testremote" + pluginTestRemoteUpgrade = "testremote2" +) + +func TestPrepare(t *testing.T) { + b := newMockBackend() + c := newTestController(b, false) + ctx := context.Background() + + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + + if b.p == nil { + t.Fatal("pull not performed") + } + + c = newTestController(b, false) + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if b.p == nil { + t.Fatal("unexpected nil") + } + if b.p.PluginObj.PluginReference != pluginTestRemoteUpgrade { + t.Fatal("upgrade not performed") + } + + c = newTestController(b, false) + c.serviceID = "1" + if err := c.Prepare(ctx); err == nil { + t.Fatal("expected error on prepare") + } +} + +func TestStart(t *testing.T) { + b := newMockBackend() + c := newTestController(b, false) + ctx := context.Background() + + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + if !b.p.IsEnabled() { + t.Fatal("expected plugin to be enabled") + } + + c = newTestController(b, true) + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + if b.p.IsEnabled() { + t.Fatal("expected plugin to be disabled") + } + + c = newTestController(b, false) + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + if !b.p.IsEnabled() { + t.Fatal("expected plugin to be enabled") + } +} + +func TestWaitCancel(t *testing.T) { + b := newMockBackend() + c := newTestController(b, true) + ctx := context.Background() + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + ctxCancel, cancel := context.WithCancel(ctx) + chErr := make(chan error) + go func() { + chErr <- c.Wait(ctxCancel) + }() + cancel() + select { + case err := <-chErr: + if err != context.Canceled { + t.Fatal(err) + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for cancelation") + } +} + +func TestWaitDisabled(t *testing.T) { + b := newMockBackend() + c := newTestController(b, true) + ctx := context.Background() + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + chErr := make(chan error) + go func() { + chErr <- c.Wait(ctx) + }() + + if err := b.Enable("test", nil); err != nil { + t.Fatal(err) + } + select { + case err := <-chErr: + if err == nil { + t.Fatal("expected error") + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } + + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + ctxWaitReady, cancelCtxWaitReady := context.WithTimeout(ctx, 30*time.Second) + c.signalWaitReady = cancelCtxWaitReady + defer cancelCtxWaitReady() + + go func() { + chErr <- c.Wait(ctx) + }() + + chEvent, cancel := b.SubscribeEvents(1) + defer cancel() + + if err := b.Disable("test", nil); err != nil { + t.Fatal(err) + } + + select { + case <-chEvent: + <-ctxWaitReady.Done() + if err := ctxWaitReady.Err(); err == context.DeadlineExceeded { + t.Fatal(err) + } + select { + case <-chErr: + t.Fatal("wait returned unexpectedly") + default: + // all good + } + case <-chErr: + t.Fatal("wait returned unexpectedly") + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } + + if err := b.Remove("test", nil); err != nil { + t.Fatal(err) + } + select { + case err := <-chErr: + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "removed") { + t.Fatal(err) + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestWaitEnabled(t *testing.T) { + b := newMockBackend() + c := newTestController(b, false) + ctx := context.Background() + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + chErr := make(chan error) + go func() { + chErr <- c.Wait(ctx) + }() + + if err := b.Disable("test", nil); err != nil { + t.Fatal(err) + } + select { + case err := <-chErr: + if err == nil { + t.Fatal("expected error") + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } + + if err := c.Start(ctx); err != nil { + t.Fatal(err) + } + + ctxWaitReady, ctxWaitCancel := context.WithCancel(ctx) + c.signalWaitReady = ctxWaitCancel + defer ctxWaitCancel() + + go func() { + chErr <- c.Wait(ctx) + }() + + chEvent, cancel := b.SubscribeEvents(1) + defer cancel() + + if err := b.Enable("test", nil); err != nil { + t.Fatal(err) + } + + select { + case <-chEvent: + <-ctxWaitReady.Done() + if err := ctxWaitReady.Err(); err == context.DeadlineExceeded { + t.Fatal(err) + } + select { + case <-chErr: + t.Fatal("wait returned unexpectedly") + default: + // all good + } + case <-chErr: + t.Fatal("wait returned unexpectedly") + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } + + if err := b.Remove("test", nil); err != nil { + t.Fatal(err) + } + select { + case err := <-chErr: + if err == nil { + t.Fatal("expected error") + } + if !strings.Contains(err.Error(), "removed") { + t.Fatal(err) + } + case <-time.After(10 * time.Second): + t.Fatal("timeout waiting for event") + } +} + +func TestRemove(t *testing.T) { + b := newMockBackend() + c := newTestController(b, false) + ctx := context.Background() + + if err := c.Prepare(ctx); err != nil { + t.Fatal(err) + } + if err := c.Shutdown(ctx); err != nil { + t.Fatal(err) + } + + c2 := newTestController(b, false) + if err := c2.Prepare(ctx); err != nil { + t.Fatal(err) + } + + if err := c.Remove(ctx); err != nil { + t.Fatal(err) + } + if b.p == nil { + t.Fatal("plugin removed unexpectedly") + } + if err := c2.Shutdown(ctx); err != nil { + t.Fatal(err) + } + if err := c2.Remove(ctx); err != nil { + t.Fatal(err) + } + if b.p != nil { + t.Fatal("expected plugin to be removed") + } +} + +func newTestController(b Backend, disabled bool) *Controller { + return &Controller{ + logger: &logrus.Entry{Logger: &logrus.Logger{Out: ioutil.Discard}}, + backend: b, + spec: runtime.PluginSpec{ + Name: pluginTestName, + Remote: pluginTestRemote, + Disabled: disabled, + }, + } +} + +func newMockBackend() *mockBackend { + return &mockBackend{ + pub: pubsub.NewPublisher(0, 0), + } +} + +type mockBackend struct { + p *v2.Plugin + pub *pubsub.Publisher +} + +func (m *mockBackend) Disable(name string, config *enginetypes.PluginDisableConfig) error { + m.p.PluginObj.Enabled = false + m.pub.Publish(plugin.EventDisable{}) + return nil +} + +func (m *mockBackend) Enable(name string, config *enginetypes.PluginEnableConfig) error { + m.p.PluginObj.Enabled = true + m.pub.Publish(plugin.EventEnable{}) + return nil +} + +func (m *mockBackend) Remove(name string, config *enginetypes.PluginRmConfig) error { + m.p = nil + m.pub.Publish(plugin.EventRemove{}) + return nil +} + +func (m *mockBackend) Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error { + m.p = &v2.Plugin{ + PluginObj: enginetypes.Plugin{ + ID: "1234", + Name: name, + PluginReference: ref.String(), + }, + } + return nil +} + +func (m *mockBackend) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error { + m.p.PluginObj.PluginReference = pluginTestRemoteUpgrade + return nil +} + +func (m *mockBackend) Get(name string) (*v2.Plugin, error) { + if m.p == nil { + return nil, errors.New("not found") + } + return m.p, nil +} + +func (m *mockBackend) SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func()) { + ch := m.pub.SubscribeTopicWithBuffer(nil, buffer) + cancel = func() { m.pub.Evict(ch) } + return ch, cancel +} diff --git a/daemon/cluster/convert/container.go b/daemon/cluster/convert/container.go index a468c5f846..6ac6f331f2 100644 --- a/daemon/cluster/convert/container.go +++ b/daemon/cluster/convert/container.go @@ -13,8 +13,11 @@ import ( gogotypes "github.com/gogo/protobuf/types" ) -func containerSpecFromGRPC(c *swarmapi.ContainerSpec) types.ContainerSpec { - containerSpec := types.ContainerSpec{ +func containerSpecFromGRPC(c *swarmapi.ContainerSpec) *types.ContainerSpec { + if c == nil { + return nil + } + containerSpec := &types.ContainerSpec{ Image: c.Image, Labels: c.Labels, Command: c.Command, @@ -211,7 +214,7 @@ func configReferencesFromGRPC(sr []*swarmapi.ConfigReference) []*types.ConfigRef return refs } -func containerToGRPC(c types.ContainerSpec) (*swarmapi.ContainerSpec, error) { +func containerToGRPC(c *types.ContainerSpec) (*swarmapi.ContainerSpec, error) { containerSpec := &swarmapi.ContainerSpec{ Image: c.Image, Labels: c.Labels, diff --git a/daemon/cluster/convert/service.go b/daemon/cluster/convert/service.go index 3ab212927c..947debdf52 100644 --- a/daemon/cluster/convert/service.go +++ b/daemon/cluster/convert/service.go @@ -1,14 +1,16 @@ package convert import ( - "errors" "fmt" "strings" types "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/swarm/runtime" "github.com/docker/docker/pkg/namesgenerator" swarmapi "github.com/docker/swarmkit/api" + "github.com/gogo/protobuf/proto" gogotypes "github.com/gogo/protobuf/types" + "github.com/pkg/errors" ) var ( @@ -85,7 +87,10 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error) } - taskTemplate := taskSpecFromGRPC(spec.Task) + taskTemplate, err := taskSpecFromGRPC(spec.Task) + if err != nil { + return nil, err + } switch t := spec.Task.GetRuntime().(type) { case *swarmapi.TaskSpec_Container: @@ -164,19 +169,34 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) { switch s.TaskTemplate.Runtime { case types.RuntimeContainer, "": // if empty runtime default to container - containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec) - if err != nil { - return swarmapi.ServiceSpec{}, err + if s.TaskTemplate.ContainerSpec != nil { + containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec) + if err != nil { + return swarmapi.ServiceSpec{}, err + } + spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec} } - spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec} case types.RuntimePlugin: - spec.Task.Runtime = &swarmapi.TaskSpec_Generic{ - Generic: &swarmapi.GenericRuntimeSpec{ - Kind: string(types.RuntimePlugin), - Payload: &gogotypes.Any{ - TypeUrl: string(types.RuntimeURLPlugin), + if s.Mode.Replicated != nil { + return swarmapi.ServiceSpec{}, errors.New("plugins must not use replicated mode") + } + + s.Mode.Global = &types.GlobalService{} // must always be global + + if s.TaskTemplate.PluginSpec != nil { + pluginSpec, err := proto.Marshal(s.TaskTemplate.PluginSpec) + if err != nil { + return swarmapi.ServiceSpec{}, err + } + spec.Task.Runtime = &swarmapi.TaskSpec_Generic{ + Generic: &swarmapi.GenericRuntimeSpec{ + Kind: string(types.RuntimePlugin), + Payload: &gogotypes.Any{ + TypeUrl: string(types.RuntimeURLPlugin), + Value: pluginSpec, + }, }, - }, + } } default: return swarmapi.ServiceSpec{}, ErrUnsupportedRuntime @@ -507,21 +527,14 @@ func updateConfigToGRPC(updateConfig *types.UpdateConfig) (*swarmapi.UpdateConfi return converted, nil } -func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec { +func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) (types.TaskSpec, error) { taskNetworks := make([]types.NetworkAttachmentConfig, 0, len(taskSpec.Networks)) for _, n := range taskSpec.Networks { netConfig := types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases, DriverOpts: n.DriverAttachmentOpts} taskNetworks = append(taskNetworks, netConfig) } - c := taskSpec.GetContainer() - cSpec := types.ContainerSpec{} - if c != nil { - cSpec = containerSpecFromGRPC(c) - } - - return types.TaskSpec{ - ContainerSpec: cSpec, + t := types.TaskSpec{ Resources: resourcesFromGRPC(taskSpec.Resources), RestartPolicy: restartPolicyFromGRPC(taskSpec.Restart), Placement: placementFromGRPC(taskSpec.Placement), @@ -529,4 +542,26 @@ func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec { Networks: taskNetworks, ForceUpdate: taskSpec.ForceUpdate, } + + switch taskSpec.GetRuntime().(type) { + case *swarmapi.TaskSpec_Container, nil: + c := taskSpec.GetContainer() + if c != nil { + t.ContainerSpec = containerSpecFromGRPC(c) + } + case *swarmapi.TaskSpec_Generic: + g := taskSpec.GetGeneric() + if g != nil { + switch g.Kind { + case string(types.RuntimePlugin): + var p runtime.PluginSpec + if err := proto.Unmarshal(g.Payload.Value, &p); err != nil { + return t, errors.Wrap(err, "error unmarshalling plugin spec") + } + t.PluginSpec = &p + } + } + } + + return t, nil } diff --git a/daemon/cluster/convert/service_test.go b/daemon/cluster/convert/service_test.go index 92d80d3323..1b6598974b 100644 --- a/daemon/cluster/convert/service_test.go +++ b/daemon/cluster/convert/service_test.go @@ -4,6 +4,7 @@ import ( "testing" swarmtypes "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/swarm/runtime" swarmapi "github.com/docker/swarmkit/api" google_protobuf3 "github.com/gogo/protobuf/types" ) @@ -82,7 +83,8 @@ func TestServiceConvertFromGRPCGenericRuntimePlugin(t *testing.T) { func TestServiceConvertToGRPCGenericRuntimePlugin(t *testing.T) { s := swarmtypes.ServiceSpec{ TaskTemplate: swarmtypes.TaskSpec{ - Runtime: swarmtypes.RuntimePlugin, + Runtime: swarmtypes.RuntimePlugin, + PluginSpec: &runtime.PluginSpec{}, }, Mode: swarmtypes.ServiceMode{ Global: &swarmtypes.GlobalService{}, @@ -108,7 +110,7 @@ func TestServiceConvertToGRPCContainerRuntime(t *testing.T) { image := "alpine:latest" s := swarmtypes.ServiceSpec{ TaskTemplate: swarmtypes.TaskSpec{ - ContainerSpec: swarmtypes.ContainerSpec{ + ContainerSpec: &swarmtypes.ContainerSpec{ Image: image, }, }, diff --git a/daemon/cluster/convert/task.go b/daemon/cluster/convert/task.go index b90d24e359..e301415c6c 100644 --- a/daemon/cluster/convert/task.go +++ b/daemon/cluster/convert/task.go @@ -9,19 +9,22 @@ import ( ) // TaskFromGRPC converts a grpc Task to a Task. -func TaskFromGRPC(t swarmapi.Task) types.Task { +func TaskFromGRPC(t swarmapi.Task) (types.Task, error) { if t.Spec.GetAttachment() != nil { - return types.Task{} + return types.Task{}, nil } containerStatus := t.Status.GetContainer() - + taskSpec, err := taskSpecFromGRPC(t.Spec) + if err != nil { + return types.Task{}, err + } task := types.Task{ ID: t.ID, Annotations: annotationsFromGRPC(t.Annotations), ServiceID: t.ServiceID, Slot: int(t.Slot), NodeID: t.NodeID, - Spec: taskSpecFromGRPC(t.Spec), + Spec: taskSpec, Status: types.TaskStatus{ State: types.TaskState(strings.ToLower(t.Status.State.String())), Message: t.Status.Message, @@ -49,7 +52,7 @@ func TaskFromGRPC(t swarmapi.Task) types.Task { } if t.Status.PortStatus == nil { - return task + return task, nil } for _, p := range t.Status.PortStatus.Ports { @@ -62,5 +65,5 @@ func TaskFromGRPC(t swarmapi.Task) types.Task { }) } - return task + return task, nil } diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go index 03a00cc87b..a71a9412e3 100644 --- a/daemon/cluster/executor/container/executor.go +++ b/daemon/cluster/executor/container/executor.go @@ -22,15 +22,17 @@ import ( ) type executor struct { - backend executorpkg.Backend - dependencies exec.DependencyManager + backend executorpkg.Backend + pluginBackend plugin.Backend + dependencies exec.DependencyManager } // NewExecutor returns an executor from the docker client. -func NewExecutor(b executorpkg.Backend) exec.Executor { +func NewExecutor(b executorpkg.Backend, p plugin.Backend) exec.Executor { return &executor{ - backend: b, - dependencies: agent.NewDependencyManager(), + backend: b, + pluginBackend: p, + dependencies: agent.NewDependencyManager(), } } @@ -181,7 +183,7 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) { } switch runtimeKind { case string(swarmtypes.RuntimePlugin): - c, err := plugin.NewController() + c, err := plugin.NewController(e.pluginBackend, t) if err != nil { return ctlr, err } diff --git a/daemon/cluster/filters.go b/daemon/cluster/filters.go index 0a004af223..efda7dc8d7 100644 --- a/daemon/cluster/filters.go +++ b/daemon/cluster/filters.go @@ -57,6 +57,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e // internal use in checking create/update progress. Therefore, // we prefix it with a '_'. "_up-to-date": true, + "runtime": true, } if err := filter.Validate(accepted); err != nil { return nil, err @@ -73,6 +74,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e ServiceIDs: filter.Get("service"), NodeIDs: filter.Get("node"), UpToDate: len(filter.Get("_up-to-date")) != 0, + Runtimes: filter.Get("runtime"), } for _, s := range filter.Get("desired-state") { diff --git a/daemon/cluster/noderunner.go b/daemon/cluster/noderunner.go index c0c7529ed9..a1eda066b2 100644 --- a/daemon/cluster/noderunner.go +++ b/daemon/cluster/noderunner.go @@ -118,7 +118,7 @@ func (n *nodeRunner) start(conf nodeStartConfig) error { JoinAddr: joinAddr, StateDir: n.cluster.root, JoinToken: conf.joinToken, - Executor: container.NewExecutor(n.cluster.config.Backend), + Executor: container.NewExecutor(n.cluster.config.Backend, n.cluster.config.PluginBackend), HeartbeatTick: 1, ElectionTick: 3, UnlockKey: conf.lockKey, diff --git a/daemon/cluster/services.go b/daemon/cluster/services.go index f4416c24c3..42397fa00b 100644 --- a/daemon/cluster/services.go +++ b/daemon/cluster/services.go @@ -50,14 +50,16 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv return nil, err } + if len(options.Filters.Get("runtime")) == 0 { + // Default to using the container runtime filter + options.Filters.Add("runtime", string(types.RuntimeContainer)) + } + filters := &swarmapi.ListServicesRequest_Filters{ NamePrefixes: options.Filters.Get("name"), IDPrefixes: options.Filters.Get("id"), Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")), - // (ehazlett): hardcode runtime for now. eventually we will - // be able to filter for the desired runtimes once more - // are supported. - Runtimes: []string{string(types.RuntimeContainer)}, + Runtimes: options.Filters.Get("runtime"), } ctx, cancel := c.getRequestContext() @@ -134,6 +136,20 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe switch serviceSpec.Task.Runtime.(type) { // handle other runtimes here + case *swarmapi.TaskSpec_Generic: + switch serviceSpec.Task.GetGeneric().Kind { + case string(types.RuntimePlugin): + if s.TaskTemplate.PluginSpec == nil { + return errors.New("plugin spec must be set") + } + } + + r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) + if err != nil { + return err + } + + resp.ID = r.Service.ID case *swarmapi.TaskSpec_Container: ctnr := serviceSpec.Task.GetContainer() if ctnr == nil { @@ -146,7 +162,9 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe // retrieve auth config from encoded auth authConfig := &apitypes.AuthConfig{} if encodedAuth != "" { - if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { + authReader := strings.NewReader(encodedAuth) + dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader)) + if err := dec.Decode(authConfig); err != nil { logrus.Warnf("invalid authconfig: %v", err) } } @@ -216,75 +234,85 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ return err } - newCtnr := serviceSpec.Task.GetContainer() - if newCtnr == nil { - return errors.New("service does not use container tasks") - } - - encodedAuth := flags.EncodedRegistryAuth - if encodedAuth != "" { - newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} - } else { - // this is needed because if the encodedAuth isn't being updated then we - // shouldn't lose it, and continue to use the one that was already present - var ctnr *swarmapi.ContainerSpec - switch flags.RegistryAuthFrom { - case apitypes.RegistryAuthFromSpec, "": - ctnr = currentService.Spec.Task.GetContainer() - case apitypes.RegistryAuthFromPreviousSpec: - if currentService.PreviousSpec == nil { - return errors.New("service does not have a previous spec") - } - ctnr = currentService.PreviousSpec.Task.GetContainer() - default: - return errors.New("unsupported registryAuthFrom value") - } - if ctnr == nil { - return errors.New("service does not use container tasks") - } - newCtnr.PullOptions = ctnr.PullOptions - // update encodedAuth so it can be used to pin image by digest - if ctnr.PullOptions != nil { - encodedAuth = ctnr.PullOptions.RegistryAuth - } - } - - // retrieve auth config from encoded auth - authConfig := &apitypes.AuthConfig{} - if encodedAuth != "" { - if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { - logrus.Warnf("invalid authconfig: %v", err) - } - } - resp = &apitypes.ServiceUpdateResponse{} - // pin image by digest for API versions < 1.30 - // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" - // should be removed in the future. Since integration tests only use the - // latest API version, so this is no longer required. - if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry { - digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig) - if err != nil { - logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()) - // warning in the client response should be concise - resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image)) - } else if newCtnr.Image != digestImage { - logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage) - newCtnr.Image = digestImage - } else { - logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image) + switch serviceSpec.Task.Runtime.(type) { + case *swarmapi.TaskSpec_Generic: + switch serviceSpec.Task.GetGeneric().Kind { + case string(types.RuntimePlugin): + if spec.TaskTemplate.PluginSpec == nil { + return errors.New("plugin spec must be set") + } + } + case *swarmapi.TaskSpec_Container: + newCtnr := serviceSpec.Task.GetContainer() + if newCtnr == nil { + return errors.New("service does not use container tasks") } - // Replace the context with a fresh one. - // If we timed out while communicating with the - // registry, then "ctx" will already be expired, which - // would cause UpdateService below to fail. Reusing - // "ctx" could make it impossible to update a service - // if the registry is slow or unresponsive. - var cancel func() - ctx, cancel = c.getRequestContext() - defer cancel() + encodedAuth := flags.EncodedRegistryAuth + if encodedAuth != "" { + newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} + } else { + // this is needed because if the encodedAuth isn't being updated then we + // shouldn't lose it, and continue to use the one that was already present + var ctnr *swarmapi.ContainerSpec + switch flags.RegistryAuthFrom { + case apitypes.RegistryAuthFromSpec, "": + ctnr = currentService.Spec.Task.GetContainer() + case apitypes.RegistryAuthFromPreviousSpec: + if currentService.PreviousSpec == nil { + return errors.New("service does not have a previous spec") + } + ctnr = currentService.PreviousSpec.Task.GetContainer() + default: + return errors.New("unsupported registryAuthFrom value") + } + if ctnr == nil { + return errors.New("service does not use container tasks") + } + newCtnr.PullOptions = ctnr.PullOptions + // update encodedAuth so it can be used to pin image by digest + if ctnr.PullOptions != nil { + encodedAuth = ctnr.PullOptions.RegistryAuth + } + } + + // retrieve auth config from encoded auth + authConfig := &apitypes.AuthConfig{} + if encodedAuth != "" { + if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { + logrus.Warnf("invalid authconfig: %v", err) + } + } + + // pin image by digest for API versions < 1.30 + // TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE" + // should be removed in the future. Since integration tests only use the + // latest API version, so this is no longer required. + if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry { + digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig) + if err != nil { + logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()) + // warning in the client response should be concise + resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image)) + } else if newCtnr.Image != digestImage { + logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage) + newCtnr.Image = digestImage + } else { + logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image) + } + + // Replace the context with a fresh one. + // If we timed out while communicating with the + // registry, then "ctx" will already be expired, which + // would cause UpdateService below to fail. Reusing + // "ctx" could make it impossible to update a service + // if the registry is slow or unresponsive. + var cancel func() + ctx, cancel = c.getRequestContext() + defer cancel() + } } var rollback swarmapi.UpdateServiceRequest_Rollback diff --git a/daemon/cluster/tasks.go b/daemon/cluster/tasks.go index 47cd5563b9..f0d6621dc5 100644 --- a/daemon/cluster/tasks.go +++ b/daemon/cluster/tasks.go @@ -19,7 +19,7 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro return nil, c.errNoManager(state) } - byName := func(filter filters.Args) error { + filterTransform := func(filter filters.Args) error { if filter.Include("service") { serviceFilters := filter.Get("service") for _, serviceFilter := range serviceFilters { @@ -42,10 +42,15 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro filter.Add("node", node.ID) } } + if !filter.Include("runtime") { + // default to only showing container tasks + filter.Add("runtime", "container") + filter.Add("runtime", "") + } return nil } - filters, err := newListTasksFilters(options.Filters, byName) + filters, err := newListTasksFilters(options.Filters, filterTransform) if err != nil { return nil, err } @@ -61,11 +66,12 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro } tasks := make([]types.Task, 0, len(r.Tasks)) - for _, task := range r.Tasks { - if task.Spec.GetContainer() != nil { - tasks = append(tasks, convert.TaskFromGRPC(*task)) + t, err := convert.TaskFromGRPC(*task) + if err != nil { + return nil, err } + tasks = append(tasks, t) } return tasks, nil } @@ -83,5 +89,5 @@ func (c *Cluster) GetTask(input string) (types.Task, error) { }); err != nil { return types.Task{}, err } - return convert.TaskFromGRPC(*task), nil + return convert.TaskFromGRPC(*task) } diff --git a/docs/api/version-history.md b/docs/api/version-history.md index 26f7585860..4a9dddbfde 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -28,6 +28,7 @@ keywords: "API, Docker, rcli, REST, documentation" * `GET /images/(name)/get` now includes an `ImageMetadata` field which contains image metadata that is local to the engine and not part of the image config. * `POST /swarm/init` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic * `POST /swarm/join` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic +* `POST /services/create` now accepts a `PluginSpec` when `TaskTemplate.Runtime` is set to `plugin` ## v1.30 API changes diff --git a/integration-cli/daemon/daemon_swarm.go b/integration-cli/daemon/daemon_swarm.go index a7058c512d..ba414066cc 100644 --- a/integration-cli/daemon/daemon_swarm.go +++ b/integration-cli/daemon/daemon_swarm.go @@ -1,6 +1,7 @@ package daemon import ( + "context" "encoding/json" "fmt" "net/http" @@ -10,6 +11,7 @@ import ( "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/client" "github.com/docker/docker/integration-cli/checker" "github.com/go-check/check" "github.com/pkg/errors" @@ -124,20 +126,29 @@ type ConfigConstructor func(*swarm.Config) // SpecConstructor defines a swarm spec constructor type SpecConstructor func(*swarm.Spec) -// CreateService creates a swarm service given the specified service constructor -func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string { +// CreateServiceWithOptions creates a swarm service given the specified service constructors +// and auth config +func (d *Swarm) CreateServiceWithOptions(c *check.C, opts types.ServiceCreateOptions, f ...ServiceConstructor) string { + cl, err := client.NewClient(d.Sock(), "", nil, nil) + c.Assert(err, checker.IsNil, check.Commentf("failed to create client")) + defer cl.Close() + var service swarm.Service for _, fn := range f { fn(&service) } - status, out, err := d.SockRequest("POST", "/services/create", service.Spec) - c.Assert(err, checker.IsNil, check.Commentf(string(out))) - c.Assert(status, checker.Equals, http.StatusCreated, check.Commentf("output: %q", string(out))) + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() - var scr types.ServiceCreateResponse - c.Assert(json.Unmarshal(out, &scr), checker.IsNil) - return scr.ID + res, err := cl.ServiceCreate(ctx, service.Spec, opts) + c.Assert(err, checker.IsNil) + return res.ID +} + +// CreateService creates a swarm service given the specified service constructor +func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string { + return d.CreateServiceWithOptions(c, types.ServiceCreateOptions{}, f...) } // GetService returns the swarm service corresponding to the specified id @@ -200,6 +211,37 @@ func (d *Swarm) CheckServiceUpdateState(service string) func(*check.C) (interfac } } +// CheckPluginRunning returns the runtime state of the plugin +func (d *Swarm) CheckPluginRunning(plugin string) func(c *check.C) (interface{}, check.CommentInterface) { + return func(c *check.C) (interface{}, check.CommentInterface) { + status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil) + c.Assert(err, checker.IsNil, check.Commentf(string(out))) + if status != http.StatusOK { + return false, nil + } + + var p types.Plugin + c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out))) + + return p.Enabled, check.Commentf("%+v", p) + } +} + +// CheckPluginImage returns the runtime state of the plugin +func (d *Swarm) CheckPluginImage(plugin string) func(c *check.C) (interface{}, check.CommentInterface) { + return func(c *check.C) (interface{}, check.CommentInterface) { + status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil) + c.Assert(err, checker.IsNil, check.Commentf(string(out))) + if status != http.StatusOK { + return false, nil + } + + var p types.Plugin + c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out))) + return p.PluginReference, check.Commentf("%+v", p) + } +} + // CheckServiceTasks returns the number of tasks for the specified service func (d *Swarm) CheckServiceTasks(service string) func(*check.C) (interface{}, check.CommentInterface) { return func(c *check.C) (interface{}, check.CommentInterface) { @@ -247,7 +289,7 @@ func (d *Swarm) CheckRunningTaskImages(c *check.C) (interface{}, check.CommentIn result := make(map[string]int) for _, task := range tasks { - if task.Status.State == swarm.TaskStateRunning { + if task.Status.State == swarm.TaskStateRunning && task.Spec.ContainerSpec != nil { result[task.Spec.ContainerSpec.Image]++ } } diff --git a/integration-cli/docker_api_swarm_service_test.go b/integration-cli/docker_api_swarm_service_test.go index 5360949362..66e17c4def 100644 --- a/integration-cli/docker_api_swarm_service_test.go +++ b/integration-cli/docker_api_swarm_service_test.go @@ -4,15 +4,19 @@ package main import ( "fmt" + "path" "strconv" "strings" "syscall" "time" "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/swarm/runtime" "github.com/docker/docker/integration-cli/checker" "github.com/docker/docker/integration-cli/daemon" + "github.com/docker/docker/integration-cli/fixtures/plugin" "github.com/go-check/check" + "golang.org/x/net/context" ) func setPortConfig(portConfig []swarm.PortConfig) daemon.ServiceConstructor { @@ -596,3 +600,77 @@ func (s *DockerSwarmSuite) TestAPISwarmServicesStateReporting(c *check.C) { } } } + +// Test plugins deployed via swarm services +func (s *DockerSwarmSuite) TestAPISwarmServicesPlugin(c *check.C) { + testRequires(c, DaemonIsLinux, IsAmd64) + reg := setupRegistry(c, false, "", "") + defer reg.Close() + + repo := path.Join(privateRegistryURL, "swarm", "test:v1") + repo2 := path.Join(privateRegistryURL, "swarm", "test:v2") + name := "test" + + err := plugin.CreateInRegistry(context.Background(), repo, nil) + c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin")) + err = plugin.CreateInRegistry(context.Background(), repo2, nil) + c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin")) + + d1 := s.AddDaemon(c, true, true) + d2 := s.AddDaemon(c, true, true) + d3 := s.AddDaemon(c, true, false) + + makePlugin := func(repo, name string, constraints []string) func(*swarm.Service) { + return func(s *swarm.Service) { + s.Spec.TaskTemplate.Runtime = "plugin" + s.Spec.TaskTemplate.PluginSpec = &runtime.PluginSpec{ + Name: name, + Remote: repo, + } + if constraints != nil { + s.Spec.TaskTemplate.Placement = &swarm.Placement{ + Constraints: constraints, + } + } + } + } + + id := d1.CreateService(c, makePlugin(repo, name, nil)) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True) + + service := d1.GetService(c, id) + d1.UpdateService(c, service, makePlugin(repo2, name, nil)) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginImage(name), checker.Equals, repo2) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginImage(name), checker.Equals, repo2) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginImage(name), checker.Equals, repo2) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True) + + d1.RemoveService(c, id) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False) + + // constrain to managers only + id = d1.CreateService(c, makePlugin(repo, name, []string{"node.role==manager"})) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False) // Not a manager, not running it + d1.RemoveService(c, id) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False) + + // with no name + id = d1.CreateService(c, makePlugin(repo, "", nil)) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.True) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.True) + d1.RemoveService(c, id) + waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.False) + waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.False) +} diff --git a/integration-cli/docker_api_swarm_test.go b/integration-cli/docker_api_swarm_test.go index a146bcc519..9d24757b4f 100644 --- a/integration-cli/docker_api_swarm_test.go +++ b/integration-cli/docker_api_swarm_test.go @@ -560,7 +560,7 @@ func simpleTestService(s *swarm.Service) { s.Spec = swarm.ServiceSpec{ TaskTemplate: swarm.TaskSpec{ - ContainerSpec: swarm.ContainerSpec{ + ContainerSpec: &swarm.ContainerSpec{ Image: "busybox:latest", Command: []string{"/bin/top"}, }, @@ -583,7 +583,7 @@ func serviceForUpdate(s *swarm.Service) { s.Spec = swarm.ServiceSpec{ TaskTemplate: swarm.TaskSpec{ - ContainerSpec: swarm.ContainerSpec{ + ContainerSpec: &swarm.ContainerSpec{ Image: "busybox:latest", Command: []string{"/bin/top"}, }, @@ -641,6 +641,9 @@ func setRollbackOrder(order string) daemon.ServiceConstructor { func setImage(image string) daemon.ServiceConstructor { return func(s *swarm.Service) { + if s.Spec.TaskTemplate.ContainerSpec == nil { + s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{} + } s.Spec.TaskTemplate.ContainerSpec.Image = image } } @@ -921,6 +924,9 @@ func (s *DockerSwarmSuite) TestAPISwarmHealthcheckNone(c *check.C) { instances := 1 d.CreateService(c, simpleTestService, setInstances(instances), func(s *swarm.Service) { + if s.Spec.TaskTemplate.ContainerSpec == nil { + s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{} + } s.Spec.TaskTemplate.ContainerSpec.Healthcheck = &container.HealthConfig{} s.Spec.TaskTemplate.Networks = []swarm.NetworkAttachmentConfig{ {Target: "lb"}, diff --git a/integration-cli/fixtures/plugin/basic/basic.go b/integration-cli/fixtures/plugin/basic/basic.go new file mode 100644 index 0000000000..892272826f --- /dev/null +++ b/integration-cli/fixtures/plugin/basic/basic.go @@ -0,0 +1,34 @@ +package main + +import ( + "fmt" + "net" + "net/http" + "os" + "path/filepath" +) + +func main() { + p, err := filepath.Abs(filepath.Join("run", "docker", "plugins")) + if err != nil { + panic(err) + } + if err := os.MkdirAll(p, 0755); err != nil { + panic(err) + } + l, err := net.Listen("unix", filepath.Join(p, "basic.sock")) + if err != nil { + panic(err) + } + + mux := http.NewServeMux() + server := http.Server{ + Addr: l.Addr().String(), + Handler: http.NewServeMux(), + } + mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1.1+json") + fmt.Println(w, `{"Implements": ["dummy"]}`) + }) + server.Serve(l) +} diff --git a/integration-cli/fixtures/plugin/plugin.go b/integration-cli/fixtures/plugin/plugin.go new file mode 100644 index 0000000000..1be6169735 --- /dev/null +++ b/integration-cli/fixtures/plugin/plugin.go @@ -0,0 +1,183 @@ +package plugin + +import ( + "encoding/json" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "time" + + "github.com/docker/docker/api/types" + "github.com/docker/docker/libcontainerd" + "github.com/docker/docker/pkg/archive" + "github.com/docker/docker/plugin" + "github.com/docker/docker/registry" + "github.com/pkg/errors" + "golang.org/x/net/context" +) + +// CreateOpt is is passed used to change the defualt plugin config before +// creating it +type CreateOpt func(*Config) + +// Config wraps types.PluginConfig to provide some extra state for options +// extra customizations on the plugin details, such as using a custom binary to +// create the plugin with. +type Config struct { + *types.PluginConfig + binPath string +} + +// WithBinary is a CreateOpt to set an custom binary to create the plugin with. +// This binary must be statically compiled. +func WithBinary(bin string) CreateOpt { + return func(cfg *Config) { + cfg.binPath = bin + } +} + +// CreateClient is the interface used for `BuildPlugin` to interact with the +// daemon. +type CreateClient interface { + PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error +} + +// Create creates a new plugin with the specified name +func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error { + tmpDir, err := ioutil.TempDir("", "create-test-plugin") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + tar, err := makePluginBundle(tmpDir, opts...) + if err != nil { + return err + } + defer tar.Close() + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + + return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name}) +} + +// TODO(@cpuguy83): we really shouldn't have to do this... +// The manager panics on init when `Executor` is not set. +type dummyExecutor struct{} + +func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil } +func (dummyExecutor) Cleanup() {} +func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error { return nil } + +// CreateInRegistry makes a plugin (locally) and pushes it to a registry. +// This does not use a dockerd instance to create or push the plugin. +// If you just want to create a plugin in some daemon, use `Create`. +// +// This can be useful when testing plugins on swarm where you don't really want +// the plugin to exist on any of the daemons (immediately) and there needs to be +// some way to distribute the plugin. +func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error { + tmpDir, err := ioutil.TempDir("", "create-test-plugin-local") + if err != nil { + return err + } + defer os.RemoveAll(tmpDir) + + inPath := filepath.Join(tmpDir, "plugin") + if err := os.MkdirAll(inPath, 0755); err != nil { + return errors.Wrap(err, "error creating plugin root") + } + + tar, err := makePluginBundle(inPath, opts...) + if err != nil { + return err + } + defer tar.Close() + + managerConfig := plugin.ManagerConfig{ + Store: plugin.NewStore(), + RegistryService: registry.NewService(registry.ServiceOptions{V2Only: true}), + Root: filepath.Join(tmpDir, "root"), + ExecRoot: "/run/docker", // manager init fails if not set + Executor: dummyExecutor{}, + LogPluginEvent: func(id, name, action string) {}, // panics when not set + } + manager, err := plugin.NewManager(managerConfig) + if err != nil { + return errors.Wrap(err, "error creating plugin manager") + } + + ctx, cancel := context.WithTimeout(ctx, 30*time.Second) + defer cancel() + if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil { + return err + } + + if auth == nil { + auth = &types.AuthConfig{} + } + err = manager.Push(ctx, repo, nil, auth, ioutil.Discard) + return errors.Wrap(err, "error pushing plugin") +} + +func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) { + p := &types.PluginConfig{ + Interface: types.PluginConfigInterface{ + Socket: "basic.sock", + Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}}, + }, + Entrypoint: []string{"/basic"}, + } + cfg := &Config{ + PluginConfig: p, + } + for _, o := range opts { + o(cfg) + } + if cfg.binPath == "" { + binPath, err := ensureBasicPluginBin() + if err != nil { + return nil, err + } + cfg.binPath = binPath + } + + configJSON, err := json.Marshal(p) + if err != nil { + return nil, err + } + if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil { + return nil, err + } + if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil { + return nil, errors.Wrap(err, "error creating plugin rootfs dir") + } + if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil { + return nil, errors.Wrap(err, "error copying plugin binary to rootfs path") + } + tar, err := archive.Tar(inPath, archive.Uncompressed) + return tar, errors.Wrap(err, "error making plugin archive") +} + +func ensureBasicPluginBin() (string, error) { + name := "docker-basic-plugin" + p, err := exec.LookPath(name) + if err == nil { + return p, nil + } + + goBin, err := exec.LookPath("go") + if err != nil { + return "", err + } + installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name) + cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic")) + cmd.Env = append(cmd.Env, "CGO_ENABLED=0") + if out, err := cmd.CombinedOutput(); err != nil { + return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out)) + } + return installPath, nil +} diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 09364617e4..8e30d16ae5 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -53,6 +53,16 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} { return ch } +// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic. +// The returned channel has a buffer of the specified size. +func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} { + ch := make(chan interface{}, buffer) + p.m.Lock() + p.subscribers[ch] = topic + p.m.Unlock() + return ch +} + // Evict removes the specified subscriber from receiving any more messages. func (p *Publisher) Evict(sub chan interface{}) { p.m.Lock() diff --git a/plugin/backend_linux.go b/plugin/backend_linux.go index d65b90a2b7..055b8e3107 100644 --- a/plugin/backend_linux.go +++ b/plugin/backend_linux.go @@ -67,6 +67,7 @@ func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) er if err := pm.disable(p, c); err != nil { return err } + pm.publisher.Publish(EventDisable{Plugin: p.PluginObj}) pm.config.LogPluginEvent(p.GetID(), refOrID, "disable") return nil } @@ -82,6 +83,7 @@ func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) erro if err := pm.enable(p, c, false); err != nil { return err } + pm.publisher.Publish(EventEnable{Plugin: p.PluginObj}) pm.config.LogPluginEvent(p.GetID(), refOrID, "enable") return nil } @@ -296,7 +298,7 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string } // Pull pulls a plugin, check if the correct privileges are provided and install the plugin. -func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) { +func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) { pm.muGC.RLock() defer pm.muGC.RUnlock() @@ -340,12 +342,19 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m return err } - p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges) + refOpt := func(p *v2.Plugin) { + p.PluginObj.PluginReference = ref.String() + } + optsList := make([]CreateOpt, 0, len(opts)+1) + optsList = append(optsList, opts...) + optsList = append(optsList, refOpt) + + p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...) if err != nil { return err } - p.PluginObj.PluginReference = ref.String() + pm.publisher.Publish(EventCreate{Plugin: p.PluginObj}) return nil } @@ -640,6 +649,7 @@ func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error { } pm.config.Store.Remove(p) pm.config.LogPluginEvent(id, name, "remove") + pm.publisher.Publish(EventRemove{Plugin: p.PluginObj}) return nil } @@ -771,6 +781,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, } p.PluginObj.PluginReference = name + pm.publisher.Publish(EventCreate{Plugin: p.PluginObj}) pm.config.LogPluginEvent(p.PluginObj.ID, name, "create") return nil diff --git a/plugin/backend_unsupported.go b/plugin/backend_unsupported.go index 2d4850eeba..e69bb883d0 100644 --- a/plugin/backend_unsupported.go +++ b/plugin/backend_unsupported.go @@ -36,7 +36,7 @@ func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHead } // Pull pulls a plugin, check if the correct privileges are provided and install the plugin. -func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer) error { +func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer, opts ...CreateOpt) error { return errNotSupported } diff --git a/plugin/defs.go b/plugin/defs.go index cf44c97ec8..3e930de048 100644 --- a/plugin/defs.go +++ b/plugin/defs.go @@ -24,3 +24,14 @@ func NewStore() *Store { handlers: make(map[string][]func(string, *plugins.Client)), } } + +// CreateOpt is used to configure specific plugin details when created +type CreateOpt func(p *v2.Plugin) + +// WithSwarmService is a CreateOpt that flags the passed in a plugin as a plugin +// managed by swarm +func WithSwarmService(id string) CreateOpt { + return func(p *v2.Plugin) { + p.SwarmServiceID = id + } +} diff --git a/plugin/events.go b/plugin/events.go new file mode 100644 index 0000000000..92e603850d --- /dev/null +++ b/plugin/events.go @@ -0,0 +1,111 @@ +package plugin + +import ( + "fmt" + "reflect" + + "github.com/docker/docker/api/types" +) + +// Event is emitted for actions performed on the plugin manager +type Event interface { + matches(Event) bool +} + +// EventCreate is an event which is emitted when a plugin is created +// This is either by pull or create from context. +// +// Use the `Interfaces` field to match only plugins that implement a specific +// interface. +// These are matched against using "or" logic. +// If no interfaces are listed, all are matched. +type EventCreate struct { + Interfaces map[string]bool + Plugin types.Plugin +} + +func (e EventCreate) matches(observed Event) bool { + oe, ok := observed.(EventCreate) + if !ok { + return false + } + if len(e.Interfaces) == 0 { + return true + } + + var ifaceMatch bool + for _, in := range oe.Plugin.Config.Interface.Types { + if e.Interfaces[in.Capability] { + ifaceMatch = true + break + } + } + return ifaceMatch +} + +// EventRemove is an event which is emitted when a plugin is removed +// It maches on the passed in plugin's ID only. +type EventRemove struct { + Plugin types.Plugin +} + +func (e EventRemove) matches(observed Event) bool { + oe, ok := observed.(EventRemove) + if !ok { + return false + } + return e.Plugin.ID == oe.Plugin.ID +} + +// EventDisable is an event that is emitted when a plugin is disabled +// It maches on the passed in plugin's ID only. +type EventDisable struct { + Plugin types.Plugin +} + +func (e EventDisable) matches(observed Event) bool { + oe, ok := observed.(EventDisable) + if !ok { + return false + } + return e.Plugin.ID == oe.Plugin.ID +} + +// EventEnable is an event that is emitted when a plugin is disabled +// It maches on the passed in plugin's ID only. +type EventEnable struct { + Plugin types.Plugin +} + +func (e EventEnable) matches(observed Event) bool { + oe, ok := observed.(EventEnable) + if !ok { + return false + } + return e.Plugin.ID == oe.Plugin.ID +} + +// SubscribeEvents provides an event channel to listen for structured events from +// the plugin manager actions, CRUD operations. +// The caller must call the returned `cancel()` function once done with the channel +// or this will leak resources. +func (pm *Manager) SubscribeEvents(buffer int, watchEvents ...Event) (eventCh <-chan interface{}, cancel func()) { + topic := func(i interface{}) bool { + observed, ok := i.(Event) + if !ok { + panic(fmt.Sprintf("unexpected type passed to event channel: %v", reflect.TypeOf(i))) + } + for _, e := range watchEvents { + if e.matches(observed) { + return true + } + } + // If no specific events are specified always assume a matched event + // If some events were specified and none matched above, then the event + // doesn't match + return watchEvents == nil + } + ch := pm.publisher.SubscribeTopicWithBuffer(topic, buffer) + cancelFunc := func() { pm.publisher.Evict(ch) } + return ch, cancelFunc +} diff --git a/plugin/manager.go b/plugin/manager.go index 43b1e676ff..fada0d667a 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -22,6 +22,7 @@ import ( "github.com/docker/docker/pkg/authorization" "github.com/docker/docker/pkg/ioutils" "github.com/docker/docker/pkg/mount" + "github.com/docker/docker/pkg/pubsub" "github.com/docker/docker/pkg/system" "github.com/docker/docker/plugin/v2" "github.com/docker/docker/registry" @@ -63,6 +64,7 @@ type Manager struct { cMap map[*v2.Plugin]*controller containerdClient libcontainerd.Client blobStore *basicBlobStore + publisher *pubsub.Publisher } // controller represents the manager's control on a plugin. @@ -117,6 +119,8 @@ func NewManager(config ManagerConfig) (*Manager, error) { if err := manager.reload(); err != nil { return nil, errors.Wrap(err, "failed to restore plugins") } + + manager.publisher = pubsub.NewPublisher(0, 0) return manager, nil } @@ -268,6 +272,11 @@ func (pm *Manager) reload() error { // todo: restore return nil } +// Get looks up the requested plugin in the store. +func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) { + return pm.config.Store.GetV2Plugin(idOrName) +} + func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) { p := filepath.Join(pm.config.Root, id, configFileName) dt, err := ioutil.ReadFile(p) diff --git a/plugin/manager_linux.go b/plugin/manager_linux.go index 678be84b3f..7f79e6900e 100644 --- a/plugin/manager_linux.go +++ b/plugin/manager_linux.go @@ -274,7 +274,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest. } // createPlugin creates a new plugin. take lock before calling. -func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges) (p *v2.Plugin, err error) { +func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) { if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store return nil, err } @@ -294,6 +294,9 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum Blobsums: blobsums, } p.InitEmptySettings() + for _, o := range opts { + o(p) + } pdir := filepath.Join(pm.config.Root, p.PluginObj.ID) if err := os.MkdirAll(pdir, 0700); err != nil { diff --git a/plugin/v2/plugin.go b/plugin/v2/plugin.go index 74ff64080a..b77536c986 100644 --- a/plugin/v2/plugin.go +++ b/plugin/v2/plugin.go @@ -22,6 +22,8 @@ type Plugin struct { Config digest.Digest Blobsums []digest.Digest + + SwarmServiceID string } const defaultPluginRuntimeDestination = "/run/docker/plugins"