diff --git a/daemon/daemon.go b/daemon/daemon.go index 5837a10aea..04ef61b039 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -47,6 +47,7 @@ import ( "github.com/docker/docker/pkg/sysinfo" "github.com/docker/docker/pkg/system" "github.com/docker/docker/pkg/truncindex" + plugingetter "github.com/docker/docker/plugin/getter" pluginstore "github.com/docker/docker/plugin/store" "github.com/docker/docker/reference" "github.com/docker/docker/registry" @@ -1096,7 +1097,7 @@ func (daemon *Daemon) reloadClusterDiscovery(config *Config) error { if daemon.netController == nil { return nil } - netOptions, err := daemon.networkOptions(daemon.configStore, nil) + netOptions, err := daemon.networkOptions(daemon.configStore, daemon.pluginStore, nil) if err != nil { logrus.WithError(err).Warnf("failed to get options with network controller") return nil @@ -1113,7 +1114,7 @@ func isBridgeNetworkDisabled(config *Config) bool { return config.bridgeConfig.Iface == disableNetworkBridge } -func (daemon *Daemon) networkOptions(dconfig *Config, activeSandboxes map[string]interface{}) ([]nwconfig.Option, error) { +func (daemon *Daemon) networkOptions(dconfig *Config, pg plugingetter.PluginGetter, activeSandboxes map[string]interface{}) ([]nwconfig.Option, error) { options := []nwconfig.Option{} if dconfig == nil { return options, nil @@ -1154,6 +1155,10 @@ func (daemon *Daemon) networkOptions(dconfig *Config, activeSandboxes map[string options = append(options, nwconfig.OptionActiveSandboxes(activeSandboxes)) } + if pg != nil { + options = append(options, nwconfig.OptionPluginGetter(pg)) + } + return options, nil } diff --git a/daemon/daemon_unix.go b/daemon/daemon_unix.go index ba0c423aaf..c35b485aff 100644 --- a/daemon/daemon_unix.go +++ b/daemon/daemon_unix.go @@ -609,7 +609,7 @@ func configureKernelSecuritySupport(config *Config, driverName string) error { } func (daemon *Daemon) initNetworkController(config *Config, activeSandboxes map[string]interface{}) (libnetwork.NetworkController, error) { - netOptions, err := daemon.networkOptions(config, activeSandboxes) + netOptions, err := daemon.networkOptions(config, daemon.pluginStore, activeSandboxes) if err != nil { return nil, err } diff --git a/daemon/daemon_unix_test.go b/daemon/daemon_unix_test.go index 3b15547b9a..9766584b8f 100644 --- a/daemon/daemon_unix_test.go +++ b/daemon/daemon_unix_test.go @@ -188,7 +188,7 @@ func TestNetworkOptions(t *testing.T) { }, } - if _, err := daemon.networkOptions(dconfigCorrect, nil); err != nil { + if _, err := daemon.networkOptions(dconfigCorrect, nil, nil); err != nil { t.Fatalf("Expect networkOptions success, got error: %v", err) } @@ -198,7 +198,7 @@ func TestNetworkOptions(t *testing.T) { }, } - if _, err := daemon.networkOptions(dconfigWrong, nil); err == nil { + if _, err := daemon.networkOptions(dconfigWrong, nil, nil); err == nil { t.Fatalf("Expected networkOptions error, got nil") } } diff --git a/daemon/daemon_windows.go b/daemon/daemon_windows.go index c65c597ac1..af45ab84fe 100644 --- a/daemon/daemon_windows.go +++ b/daemon/daemon_windows.go @@ -186,7 +186,7 @@ func configureMaxThreads(config *Config) error { } func (daemon *Daemon) initNetworkController(config *Config, activeSandboxes map[string]interface{}) (libnetwork.NetworkController, error) { - netOptions, err := daemon.networkOptions(config, nil) + netOptions, err := daemon.networkOptions(config, nil, nil) if err != nil { return nil, err } diff --git a/hack/vendor.sh b/hack/vendor.sh index 6c3a311297..b3fe38d5c5 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -70,7 +70,7 @@ clone git github.com/RackSec/srslog 365bf33cd9acc21ae1c355209865f17228ca534e clone git github.com/imdario/mergo 0.2.1 #get libnetwork packages -clone git github.com/docker/libnetwork bf3d9ccfb8ebf768843691143c66d137743cc5e9 +clone git github.com/docker/libnetwork 66764992b5bff765a5aa2318ca3768ad22c4ce95 clone git github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894 clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec @@ -143,7 +143,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 clone git github.com/docker/containerd 2545227b0357eb55e369fa0072baef9ad91cdb69 # cluster -clone git github.com/docker/swarmkit b79d41fa99c137181d8f58ef76a6e8a25bc2e72f +clone git github.com/docker/swarmkit 7e63bdefb94e5bea2641e8bdebae2cfa61a0ed44 clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 clone git github.com/gogo/protobuf v0.3 clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/src/github.com/docker/libnetwork/agent.go b/vendor/src/github.com/docker/libnetwork/agent.go index fb0c342257..4c8980b2e1 100644 --- a/vendor/src/github.com/docker/libnetwork/agent.go +++ b/vendor/src/github.com/docker/libnetwork/agent.go @@ -191,8 +191,7 @@ func (c *controller) agentSetup() error { if remoteAddr != "" { if err := c.agentJoin(remoteAddr); err != nil { - logrus.Errorf("Error in agentJoin : %v", err) - return nil + logrus.Errorf("Error in joining gossip cluster : %v(join will be retried in background)", err) } } diff --git a/vendor/src/github.com/docker/libnetwork/config/config.go b/vendor/src/github.com/docker/libnetwork/config/config.go index 832412ec74..0e4780e489 100644 --- a/vendor/src/github.com/docker/libnetwork/config/config.go +++ b/vendor/src/github.com/docker/libnetwork/config/config.go @@ -6,6 +6,7 @@ import ( "github.com/BurntSushi/toml" log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/discovery" + "github.com/docker/docker/plugin/getter" "github.com/docker/go-connections/tlsconfig" "github.com/docker/libkv/store" "github.com/docker/libnetwork/cluster" @@ -20,6 +21,7 @@ type Config struct { Cluster ClusterCfg Scopes map[string]*datastore.ScopeCfg ActiveSandboxes map[string]interface{} + PluginGetter getter.PluginGetter } // DaemonCfg represents libnetwork core configuration @@ -205,6 +207,13 @@ func OptionExecRoot(execRoot string) Option { } } +// OptionPluginGetter returns a plugingetter for remote drivers. +func OptionPluginGetter(pg getter.PluginGetter) Option { + return func(c *Config) { + c.PluginGetter = pg + } +} + // ProcessOptions processes options and stores it in config func (c *Config) ProcessOptions(options ...Option) { for _, opt := range options { diff --git a/vendor/src/github.com/docker/libnetwork/controller.go b/vendor/src/github.com/docker/libnetwork/controller.go index a1906da322..50ff1195a9 100644 --- a/vendor/src/github.com/docker/libnetwork/controller.go +++ b/vendor/src/github.com/docker/libnetwork/controller.go @@ -55,6 +55,7 @@ import ( "github.com/docker/docker/pkg/locker" "github.com/docker/docker/pkg/plugins" "github.com/docker/docker/pkg/stringid" + "github.com/docker/docker/plugin/getter" "github.com/docker/libnetwork/cluster" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" @@ -178,7 +179,7 @@ func New(cfgOptions ...config.Option) (NetworkController, error) { return nil, err } - drvRegistry, err := drvregistry.New(c.getStore(datastore.LocalScope), c.getStore(datastore.GlobalScope), c.RegisterDriver, nil) + drvRegistry, err := drvregistry.New(c.getStore(datastore.LocalScope), c.getStore(datastore.GlobalScope), c.RegisterDriver, nil, c.cfg.PluginGetter) if err != nil { return nil, err } @@ -601,6 +602,10 @@ func (c *controller) isDistributedControl() bool { return !c.isManager() && !c.isAgent() } +func (c *controller) GetPluginGetter() getter.PluginGetter { + return c.drvRegistry.GetPluginGetter() +} + func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error { c.Lock() hd := c.discovery @@ -1074,7 +1079,7 @@ func (c *controller) loadDriver(networkType string) error { } func (c *controller) loadIPAMDriver(name string) error { - if _, err := plugins.Get(name, ipamapi.PluginEndpointType); err != nil { + if _, err := c.GetPluginGetter().Get(name, ipamapi.PluginEndpointType, getter.LOOKUP); err != nil { if err == plugins.ErrNotFound { return types.NotFoundErrorf(err.Error()) } diff --git a/vendor/src/github.com/docker/libnetwork/driverapi/driverapi.go b/vendor/src/github.com/docker/libnetwork/driverapi/driverapi.go index ccb7936e2a..51a43e780b 100644 --- a/vendor/src/github.com/docker/libnetwork/driverapi/driverapi.go +++ b/vendor/src/github.com/docker/libnetwork/driverapi/driverapi.go @@ -3,6 +3,7 @@ package driverapi import ( "net" + "github.com/docker/docker/plugin/getter" "github.com/docker/libnetwork/discoverapi" ) @@ -139,6 +140,8 @@ type JoinInfo interface { // DriverCallback provides a Callback interface for Drivers into LibNetwork type DriverCallback interface { + // GetPluginGetter returns the pluginv2 getter. + GetPluginGetter() getter.PluginGetter // RegisterDriver provides a way for Remote drivers to dynamically register new NetworkType and associate with a driver instance RegisterDriver(name string, driver Driver, capability Capability) error } diff --git a/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go b/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go index 0b93eb60b0..777f7b162e 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/bridge/bridge.go @@ -50,6 +50,7 @@ type configuration struct { EnableIPForwarding bool EnableIPTables bool EnableUserlandProxy bool + UserlandProxyPath string } // networkConfiguration for network specific configuration @@ -638,7 +639,7 @@ func (d *driver) createNetwork(config *networkConfiguration) error { id: config.ID, endpoints: make(map[string]*bridgeEndpoint), config: config, - portMapper: portmapper.New(), + portMapper: portmapper.New(d.config.UserlandProxyPath), driver: d, } diff --git a/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go b/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go index 9fa253bb29..7794d1964f 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go @@ -29,7 +29,12 @@ func newDriver(name string, client *plugins.Client) driverapi.Driver { // Init makes sure a remote driver is registered when a network driver // plugin is activated. func Init(dc driverapi.DriverCallback, config map[string]interface{}) error { - plugins.Handle(driverapi.NetworkPluginEndpointType, func(name string, client *plugins.Client) { + // Unit test code is unaware of a true PluginStore. So we fall back to v1 plugins. + handleFunc := plugins.Handle + if pg := dc.GetPluginGetter(); pg != nil { + handleFunc = pg.Handle + } + handleFunc(driverapi.NetworkPluginEndpointType, func(name string, client *plugins.Client) { // negotiate driver capability with client d := newDriver(name, client) c, err := d.(*driver).getCapabilities() diff --git a/vendor/src/github.com/docker/libnetwork/drvregistry/drvregistry.go b/vendor/src/github.com/docker/libnetwork/drvregistry/drvregistry.go index d2cf781193..af4dee4264 100644 --- a/vendor/src/github.com/docker/libnetwork/drvregistry/drvregistry.go +++ b/vendor/src/github.com/docker/libnetwork/drvregistry/drvregistry.go @@ -5,6 +5,7 @@ import ( "strings" "sync" + "github.com/docker/docker/plugin/getter" "github.com/docker/libnetwork/driverapi" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/types" @@ -28,10 +29,11 @@ type ipamTable map[string]*ipamData // DrvRegistry holds the registry of all network drivers and IPAM drivers that it knows about. type DrvRegistry struct { sync.Mutex - drivers driverTable - ipamDrivers ipamTable - dfn DriverNotifyFunc - ifn IPAMNotifyFunc + drivers driverTable + ipamDrivers ipamTable + dfn DriverNotifyFunc + ifn IPAMNotifyFunc + pluginGetter getter.PluginGetter } // Functors definition @@ -52,12 +54,13 @@ type IPAMNotifyFunc func(name string, driver ipamapi.Ipam, cap *ipamapi.Capabili type DriverNotifyFunc func(name string, driver driverapi.Driver, capability driverapi.Capability) error // New retruns a new driver registry handle. -func New(lDs, gDs interface{}, dfn DriverNotifyFunc, ifn IPAMNotifyFunc) (*DrvRegistry, error) { +func New(lDs, gDs interface{}, dfn DriverNotifyFunc, ifn IPAMNotifyFunc, pg getter.PluginGetter) (*DrvRegistry, error) { r := &DrvRegistry{ - drivers: make(driverTable), - ipamDrivers: make(ipamTable), - dfn: dfn, - ifn: ifn, + drivers: make(driverTable), + ipamDrivers: make(ipamTable), + dfn: dfn, + ifn: ifn, + pluginGetter: pg, } return r, nil @@ -149,6 +152,11 @@ func (r *DrvRegistry) IPAMDefaultAddressSpaces(name string) (string, string, err return i.defaultLocalAddressSpace, i.defaultGlobalAddressSpace, nil } +// GetPluginGetter returns the plugingetter +func (r *DrvRegistry) GetPluginGetter() getter.PluginGetter { + return r.pluginGetter +} + // RegisterDriver registers the network driver when it gets discovered. func (r *DrvRegistry) RegisterDriver(ntype string, driver driverapi.Driver, capability driverapi.Capability) error { if strings.TrimSpace(ntype) == "" { diff --git a/vendor/src/github.com/docker/libnetwork/ipamapi/contract.go b/vendor/src/github.com/docker/libnetwork/ipamapi/contract.go index a34d4a5c85..2800282ee0 100644 --- a/vendor/src/github.com/docker/libnetwork/ipamapi/contract.go +++ b/vendor/src/github.com/docker/libnetwork/ipamapi/contract.go @@ -4,6 +4,7 @@ package ipamapi import ( "net" + "github.com/docker/docker/plugin/getter" "github.com/docker/libnetwork/discoverapi" "github.com/docker/libnetwork/types" ) @@ -25,6 +26,8 @@ const ( // Callback provides a Callback interface for registering an IPAM instance into LibNetwork type Callback interface { + // GetPluginGetter returns the pluginv2 getter. + GetPluginGetter() getter.PluginGetter // RegisterIpamDriver provides a way for Remote drivers to dynamically register with libnetwork RegisterIpamDriver(name string, driver Ipam) error // RegisterIpamDriverWithCapabilities provides a way for Remote drivers to dynamically register with libnetwork and specify capabilities diff --git a/vendor/src/github.com/docker/libnetwork/ipams/remote/remote.go b/vendor/src/github.com/docker/libnetwork/ipams/remote/remote.go index 4ad3287e66..ab00fc6fe7 100644 --- a/vendor/src/github.com/docker/libnetwork/ipams/remote/remote.go +++ b/vendor/src/github.com/docker/libnetwork/ipams/remote/remote.go @@ -30,7 +30,13 @@ func newAllocator(name string, client *plugins.Client) ipamapi.Ipam { // Init registers a remote ipam when its plugin is activated func Init(cb ipamapi.Callback, l, g interface{}) error { - plugins.Handle(ipamapi.PluginEndpointType, func(name string, client *plugins.Client) { + + // Unit test code is unaware of a true PluginStore. So we fall back to v1 plugins. + handleFunc := plugins.Handle + if pg := cb.GetPluginGetter(); pg != nil { + handleFunc = pg.Handle + } + handleFunc(ipamapi.PluginEndpointType, func(name string, client *plugins.Client) { a := newAllocator(name, client) if cps, err := a.(*allocator).getCapabilities(); err == nil { if err := cb.RegisterIpamDriverWithCapabilities(name, a, cps); err != nil { diff --git a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go index 3b624c9a27..c3bfdd4051 100644 --- a/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go +++ b/vendor/src/github.com/docker/libnetwork/networkdb/cluster.go @@ -161,6 +161,10 @@ func (nDB *NetworkDB) retryJoin(members []string, stop <-chan struct{}) { logrus.Errorf("Failed to join memberlist %s on retry: %v", members, err) continue } + if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil { + logrus.Errorf("failed to send node join on retry: %v", err) + continue + } return case <-stop: return diff --git a/vendor/src/github.com/docker/libnetwork/portmapper/mapper.go b/vendor/src/github.com/docker/libnetwork/portmapper/mapper.go index 6a1bb08ffb..7f2a67c89f 100644 --- a/vendor/src/github.com/docker/libnetwork/portmapper/mapper.go +++ b/vendor/src/github.com/docker/libnetwork/portmapper/mapper.go @@ -38,19 +38,22 @@ type PortMapper struct { currentMappings map[string]*mapping lock sync.Mutex + proxyPath string + Allocator *portallocator.PortAllocator } // New returns a new instance of PortMapper -func New() *PortMapper { - return NewWithPortAllocator(portallocator.Get()) +func New(proxyPath string) *PortMapper { + return NewWithPortAllocator(portallocator.Get(), proxyPath) } // NewWithPortAllocator returns a new instance of PortMapper which will use the specified PortAllocator -func NewWithPortAllocator(allocator *portallocator.PortAllocator) *PortMapper { +func NewWithPortAllocator(allocator *portallocator.PortAllocator, proxyPath string) *PortMapper { return &PortMapper{ currentMappings: make(map[string]*mapping), Allocator: allocator, + proxyPath: proxyPath, } } @@ -90,7 +93,7 @@ func (pm *PortMapper) MapRange(container net.Addr, hostIP net.IP, hostPortStart, } if useProxy { - m.userlandProxy, err = newProxy(proto, hostIP, allocatedHostPort, container.(*net.TCPAddr).IP, container.(*net.TCPAddr).Port) + m.userlandProxy, err = newProxy(proto, hostIP, allocatedHostPort, container.(*net.TCPAddr).IP, container.(*net.TCPAddr).Port, pm.proxyPath) if err != nil { return nil, err } @@ -110,7 +113,7 @@ func (pm *PortMapper) MapRange(container net.Addr, hostIP net.IP, hostPortStart, } if useProxy { - m.userlandProxy, err = newProxy(proto, hostIP, allocatedHostPort, container.(*net.UDPAddr).IP, container.(*net.UDPAddr).Port) + m.userlandProxy, err = newProxy(proto, hostIP, allocatedHostPort, container.(*net.UDPAddr).IP, container.(*net.UDPAddr).Port, pm.proxyPath) if err != nil { return nil, err } diff --git a/vendor/src/github.com/docker/libnetwork/portmapper/mock_proxy.go b/vendor/src/github.com/docker/libnetwork/portmapper/mock_proxy.go index 587026f9ce..ceb7b02926 100644 --- a/vendor/src/github.com/docker/libnetwork/portmapper/mock_proxy.go +++ b/vendor/src/github.com/docker/libnetwork/portmapper/mock_proxy.go @@ -2,7 +2,7 @@ package portmapper import "net" -func newMockProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) (userlandProxy, error) { +func newMockProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int, userlandProxyPath string) (userlandProxy, error) { return &mockProxyCommand{}, nil } diff --git a/vendor/src/github.com/docker/libnetwork/portmapper/proxy.go b/vendor/src/github.com/docker/libnetwork/portmapper/proxy.go index 25a341a98f..6a4adbb872 100644 --- a/vendor/src/github.com/docker/libnetwork/portmapper/proxy.go +++ b/vendor/src/github.com/docker/libnetwork/portmapper/proxy.go @@ -25,15 +25,18 @@ type proxyCommand struct { cmd *exec.Cmd } -func newProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int) (userlandProxy, error) { - cmd, err := exec.LookPath(userlandProxyCommandName) - - if err != nil { - return nil, err +func newProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net.IP, containerPort int, proxyPath string) (userlandProxy, error) { + path := proxyPath + if proxyPath == "" { + cmd, err := exec.LookPath(userlandProxyCommandName) + if err != nil { + return nil, err + } + path = cmd } args := []string{ - cmd, + path, "-proto", proto, "-host-ip", hostIP.String(), "-host-port", strconv.Itoa(hostPort), @@ -43,7 +46,7 @@ func newProxyCommand(proto string, hostIP net.IP, hostPort int, containerIP net. return &proxyCommand{ cmd: &exec.Cmd{ - Path: cmd, + Path: path, Args: args, SysProcAttr: &syscall.SysProcAttr{ Pdeathsig: syscall.SIGTERM, // send a sigterm to the proxy if the daemon process dies diff --git a/vendor/src/github.com/docker/swarmkit/agent/agent.go b/vendor/src/github.com/docker/swarmkit/agent/agent.go index 8f686a0a07..a96ac2d38a 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/src/github.com/docker/swarmkit/agent/agent.go @@ -220,7 +220,6 @@ func (a *Agent) run(ctx context.Context) { } session = newSession(ctx, a, delay, session.sessionID, nodeDescription) registered = session.registered - sessionq = a.sessionq case <-nodeUpdateTicker.C: // skip this case if the registration isn't finished if registered != nil { @@ -246,9 +245,7 @@ func (a *Agent) run(ctx context.Context) { nodeDescription = newNodeDescription // close the session log.G(ctx).Info("agent: found node update") - if err := session.close(); err != nil { - log.G(ctx).WithError(err).Error("agent: closing session for node update failed") - } + session.sendError(nil) } case <-a.stopped: // TODO(stevvooe): Wait on shutdown and cleanup. May need to pump @@ -365,7 +362,7 @@ func (a *Agent) UpdateTaskStatus(ctx context.Context, taskID string, status *api err = nil // dispatcher no longer cares about this task. } else { log.G(ctx).WithError(err).Error("closing session after fatal error") - session.close() + session.sendError(err) } } else { log.G(ctx).Debug("task status reported") diff --git a/vendor/src/github.com/docker/swarmkit/agent/session.go b/vendor/src/github.com/docker/swarmkit/agent/session.go index fc1a4582ce..e7efdd3a21 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/session.go +++ b/vendor/src/github.com/docker/swarmkit/agent/session.go @@ -339,6 +339,16 @@ func (s *session) sendTaskStatuses(ctx context.Context, updates ...*api.UpdateTa return updates[n:], nil } +// sendError is used to send errors to errs channel and trigger session recreation +func (s *session) sendError(err error) { + select { + case s.errs <- err: + case <-s.closed: + } +} + +// close closing session. It should be called only in <-session.errs branch +// of event loop. func (s *session) close() error { s.closeOnce.Do(func() { if s.conn != nil { diff --git a/vendor/src/github.com/docker/swarmkit/api/control.pb.go b/vendor/src/github.com/docker/swarmkit/api/control.pb.go index bc5e449744..8ad69313bf 100644 --- a/vendor/src/github.com/docker/swarmkit/api/control.pb.go +++ b/vendor/src/github.com/docker/swarmkit/api/control.pb.go @@ -2414,8 +2414,6 @@ type ControlClient interface { // on the provided `CreateSecretRequest.SecretSpec`. // - Returns `InvalidArgument` if the `CreateSecretRequest.SecretSpec` is malformed, // or if the secret data is too long or contains invalid characters. - // - Returns `ResourceExhausted` if there are already the maximum number of allowed - // secrets in the system. // - Returns an error if the creation fails. CreateSecret(ctx context.Context, in *CreateSecretRequest, opts ...grpc.CallOption) (*CreateSecretResponse, error) // RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`. @@ -2678,8 +2676,6 @@ type ControlServer interface { // on the provided `CreateSecretRequest.SecretSpec`. // - Returns `InvalidArgument` if the `CreateSecretRequest.SecretSpec` is malformed, // or if the secret data is too long or contains invalid characters. - // - Returns `ResourceExhausted` if there are already the maximum number of allowed - // secrets in the system. // - Returns an error if the creation fails. CreateSecret(context.Context, *CreateSecretRequest) (*CreateSecretResponse, error) // RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`. diff --git a/vendor/src/github.com/docker/swarmkit/api/control.proto b/vendor/src/github.com/docker/swarmkit/api/control.proto index 630612916b..3be83d5600 100644 --- a/vendor/src/github.com/docker/swarmkit/api/control.proto +++ b/vendor/src/github.com/docker/swarmkit/api/control.proto @@ -95,8 +95,6 @@ service Control { // on the provided `CreateSecretRequest.SecretSpec`. // - Returns `InvalidArgument` if the `CreateSecretRequest.SecretSpec` is malformed, // or if the secret data is too long or contains invalid characters. - // - Returns `ResourceExhausted` if there are already the maximum number of allowed - // secrets in the system. // - Returns an error if the creation fails. rpc CreateSecret(CreateSecretRequest) returns (CreateSecretResponse) { option (docker.protobuf.plugin.tls_authorization) = { roles: "swarm-manager" }; diff --git a/vendor/src/github.com/docker/swarmkit/api/objects.pb.go b/vendor/src/github.com/docker/swarmkit/api/objects.pb.go index 54c88f2254..3cad399730 100644 --- a/vendor/src/github.com/docker/swarmkit/api/objects.pb.go +++ b/vendor/src/github.com/docker/swarmkit/api/objects.pb.go @@ -247,9 +247,9 @@ type Secret struct { // the form ":": for example "sha256:DEADBEEF...". It // is calculated from the data contained in `Secret.Spec.data`. Digest string `protobuf:"bytes,4,opt,name=digest,proto3" json:"digest,omitempty"` - // Size represents the size (number of bytes) of the secret data, and is - // calculated from the data contained in `Secret.Spec.data`.. - SecretSize uint32 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"` + // SecretSize represents the size (number of bytes) of the secret data, and is + // calculated from the data contained in `Secret.Spec.data`. + SecretSize int64 `protobuf:"varint,5,opt,name=size,proto3" json:"size,omitempty"` // Whether the secret is an internal secret (not set by a user) or not. Internal bool `protobuf:"varint,6,opt,name=internal,proto3" json:"internal,omitempty"` } @@ -3851,7 +3851,7 @@ func (m *Secret) Unmarshal(data []byte) error { } b := data[iNdEx] iNdEx++ - m.SecretSize |= (uint32(b) & 0x7F) << shift + m.SecretSize |= (int64(b) & 0x7F) << shift if b < 0x80 { break } @@ -4070,9 +4070,9 @@ var fileDescriptorObjects = []byte{ 0x36, 0x17, 0xdf, 0xa6, 0xcb, 0x52, 0x5f, 0xa6, 0x88, 0xae, 0x06, 0x34, 0xff, 0x74, 0xa0, 0x74, 0x48, 0x43, 0x41, 0xd5, 0x5b, 0x2d, 0xf8, 0xa3, 0x0b, 0x05, 0xaf, 0xe7, 0xbf, 0xc5, 0xda, 0xeb, 0x4a, 0xbd, 0xb7, 0xa0, 0x14, 0xb1, 0x01, 0x95, 0xe9, 0xd7, 0x84, 0x87, 0xad, 0x85, 0x9a, 0xe0, - 0x4a, 0xf6, 0x9a, 0x9a, 0xce, 0xaa, 0xa5, 0x0f, 0x9f, 0x55, 0x60, 0xaf, 0x29, 0x36, 0x7b, 0x68, + 0x4a, 0xf6, 0x9a, 0x9a, 0xce, 0x2a, 0xa6, 0x0f, 0x9f, 0x55, 0x60, 0xaf, 0x29, 0x36, 0x7b, 0x68, 0x1b, 0x2a, 0x2c, 0x56, 0x54, 0xc4, 0x64, 0x64, 0x32, 0xaf, 0xe0, 0x85, 0xdd, 0xd9, 0x39, 0x3d, 0xab, 0xdf, 0xf8, 0xfd, 0xac, 0x7e, 0xe3, 0x9f, 0xb3, 0xba, 0xf3, 0xfd, 0xbc, 0xee, 0x9c, 0xce, 0xeb, 0xce, 0xaf, 0xf3, 0xba, 0xf3, 0xc7, 0xbc, 0xee, 0x1c, 0x97, 0xcc, 0xbf, 0x81, 0x8f, 0xfe, - 0x0d, 0x00, 0x00, 0xff, 0xff, 0xfd, 0x7d, 0x96, 0x72, 0x7d, 0x0c, 0x00, 0x00, + 0x0d, 0x00, 0x00, 0xff, 0xff, 0x4e, 0x76, 0x6f, 0x66, 0x7d, 0x0c, 0x00, 0x00, } diff --git a/vendor/src/github.com/docker/swarmkit/api/objects.proto b/vendor/src/github.com/docker/swarmkit/api/objects.proto index 7317988292..ab47b1a6fb 100644 --- a/vendor/src/github.com/docker/swarmkit/api/objects.proto +++ b/vendor/src/github.com/docker/swarmkit/api/objects.proto @@ -245,9 +245,9 @@ message Secret { // is calculated from the data contained in `Secret.Spec.data`. string digest = 4; - // Size represents the size (number of bytes) of the secret data, and is - // calculated from the data contained in `Secret.Spec.data`.. - uint32 size = 5 [(gogoproto.customname) = "SecretSize"]; + // SecretSize represents the size (number of bytes) of the secret data, and is + // calculated from the data contained in `Secret.Spec.data`. + int64 size = 5 [(gogoproto.customname) = "SecretSize"]; // Whether the secret is an internal secret (not set by a user) or not. bool internal = 6; diff --git a/vendor/src/github.com/docker/swarmkit/ca/certificates.go b/vendor/src/github.com/docker/swarmkit/ca/certificates.go index 33e7b81319..a29dbda245 100644 --- a/vendor/src/github.com/docker/swarmkit/ca/certificates.go +++ b/vendor/src/github.com/docker/swarmkit/ca/certificates.go @@ -669,25 +669,26 @@ func GetRemoteSignedCertificate(ctx context.Context, csr []byte, token string, r } } -// readCertExpiration returns the number of months left for certificate expiration -func readCertExpiration(paths CertPaths) (time.Duration, error) { +// readCertValidity returns the certificate issue and expiration time +func readCertValidity(paths CertPaths) (time.Time, time.Time, error) { + var zeroTime time.Time // Read the Cert cert, err := ioutil.ReadFile(paths.Cert) if err != nil { - return time.Hour, err + return zeroTime, zeroTime, err } // Create an x509 certificate out of the contents on disk certBlock, _ := pem.Decode([]byte(cert)) if certBlock == nil { - return time.Hour, errors.New("failed to decode certificate block") + return zeroTime, zeroTime, errors.New("failed to decode certificate block") } X509Cert, err := x509.ParseCertificate(certBlock.Bytes) if err != nil { - return time.Hour, err + return zeroTime, zeroTime, err } - return X509Cert.NotAfter.Sub(time.Now()), nil + return X509Cert.NotBefore, X509Cert.NotAfter, nil } diff --git a/vendor/src/github.com/docker/swarmkit/ca/config.go b/vendor/src/github.com/docker/swarmkit/ca/config.go index 93839ee109..872ab5e1cb 100644 --- a/vendor/src/github.com/docker/swarmkit/ca/config.go +++ b/vendor/src/github.com/docker/swarmkit/ca/config.go @@ -336,8 +336,8 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string, // Since the expiration of the certificate is managed remotely we should update our // retry timer on every iteration of this loop. - // Retrieve the time until the certificate expires. - expiresIn, err := readCertExpiration(paths.Node) + // Retrieve the current certificate expiration information. + validFrom, validUntil, err := readCertValidity(paths.Node) if err != nil { // We failed to read the expiration, let's stick with the starting default log.Errorf("failed to read the expiration of the TLS certificate in: %s", paths.Node.Cert) @@ -345,12 +345,12 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string, } else { // If we have an expired certificate, we let's stick with the starting default in // the hope that this is a temporary clock skew. - if expiresIn.Minutes() < 0 { + if validUntil.Before(time.Now()) { log.WithError(err).Errorf("failed to create a new client TLS config") updates <- CertificateUpdate{Err: errors.New("TLS certificate is expired")} } else { // Random retry time between 50% and 80% of the total time to expiration - retry = calculateRandomExpiry(expiresIn) + retry = calculateRandomExpiry(validFrom, validUntil) } } @@ -420,18 +420,16 @@ func RenewTLSConfig(ctx context.Context, s *SecurityConfig, baseCertDir string, return updates } -// calculateRandomExpiry returns a random duration between 50% and 80% of the original -// duration -func calculateRandomExpiry(expiresIn time.Duration) time.Duration { - if expiresIn.Minutes() <= 1 { - return time.Second - } +// calculateRandomExpiry returns a random duration between 50% and 80% of the +// original validity period +func calculateRandomExpiry(validFrom, validUntil time.Time) time.Duration { + duration := validUntil.Sub(validFrom) var randomExpiry int // Our lower bound of renewal will be half of the total expiration time - minValidity := int(expiresIn.Minutes() * CertLowerRotationRange) + minValidity := int(duration.Minutes() * CertLowerRotationRange) // Our upper bound of renewal will be 80% of the total expiration time - maxValidity := int(expiresIn.Minutes() * CertUpperRotationRange) + maxValidity := int(duration.Minutes() * CertUpperRotationRange) // Let's select a random number of minutes between min and max, and set our retry for that // Using randomly selected rotation allows us to avoid certificate thundering herds. if maxValidity-minValidity < 1 { @@ -440,7 +438,11 @@ func calculateRandomExpiry(expiresIn time.Duration) time.Duration { randomExpiry = rand.Intn(maxValidity-minValidity) + int(minValidity) } - return time.Duration(randomExpiry) * time.Minute + expiry := validFrom.Add(time.Duration(randomExpiry) * time.Minute).Sub(time.Now()) + if expiry < 0 { + return 0 + } + return expiry } // LoadTLSCreds loads tls credentials from the specified path and verifies that diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go index 9e132cb43c..d8ea7830dc 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go @@ -79,7 +79,7 @@ func New() (*NetworkAllocator, error) { // There are no driver configurations and notification // functions as of now. - reg, err := drvregistry.New(nil, nil, nil, nil) + reg, err := drvregistry.New(nil, nil, nil, nil, nil) if err != nil { return nil, err } diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/common.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/common.go index 3dacf98e22..4551878e6a 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/common.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/common.go @@ -66,7 +66,7 @@ func validateAnnotations(m api.Annotations) error { return grpc.Errorf(codes.InvalidArgument, "meta: name must be provided") } else if !isValidName.MatchString(m.Name) { // if the name doesn't match the regex - return grpc.Errorf(codes.InvalidArgument, "invalid name, only [a-zA-Z0-9][a-zA-Z0-9-]*[a-zA-Z0-9] are allowed") + return grpc.Errorf(codes.InvalidArgument, "name must be valid as a DNS name component") } return nil } diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/secret.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/secret.go index a44183e324..4a2a04981e 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/secret.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/secret.go @@ -1,7 +1,12 @@ package controlapi import ( + "regexp" + + "github.com/docker/distribution/digest" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/identity" + "github.com/docker/swarmkit/manager/state/store" "golang.org/x/net/context" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -12,13 +17,39 @@ import ( // MaxSecretSize is the maximum byte length of the `Secret.Spec.Data` field. const MaxSecretSize = 500 * 1024 // 500KB +var validSecretNameRegexp = regexp.MustCompile(`^[a-zA-Z0-9]+(?:[a-zA-Z0-9-_.]*[a-zA-Z0-9])?$`) + +// assumes spec is not nil +func secretFromSecretSpec(spec *api.SecretSpec) *api.Secret { + return &api.Secret{ + ID: identity.NewID(), + Spec: *spec, + SecretSize: int64(len(spec.Data)), + Digest: digest.FromBytes(spec.Data).String(), + } +} + // GetSecret returns a `GetSecretResponse` with a `Secret` with the same // id as `GetSecretRequest.SecretID` // - Returns `NotFound` if the Secret with the given id is not found. // - Returns `InvalidArgument` if the `GetSecretRequest.SecretID` is empty. // - Returns an error if getting fails. func (s *Server) GetSecret(ctx context.Context, request *api.GetSecretRequest) (*api.GetSecretResponse, error) { - return nil, grpc.Errorf(codes.Unimplemented, "Not yet implemented") + if request.SecretID == "" { + return nil, grpc.Errorf(codes.InvalidArgument, "secret ID must be provided") + } + + var secret *api.Secret + s.store.View(func(tx store.ReadTx) { + secret = store.GetSecret(tx, request.SecretID) + }) + + if secret == nil { + return nil, grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID) + } + + secret.Spec.Data = nil // clean the actual secret data so it's never returned + return &api.GetSecretResponse{Secret: secret}, nil } // ListSecrets returns a `ListSecretResponse` with a list all non-internal `Secret`s being @@ -27,18 +58,81 @@ func (s *Server) GetSecret(ctx context.Context, request *api.GetSecretRequest) ( // `ListSecretsRequest.SecretIDs`, or any id prefix in `ListSecretsRequest.IDPrefixes`. // - Returns an error if listing fails. func (s *Server) ListSecrets(ctx context.Context, request *api.ListSecretsRequest) (*api.ListSecretsResponse, error) { - return nil, grpc.Errorf(codes.Unimplemented, "Not yet implemented") + var ( + secrets []*api.Secret + respSecrets []*api.Secret + err error + byFilters []store.By + by store.By + labels map[string]string + ) + + // return all secrets that match either any of the names or any of the name prefixes (why would you give both?) + if request.Filters != nil { + for _, name := range request.Filters.Names { + byFilters = append(byFilters, store.ByName(name)) + } + for _, prefix := range request.Filters.NamePrefixes { + byFilters = append(byFilters, store.ByNamePrefix(prefix)) + } + for _, prefix := range request.Filters.IDPrefixes { + byFilters = append(byFilters, store.ByIDPrefix(prefix)) + } + labels = request.Filters.Labels + } + + switch len(byFilters) { + case 0: + by = store.All + case 1: + by = byFilters[0] + default: + by = store.Or(byFilters...) + } + + s.store.View(func(tx store.ReadTx) { + secrets, err = store.FindSecrets(tx, by) + }) + if err != nil { + return nil, err + } + + // strip secret data from the secret, filter by label, and filter out all internal secrets + for _, secret := range secrets { + if secret.Internal || !filterMatchLabels(secret.Spec.Annotations.Labels, labels) { + continue + } + secret.Spec.Data = nil // clean the actual secret data so it's never returned + respSecrets = append(respSecrets, secret) + } + + return &api.ListSecretsResponse{Secrets: respSecrets}, nil } // CreateSecret creates and return a `CreateSecretResponse` with a `Secret` based // on the provided `CreateSecretRequest.SecretSpec`. // - Returns `InvalidArgument` if the `CreateSecretRequest.SecretSpec` is malformed, // or if the secret data is too long or contains invalid characters. -// - Returns `ResourceExhausted` if there are already the maximum number of allowed -// secrets in the system. // - Returns an error if the creation fails. func (s *Server) CreateSecret(ctx context.Context, request *api.CreateSecretRequest) (*api.CreateSecretResponse, error) { - return nil, grpc.Errorf(codes.Unimplemented, "Not yet implemented") + if err := validateSecretSpec(request.Spec); err != nil { + return nil, err + } + + secret := secretFromSecretSpec(request.Spec) // the store will handle name conflicts + err := s.store.Update(func(tx store.Tx) error { + return store.CreateSecret(tx, secret) + }) + + switch err { + case store.ErrNameConflict: + return nil, grpc.Errorf(codes.AlreadyExists, "secret %s already exists", request.Spec.Annotations.Name) + case nil: + secret.Spec.Data = nil // clean the actual secret data so it's never returned + return &api.CreateSecretResponse{Secret: secret}, nil + default: + return nil, err + } } // RemoveSecret removes the secret referenced by `RemoveSecretRequest.ID`. @@ -46,5 +140,44 @@ func (s *Server) CreateSecret(ctx context.Context, request *api.CreateSecretRequ // - Returns `NotFound` if the a secret named `RemoveSecretRequest.ID` is not found. // - Returns an error if the deletion fails. func (s *Server) RemoveSecret(ctx context.Context, request *api.RemoveSecretRequest) (*api.RemoveSecretResponse, error) { - return nil, grpc.Errorf(codes.Unimplemented, "Not yet implemented") + if request.SecretID == "" { + return nil, grpc.Errorf(codes.InvalidArgument, "secret ID must be provided") + } + + err := s.store.Update(func(tx store.Tx) error { + return store.DeleteSecret(tx, request.SecretID) + }) + switch err { + case store.ErrNotExist: + return nil, grpc.Errorf(codes.NotFound, "secret %s not found", request.SecretID) + case nil: + return &api.RemoveSecretResponse{}, nil + default: + return nil, err + } +} + +func validateSecretSpec(spec *api.SecretSpec) error { + if spec == nil { + return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error()) + } + if err := validateSecretAnnotations(spec.Annotations); err != nil { + return err + } + + if len(spec.Data) >= MaxSecretSize || len(spec.Data) < 1 { + return grpc.Errorf(codes.InvalidArgument, "secret data must be larger than 0 and less than %d bytes", MaxSecretSize) + } + return nil +} + +func validateSecretAnnotations(m api.Annotations) error { + if m.Name == "" { + return grpc.Errorf(codes.InvalidArgument, "name must be provided") + } else if len(m.Name) > 64 || !validSecretNameRegexp.MatchString(m.Name) { + // if the name doesn't match the regex + return grpc.Errorf(codes.InvalidArgument, + "invalid name, only 64 [a-zA-Z0-9-_.] characters allowed, and the start and end character must be [a-zA-Z0-9]") + } + return nil } diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go index 9336462562..24b64356c5 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go @@ -136,6 +136,15 @@ func validateTask(taskSpec api.TaskSpec) error { if _, err := reference.ParseNamed(container.Image); err != nil { return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: %q is not a valid repository/tag", container.Image) } + + mountMap := make(map[string]bool) + for _, mount := range container.Mounts { + if _, exists := mountMap[mount.Target]; exists { + return grpc.Errorf(codes.InvalidArgument, "ContainerSpec: duplicate mount point: %s", mount.Target) + } + mountMap[mount.Target] = true + } + return nil } @@ -327,26 +336,24 @@ func (s *Server) UpdateService(ctx context.Context, request *api.UpdateServiceRe return nil } // temporary disable network update - if request.Spec != nil { - requestSpecNetworks := request.Spec.Task.Networks - if len(requestSpecNetworks) == 0 { - requestSpecNetworks = request.Spec.Networks - } + requestSpecNetworks := request.Spec.Task.Networks + if len(requestSpecNetworks) == 0 { + requestSpecNetworks = request.Spec.Networks + } - specNetworks := service.Spec.Task.Networks - if len(specNetworks) == 0 { - specNetworks = service.Spec.Networks - } + specNetworks := service.Spec.Task.Networks + if len(specNetworks) == 0 { + specNetworks = service.Spec.Networks + } - if !reflect.DeepEqual(requestSpecNetworks, specNetworks) { - return errNetworkUpdateNotSupported - } + if !reflect.DeepEqual(requestSpecNetworks, specNetworks) { + return errNetworkUpdateNotSupported } // orchestrator is designed to be stateless, so it should not deal // with service mode change (comparing current config with previous config). // proper way to change service mode is to delete and re-add. - if request.Spec != nil && reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) { + if reflect.TypeOf(service.Spec.Mode) != reflect.TypeOf(request.Spec.Mode) { return errModeChangeNotAllowed } service.Meta.Version = *request.ServiceVersion diff --git a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/services.go b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/services.go index 549896489a..60a611c6e4 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/services.go +++ b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/services.go @@ -86,7 +86,7 @@ func (r *ReplicatedOrchestrator) resolveService(ctx context.Context, task *api.T } func (r *ReplicatedOrchestrator) reconcile(ctx context.Context, service *api.Service) { - runningSlots, err := getRunnableSlots(r.store, service.ID) + runningSlots, deadSlots, err := getRunnableAndDeadSlots(r.store, service.ID) if err != nil { log.G(ctx).WithError(err).Errorf("reconcile failed finding tasks") return @@ -108,7 +108,8 @@ func (r *ReplicatedOrchestrator) reconcile(ctx context.Context, service *api.Ser // Update all current tasks then add missing tasks r.updater.Update(ctx, r.cluster, service, slotsSlice) _, err = r.store.Batch(func(batch *store.Batch) error { - r.addTasks(ctx, batch, service, runningSlots, specifiedSlots-numSlots) + r.addTasks(ctx, batch, service, runningSlots, deadSlots, specifiedSlots-numSlots) + r.deleteTasksMap(ctx, batch, deadSlots) return nil }) if err != nil { @@ -154,7 +155,8 @@ func (r *ReplicatedOrchestrator) reconcile(ctx context.Context, service *api.Ser r.updater.Update(ctx, r.cluster, service, sortedSlots[:specifiedSlots]) _, err = r.store.Batch(func(batch *store.Batch) error { - r.removeTasks(ctx, batch, service, sortedSlots[specifiedSlots:]) + r.deleteTasksMap(ctx, batch, deadSlots) + r.deleteTasks(ctx, batch, sortedSlots[specifiedSlots:]) return nil }) if err != nil { @@ -162,12 +164,16 @@ func (r *ReplicatedOrchestrator) reconcile(ctx context.Context, service *api.Ser } case specifiedSlots == numSlots: + _, err = r.store.Batch(func(batch *store.Batch) error { + r.deleteTasksMap(ctx, batch, deadSlots) + return nil + }) // Simple update, no scaling - update all tasks. r.updater.Update(ctx, r.cluster, service, slotsSlice) } } -func (r *ReplicatedOrchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]slot, count int) { +func (r *ReplicatedOrchestrator) addTasks(ctx context.Context, batch *store.Batch, service *api.Service, runningSlots map[uint64]slot, deadSlots map[uint64]slot, count int) { slot := uint64(0) for i := 0; i < count; i++ { // Find an slot number that is missing a running task @@ -178,6 +184,7 @@ func (r *ReplicatedOrchestrator) addTasks(ctx context.Context, batch *store.Batc } } + delete(deadSlots, slot) err := batch.Update(func(tx store.Tx) error { return store.CreateTask(tx, newTask(r.cluster, service, slot, "")) }) @@ -187,28 +194,36 @@ func (r *ReplicatedOrchestrator) addTasks(ctx context.Context, batch *store.Batc } } -func (r *ReplicatedOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, service *api.Service, slots []slot) { +func (r *ReplicatedOrchestrator) deleteTasks(ctx context.Context, batch *store.Batch, slots []slot) { for _, slot := range slots { for _, t := range slot { - err := batch.Update(func(tx store.Tx) error { - // TODO(aaronl): optimistic update? - t = store.GetTask(tx, t.ID) - if t != nil && t.DesiredState < api.TaskStateShutdown { - t.DesiredState = api.TaskStateShutdown - return store.UpdateTask(tx, t) - } - return nil - }) - if err != nil { - log.G(ctx).WithError(err).Errorf("removing task %s failed", t.ID) - } + r.deleteTask(ctx, batch, t) } } } -// getRunnableSlots returns a map of slots that have at least one task with -// a desired state above NEW and lesser or equal to RUNNING. -func getRunnableSlots(s *store.MemoryStore, serviceID string) (map[uint64]slot, error) { +func (r *ReplicatedOrchestrator) deleteTasksMap(ctx context.Context, batch *store.Batch, slots map[uint64]slot) { + for _, slot := range slots { + for _, t := range slot { + r.deleteTask(ctx, batch, t) + } + } +} + +func (r *ReplicatedOrchestrator) deleteTask(ctx context.Context, batch *store.Batch, t *api.Task) { + err := batch.Update(func(tx store.Tx) error { + return store.DeleteTask(tx, t.ID) + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("deleting task %s failed", t.ID) + } +} + +// getRunnableAndDeadSlots returns two maps of slots. The first contains slots +// that have at least one task with a desired state above NEW and lesser or +// equal to RUNNING. The second is for slots that only contain tasks with a +// desired state above RUNNING. +func getRunnableAndDeadSlots(s *store.MemoryStore, serviceID string) (map[uint64]slot, map[uint64]slot, error) { var ( tasks []*api.Task err error @@ -217,18 +232,22 @@ func getRunnableSlots(s *store.MemoryStore, serviceID string) (map[uint64]slot, tasks, err = store.FindTasks(tx, store.ByServiceID(serviceID)) }) if err != nil { - return nil, err + return nil, nil, err } runningSlots := make(map[uint64]slot) for _, t := range tasks { - // Technically the check below could just be - // t.DesiredState <= api.TaskStateRunning, but ignoring tasks - // with DesiredState == NEW simplifies the drainer unit tests. - if t.DesiredState > api.TaskStateNew && t.DesiredState <= api.TaskStateRunning { + if t.DesiredState <= api.TaskStateRunning { runningSlots[t.Slot] = append(runningSlots[t.Slot], t) } } - return runningSlots, nil + deadSlots := make(map[uint64]slot) + for _, t := range tasks { + if _, exists := runningSlots[t.Slot]; !exists { + deadSlots[t.Slot] = append(deadSlots[t.Slot], t) + } + } + + return runningSlots, deadSlots, nil } diff --git a/vendor/src/github.com/docker/swarmkit/manager/scheduler/scheduler.go b/vendor/src/github.com/docker/swarmkit/manager/scheduler/scheduler.go index 1b29a2ac05..6d3b0d93e6 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/scheduler/scheduler.go +++ b/vendor/src/github.com/docker/swarmkit/manager/scheduler/scheduler.go @@ -505,7 +505,6 @@ func (s *Scheduler) scheduleTaskGroup(ctx context.Context, taskGroup map[string] nextNode := nodes[(nodeIter+1)%len(nodes)] if nodeLess(&nextNode, &nodeInfo) { nodeIter++ - continue } } else { // In later passes, we just assign one task at a time diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go index c3962b0235..354c57b48d 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go @@ -2,6 +2,7 @@ package membership import ( "errors" + "fmt" "sync" "google.golang.org/grpc" @@ -10,6 +11,7 @@ import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/manager/state/watch" "github.com/gogo/protobuf/proto" + "golang.org/x/net/context" ) var ( @@ -44,9 +46,23 @@ type Member struct { *api.RaftMember api.RaftClient - Conn *grpc.ClientConn - tick int - active bool + Conn *grpc.ClientConn + tick int + active bool + lastSeenHost string +} + +// HealthCheck sends a health check RPC to the member and returns the response. +func (member *Member) HealthCheck(ctx context.Context) error { + healthClient := api.NewHealthClient(member.Conn) + resp, err := healthClient.Check(ctx, &api.HealthCheckRequest{Service: "Raft"}) + if err != nil { + return err + } + if resp.Status != api.HealthCheckResponse_SERVING { + return fmt.Errorf("health check returned status %s", resp.Status.String()) + } + return nil } // NewCluster creates a new Cluster neighbors list for a raft Member. @@ -171,7 +187,7 @@ func (c *Cluster) clearMember(id uint64) error { // ReplaceMemberConnection replaces the member's GRPC connection and GRPC // client. -func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member) error { +func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *Member, newAddr string, force bool) error { c.mu.Lock() defer c.mu.Unlock() @@ -180,15 +196,19 @@ func (c *Cluster) ReplaceMemberConnection(id uint64, oldConn *Member, newConn *M return ErrIDNotFound } - if oldConn.Conn != oldMember.Conn { + if !force && oldConn.Conn != oldMember.Conn { // The connection was already replaced. Don't do it again. newConn.Conn.Close() return nil } - oldMember.Conn.Close() + if oldMember.Conn != nil { + oldMember.Conn.Close() + } newMember := *oldMember + newMember.RaftMember = oldMember.RaftMember.Copy() + newMember.RaftMember.Addr = newAddr newMember.Conn = newConn.Conn newMember.RaftClient = newConn.RaftClient c.members[id] = &newMember @@ -217,8 +237,8 @@ func (c *Cluster) Clear() { c.mu.Unlock() } -// ReportActive reports that member is acive (called ProcessRaftMessage), -func (c *Cluster) ReportActive(id uint64) { +// ReportActive reports that member is active (called ProcessRaftMessage), +func (c *Cluster) ReportActive(id uint64, sourceHost string) { c.mu.Lock() defer c.mu.Unlock() m, ok := c.members[id] @@ -227,6 +247,9 @@ func (c *Cluster) ReportActive(id uint64) { } m.tick = 0 m.active = true + if sourceHost != "" { + m.lastSeenHost = sourceHost + } } // Active returns true if node is active. @@ -240,6 +263,18 @@ func (c *Cluster) Active(id uint64) bool { return m.active } +// LastSeenHost returns the last observed source address that the specified +// member connected from. +func (c *Cluster) LastSeenHost(id uint64) string { + c.mu.RLock() + defer c.mu.RUnlock() + m, ok := c.members[id] + if ok { + return m.lastSeenHost + } + return "" +} + // ValidateConfigurationChange takes a proposed ConfChange and // ensures that it is valid. func (c *Cluster) ValidateConfigurationChange(cc raftpb.ConfChange) error { diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go index 9bead11084..a4271454d3 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -12,6 +12,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/peer" "golang.org/x/net/context" @@ -35,9 +36,6 @@ import ( ) var ( - // ErrHealthCheckFailure is returned when there is an issue with the initial handshake which means - // that the address provided must be invalid or there is ongoing connectivity issues at join time. - ErrHealthCheckFailure = errors.New("raft: could not connect to prospective new cluster member using its advertised address") // ErrNoRaftMember is thrown when the node is not yet part of a raft cluster ErrNoRaftMember = errors.New("raft: node is not yet part of a raft cluster") // ErrConfChangeRefused is returned when there is an issue with the configuration change @@ -62,6 +60,9 @@ var ( ErrMemberRemoved = errors.New("raft: member was removed from the cluster") // ErrNoClusterLeader is thrown when the cluster has no elected leader ErrNoClusterLeader = errors.New("raft: no elected cluster leader") + // ErrMemberUnknown is sent in response to a message from an + // unrecognized peer. + ErrMemberUnknown = errors.New("raft: member unknown") ) // LeadershipState indicates whether the node is a leader or follower. @@ -317,6 +318,7 @@ func DefaultNodeConfig() *raft.Config { MaxSizePerMsg: math.MaxUint16, MaxInflightMsgs: 256, Logger: log.L, + CheckQuorum: true, } } @@ -670,7 +672,7 @@ func (n *Node) Join(ctx context.Context, req *api.JoinRequest) (*api.JoinRespons // checkHealth tries to contact an aspiring member through its advertised address // and checks if its raft server is running. func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Duration) error { - conn, err := dial(addr, "tcp", n.tlsCredentials, timeout) + conn, err := n.ConnectToMember(addr, timeout) if err != nil { return err } @@ -681,15 +683,10 @@ func (n *Node) checkHealth(ctx context.Context, addr string, timeout time.Durati ctx = tctx } - client := api.NewHealthClient(conn) - defer conn.Close() + defer conn.Conn.Close() - resp, err := client.Check(ctx, &api.HealthCheckRequest{Service: "Raft"}) - if err != nil { - return ErrHealthCheckFailure - } - if resp != nil && resp.Status != api.HealthCheckResponse_SERVING { - return ErrHealthCheckFailure + if err := conn.HealthCheck(ctx); err != nil { + return errors.Wrap(err, "could not connect to prospective new cluster member using its advertised address") } return nil @@ -715,8 +712,30 @@ func (n *Node) addMember(ctx context.Context, addr string, raftID uint64, nodeID } // Wait for a raft round to process the configuration change - err = n.configure(ctx, cc) - return err + return n.configure(ctx, cc) +} + +// updateMember submits a configuration change to change a member's address. +func (n *Node) updateMember(ctx context.Context, addr string, raftID uint64, nodeID string) error { + node := api.RaftMember{ + RaftID: raftID, + NodeID: nodeID, + Addr: addr, + } + + meta, err := node.Marshal() + if err != nil { + return err + } + + cc := raftpb.ConfChange{ + Type: raftpb.ConfChangeUpdateNode, + NodeID: raftID, + Context: meta, + } + + // Wait for a raft round to process the configuration change + return n.configure(ctx, cc) } // Leave asks to a member of the raft to remove @@ -799,7 +818,30 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa return nil, ErrMemberRemoved } - n.cluster.ReportActive(msg.Message.From) + var sourceHost string + peer, ok := peer.FromContext(ctx) + if ok { + sourceHost, _, _ = net.SplitHostPort(peer.Addr.String()) + } + + n.cluster.ReportActive(msg.Message.From, sourceHost) + + // Reject vote requests from unreachable peers + if msg.Message.Type == raftpb.MsgVote { + member := n.cluster.GetMember(msg.Message.From) + if member == nil || member.Conn == nil { + n.Config.Logger.Errorf("received vote request from unknown member %x", msg.Message.From) + return nil, ErrMemberUnknown + } + + healthCtx, cancel := context.WithTimeout(ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) + defer cancel() + + if err := member.HealthCheck(healthCtx); err != nil { + n.Config.Logger.Warningf("member %x which sent vote request failed health check: %v", msg.Message.From, err) + return nil, errors.Wrap(err, "member unreachable") + } + } if msg.Message.Type == raftpb.MsgProp { // We don't accepted forwarded proposals. Our @@ -1178,21 +1220,64 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess } n.ReportUnreachable(m.To) + lastSeenHost := n.cluster.LastSeenHost(m.To) + if lastSeenHost != "" { + // Check if address has changed + officialHost, officialPort, _ := net.SplitHostPort(conn.Addr) + if officialHost != lastSeenHost { + reconnectAddr := net.JoinHostPort(lastSeenHost, officialPort) + n.Config.Logger.Warningf("detected address change for %x (%s -> %s)", m.To, conn.Addr, reconnectAddr) + if err := n.handleAddressChange(conn, reconnectAddr); err != nil { + n.Config.Logger.Error(err) + } + return + } + } + // Bounce the connection newConn, err := n.ConnectToMember(conn.Addr, 0) if err != nil { n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, conn.Addr, err) - } else { - err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn) - if err != nil { - newConn.Conn.Close() - } + return + } + err = n.cluster.ReplaceMemberConnection(m.To, conn, newConn, conn.Addr, false) + if err != nil { + n.Config.Logger.Errorf("failed to replace connection to raft member: %v", err) + newConn.Conn.Close() } } else if m.Type == raftpb.MsgSnap { n.ReportSnapshot(m.To, raft.SnapshotFinish) } } +func (n *Node) handleAddressChange(member *membership.Member, reconnectAddr string) error { + newConn, err := n.ConnectToMember(reconnectAddr, 0) + if err != nil { + return errors.Wrapf(err, "could connect to member ID %x at observed address %s", member.RaftID, reconnectAddr) + } + + healthCtx, cancelHealth := context.WithTimeout(n.Ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) + defer cancelHealth() + + if err := newConn.HealthCheck(healthCtx); err != nil { + return errors.Wrapf(err, "%x failed health check at observed address %s", member.RaftID, reconnectAddr) + } + + if err := n.cluster.ReplaceMemberConnection(member.RaftID, member, newConn, reconnectAddr, false); err != nil { + newConn.Conn.Close() + return errors.Wrap(err, "failed to replace connection to raft member") + } + + // If we're the leader, write the address change to raft + updateCtx, cancelUpdate := context.WithTimeout(n.Ctx, time.Duration(n.Config.ElectionTick)*n.opts.TickInterval) + defer cancelUpdate() + if err := n.updateMember(updateCtx, reconnectAddr, member.RaftID, member.NodeID); err != nil { + return errors.Wrap(err, "failed to update member address in raft") + } + + return nil +} + type applyResult struct { resp proto.Message err error @@ -1354,6 +1439,8 @@ func (n *Node) processConfChange(entry raftpb.Entry) { switch cc.Type { case raftpb.ConfChangeAddNode: err = n.applyAddNode(cc) + case raftpb.ConfChangeUpdateNode: + err = n.applyUpdateNode(cc) case raftpb.ConfChangeRemoveNode: err = n.applyRemoveNode(cc) } @@ -1387,6 +1474,43 @@ func (n *Node) applyAddNode(cc raftpb.ConfChange) error { return nil } +// applyUpdateNode is called when we receive a ConfChange from a member in the +// raft cluster which update the address of an existing node. +func (n *Node) applyUpdateNode(cc raftpb.ConfChange) error { + newMember := &api.RaftMember{} + err := proto.Unmarshal(cc.Context, newMember) + if err != nil { + return err + } + + oldMember := n.cluster.GetMember(newMember.RaftID) + + if oldMember == nil { + return ErrMemberUnknown + } + if oldMember.NodeID != newMember.NodeID { + // Should never happen; this is a sanity check + n.Config.Logger.Errorf("node ID mismatch on node update (old: %x, new: %x)", oldMember.NodeID, newMember.NodeID) + return errors.New("node ID mismatch match on node update") + } + + if oldMember.Addr == newMember.Addr || oldMember.Conn == nil { + // nothing to do + return nil + } + + newConn, err := n.ConnectToMember(newMember.Addr, 0) + if err != nil { + return errors.Errorf("could connect to member ID %x at %s: %v", newMember.RaftID, newMember.Addr, err) + } + if err := n.cluster.ReplaceMemberConnection(newMember.RaftID, oldMember, newConn, newMember.Addr, true); err != nil { + newConn.Conn.Close() + return err + } + + return nil +} + // applyRemoveNode is called when we receive a ConfChange // from a member in the raft cluster, this removes a node // from the existing raft cluster diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/store/secrets.go b/vendor/src/github.com/docker/swarmkit/manager/state/store/secrets.go new file mode 100644 index 0000000000..b8b25488af --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/manager/state/store/secrets.go @@ -0,0 +1,225 @@ +package store + +import ( + "strings" + + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/manager/state" + memdb "github.com/hashicorp/go-memdb" +) + +const tableSecret = "secret" + +func init() { + register(ObjectStoreConfig{ + Name: tableSecret, + Table: &memdb.TableSchema{ + Name: tableSecret, + Indexes: map[string]*memdb.IndexSchema{ + indexID: { + Name: indexID, + Unique: true, + Indexer: secretIndexerByID{}, + }, + indexName: { + Name: indexName, + Unique: true, + Indexer: secretIndexerByName{}, + }, + }, + }, + Save: func(tx ReadTx, snapshot *api.StoreSnapshot) error { + var err error + snapshot.Secrets, err = FindSecrets(tx, All) + return err + }, + Restore: func(tx Tx, snapshot *api.StoreSnapshot) error { + secrets, err := FindSecrets(tx, All) + if err != nil { + return err + } + for _, s := range secrets { + if err := DeleteSecret(tx, s.ID); err != nil { + return err + } + } + for _, s := range snapshot.Secrets { + if err := CreateSecret(tx, s); err != nil { + return err + } + } + return nil + }, + ApplyStoreAction: func(tx Tx, sa *api.StoreAction) error { + switch v := sa.Target.(type) { + case *api.StoreAction_Secret: + obj := v.Secret + switch sa.Action { + case api.StoreActionKindCreate: + return CreateSecret(tx, obj) + case api.StoreActionKindUpdate: + return UpdateSecret(tx, obj) + case api.StoreActionKindRemove: + return DeleteSecret(tx, obj.ID) + } + } + return errUnknownStoreAction + }, + NewStoreAction: func(c state.Event) (api.StoreAction, error) { + var sa api.StoreAction + switch v := c.(type) { + case state.EventCreateSecret: + sa.Action = api.StoreActionKindCreate + sa.Target = &api.StoreAction_Secret{ + Secret: v.Secret, + } + case state.EventUpdateSecret: + sa.Action = api.StoreActionKindUpdate + sa.Target = &api.StoreAction_Secret{ + Secret: v.Secret, + } + case state.EventDeleteSecret: + sa.Action = api.StoreActionKindRemove + sa.Target = &api.StoreAction_Secret{ + Secret: v.Secret, + } + default: + return api.StoreAction{}, errUnknownStoreAction + } + return sa, nil + }, + }) +} + +type secretEntry struct { + *api.Secret +} + +func (s secretEntry) ID() string { + return s.Secret.ID +} + +func (s secretEntry) Meta() api.Meta { + return s.Secret.Meta +} + +func (s secretEntry) SetMeta(meta api.Meta) { + s.Secret.Meta = meta +} + +func (s secretEntry) Copy() Object { + return secretEntry{s.Secret.Copy()} +} + +func (s secretEntry) EventCreate() state.Event { + return state.EventCreateSecret{Secret: s.Secret} +} + +func (s secretEntry) EventUpdate() state.Event { + return state.EventUpdateSecret{Secret: s.Secret} +} + +func (s secretEntry) EventDelete() state.Event { + return state.EventDeleteSecret{Secret: s.Secret} +} + +// CreateSecret adds a new secret to the store. +// Returns ErrExist if the ID is already taken. +func CreateSecret(tx Tx, s *api.Secret) error { + // Ensure the name is not already in use. + if tx.lookup(tableSecret, indexName, strings.ToLower(s.Spec.Annotations.Name)) != nil { + return ErrNameConflict + } + + return tx.create(tableSecret, secretEntry{s}) +} + +// UpdateSecret updates an existing secret in the store. +// Returns ErrNotExist if the secret doesn't exist. +func UpdateSecret(tx Tx, s *api.Secret) error { + // Ensure the name is either not in use or already used by this same Secret. + if existing := tx.lookup(tableSecret, indexName, strings.ToLower(s.Spec.Annotations.Name)); existing != nil { + if existing.ID() != s.ID { + return ErrNameConflict + } + } + + return tx.update(tableSecret, secretEntry{s}) +} + +// DeleteSecret removes a secret from the store. +// Returns ErrNotExist if the secret doesn't exist. +func DeleteSecret(tx Tx, id string) error { + return tx.delete(tableSecret, id) +} + +// GetSecret looks up a secret by ID. +// Returns nil if the secret doesn't exist. +func GetSecret(tx ReadTx, id string) *api.Secret { + n := tx.get(tableSecret, id) + if n == nil { + return nil + } + return n.(secretEntry).Secret +} + +// FindSecrets selects a set of secrets and returns them. +func FindSecrets(tx ReadTx, by By) ([]*api.Secret, error) { + checkType := func(by By) error { + switch by.(type) { + case byName, byNamePrefix, byIDPrefix: + return nil + default: + return ErrInvalidFindBy + } + } + + secretList := []*api.Secret{} + appendResult := func(o Object) { + secretList = append(secretList, o.(secretEntry).Secret) + } + + err := tx.find(tableSecret, by, checkType, appendResult) + return secretList, err +} + +type secretIndexerByID struct{} + +func (ci secretIndexerByID) FromArgs(args ...interface{}) ([]byte, error) { + return fromArgs(args...) +} + +func (ci secretIndexerByID) FromObject(obj interface{}) (bool, []byte, error) { + s, ok := obj.(secretEntry) + if !ok { + panic("unexpected type passed to FromObject") + } + + // Add the null character as a terminator + val := s.Secret.ID + "\x00" + return true, []byte(val), nil +} + +func (ci secretIndexerByID) PrefixFromArgs(args ...interface{}) ([]byte, error) { + return prefixFromArgs(args...) +} + +type secretIndexerByName struct{} + +func (ci secretIndexerByName) FromArgs(args ...interface{}) ([]byte, error) { + return fromArgs(args...) +} + +func (ci secretIndexerByName) FromObject(obj interface{}) (bool, []byte, error) { + s, ok := obj.(secretEntry) + if !ok { + panic("unexpected type passed to FromObject") + } + + // Add the null character as a terminator + return true, []byte(strings.ToLower(s.Spec.Annotations.Name) + "\x00"), nil +} + +func (ci secretIndexerByName) PrefixFromArgs(args ...interface{}) ([]byte, error) { + return prefixFromArgs(args...) +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/watch.go b/vendor/src/github.com/docker/swarmkit/manager/state/watch.go index 0d0a742c4d..cf1f29b5e1 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/watch.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/watch.go @@ -451,6 +451,87 @@ func (e EventDeleteCluster) matches(watchEvent events.Event) bool { return true } +// SecretCheckFunc is the type of function used to perform filtering checks on +// api.Secret structures. +type SecretCheckFunc func(v1, v2 *api.Secret) bool + +// SecretCheckID is a SecretCheckFunc for matching volume IDs. +func SecretCheckID(v1, v2 *api.Secret) bool { + return v1.ID == v2.ID +} + +// EventCreateSecret is the type used to put CreateSecret events on the +// publish/subscribe queue and filter these events in calls to Watch. +type EventCreateSecret struct { + Secret *api.Secret + // Checks is a list of functions to call to filter events for a watch + // stream. They are applied with AND logic. They are only applicable for + // calls to Watch. + Checks []SecretCheckFunc +} + +func (e EventCreateSecret) matches(watchEvent events.Event) bool { + typedEvent, ok := watchEvent.(EventCreateSecret) + if !ok { + return false + } + + for _, check := range e.Checks { + if !check(e.Secret, typedEvent.Secret) { + return false + } + } + return true +} + +// EventUpdateSecret is the type used to put UpdateSecret events on the +// publish/subscribe queue and filter these events in calls to Watch. +type EventUpdateSecret struct { + Secret *api.Secret + // Checks is a list of functions to call to filter events for a watch + // stream. They are applied with AND logic. They are only applicable for + // calls to Watch. + Checks []SecretCheckFunc +} + +func (e EventUpdateSecret) matches(watchEvent events.Event) bool { + typedEvent, ok := watchEvent.(EventUpdateSecret) + if !ok { + return false + } + + for _, check := range e.Checks { + if !check(e.Secret, typedEvent.Secret) { + return false + } + } + return true +} + +// EventDeleteSecret is the type used to put DeleteSecret events on the +// publish/subscribe queue and filter these events in calls to Watch. +type EventDeleteSecret struct { + Secret *api.Secret + // Checks is a list of functions to call to filter events for a watch + // stream. They are applied with AND logic. They are only applicable for + // calls to Watch. + Checks []SecretCheckFunc +} + +func (e EventDeleteSecret) matches(watchEvent events.Event) bool { + typedEvent, ok := watchEvent.(EventDeleteSecret) + if !ok { + return false + } + + for _, check := range e.Checks { + if !check(e.Secret, typedEvent.Secret) { + return false + } + } + return true +} + // Watch takes a variable number of events to match against. The subscriber // will receive events that match any of the arguments passed to Watch. //