Make plugin emit strongly typed, consumable events

Enables other subsystems to watch actions for a plugin(s).

This will be used specifically for implementing plugins on swarm where a
swarm controller needs to watch the state of a plugin.

Signed-off-by: Brian Goff <cpuguy83@gmail.com>
This commit is contained in:
Brian Goff 2017-06-07 13:07:01 -04:00
parent 9d95740dbf
commit 72c3bcf2a5
37 changed files with 2204 additions and 213 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/docker/distribution/reference"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/plugin"
"golang.org/x/net/context"
)
@ -19,7 +20,7 @@ type Backend interface {
Remove(name string, config *enginetypes.PluginRmConfig) error
Set(name string, args []string) error
Privileges(ctx context.Context, ref reference.Named, metaHeaders http.Header, authConfig *enginetypes.AuthConfig) (enginetypes.PluginPrivileges, error)
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
Push(ctx context.Context, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, outStream io.Writer) error
Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
CreateFromContext(ctx context.Context, tarCtx io.ReadCloser, options *enginetypes.PluginCreateOptions) error

View File

@ -44,7 +44,7 @@ func (sr *swarmRouter) swarmLogs(ctx context.Context, w http.ResponseWriter, r *
// maybe should return some context with this error?
return err
}
tty = s.Spec.TaskTemplate.ContainerSpec.TTY || tty
tty = (s.Spec.TaskTemplate.ContainerSpec != nil && s.Spec.TaskTemplate.ContainerSpec.TTY) || tty
}
for _, task := range selector.Tasks {
t, err := sr.backend.GetTask(task)

View File

@ -1975,11 +1975,39 @@ definitions:
description: "User modifiable task configuration."
type: "object"
properties:
PluginSpec:
type: "object"
description: "Invalid when specified with `ContainerSpec`."
properties:
Name:
description: "The name or 'alias' to use for the plugin."
type: "string"
Remote:
description: "The plugin image reference to use."
type: "string"
Disabled:
description: "Disable the plugin once scheduled."
type: "boolean"
PluginPrivilege:
type: "array"
items:
description: "Describes a permission accepted by the user upon installing the plugin."
type: "object"
properties:
Name:
type: "string"
Description:
type: "string"
Value:
type: "array"
items:
type: "string"
ContainerSpec:
type: "object"
description: "Invalid when specified with `PluginSpec`."
properties:
Image:
description: "The image name to use for the container."
description: "The image name to use for the container"
type: "string"
Labels:
description: "User-defined key/value data."

View File

@ -0,0 +1,3 @@
//go:generate protoc -I . --gogofast_out=import_path=github.com/docker/docker/api/types/swarm/runtime:. plugin.proto
package runtime

View File

@ -0,0 +1,712 @@
// Code generated by protoc-gen-gogo.
// source: plugin.proto
// DO NOT EDIT!
/*
Package runtime is a generated protocol buffer package.
It is generated from these files:
plugin.proto
It has these top-level messages:
PluginSpec
PluginPrivilege
*/
package runtime
import proto "github.com/gogo/protobuf/proto"
import fmt "fmt"
import math "math"
import io "io"
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
// PluginSpec defines the base payload which clients can specify for creating
// a service with the plugin runtime.
type PluginSpec struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Remote string `protobuf:"bytes,2,opt,name=remote,proto3" json:"remote,omitempty"`
Privileges []*PluginPrivilege `protobuf:"bytes,3,rep,name=privileges" json:"privileges,omitempty"`
Disabled bool `protobuf:"varint,4,opt,name=disabled,proto3" json:"disabled,omitempty"`
}
func (m *PluginSpec) Reset() { *m = PluginSpec{} }
func (m *PluginSpec) String() string { return proto.CompactTextString(m) }
func (*PluginSpec) ProtoMessage() {}
func (*PluginSpec) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{0} }
func (m *PluginSpec) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *PluginSpec) GetRemote() string {
if m != nil {
return m.Remote
}
return ""
}
func (m *PluginSpec) GetPrivileges() []*PluginPrivilege {
if m != nil {
return m.Privileges
}
return nil
}
func (m *PluginSpec) GetDisabled() bool {
if m != nil {
return m.Disabled
}
return false
}
// PluginPrivilege describes a permission the user has to accept
// upon installing a plugin.
type PluginPrivilege struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Description string `protobuf:"bytes,2,opt,name=description,proto3" json:"description,omitempty"`
Value []string `protobuf:"bytes,3,rep,name=value" json:"value,omitempty"`
}
func (m *PluginPrivilege) Reset() { *m = PluginPrivilege{} }
func (m *PluginPrivilege) String() string { return proto.CompactTextString(m) }
func (*PluginPrivilege) ProtoMessage() {}
func (*PluginPrivilege) Descriptor() ([]byte, []int) { return fileDescriptorPlugin, []int{1} }
func (m *PluginPrivilege) GetName() string {
if m != nil {
return m.Name
}
return ""
}
func (m *PluginPrivilege) GetDescription() string {
if m != nil {
return m.Description
}
return ""
}
func (m *PluginPrivilege) GetValue() []string {
if m != nil {
return m.Value
}
return nil
}
func init() {
proto.RegisterType((*PluginSpec)(nil), "PluginSpec")
proto.RegisterType((*PluginPrivilege)(nil), "PluginPrivilege")
}
func (m *PluginSpec) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PluginSpec) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Name) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name)))
i += copy(dAtA[i:], m.Name)
}
if len(m.Remote) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintPlugin(dAtA, i, uint64(len(m.Remote)))
i += copy(dAtA[i:], m.Remote)
}
if len(m.Privileges) > 0 {
for _, msg := range m.Privileges {
dAtA[i] = 0x1a
i++
i = encodeVarintPlugin(dAtA, i, uint64(msg.Size()))
n, err := msg.MarshalTo(dAtA[i:])
if err != nil {
return 0, err
}
i += n
}
}
if m.Disabled {
dAtA[i] = 0x20
i++
if m.Disabled {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
return i, nil
}
func (m *PluginPrivilege) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalTo(dAtA)
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *PluginPrivilege) MarshalTo(dAtA []byte) (int, error) {
var i int
_ = i
var l int
_ = l
if len(m.Name) > 0 {
dAtA[i] = 0xa
i++
i = encodeVarintPlugin(dAtA, i, uint64(len(m.Name)))
i += copy(dAtA[i:], m.Name)
}
if len(m.Description) > 0 {
dAtA[i] = 0x12
i++
i = encodeVarintPlugin(dAtA, i, uint64(len(m.Description)))
i += copy(dAtA[i:], m.Description)
}
if len(m.Value) > 0 {
for _, s := range m.Value {
dAtA[i] = 0x1a
i++
l = len(s)
for l >= 1<<7 {
dAtA[i] = uint8(uint64(l)&0x7f | 0x80)
l >>= 7
i++
}
dAtA[i] = uint8(l)
i++
i += copy(dAtA[i:], s)
}
}
return i, nil
}
func encodeFixed64Plugin(dAtA []byte, offset int, v uint64) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
dAtA[offset+4] = uint8(v >> 32)
dAtA[offset+5] = uint8(v >> 40)
dAtA[offset+6] = uint8(v >> 48)
dAtA[offset+7] = uint8(v >> 56)
return offset + 8
}
func encodeFixed32Plugin(dAtA []byte, offset int, v uint32) int {
dAtA[offset] = uint8(v)
dAtA[offset+1] = uint8(v >> 8)
dAtA[offset+2] = uint8(v >> 16)
dAtA[offset+3] = uint8(v >> 24)
return offset + 4
}
func encodeVarintPlugin(dAtA []byte, offset int, v uint64) int {
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return offset + 1
}
func (m *PluginSpec) Size() (n int) {
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPlugin(uint64(l))
}
l = len(m.Remote)
if l > 0 {
n += 1 + l + sovPlugin(uint64(l))
}
if len(m.Privileges) > 0 {
for _, e := range m.Privileges {
l = e.Size()
n += 1 + l + sovPlugin(uint64(l))
}
}
if m.Disabled {
n += 2
}
return n
}
func (m *PluginPrivilege) Size() (n int) {
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovPlugin(uint64(l))
}
l = len(m.Description)
if l > 0 {
n += 1 + l + sovPlugin(uint64(l))
}
if len(m.Value) > 0 {
for _, s := range m.Value {
l = len(s)
n += 1 + l + sovPlugin(uint64(l))
}
}
return n
}
func sovPlugin(x uint64) (n int) {
for {
n++
x >>= 7
if x == 0 {
break
}
}
return n
}
func sozPlugin(x uint64) (n int) {
return sovPlugin(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *PluginSpec) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PluginSpec: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PluginSpec: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Remote", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Remote = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Privileges", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + msglen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Privileges = append(m.Privileges, &PluginPrivilege{})
if err := m.Privileges[len(m.Privileges)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 4:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Disabled", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
m.Disabled = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipPlugin(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPlugin
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *PluginPrivilege) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: PluginPrivilege: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: PluginPrivilege: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Description", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Description = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 3:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowPlugin
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthPlugin
}
postIndex := iNdEx + intStringLen
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = append(m.Value, string(dAtA[iNdEx:postIndex]))
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipPlugin(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthPlugin
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipPlugin(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPlugin
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPlugin
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPlugin
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
iNdEx += length
if length < 0 {
return 0, ErrInvalidLengthPlugin
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowPlugin
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipPlugin(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthPlugin = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowPlugin = fmt.Errorf("proto: integer overflow")
)
func init() { proto.RegisterFile("plugin.proto", fileDescriptorPlugin) }
var fileDescriptorPlugin = []byte{
// 196 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x29, 0xc8, 0x29, 0x4d,
0xcf, 0xcc, 0xd3, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x57, 0x6a, 0x63, 0xe4, 0xe2, 0x0a, 0x00, 0x0b,
0x04, 0x17, 0xa4, 0x26, 0x0b, 0x09, 0x71, 0xb1, 0xe4, 0x25, 0xe6, 0xa6, 0x4a, 0x30, 0x2a, 0x30,
0x6a, 0x70, 0x06, 0x81, 0xd9, 0x42, 0x62, 0x5c, 0x6c, 0x45, 0xa9, 0xb9, 0xf9, 0x25, 0xa9, 0x12,
0x4c, 0x60, 0x51, 0x28, 0x4f, 0xc8, 0x80, 0x8b, 0xab, 0xa0, 0x28, 0xb3, 0x2c, 0x33, 0x27, 0x35,
0x3d, 0xb5, 0x58, 0x82, 0x59, 0x81, 0x59, 0x83, 0xdb, 0x48, 0x40, 0x0f, 0x62, 0x58, 0x00, 0x4c,
0x22, 0x08, 0x49, 0x8d, 0x90, 0x14, 0x17, 0x47, 0x4a, 0x66, 0x71, 0x62, 0x52, 0x4e, 0x6a, 0x8a,
0x04, 0x8b, 0x02, 0xa3, 0x06, 0x47, 0x10, 0x9c, 0xaf, 0x14, 0xcb, 0xc5, 0x8f, 0xa6, 0x15, 0xab,
0x63, 0x14, 0xb8, 0xb8, 0x53, 0x52, 0x8b, 0x93, 0x8b, 0x32, 0x0b, 0x4a, 0x32, 0xf3, 0xf3, 0xa0,
0x2e, 0x42, 0x16, 0x12, 0x12, 0xe1, 0x62, 0x2d, 0x4b, 0xcc, 0x29, 0x4d, 0x05, 0xbb, 0x88, 0x33,
0x08, 0xc2, 0x71, 0xe2, 0x39, 0xf1, 0x48, 0x8e, 0xf1, 0xc2, 0x23, 0x39, 0xc6, 0x07, 0x8f, 0xe4,
0x18, 0x93, 0xd8, 0xc0, 0x9e, 0x37, 0x06, 0x04, 0x00, 0x00, 0xff, 0xff, 0xb8, 0x84, 0xad, 0x79,
0x0c, 0x01, 0x00, 0x00,
}

View File

@ -0,0 +1,18 @@
syntax = "proto3";
// PluginSpec defines the base payload which clients can specify for creating
// a service with the plugin runtime.
message PluginSpec {
string name = 1;
string remote = 2;
repeated PluginPrivilege privileges = 3;
bool disabled = 4;
}
// PluginPrivilege describes a permission the user has to accept
// upon installing a plugin.
message PluginPrivilege {
string name = 1;
string description = 2;
repeated string value = 3;
}

View File

@ -1,6 +1,10 @@
package swarm
import "time"
import (
"time"
"github.com/docker/docker/api/types/swarm/runtime"
)
// TaskState represents the state of a task.
type TaskState string
@ -51,7 +55,11 @@ type Task struct {
// TaskSpec represents the spec of a task.
type TaskSpec struct {
ContainerSpec ContainerSpec `json:",omitempty"`
// ContainerSpec and PluginSpec are mutually exclusive.
// PluginSpec will only be used when the `Runtime` field is set to `plugin`
ContainerSpec *ContainerSpec `json:",omitempty"`
PluginSpec *runtime.PluginSpec `json:",omitempty"`
Resources *ResourceRequirements `json:",omitempty"`
RestartPolicy *RestartPolicy `json:",omitempty"`
Placement *Placement `json:",omitempty"`

View File

@ -6,9 +6,9 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/docker/api/types"
registrytypes "github.com/docker/docker/api/types/registry"
"github.com/docker/docker/api/types/swarm"
"github.com/opencontainers/go-digest"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
@ -24,24 +24,51 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec,
headers["X-Registry-Auth"] = []string{options.EncodedRegistryAuth}
}
// ensure that the image is tagged
if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
service.TaskTemplate.ContainerSpec.Image = taggedImg
// Make sure containerSpec is not nil when no runtime is set or the runtime is set to container
if service.TaskTemplate.ContainerSpec == nil && (service.TaskTemplate.Runtime == "" || service.TaskTemplate.Runtime == swarm.RuntimeContainer) {
service.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
}
// Contact the registry to retrieve digest and platform information
if options.QueryRegistry {
distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
distErr = err
if err == nil {
// now pin by digest if the image doesn't already contain a digest
if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" {
if err := validateServiceSpec(service); err != nil {
return types.ServiceCreateResponse{}, err
}
// ensure that the image is tagged
var imgPlatforms []swarm.Platform
if service.TaskTemplate.ContainerSpec != nil {
if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
service.TaskTemplate.ContainerSpec.Image = taggedImg
}
if options.QueryRegistry {
var img string
img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
if img != "" {
service.TaskTemplate.ContainerSpec.Image = img
}
// add platforms that are compatible with the service
service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect)
}
}
// ensure that the image is tagged
if service.TaskTemplate.PluginSpec != nil {
if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" {
service.TaskTemplate.PluginSpec.Remote = taggedImg
}
if options.QueryRegistry {
var img string
img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth)
if img != "" {
service.TaskTemplate.PluginSpec.Remote = img
}
}
}
if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 {
service.TaskTemplate.Placement = &swarm.Placement{}
}
if len(imgPlatforms) > 0 {
service.TaskTemplate.Placement.Platforms = imgPlatforms
}
var response types.ServiceCreateResponse
resp, err := cli.post(ctx, "/services/create", nil, service, headers)
if err != nil {
@ -58,6 +85,28 @@ func (cli *Client) ServiceCreate(ctx context.Context, service swarm.ServiceSpec,
return response, err
}
func imageDigestAndPlatforms(ctx context.Context, cli *Client, image, encodedAuth string) (string, []swarm.Platform, error) {
distributionInspect, err := cli.DistributionInspect(ctx, image, encodedAuth)
imageWithDigest := image
var platforms []swarm.Platform
if err != nil {
return "", nil, err
}
imageWithDigest = imageWithDigestString(image, distributionInspect.Descriptor.Digest)
if len(distributionInspect.Platforms) > 0 {
platforms = make([]swarm.Platform, 0, len(distributionInspect.Platforms))
for _, p := range distributionInspect.Platforms {
platforms = append(platforms, swarm.Platform{
Architecture: p.Architecture,
OS: p.OS,
})
}
}
return imageWithDigest, platforms, err
}
// imageWithDigestString takes an image string and a digest, and updates
// the image string if it didn't originally contain a digest. It returns
// an empty string if there are no updates.
@ -86,27 +135,22 @@ func imageWithTagString(image string) string {
return ""
}
// setServicePlatforms sets Platforms in swarm.Placement to list all
// compatible platforms for the service, as found in distributionInspect
// and returns a pointer to the new or updated swarm.Placement struct.
func setServicePlatforms(placement *swarm.Placement, distributionInspect registrytypes.DistributionInspect) *swarm.Placement {
if placement == nil {
placement = &swarm.Placement{}
}
// reset any existing listed platforms
placement.Platforms = []swarm.Platform{}
for _, p := range distributionInspect.Platforms {
placement.Platforms = append(placement.Platforms, swarm.Platform{
Architecture: p.Architecture,
OS: p.OS,
})
}
return placement
}
// digestWarning constructs a formatted warning string using the
// image name that could not be pinned by digest. The formatting
// is hardcoded, but could me made smarter in the future
func digestWarning(image string) string {
return fmt.Sprintf("image %s could not be accessed on a registry to record\nits digest. Each node will access %s independently,\npossibly leading to different nodes running different\nversions of the image.\n", image, image)
}
func validateServiceSpec(s swarm.ServiceSpec) error {
if s.TaskTemplate.ContainerSpec != nil && s.TaskTemplate.PluginSpec != nil {
return errors.New("must not specify both a container spec and a plugin spec in the task template")
}
if s.TaskTemplate.PluginSpec != nil && s.TaskTemplate.Runtime != swarm.RuntimePlugin {
return errors.New("mismatched runtime with plugin spec")
}
if s.TaskTemplate.ContainerSpec != nil && (s.TaskTemplate.Runtime != "" && s.TaskTemplate.Runtime != swarm.RuntimeContainer) {
return errors.New("mismatched runtime with container spec")
}
return nil
}

View File

@ -112,7 +112,7 @@ func TestServiceCreateCompatiblePlatforms(t *testing.T) {
}),
}
spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: swarm.ContainerSpec{Image: "foobar:1.0"}}}
spec := swarm.ServiceSpec{TaskTemplate: swarm.TaskSpec{ContainerSpec: &swarm.ContainerSpec{Image: "foobar:1.0"}}}
r, err := client.ServiceCreate(context.Background(), spec, types.ServiceCreateOptions{QueryRegistry: true})
assert.NoError(t, err)
@ -189,7 +189,7 @@ func TestServiceCreateDigestPinning(t *testing.T) {
for _, p := range pinByDigestTests {
r, err := client.ServiceCreate(context.Background(), swarm.ServiceSpec{
TaskTemplate: swarm.TaskSpec{
ContainerSpec: swarm.ContainerSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: p.img,
},
},

View File

@ -35,26 +35,46 @@ func (cli *Client) ServiceUpdate(ctx context.Context, serviceID string, version
query.Set("version", strconv.FormatUint(version.Index, 10))
// ensure that the image is tagged
if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
service.TaskTemplate.ContainerSpec.Image = taggedImg
if err := validateServiceSpec(service); err != nil {
return types.ServiceUpdateResponse{}, err
}
// Contact the registry to retrieve digest and platform information
// This happens only when the image has changed
if options.QueryRegistry {
distributionInspect, err := cli.DistributionInspect(ctx, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
distErr = err
if err == nil {
// now pin by digest if the image doesn't already contain a digest
if img := imageWithDigestString(service.TaskTemplate.ContainerSpec.Image, distributionInspect.Descriptor.Digest); img != "" {
var imgPlatforms []swarm.Platform
// ensure that the image is tagged
if service.TaskTemplate.ContainerSpec != nil {
if taggedImg := imageWithTagString(service.TaskTemplate.ContainerSpec.Image); taggedImg != "" {
service.TaskTemplate.ContainerSpec.Image = taggedImg
}
if options.QueryRegistry {
var img string
img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.ContainerSpec.Image, options.EncodedRegistryAuth)
if img != "" {
service.TaskTemplate.ContainerSpec.Image = img
}
// add platforms that are compatible with the service
service.TaskTemplate.Placement = setServicePlatforms(service.TaskTemplate.Placement, distributionInspect)
}
}
// ensure that the image is tagged
if service.TaskTemplate.PluginSpec != nil {
if taggedImg := imageWithTagString(service.TaskTemplate.PluginSpec.Remote); taggedImg != "" {
service.TaskTemplate.PluginSpec.Remote = taggedImg
}
if options.QueryRegistry {
var img string
img, imgPlatforms, distErr = imageDigestAndPlatforms(ctx, cli, service.TaskTemplate.PluginSpec.Remote, options.EncodedRegistryAuth)
if img != "" {
service.TaskTemplate.PluginSpec.Remote = img
}
}
}
if service.TaskTemplate.Placement == nil && len(imgPlatforms) > 0 {
service.TaskTemplate.Placement = &swarm.Placement{}
}
if len(imgPlatforms) > 0 {
service.TaskTemplate.Placement.Platforms = imgPlatforms
}
var response types.ServiceUpdateResponse
resp, err := cli.post(ctx, "/services/"+serviceID+"/update", query, service, headers)
if err != nil {

View File

@ -253,6 +253,7 @@ func (cli *DaemonCli) start(opts *daemonOptions) (err error) {
Root: cli.Config.Root,
Name: name,
Backend: d,
PluginBackend: d.PluginManager(),
NetworkSubnetsProvider: d,
DefaultAdvertiseAddr: cli.Config.SwarmDefaultAdvertiseAddr,
RuntimeRoot: cli.getSwarmRunRoot(),

View File

@ -49,6 +49,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types/network"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/controllers/plugin"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/docker/pkg/signal"
lncluster "github.com/docker/libnetwork/cluster"
@ -97,6 +98,7 @@ type Config struct {
Root string
Name string
Backend executorpkg.Backend
PluginBackend plugin.Backend
NetworkSubnetsProvider NetworkSubnetsProvider
// DefaultAdvertiseAddr is the default host/IP or network interface to use

View File

@ -1,79 +1,261 @@
package plugin
import (
"io"
"io/ioutil"
"net/http"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/plugin"
"github.com/docker/docker/plugin/v2"
"github.com/docker/swarmkit/api"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// Controller is the controller for the plugin backend
type Controller struct{}
// Controller is the controller for the plugin backend.
// Plugins are managed as a singleton object with a desired state (different from containers).
// With the the plugin controller instead of having a strict create->start->stop->remove
// task lifecycle like containers, we manage the desired state of the plugin and let
// the plugin manager do what it already does and monitor the plugin.
// We'll also end up with many tasks all pointing to the same plugin ID.
//
// TODO(@cpuguy83): registry auth is intentionally not supported until we work out
// the right way to pass registry crednetials via secrets.
type Controller struct {
backend Backend
spec runtime.PluginSpec
logger *logrus.Entry
pluginID string
serviceID string
taskID string
// hook used to signal tests that `Wait()` is actually ready and waiting
signalWaitReady func()
}
// Backend is the interface for interacting with the plugin manager
// Controller actions are passed to the configured backend to do the real work.
type Backend interface {
Disable(name string, config *enginetypes.PluginDisableConfig) error
Enable(name string, config *enginetypes.PluginEnableConfig) error
Remove(name string, config *enginetypes.PluginRmConfig) error
Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error
Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error
Get(name string) (*v2.Plugin, error)
SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func())
}
// NewController returns a new cluster plugin controller
func NewController() (*Controller, error) {
return &Controller{}, nil
func NewController(backend Backend, t *api.Task) (*Controller, error) {
spec, err := readSpec(t)
if err != nil {
return nil, err
}
return &Controller{
backend: backend,
spec: spec,
serviceID: t.ServiceID,
logger: logrus.WithFields(logrus.Fields{
"controller": "plugin",
"task": t.ID,
"plugin": spec.Name,
})}, nil
}
func readSpec(t *api.Task) (runtime.PluginSpec, error) {
var cfg runtime.PluginSpec
generic := t.Spec.GetGeneric()
if err := proto.Unmarshal(generic.Payload.Value, &cfg); err != nil {
return cfg, errors.Wrap(err, "error reading plugin spec")
}
return cfg, nil
}
// Update is the update phase from swarmkit
func (p *Controller) Update(ctx context.Context, t *api.Task) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Update")
p.logger.Debug("Update")
return nil
}
// Prepare is the prepare phase from swarmkit
func (p *Controller) Prepare(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Prepare")
func (p *Controller) Prepare(ctx context.Context) (err error) {
p.logger.Debug("Prepare")
remote, err := reference.ParseNormalizedNamed(p.spec.Remote)
if err != nil {
return errors.Wrapf(err, "error parsing remote reference %q", p.spec.Remote)
}
if p.spec.Name == "" {
p.spec.Name = remote.String()
}
var authConfig enginetypes.AuthConfig
privs := convertPrivileges(p.spec.Privileges)
pl, err := p.backend.Get(p.spec.Name)
defer func() {
if pl != nil && err == nil {
pl.Acquire()
}
}()
if err == nil && pl != nil {
if pl.SwarmServiceID != p.serviceID {
return errors.Errorf("plugin already exists: %s", p.spec.Name)
}
if pl.IsEnabled() {
if err := p.backend.Disable(pl.GetID(), &enginetypes.PluginDisableConfig{ForceDisable: true}); err != nil {
p.logger.WithError(err).Debug("could not disable plugin before running upgrade")
}
}
p.pluginID = pl.GetID()
return p.backend.Upgrade(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard)
}
if err := p.backend.Pull(ctx, remote, p.spec.Name, nil, &authConfig, privs, ioutil.Discard, plugin.WithSwarmService(p.serviceID)); err != nil {
return err
}
pl, err = p.backend.Get(p.spec.Name)
if err != nil {
return err
}
p.pluginID = pl.GetID()
return nil
}
// Start is the start phase from swarmkit
func (p *Controller) Start(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Start")
p.logger.Debug("Start")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
if p.spec.Disabled {
if pl.IsEnabled() {
return p.backend.Disable(p.pluginID, &enginetypes.PluginDisableConfig{ForceDisable: false})
}
return nil
}
if !pl.IsEnabled() {
return p.backend.Enable(p.pluginID, &enginetypes.PluginEnableConfig{Timeout: 30})
}
return nil
}
// Wait causes the task to wait until returned
func (p *Controller) Wait(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Wait")
return nil
p.logger.Debug("Wait")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
return err
}
events, cancel := p.backend.SubscribeEvents(1, plugin.EventDisable{Plugin: pl.PluginObj}, plugin.EventRemove{Plugin: pl.PluginObj}, plugin.EventEnable{Plugin: pl.PluginObj})
defer cancel()
if p.signalWaitReady != nil {
p.signalWaitReady()
}
if !p.spec.Disabled != pl.IsEnabled() {
return errors.New("mismatched plugin state")
}
for {
select {
case <-ctx.Done():
return ctx.Err()
case e := <-events:
p.logger.Debugf("got event %#T", e)
switch e.(type) {
case plugin.EventEnable:
if p.spec.Disabled {
return errors.New("plugin enabled")
}
case plugin.EventRemove:
return errors.New("plugin removed")
case plugin.EventDisable:
if !p.spec.Disabled {
return errors.New("plugin disabled")
}
}
}
}
}
func isNotFound(err error) bool {
_, ok := errors.Cause(err).(plugin.ErrNotFound)
return ok
}
// Shutdown is the shutdown phase from swarmkit
func (p *Controller) Shutdown(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Shutdown")
p.logger.Debug("Shutdown")
return nil
}
// Terminate is the terminate phase from swarmkit
func (p *Controller) Terminate(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Terminate")
p.logger.Debug("Terminate")
return nil
}
// Remove is the remove phase from swarmkit
func (p *Controller) Remove(ctx context.Context) error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Remove")
return nil
p.logger.Debug("Remove")
pl, err := p.backend.Get(p.pluginID)
if err != nil {
if isNotFound(err) {
return nil
}
return err
}
pl.Release()
if pl.GetRefCount() > 0 {
p.logger.Debug("skipping remove due to ref count")
return nil
}
// This may error because we have exactly 1 plugin, but potentially multiple
// tasks which are calling remove.
err = p.backend.Remove(p.pluginID, &enginetypes.PluginRmConfig{ForceRemove: true})
if isNotFound(err) {
return nil
}
return err
}
// Close is the close phase from swarmkit
func (p *Controller) Close() error {
logrus.WithFields(logrus.Fields{
"controller": "plugin",
}).Debug("Close")
p.logger.Debug("Close")
return nil
}
func convertPrivileges(ls []*runtime.PluginPrivilege) enginetypes.PluginPrivileges {
var out enginetypes.PluginPrivileges
for _, p := range ls {
pp := enginetypes.PluginPrivilege{
Name: p.Name,
Description: p.Description,
Value: p.Value,
}
out = append(out, pp)
}
return out
}

View File

@ -0,0 +1,390 @@
package plugin
import (
"errors"
"io"
"io/ioutil"
"net/http"
"strings"
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/distribution/reference"
enginetypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/plugin"
"github.com/docker/docker/plugin/v2"
"golang.org/x/net/context"
)
const (
pluginTestName = "test"
pluginTestRemote = "testremote"
pluginTestRemoteUpgrade = "testremote2"
)
func TestPrepare(t *testing.T) {
b := newMockBackend()
c := newTestController(b, false)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if b.p == nil {
t.Fatal("pull not performed")
}
c = newTestController(b, false)
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if b.p == nil {
t.Fatal("unexpected nil")
}
if b.p.PluginObj.PluginReference != pluginTestRemoteUpgrade {
t.Fatal("upgrade not performed")
}
c = newTestController(b, false)
c.serviceID = "1"
if err := c.Prepare(ctx); err == nil {
t.Fatal("expected error on prepare")
}
}
func TestStart(t *testing.T) {
b := newMockBackend()
c := newTestController(b, false)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
if !b.p.IsEnabled() {
t.Fatal("expected plugin to be enabled")
}
c = newTestController(b, true)
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
if b.p.IsEnabled() {
t.Fatal("expected plugin to be disabled")
}
c = newTestController(b, false)
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
if !b.p.IsEnabled() {
t.Fatal("expected plugin to be enabled")
}
}
func TestWaitCancel(t *testing.T) {
b := newMockBackend()
c := newTestController(b, true)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
ctxCancel, cancel := context.WithCancel(ctx)
chErr := make(chan error)
go func() {
chErr <- c.Wait(ctxCancel)
}()
cancel()
select {
case err := <-chErr:
if err != context.Canceled {
t.Fatal(err)
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for cancelation")
}
}
func TestWaitDisabled(t *testing.T) {
b := newMockBackend()
c := newTestController(b, true)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
chErr := make(chan error)
go func() {
chErr <- c.Wait(ctx)
}()
if err := b.Enable("test", nil); err != nil {
t.Fatal(err)
}
select {
case err := <-chErr:
if err == nil {
t.Fatal("expected error")
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
ctxWaitReady, cancelCtxWaitReady := context.WithTimeout(ctx, 30*time.Second)
c.signalWaitReady = cancelCtxWaitReady
defer cancelCtxWaitReady()
go func() {
chErr <- c.Wait(ctx)
}()
chEvent, cancel := b.SubscribeEvents(1)
defer cancel()
if err := b.Disable("test", nil); err != nil {
t.Fatal(err)
}
select {
case <-chEvent:
<-ctxWaitReady.Done()
if err := ctxWaitReady.Err(); err == context.DeadlineExceeded {
t.Fatal(err)
}
select {
case <-chErr:
t.Fatal("wait returned unexpectedly")
default:
// all good
}
case <-chErr:
t.Fatal("wait returned unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
if err := b.Remove("test", nil); err != nil {
t.Fatal(err)
}
select {
case err := <-chErr:
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), "removed") {
t.Fatal(err)
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
}
func TestWaitEnabled(t *testing.T) {
b := newMockBackend()
c := newTestController(b, false)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
chErr := make(chan error)
go func() {
chErr <- c.Wait(ctx)
}()
if err := b.Disable("test", nil); err != nil {
t.Fatal(err)
}
select {
case err := <-chErr:
if err == nil {
t.Fatal("expected error")
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
if err := c.Start(ctx); err != nil {
t.Fatal(err)
}
ctxWaitReady, ctxWaitCancel := context.WithCancel(ctx)
c.signalWaitReady = ctxWaitCancel
defer ctxWaitCancel()
go func() {
chErr <- c.Wait(ctx)
}()
chEvent, cancel := b.SubscribeEvents(1)
defer cancel()
if err := b.Enable("test", nil); err != nil {
t.Fatal(err)
}
select {
case <-chEvent:
<-ctxWaitReady.Done()
if err := ctxWaitReady.Err(); err == context.DeadlineExceeded {
t.Fatal(err)
}
select {
case <-chErr:
t.Fatal("wait returned unexpectedly")
default:
// all good
}
case <-chErr:
t.Fatal("wait returned unexpectedly")
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
if err := b.Remove("test", nil); err != nil {
t.Fatal(err)
}
select {
case err := <-chErr:
if err == nil {
t.Fatal("expected error")
}
if !strings.Contains(err.Error(), "removed") {
t.Fatal(err)
}
case <-time.After(10 * time.Second):
t.Fatal("timeout waiting for event")
}
}
func TestRemove(t *testing.T) {
b := newMockBackend()
c := newTestController(b, false)
ctx := context.Background()
if err := c.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Shutdown(ctx); err != nil {
t.Fatal(err)
}
c2 := newTestController(b, false)
if err := c2.Prepare(ctx); err != nil {
t.Fatal(err)
}
if err := c.Remove(ctx); err != nil {
t.Fatal(err)
}
if b.p == nil {
t.Fatal("plugin removed unexpectedly")
}
if err := c2.Shutdown(ctx); err != nil {
t.Fatal(err)
}
if err := c2.Remove(ctx); err != nil {
t.Fatal(err)
}
if b.p != nil {
t.Fatal("expected plugin to be removed")
}
}
func newTestController(b Backend, disabled bool) *Controller {
return &Controller{
logger: &logrus.Entry{Logger: &logrus.Logger{Out: ioutil.Discard}},
backend: b,
spec: runtime.PluginSpec{
Name: pluginTestName,
Remote: pluginTestRemote,
Disabled: disabled,
},
}
}
func newMockBackend() *mockBackend {
return &mockBackend{
pub: pubsub.NewPublisher(0, 0),
}
}
type mockBackend struct {
p *v2.Plugin
pub *pubsub.Publisher
}
func (m *mockBackend) Disable(name string, config *enginetypes.PluginDisableConfig) error {
m.p.PluginObj.Enabled = false
m.pub.Publish(plugin.EventDisable{})
return nil
}
func (m *mockBackend) Enable(name string, config *enginetypes.PluginEnableConfig) error {
m.p.PluginObj.Enabled = true
m.pub.Publish(plugin.EventEnable{})
return nil
}
func (m *mockBackend) Remove(name string, config *enginetypes.PluginRmConfig) error {
m.p = nil
m.pub.Publish(plugin.EventRemove{})
return nil
}
func (m *mockBackend) Pull(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer, opts ...plugin.CreateOpt) error {
m.p = &v2.Plugin{
PluginObj: enginetypes.Plugin{
ID: "1234",
Name: name,
PluginReference: ref.String(),
},
}
return nil
}
func (m *mockBackend) Upgrade(ctx context.Context, ref reference.Named, name string, metaHeaders http.Header, authConfig *enginetypes.AuthConfig, privileges enginetypes.PluginPrivileges, outStream io.Writer) error {
m.p.PluginObj.PluginReference = pluginTestRemoteUpgrade
return nil
}
func (m *mockBackend) Get(name string) (*v2.Plugin, error) {
if m.p == nil {
return nil, errors.New("not found")
}
return m.p, nil
}
func (m *mockBackend) SubscribeEvents(buffer int, events ...plugin.Event) (eventCh <-chan interface{}, cancel func()) {
ch := m.pub.SubscribeTopicWithBuffer(nil, buffer)
cancel = func() { m.pub.Evict(ch) }
return ch, cancel
}

View File

@ -13,8 +13,11 @@ import (
gogotypes "github.com/gogo/protobuf/types"
)
func containerSpecFromGRPC(c *swarmapi.ContainerSpec) types.ContainerSpec {
containerSpec := types.ContainerSpec{
func containerSpecFromGRPC(c *swarmapi.ContainerSpec) *types.ContainerSpec {
if c == nil {
return nil
}
containerSpec := &types.ContainerSpec{
Image: c.Image,
Labels: c.Labels,
Command: c.Command,
@ -211,7 +214,7 @@ func configReferencesFromGRPC(sr []*swarmapi.ConfigReference) []*types.ConfigRef
return refs
}
func containerToGRPC(c types.ContainerSpec) (*swarmapi.ContainerSpec, error) {
func containerToGRPC(c *types.ContainerSpec) (*swarmapi.ContainerSpec, error) {
containerSpec := &swarmapi.ContainerSpec{
Image: c.Image,
Labels: c.Labels,

View File

@ -1,14 +1,16 @@
package convert
import (
"errors"
"fmt"
"strings"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/pkg/namesgenerator"
swarmapi "github.com/docker/swarmkit/api"
"github.com/gogo/protobuf/proto"
gogotypes "github.com/gogo/protobuf/types"
"github.com/pkg/errors"
)
var (
@ -85,7 +87,10 @@ func serviceSpecFromGRPC(spec *swarmapi.ServiceSpec) (*types.ServiceSpec, error)
}
taskTemplate := taskSpecFromGRPC(spec.Task)
taskTemplate, err := taskSpecFromGRPC(spec.Task)
if err != nil {
return nil, err
}
switch t := spec.Task.GetRuntime().(type) {
case *swarmapi.TaskSpec_Container:
@ -164,19 +169,34 @@ func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
switch s.TaskTemplate.Runtime {
case types.RuntimeContainer, "": // if empty runtime default to container
containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
if s.TaskTemplate.ContainerSpec != nil {
containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
}
spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
}
spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
case types.RuntimePlugin:
spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
Generic: &swarmapi.GenericRuntimeSpec{
Kind: string(types.RuntimePlugin),
Payload: &gogotypes.Any{
TypeUrl: string(types.RuntimeURLPlugin),
if s.Mode.Replicated != nil {
return swarmapi.ServiceSpec{}, errors.New("plugins must not use replicated mode")
}
s.Mode.Global = &types.GlobalService{} // must always be global
if s.TaskTemplate.PluginSpec != nil {
pluginSpec, err := proto.Marshal(s.TaskTemplate.PluginSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
}
spec.Task.Runtime = &swarmapi.TaskSpec_Generic{
Generic: &swarmapi.GenericRuntimeSpec{
Kind: string(types.RuntimePlugin),
Payload: &gogotypes.Any{
TypeUrl: string(types.RuntimeURLPlugin),
Value: pluginSpec,
},
},
},
}
}
default:
return swarmapi.ServiceSpec{}, ErrUnsupportedRuntime
@ -507,21 +527,14 @@ func updateConfigToGRPC(updateConfig *types.UpdateConfig) (*swarmapi.UpdateConfi
return converted, nil
}
func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec {
func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) (types.TaskSpec, error) {
taskNetworks := make([]types.NetworkAttachmentConfig, 0, len(taskSpec.Networks))
for _, n := range taskSpec.Networks {
netConfig := types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases, DriverOpts: n.DriverAttachmentOpts}
taskNetworks = append(taskNetworks, netConfig)
}
c := taskSpec.GetContainer()
cSpec := types.ContainerSpec{}
if c != nil {
cSpec = containerSpecFromGRPC(c)
}
return types.TaskSpec{
ContainerSpec: cSpec,
t := types.TaskSpec{
Resources: resourcesFromGRPC(taskSpec.Resources),
RestartPolicy: restartPolicyFromGRPC(taskSpec.Restart),
Placement: placementFromGRPC(taskSpec.Placement),
@ -529,4 +542,26 @@ func taskSpecFromGRPC(taskSpec swarmapi.TaskSpec) types.TaskSpec {
Networks: taskNetworks,
ForceUpdate: taskSpec.ForceUpdate,
}
switch taskSpec.GetRuntime().(type) {
case *swarmapi.TaskSpec_Container, nil:
c := taskSpec.GetContainer()
if c != nil {
t.ContainerSpec = containerSpecFromGRPC(c)
}
case *swarmapi.TaskSpec_Generic:
g := taskSpec.GetGeneric()
if g != nil {
switch g.Kind {
case string(types.RuntimePlugin):
var p runtime.PluginSpec
if err := proto.Unmarshal(g.Payload.Value, &p); err != nil {
return t, errors.Wrap(err, "error unmarshalling plugin spec")
}
t.PluginSpec = &p
}
}
}
return t, nil
}

View File

@ -4,6 +4,7 @@ import (
"testing"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/swarm/runtime"
swarmapi "github.com/docker/swarmkit/api"
google_protobuf3 "github.com/gogo/protobuf/types"
)
@ -82,7 +83,8 @@ func TestServiceConvertFromGRPCGenericRuntimePlugin(t *testing.T) {
func TestServiceConvertToGRPCGenericRuntimePlugin(t *testing.T) {
s := swarmtypes.ServiceSpec{
TaskTemplate: swarmtypes.TaskSpec{
Runtime: swarmtypes.RuntimePlugin,
Runtime: swarmtypes.RuntimePlugin,
PluginSpec: &runtime.PluginSpec{},
},
Mode: swarmtypes.ServiceMode{
Global: &swarmtypes.GlobalService{},
@ -108,7 +110,7 @@ func TestServiceConvertToGRPCContainerRuntime(t *testing.T) {
image := "alpine:latest"
s := swarmtypes.ServiceSpec{
TaskTemplate: swarmtypes.TaskSpec{
ContainerSpec: swarmtypes.ContainerSpec{
ContainerSpec: &swarmtypes.ContainerSpec{
Image: image,
},
},

View File

@ -9,19 +9,22 @@ import (
)
// TaskFromGRPC converts a grpc Task to a Task.
func TaskFromGRPC(t swarmapi.Task) types.Task {
func TaskFromGRPC(t swarmapi.Task) (types.Task, error) {
if t.Spec.GetAttachment() != nil {
return types.Task{}
return types.Task{}, nil
}
containerStatus := t.Status.GetContainer()
taskSpec, err := taskSpecFromGRPC(t.Spec)
if err != nil {
return types.Task{}, err
}
task := types.Task{
ID: t.ID,
Annotations: annotationsFromGRPC(t.Annotations),
ServiceID: t.ServiceID,
Slot: int(t.Slot),
NodeID: t.NodeID,
Spec: taskSpecFromGRPC(t.Spec),
Spec: taskSpec,
Status: types.TaskStatus{
State: types.TaskState(strings.ToLower(t.Status.State.String())),
Message: t.Status.Message,
@ -49,7 +52,7 @@ func TaskFromGRPC(t swarmapi.Task) types.Task {
}
if t.Status.PortStatus == nil {
return task
return task, nil
}
for _, p := range t.Status.PortStatus.Ports {
@ -62,5 +65,5 @@ func TaskFromGRPC(t swarmapi.Task) types.Task {
})
}
return task
return task, nil
}

View File

@ -22,15 +22,17 @@ import (
)
type executor struct {
backend executorpkg.Backend
dependencies exec.DependencyManager
backend executorpkg.Backend
pluginBackend plugin.Backend
dependencies exec.DependencyManager
}
// NewExecutor returns an executor from the docker client.
func NewExecutor(b executorpkg.Backend) exec.Executor {
func NewExecutor(b executorpkg.Backend, p plugin.Backend) exec.Executor {
return &executor{
backend: b,
dependencies: agent.NewDependencyManager(),
backend: b,
pluginBackend: p,
dependencies: agent.NewDependencyManager(),
}
}
@ -181,7 +183,7 @@ func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
}
switch runtimeKind {
case string(swarmtypes.RuntimePlugin):
c, err := plugin.NewController()
c, err := plugin.NewController(e.pluginBackend, t)
if err != nil {
return ctlr, err
}

View File

@ -57,6 +57,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e
// internal use in checking create/update progress. Therefore,
// we prefix it with a '_'.
"_up-to-date": true,
"runtime": true,
}
if err := filter.Validate(accepted); err != nil {
return nil, err
@ -73,6 +74,7 @@ func newListTasksFilters(filter filters.Args, transformFunc func(filters.Args) e
ServiceIDs: filter.Get("service"),
NodeIDs: filter.Get("node"),
UpToDate: len(filter.Get("_up-to-date")) != 0,
Runtimes: filter.Get("runtime"),
}
for _, s := range filter.Get("desired-state") {

View File

@ -118,7 +118,7 @@ func (n *nodeRunner) start(conf nodeStartConfig) error {
JoinAddr: joinAddr,
StateDir: n.cluster.root,
JoinToken: conf.joinToken,
Executor: container.NewExecutor(n.cluster.config.Backend),
Executor: container.NewExecutor(n.cluster.config.Backend, n.cluster.config.PluginBackend),
HeartbeatTick: 1,
ElectionTick: 3,
UnlockKey: conf.lockKey,

View File

@ -50,14 +50,16 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv
return nil, err
}
if len(options.Filters.Get("runtime")) == 0 {
// Default to using the container runtime filter
options.Filters.Add("runtime", string(types.RuntimeContainer))
}
filters := &swarmapi.ListServicesRequest_Filters{
NamePrefixes: options.Filters.Get("name"),
IDPrefixes: options.Filters.Get("id"),
Labels: runconfigopts.ConvertKVStringsToMap(options.Filters.Get("label")),
// (ehazlett): hardcode runtime for now. eventually we will
// be able to filter for the desired runtimes once more
// are supported.
Runtimes: []string{string(types.RuntimeContainer)},
Runtimes: options.Filters.Get("runtime"),
}
ctx, cancel := c.getRequestContext()
@ -134,6 +136,20 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe
switch serviceSpec.Task.Runtime.(type) {
// handle other runtimes here
case *swarmapi.TaskSpec_Generic:
switch serviceSpec.Task.GetGeneric().Kind {
case string(types.RuntimePlugin):
if s.TaskTemplate.PluginSpec == nil {
return errors.New("plugin spec must be set")
}
}
r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec})
if err != nil {
return err
}
resp.ID = r.Service.ID
case *swarmapi.TaskSpec_Container:
ctnr := serviceSpec.Task.GetContainer()
if ctnr == nil {
@ -146,7 +162,9 @@ func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string, queryRe
// retrieve auth config from encoded auth
authConfig := &apitypes.AuthConfig{}
if encodedAuth != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
authReader := strings.NewReader(encodedAuth)
dec := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, authReader))
if err := dec.Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
@ -216,75 +234,85 @@ func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec typ
return err
}
newCtnr := serviceSpec.Task.GetContainer()
if newCtnr == nil {
return errors.New("service does not use container tasks")
}
encodedAuth := flags.EncodedRegistryAuth
if encodedAuth != "" {
newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
} else {
// this is needed because if the encodedAuth isn't being updated then we
// shouldn't lose it, and continue to use the one that was already present
var ctnr *swarmapi.ContainerSpec
switch flags.RegistryAuthFrom {
case apitypes.RegistryAuthFromSpec, "":
ctnr = currentService.Spec.Task.GetContainer()
case apitypes.RegistryAuthFromPreviousSpec:
if currentService.PreviousSpec == nil {
return errors.New("service does not have a previous spec")
}
ctnr = currentService.PreviousSpec.Task.GetContainer()
default:
return errors.New("unsupported registryAuthFrom value")
}
if ctnr == nil {
return errors.New("service does not use container tasks")
}
newCtnr.PullOptions = ctnr.PullOptions
// update encodedAuth so it can be used to pin image by digest
if ctnr.PullOptions != nil {
encodedAuth = ctnr.PullOptions.RegistryAuth
}
}
// retrieve auth config from encoded auth
authConfig := &apitypes.AuthConfig{}
if encodedAuth != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
resp = &apitypes.ServiceUpdateResponse{}
// pin image by digest for API versions < 1.30
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
// should be removed in the future. Since integration tests only use the
// latest API version, so this is no longer required.
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
if err != nil {
logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
// warning in the client response should be concise
resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
} else if newCtnr.Image != digestImage {
logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
newCtnr.Image = digestImage
} else {
logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
switch serviceSpec.Task.Runtime.(type) {
case *swarmapi.TaskSpec_Generic:
switch serviceSpec.Task.GetGeneric().Kind {
case string(types.RuntimePlugin):
if spec.TaskTemplate.PluginSpec == nil {
return errors.New("plugin spec must be set")
}
}
case *swarmapi.TaskSpec_Container:
newCtnr := serviceSpec.Task.GetContainer()
if newCtnr == nil {
return errors.New("service does not use container tasks")
}
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to update a service
// if the registry is slow or unresponsive.
var cancel func()
ctx, cancel = c.getRequestContext()
defer cancel()
encodedAuth := flags.EncodedRegistryAuth
if encodedAuth != "" {
newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth}
} else {
// this is needed because if the encodedAuth isn't being updated then we
// shouldn't lose it, and continue to use the one that was already present
var ctnr *swarmapi.ContainerSpec
switch flags.RegistryAuthFrom {
case apitypes.RegistryAuthFromSpec, "":
ctnr = currentService.Spec.Task.GetContainer()
case apitypes.RegistryAuthFromPreviousSpec:
if currentService.PreviousSpec == nil {
return errors.New("service does not have a previous spec")
}
ctnr = currentService.PreviousSpec.Task.GetContainer()
default:
return errors.New("unsupported registryAuthFrom value")
}
if ctnr == nil {
return errors.New("service does not use container tasks")
}
newCtnr.PullOptions = ctnr.PullOptions
// update encodedAuth so it can be used to pin image by digest
if ctnr.PullOptions != nil {
encodedAuth = ctnr.PullOptions.RegistryAuth
}
}
// retrieve auth config from encoded auth
authConfig := &apitypes.AuthConfig{}
if encodedAuth != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
// pin image by digest for API versions < 1.30
// TODO(nishanttotla): The check on "DOCKER_SERVICE_PREFER_OFFLINE_IMAGE"
// should be removed in the future. Since integration tests only use the
// latest API version, so this is no longer required.
if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" && queryRegistry {
digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig)
if err != nil {
logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())
// warning in the client response should be concise
resp.Warnings = append(resp.Warnings, digestWarning(newCtnr.Image))
} else if newCtnr.Image != digestImage {
logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage)
newCtnr.Image = digestImage
} else {
logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image)
}
// Replace the context with a fresh one.
// If we timed out while communicating with the
// registry, then "ctx" will already be expired, which
// would cause UpdateService below to fail. Reusing
// "ctx" could make it impossible to update a service
// if the registry is slow or unresponsive.
var cancel func()
ctx, cancel = c.getRequestContext()
defer cancel()
}
}
var rollback swarmapi.UpdateServiceRequest_Rollback

View File

@ -19,7 +19,7 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
return nil, c.errNoManager(state)
}
byName := func(filter filters.Args) error {
filterTransform := func(filter filters.Args) error {
if filter.Include("service") {
serviceFilters := filter.Get("service")
for _, serviceFilter := range serviceFilters {
@ -42,10 +42,15 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
filter.Add("node", node.ID)
}
}
if !filter.Include("runtime") {
// default to only showing container tasks
filter.Add("runtime", "container")
filter.Add("runtime", "")
}
return nil
}
filters, err := newListTasksFilters(options.Filters, byName)
filters, err := newListTasksFilters(options.Filters, filterTransform)
if err != nil {
return nil, err
}
@ -61,11 +66,12 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro
}
tasks := make([]types.Task, 0, len(r.Tasks))
for _, task := range r.Tasks {
if task.Spec.GetContainer() != nil {
tasks = append(tasks, convert.TaskFromGRPC(*task))
t, err := convert.TaskFromGRPC(*task)
if err != nil {
return nil, err
}
tasks = append(tasks, t)
}
return tasks, nil
}
@ -83,5 +89,5 @@ func (c *Cluster) GetTask(input string) (types.Task, error) {
}); err != nil {
return types.Task{}, err
}
return convert.TaskFromGRPC(*task), nil
return convert.TaskFromGRPC(*task)
}

View File

@ -28,6 +28,7 @@ keywords: "API, Docker, rcli, REST, documentation"
* `GET /images/(name)/get` now includes an `ImageMetadata` field which contains image metadata that is local to the engine and not part of the image config.
* `POST /swarm/init` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
* `POST /swarm/join` now accepts a `DataPathAddr` property to set the IP-address or network interface to use for data traffic
* `POST /services/create` now accepts a `PluginSpec` when `TaskTemplate.Runtime` is set to `plugin`
## v1.30 API changes

View File

@ -1,6 +1,7 @@
package daemon
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -10,6 +11,7 @@ import (
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/docker/docker/integration-cli/checker"
"github.com/go-check/check"
"github.com/pkg/errors"
@ -124,20 +126,29 @@ type ConfigConstructor func(*swarm.Config)
// SpecConstructor defines a swarm spec constructor
type SpecConstructor func(*swarm.Spec)
// CreateService creates a swarm service given the specified service constructor
func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string {
// CreateServiceWithOptions creates a swarm service given the specified service constructors
// and auth config
func (d *Swarm) CreateServiceWithOptions(c *check.C, opts types.ServiceCreateOptions, f ...ServiceConstructor) string {
cl, err := client.NewClient(d.Sock(), "", nil, nil)
c.Assert(err, checker.IsNil, check.Commentf("failed to create client"))
defer cl.Close()
var service swarm.Service
for _, fn := range f {
fn(&service)
}
status, out, err := d.SockRequest("POST", "/services/create", service.Spec)
c.Assert(err, checker.IsNil, check.Commentf(string(out)))
c.Assert(status, checker.Equals, http.StatusCreated, check.Commentf("output: %q", string(out)))
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
var scr types.ServiceCreateResponse
c.Assert(json.Unmarshal(out, &scr), checker.IsNil)
return scr.ID
res, err := cl.ServiceCreate(ctx, service.Spec, opts)
c.Assert(err, checker.IsNil)
return res.ID
}
// CreateService creates a swarm service given the specified service constructor
func (d *Swarm) CreateService(c *check.C, f ...ServiceConstructor) string {
return d.CreateServiceWithOptions(c, types.ServiceCreateOptions{}, f...)
}
// GetService returns the swarm service corresponding to the specified id
@ -200,6 +211,37 @@ func (d *Swarm) CheckServiceUpdateState(service string) func(*check.C) (interfac
}
}
// CheckPluginRunning returns the runtime state of the plugin
func (d *Swarm) CheckPluginRunning(plugin string) func(c *check.C) (interface{}, check.CommentInterface) {
return func(c *check.C) (interface{}, check.CommentInterface) {
status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil)
c.Assert(err, checker.IsNil, check.Commentf(string(out)))
if status != http.StatusOK {
return false, nil
}
var p types.Plugin
c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out)))
return p.Enabled, check.Commentf("%+v", p)
}
}
// CheckPluginImage returns the runtime state of the plugin
func (d *Swarm) CheckPluginImage(plugin string) func(c *check.C) (interface{}, check.CommentInterface) {
return func(c *check.C) (interface{}, check.CommentInterface) {
status, out, err := d.SockRequest("GET", "/plugins/"+plugin+"/json", nil)
c.Assert(err, checker.IsNil, check.Commentf(string(out)))
if status != http.StatusOK {
return false, nil
}
var p types.Plugin
c.Assert(json.Unmarshal(out, &p), checker.IsNil, check.Commentf(string(out)))
return p.PluginReference, check.Commentf("%+v", p)
}
}
// CheckServiceTasks returns the number of tasks for the specified service
func (d *Swarm) CheckServiceTasks(service string) func(*check.C) (interface{}, check.CommentInterface) {
return func(c *check.C) (interface{}, check.CommentInterface) {
@ -247,7 +289,7 @@ func (d *Swarm) CheckRunningTaskImages(c *check.C) (interface{}, check.CommentIn
result := make(map[string]int)
for _, task := range tasks {
if task.Status.State == swarm.TaskStateRunning {
if task.Status.State == swarm.TaskStateRunning && task.Spec.ContainerSpec != nil {
result[task.Spec.ContainerSpec.Image]++
}
}

View File

@ -4,15 +4,19 @@ package main
import (
"fmt"
"path"
"strconv"
"strings"
"syscall"
"time"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/swarm/runtime"
"github.com/docker/docker/integration-cli/checker"
"github.com/docker/docker/integration-cli/daemon"
"github.com/docker/docker/integration-cli/fixtures/plugin"
"github.com/go-check/check"
"golang.org/x/net/context"
)
func setPortConfig(portConfig []swarm.PortConfig) daemon.ServiceConstructor {
@ -596,3 +600,77 @@ func (s *DockerSwarmSuite) TestAPISwarmServicesStateReporting(c *check.C) {
}
}
}
// Test plugins deployed via swarm services
func (s *DockerSwarmSuite) TestAPISwarmServicesPlugin(c *check.C) {
testRequires(c, DaemonIsLinux, IsAmd64)
reg := setupRegistry(c, false, "", "")
defer reg.Close()
repo := path.Join(privateRegistryURL, "swarm", "test:v1")
repo2 := path.Join(privateRegistryURL, "swarm", "test:v2")
name := "test"
err := plugin.CreateInRegistry(context.Background(), repo, nil)
c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin"))
err = plugin.CreateInRegistry(context.Background(), repo2, nil)
c.Assert(err, checker.IsNil, check.Commentf("failed to create plugin"))
d1 := s.AddDaemon(c, true, true)
d2 := s.AddDaemon(c, true, true)
d3 := s.AddDaemon(c, true, false)
makePlugin := func(repo, name string, constraints []string) func(*swarm.Service) {
return func(s *swarm.Service) {
s.Spec.TaskTemplate.Runtime = "plugin"
s.Spec.TaskTemplate.PluginSpec = &runtime.PluginSpec{
Name: name,
Remote: repo,
}
if constraints != nil {
s.Spec.TaskTemplate.Placement = &swarm.Placement{
Constraints: constraints,
}
}
}
}
id := d1.CreateService(c, makePlugin(repo, name, nil))
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True)
service := d1.GetService(c, id)
d1.UpdateService(c, service, makePlugin(repo2, name, nil))
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginImage(name), checker.Equals, repo2)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginImage(name), checker.Equals, repo2)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginImage(name), checker.Equals, repo2)
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.True)
d1.RemoveService(c, id)
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False)
// constrain to managers only
id = d1.CreateService(c, makePlugin(repo, name, []string{"node.role==manager"}))
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False) // Not a manager, not running it
d1.RemoveService(c, id)
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(name), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(name), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(name), checker.False)
// with no name
id = d1.CreateService(c, makePlugin(repo, "", nil))
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.True)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.True)
d1.RemoveService(c, id)
waitAndAssert(c, defaultReconciliationTimeout, d1.CheckPluginRunning(repo), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d2.CheckPluginRunning(repo), checker.False)
waitAndAssert(c, defaultReconciliationTimeout, d3.CheckPluginRunning(repo), checker.False)
}

View File

@ -560,7 +560,7 @@ func simpleTestService(s *swarm.Service) {
s.Spec = swarm.ServiceSpec{
TaskTemplate: swarm.TaskSpec{
ContainerSpec: swarm.ContainerSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: "busybox:latest",
Command: []string{"/bin/top"},
},
@ -583,7 +583,7 @@ func serviceForUpdate(s *swarm.Service) {
s.Spec = swarm.ServiceSpec{
TaskTemplate: swarm.TaskSpec{
ContainerSpec: swarm.ContainerSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: "busybox:latest",
Command: []string{"/bin/top"},
},
@ -641,6 +641,9 @@ func setRollbackOrder(order string) daemon.ServiceConstructor {
func setImage(image string) daemon.ServiceConstructor {
return func(s *swarm.Service) {
if s.Spec.TaskTemplate.ContainerSpec == nil {
s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
}
s.Spec.TaskTemplate.ContainerSpec.Image = image
}
}
@ -921,6 +924,9 @@ func (s *DockerSwarmSuite) TestAPISwarmHealthcheckNone(c *check.C) {
instances := 1
d.CreateService(c, simpleTestService, setInstances(instances), func(s *swarm.Service) {
if s.Spec.TaskTemplate.ContainerSpec == nil {
s.Spec.TaskTemplate.ContainerSpec = &swarm.ContainerSpec{}
}
s.Spec.TaskTemplate.ContainerSpec.Healthcheck = &container.HealthConfig{}
s.Spec.TaskTemplate.Networks = []swarm.NetworkAttachmentConfig{
{Target: "lb"},

View File

@ -0,0 +1,34 @@
package main
import (
"fmt"
"net"
"net/http"
"os"
"path/filepath"
)
func main() {
p, err := filepath.Abs(filepath.Join("run", "docker", "plugins"))
if err != nil {
panic(err)
}
if err := os.MkdirAll(p, 0755); err != nil {
panic(err)
}
l, err := net.Listen("unix", filepath.Join(p, "basic.sock"))
if err != nil {
panic(err)
}
mux := http.NewServeMux()
server := http.Server{
Addr: l.Addr().String(),
Handler: http.NewServeMux(),
}
mux.HandleFunc("/Plugin.Activate", func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/vnd.docker.plugins.v1.1+json")
fmt.Println(w, `{"Implements": ["dummy"]}`)
})
server.Serve(l)
}

View File

@ -0,0 +1,183 @@
package plugin
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/libcontainerd"
"github.com/docker/docker/pkg/archive"
"github.com/docker/docker/plugin"
"github.com/docker/docker/registry"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// CreateOpt is is passed used to change the defualt plugin config before
// creating it
type CreateOpt func(*Config)
// Config wraps types.PluginConfig to provide some extra state for options
// extra customizations on the plugin details, such as using a custom binary to
// create the plugin with.
type Config struct {
*types.PluginConfig
binPath string
}
// WithBinary is a CreateOpt to set an custom binary to create the plugin with.
// This binary must be statically compiled.
func WithBinary(bin string) CreateOpt {
return func(cfg *Config) {
cfg.binPath = bin
}
}
// CreateClient is the interface used for `BuildPlugin` to interact with the
// daemon.
type CreateClient interface {
PluginCreate(context.Context, io.Reader, types.PluginCreateOptions) error
}
// Create creates a new plugin with the specified name
func Create(ctx context.Context, c CreateClient, name string, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
tar, err := makePluginBundle(tmpDir, opts...)
if err != nil {
return err
}
defer tar.Close()
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
return c.PluginCreate(ctx, tar, types.PluginCreateOptions{RepoName: name})
}
// TODO(@cpuguy83): we really shouldn't have to do this...
// The manager panics on init when `Executor` is not set.
type dummyExecutor struct{}
func (dummyExecutor) Client(libcontainerd.Backend) (libcontainerd.Client, error) { return nil, nil }
func (dummyExecutor) Cleanup() {}
func (dummyExecutor) UpdateOptions(...libcontainerd.RemoteOption) error { return nil }
// CreateInRegistry makes a plugin (locally) and pushes it to a registry.
// This does not use a dockerd instance to create or push the plugin.
// If you just want to create a plugin in some daemon, use `Create`.
//
// This can be useful when testing plugins on swarm where you don't really want
// the plugin to exist on any of the daemons (immediately) and there needs to be
// some way to distribute the plugin.
func CreateInRegistry(ctx context.Context, repo string, auth *types.AuthConfig, opts ...CreateOpt) error {
tmpDir, err := ioutil.TempDir("", "create-test-plugin-local")
if err != nil {
return err
}
defer os.RemoveAll(tmpDir)
inPath := filepath.Join(tmpDir, "plugin")
if err := os.MkdirAll(inPath, 0755); err != nil {
return errors.Wrap(err, "error creating plugin root")
}
tar, err := makePluginBundle(inPath, opts...)
if err != nil {
return err
}
defer tar.Close()
managerConfig := plugin.ManagerConfig{
Store: plugin.NewStore(),
RegistryService: registry.NewService(registry.ServiceOptions{V2Only: true}),
Root: filepath.Join(tmpDir, "root"),
ExecRoot: "/run/docker", // manager init fails if not set
Executor: dummyExecutor{},
LogPluginEvent: func(id, name, action string) {}, // panics when not set
}
manager, err := plugin.NewManager(managerConfig)
if err != nil {
return errors.Wrap(err, "error creating plugin manager")
}
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if err := manager.CreateFromContext(ctx, tar, &types.PluginCreateOptions{RepoName: repo}); err != nil {
return err
}
if auth == nil {
auth = &types.AuthConfig{}
}
err = manager.Push(ctx, repo, nil, auth, ioutil.Discard)
return errors.Wrap(err, "error pushing plugin")
}
func makePluginBundle(inPath string, opts ...CreateOpt) (io.ReadCloser, error) {
p := &types.PluginConfig{
Interface: types.PluginConfigInterface{
Socket: "basic.sock",
Types: []types.PluginInterfaceType{{Capability: "docker.dummy/1.0"}},
},
Entrypoint: []string{"/basic"},
}
cfg := &Config{
PluginConfig: p,
}
for _, o := range opts {
o(cfg)
}
if cfg.binPath == "" {
binPath, err := ensureBasicPluginBin()
if err != nil {
return nil, err
}
cfg.binPath = binPath
}
configJSON, err := json.Marshal(p)
if err != nil {
return nil, err
}
if err := ioutil.WriteFile(filepath.Join(inPath, "config.json"), configJSON, 0644); err != nil {
return nil, err
}
if err := os.MkdirAll(filepath.Join(inPath, "rootfs", filepath.Dir(p.Entrypoint[0])), 0755); err != nil {
return nil, errors.Wrap(err, "error creating plugin rootfs dir")
}
if err := archive.NewDefaultArchiver().CopyFileWithTar(cfg.binPath, filepath.Join(inPath, "rootfs", p.Entrypoint[0])); err != nil {
return nil, errors.Wrap(err, "error copying plugin binary to rootfs path")
}
tar, err := archive.Tar(inPath, archive.Uncompressed)
return tar, errors.Wrap(err, "error making plugin archive")
}
func ensureBasicPluginBin() (string, error) {
name := "docker-basic-plugin"
p, err := exec.LookPath(name)
if err == nil {
return p, nil
}
goBin, err := exec.LookPath("go")
if err != nil {
return "", err
}
installPath := filepath.Join(os.Getenv("GOPATH"), "bin", name)
cmd := exec.Command(goBin, "build", "-o", installPath, "./"+filepath.Join("fixtures", "plugin", "basic"))
cmd.Env = append(cmd.Env, "CGO_ENABLED=0")
if out, err := cmd.CombinedOutput(); err != nil {
return "", errors.Wrapf(err, "error building basic plugin bin: %s", string(out))
}
return installPath, nil
}

View File

@ -53,6 +53,16 @@ func (p *Publisher) SubscribeTopic(topic topicFunc) chan interface{} {
return ch
}
// SubscribeTopicWithBuffer adds a new subscriber that filters messages sent by a topic.
// The returned channel has a buffer of the specified size.
func (p *Publisher) SubscribeTopicWithBuffer(topic topicFunc, buffer int) chan interface{} {
ch := make(chan interface{}, buffer)
p.m.Lock()
p.subscribers[ch] = topic
p.m.Unlock()
return ch
}
// Evict removes the specified subscriber from receiving any more messages.
func (p *Publisher) Evict(sub chan interface{}) {
p.m.Lock()

View File

@ -67,6 +67,7 @@ func (pm *Manager) Disable(refOrID string, config *types.PluginDisableConfig) er
if err := pm.disable(p, c); err != nil {
return err
}
pm.publisher.Publish(EventDisable{Plugin: p.PluginObj})
pm.config.LogPluginEvent(p.GetID(), refOrID, "disable")
return nil
}
@ -82,6 +83,7 @@ func (pm *Manager) Enable(refOrID string, config *types.PluginEnableConfig) erro
if err := pm.enable(p, c, false); err != nil {
return err
}
pm.publisher.Publish(EventEnable{Plugin: p.PluginObj})
pm.config.LogPluginEvent(p.GetID(), refOrID, "enable")
return nil
}
@ -296,7 +298,7 @@ func (pm *Manager) Upgrade(ctx context.Context, ref reference.Named, name string
}
// Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer) (err error) {
func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, outStream io.Writer, opts ...CreateOpt) (err error) {
pm.muGC.RLock()
defer pm.muGC.RUnlock()
@ -340,12 +342,19 @@ func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, m
return err
}
p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges)
refOpt := func(p *v2.Plugin) {
p.PluginObj.PluginReference = ref.String()
}
optsList := make([]CreateOpt, 0, len(opts)+1)
optsList = append(optsList, opts...)
optsList = append(optsList, refOpt)
p, err := pm.createPlugin(name, dm.configDigest, dm.blobs, tmpRootFSDir, &privileges, optsList...)
if err != nil {
return err
}
p.PluginObj.PluginReference = ref.String()
pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
return nil
}
@ -640,6 +649,7 @@ func (pm *Manager) Remove(name string, config *types.PluginRmConfig) error {
}
pm.config.Store.Remove(p)
pm.config.LogPluginEvent(id, name, "remove")
pm.publisher.Publish(EventRemove{Plugin: p.PluginObj})
return nil
}
@ -771,6 +781,7 @@ func (pm *Manager) CreateFromContext(ctx context.Context, tarCtx io.ReadCloser,
}
p.PluginObj.PluginReference = name
pm.publisher.Publish(EventCreate{Plugin: p.PluginObj})
pm.config.LogPluginEvent(p.PluginObj.ID, name, "create")
return nil

View File

@ -36,7 +36,7 @@ func (pm *Manager) Privileges(ctx context.Context, ref reference.Named, metaHead
}
// Pull pulls a plugin, check if the correct privileges are provided and install the plugin.
func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer) error {
func (pm *Manager) Pull(ctx context.Context, ref reference.Named, name string, metaHeader http.Header, authConfig *types.AuthConfig, privileges types.PluginPrivileges, out io.Writer, opts ...CreateOpt) error {
return errNotSupported
}

View File

@ -24,3 +24,14 @@ func NewStore() *Store {
handlers: make(map[string][]func(string, *plugins.Client)),
}
}
// CreateOpt is used to configure specific plugin details when created
type CreateOpt func(p *v2.Plugin)
// WithSwarmService is a CreateOpt that flags the passed in a plugin as a plugin
// managed by swarm
func WithSwarmService(id string) CreateOpt {
return func(p *v2.Plugin) {
p.SwarmServiceID = id
}
}

111
plugin/events.go Normal file
View File

@ -0,0 +1,111 @@
package plugin
import (
"fmt"
"reflect"
"github.com/docker/docker/api/types"
)
// Event is emitted for actions performed on the plugin manager
type Event interface {
matches(Event) bool
}
// EventCreate is an event which is emitted when a plugin is created
// This is either by pull or create from context.
//
// Use the `Interfaces` field to match only plugins that implement a specific
// interface.
// These are matched against using "or" logic.
// If no interfaces are listed, all are matched.
type EventCreate struct {
Interfaces map[string]bool
Plugin types.Plugin
}
func (e EventCreate) matches(observed Event) bool {
oe, ok := observed.(EventCreate)
if !ok {
return false
}
if len(e.Interfaces) == 0 {
return true
}
var ifaceMatch bool
for _, in := range oe.Plugin.Config.Interface.Types {
if e.Interfaces[in.Capability] {
ifaceMatch = true
break
}
}
return ifaceMatch
}
// EventRemove is an event which is emitted when a plugin is removed
// It maches on the passed in plugin's ID only.
type EventRemove struct {
Plugin types.Plugin
}
func (e EventRemove) matches(observed Event) bool {
oe, ok := observed.(EventRemove)
if !ok {
return false
}
return e.Plugin.ID == oe.Plugin.ID
}
// EventDisable is an event that is emitted when a plugin is disabled
// It maches on the passed in plugin's ID only.
type EventDisable struct {
Plugin types.Plugin
}
func (e EventDisable) matches(observed Event) bool {
oe, ok := observed.(EventDisable)
if !ok {
return false
}
return e.Plugin.ID == oe.Plugin.ID
}
// EventEnable is an event that is emitted when a plugin is disabled
// It maches on the passed in plugin's ID only.
type EventEnable struct {
Plugin types.Plugin
}
func (e EventEnable) matches(observed Event) bool {
oe, ok := observed.(EventEnable)
if !ok {
return false
}
return e.Plugin.ID == oe.Plugin.ID
}
// SubscribeEvents provides an event channel to listen for structured events from
// the plugin manager actions, CRUD operations.
// The caller must call the returned `cancel()` function once done with the channel
// or this will leak resources.
func (pm *Manager) SubscribeEvents(buffer int, watchEvents ...Event) (eventCh <-chan interface{}, cancel func()) {
topic := func(i interface{}) bool {
observed, ok := i.(Event)
if !ok {
panic(fmt.Sprintf("unexpected type passed to event channel: %v", reflect.TypeOf(i)))
}
for _, e := range watchEvents {
if e.matches(observed) {
return true
}
}
// If no specific events are specified always assume a matched event
// If some events were specified and none matched above, then the event
// doesn't match
return watchEvents == nil
}
ch := pm.publisher.SubscribeTopicWithBuffer(topic, buffer)
cancelFunc := func() { pm.publisher.Evict(ch) }
return ch, cancelFunc
}

View File

@ -22,6 +22,7 @@ import (
"github.com/docker/docker/pkg/authorization"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/pkg/mount"
"github.com/docker/docker/pkg/pubsub"
"github.com/docker/docker/pkg/system"
"github.com/docker/docker/plugin/v2"
"github.com/docker/docker/registry"
@ -63,6 +64,7 @@ type Manager struct {
cMap map[*v2.Plugin]*controller
containerdClient libcontainerd.Client
blobStore *basicBlobStore
publisher *pubsub.Publisher
}
// controller represents the manager's control on a plugin.
@ -117,6 +119,8 @@ func NewManager(config ManagerConfig) (*Manager, error) {
if err := manager.reload(); err != nil {
return nil, errors.Wrap(err, "failed to restore plugins")
}
manager.publisher = pubsub.NewPublisher(0, 0)
return manager, nil
}
@ -268,6 +272,11 @@ func (pm *Manager) reload() error { // todo: restore
return nil
}
// Get looks up the requested plugin in the store.
func (pm *Manager) Get(idOrName string) (*v2.Plugin, error) {
return pm.config.Store.GetV2Plugin(idOrName)
}
func (pm *Manager) loadPlugin(id string) (*v2.Plugin, error) {
p := filepath.Join(pm.config.Root, id, configFileName)
dt, err := ioutil.ReadFile(p)

View File

@ -274,7 +274,7 @@ func (pm *Manager) setupNewPlugin(configDigest digest.Digest, blobsums []digest.
}
// createPlugin creates a new plugin. take lock before calling.
func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges) (p *v2.Plugin, err error) {
func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsums []digest.Digest, rootFSDir string, privileges *types.PluginPrivileges, opts ...CreateOpt) (p *v2.Plugin, err error) {
if err := pm.config.Store.validateName(name); err != nil { // todo: this check is wrong. remove store
return nil, err
}
@ -294,6 +294,9 @@ func (pm *Manager) createPlugin(name string, configDigest digest.Digest, blobsum
Blobsums: blobsums,
}
p.InitEmptySettings()
for _, o := range opts {
o(p)
}
pdir := filepath.Join(pm.config.Root, p.PluginObj.ID)
if err := os.MkdirAll(pdir, 0700); err != nil {

View File

@ -22,6 +22,8 @@ type Plugin struct {
Config digest.Digest
Blobsums []digest.Digest
SwarmServiceID string
}
const defaultPluginRuntimeDestination = "/run/docker/plugins"