diff --git a/vendor.conf b/vendor.conf index aa9a3e1a89..33e905d25a 100644 --- a/vendor.conf +++ b/vendor.conf @@ -100,7 +100,7 @@ github.com/docker/containerd 8517738ba4b82aff5662c97ca4627e7e4d03b531 github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4 # cluster -github.com/docker/swarmkit efd44df04cc0fd828de5947263858c3a5a2729b1 +github.com/docker/swarmkit 91c6e2db9c0c91c466a83529ed16649a1de7ccc4 github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 github.com/gogo/protobuf v0.3 github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/secret.go b/vendor/github.com/docker/swarmkit/manager/controlapi/secret.go index 7ddbaf580a..36fdd85ad9 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/secret.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/secret.go @@ -1,6 +1,7 @@ package controlapi import ( + "crypto/subtle" "regexp" "strings" @@ -71,7 +72,10 @@ func (s *Server) UpdateSecret(ctx context.Context, request *api.UpdateSecretRequ return nil } - if secret.Spec.Annotations.Name != request.Spec.Annotations.Name || request.Spec.Data != nil { + // Check if the Name is different than the current name, or the secret is non-nil and different + // than the current secret + if secret.Spec.Annotations.Name != request.Spec.Annotations.Name || + (request.Spec.Data != nil && subtle.ConstantTimeCompare(request.Spec.Data, secret.Spec.Data) == 0) { return grpc.Errorf(codes.InvalidArgument, "only updates to Labels are allowed") } diff --git a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go index 915574a92d..f241836059 100644 --- a/vendor/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/github.com/docker/swarmkit/manager/controlapi/service.go @@ -203,10 +203,6 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error { return nil } - if len(epSpec.Ports) > 0 && epSpec.Mode == api.ResolutionModeDNSRoundRobin { - return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: ports can't be used with dnsrr mode") - } - type portSpec struct { publishedPort uint32 protocol api.PortConfig_Protocol @@ -214,6 +210,17 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error { portSet := make(map[portSpec]struct{}) for _, port := range epSpec.Ports { + // Publish mode = "ingress" represents Routing-Mesh and current implementation + // of routing-mesh relies on IPVS based load-balancing with input=published-port. + // But Endpoint-Spec mode of DNSRR relies on multiple A records and cannot be used + // with routing-mesh (PublishMode="ingress") which cannot rely on DNSRR. + // But PublishMode="host" doesn't provide Routing-Mesh and the DNSRR is applicable + // for the backend network and hence we accept that configuration. + + if epSpec.Mode == api.ResolutionModeDNSRoundRobin && port.PublishMode == api.PublishModeIngress { + return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: port published with ingress mode can't be used with dnsrr mode") + } + // If published port is not specified, it does not conflict // with any others. if port.PublishedPort == 0 { diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go index d0bfebc0a5..1db89307e4 100644 --- a/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/broker.go @@ -14,6 +14,7 @@ import ( "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/identity" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/watch" "golang.org/x/net/context" ) @@ -23,7 +24,7 @@ var ( errNotRunning = errors.New("broker is not running") ) -// LogBroker coordinates log subscriptions to services and tasks. Çlients can +// LogBroker coordinates log subscriptions to services and tasks. Clients can // publish and subscribe to logs channels. // // Log subscriptions are pushed to the work nodes by creating log subscsription @@ -33,15 +34,19 @@ type LogBroker struct { logQueue *watch.Queue subscriptionQueue *watch.Queue - registeredSubscriptions map[string]*api.SubscriptionMessage + registeredSubscriptions map[string]*subscription pctx context.Context cancelAll context.CancelFunc + + store *store.MemoryStore } // New initializes and returns a new LogBroker -func New() *LogBroker { - return &LogBroker{} +func New(store *store.MemoryStore) *LogBroker { + return &LogBroker{ + store: store, + } } // Run the log broker @@ -56,7 +61,7 @@ func (lb *LogBroker) Run(ctx context.Context) error { lb.pctx, lb.cancelAll = context.WithCancel(ctx) lb.logQueue = watch.NewQueue() lb.subscriptionQueue = watch.NewQueue() - lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage) + lb.registeredSubscriptions = make(map[string]*subscription) lb.mu.Unlock() select { @@ -94,36 +99,60 @@ func validateSelector(selector *api.LogSelector) error { return nil } -func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) { - lb.mu.Lock() - defer lb.mu.Unlock() - - lb.registeredSubscriptions[subscription.ID] = subscription - lb.subscriptionQueue.Publish(subscription) -} - -func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) { - subscription = subscription.Copy() - subscription.Close = true - - lb.mu.Lock() - defer lb.mu.Unlock() - - delete(lb.registeredSubscriptions, subscription.ID) - lb.subscriptionQueue.Publish(subscription) -} - -func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) { +func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription { lb.mu.RLock() defer lb.mu.RUnlock() - subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions)) - for _, sub := range lb.registeredSubscriptions { - subs = append(subs, sub) + subscription := newSubscription(lb.store, &api.SubscriptionMessage{ + ID: identity.NewID(), + Selector: selector, + Options: options, + }, lb.subscriptionQueue) + + return subscription +} + +func (lb *LogBroker) registerSubscription(subscription *subscription) { + lb.mu.Lock() + defer lb.mu.Unlock() + + lb.registeredSubscriptions[subscription.message.ID] = subscription + lb.subscriptionQueue.Publish(subscription) +} + +func (lb *LogBroker) unregisterSubscription(subscription *subscription) { + lb.mu.Lock() + defer lb.mu.Unlock() + + delete(lb.registeredSubscriptions, subscription.message.ID) + subscription.message.Close = true + lb.subscriptionQueue.Publish(subscription) +} + +// watchSubscriptions grabs all current subscriptions and notifies of any +// subscription change for this node. +// +// Subscriptions may fire multiple times and the caller has to protect against +// dupes. +func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) { + lb.mu.RLock() + defer lb.mu.RUnlock() + + // Watch for subscription changes for this node. + ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool { + s := event.(*subscription) + return s.Contains(nodeID) + })) + + // Grab current subscriptions. + subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions)) + for _, s := range lb.registeredSubscriptions { + if s.Contains(nodeID) { + subscriptions = append(subscriptions, s) + } } - ch, cancel := lb.subscriptionQueue.Watch() - return subs, ch, cancel + return subscriptions, ch, cancel } func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) { @@ -151,22 +180,20 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api return err } - subscription := &api.SubscriptionMessage{ - ID: identity.NewID(), - Selector: request.Selector, - Options: request.Options, - } + subscription := lb.newSubscription(request.Selector, request.Options) + subscription.Run(lb.pctx) + defer subscription.Stop() log := log.G(ctx).WithFields( logrus.Fields{ "method": "(*LogBroker).SubscribeLogs", - "subscription.id": subscription.ID, + "subscription.id": subscription.message.ID, }, ) log.Debug("subscribed") - publishCh, publishCancel := lb.subscribe(subscription.ID) + publishCh, publishCancel := lb.subscribe(subscription.message.ID) defer publishCancel() lb.registerSubscription(subscription) @@ -202,11 +229,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest "node": remote.NodeID, }, ) - subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions() + subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID) defer subscriptionCancel() log.Debug("node registered") + activeSubscriptions := make(map[string]struct{}) + // Start by sending down all active subscriptions. for _, subscription := range subscriptions { select { @@ -217,19 +246,30 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest default: } - if err := stream.Send(subscription); err != nil { + if err := stream.Send(subscription.message); err != nil { log.Error(err) return err } + activeSubscriptions[subscription.message.ID] = struct{}{} } // Send down new subscriptions. - // TODO(aluzzardi): We should filter by relevant tasks for this node rather for { select { case v := <-subscriptionCh: - subscription := v.(*api.SubscriptionMessage) - if err := stream.Send(subscription); err != nil { + subscription := v.(*subscription) + + if subscription.message.Close { + log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed") + } else { + // Avoid sending down the same subscription multiple times + if _, ok := activeSubscriptions[subscription.message.ID]; ok { + continue + } + activeSubscriptions[subscription.message.ID] = struct{}{} + log.WithField("subscription.id", subscription.message.ID).Debug("subscription added") + } + if err := stream.Send(subscription.message); err != nil { log.Error(err) return err } diff --git a/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go new file mode 100644 index 0000000000..ac945e234b --- /dev/null +++ b/vendor/github.com/docker/swarmkit/manager/logbroker/subscription.go @@ -0,0 +1,138 @@ +package logbroker + +import ( + "context" + "sync" + + events "github.com/docker/go-events" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/manager/state" + "github.com/docker/swarmkit/manager/state/store" + "github.com/docker/swarmkit/watch" +) + +type subscription struct { + mu sync.RWMutex + + store *store.MemoryStore + message *api.SubscriptionMessage + changed *watch.Queue + + ctx context.Context + cancel context.CancelFunc + + nodes map[string]struct{} +} + +func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription { + return &subscription{ + store: store, + message: message, + changed: changed, + nodes: make(map[string]struct{}), + } +} + +func (s *subscription) Contains(nodeID string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + + _, ok := s.nodes[nodeID] + return ok +} + +func (s *subscription) Run(ctx context.Context) { + s.ctx, s.cancel = context.WithCancel(ctx) + + wq := s.store.WatchQueue() + ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{}) + go func() { + defer cancel() + s.watch(ch) + }() + + s.match() +} + +func (s *subscription) Stop() { + if s.cancel != nil { + s.cancel() + } +} + +func (s *subscription) match() { + s.mu.Lock() + defer s.mu.Unlock() + + s.store.View(func(tx store.ReadTx) { + for _, nid := range s.message.Selector.NodeIDs { + s.nodes[nid] = struct{}{} + } + + for _, tid := range s.message.Selector.TaskIDs { + if task := store.GetTask(tx, tid); task != nil { + s.nodes[task.NodeID] = struct{}{} + } + } + + for _, sid := range s.message.Selector.ServiceIDs { + tasks, err := store.FindTasks(tx, store.ByServiceID(sid)) + if err != nil { + log.L.Warning(err) + continue + } + for _, task := range tasks { + s.nodes[task.NodeID] = struct{}{} + } + } + }) +} + +func (s *subscription) watch(ch <-chan events.Event) error { + matchTasks := map[string]struct{}{} + for _, tid := range s.message.Selector.TaskIDs { + matchTasks[tid] = struct{}{} + } + + matchServices := map[string]struct{}{} + for _, sid := range s.message.Selector.ServiceIDs { + matchServices[sid] = struct{}{} + } + + add := func(nodeID string) { + s.mu.Lock() + defer s.mu.Unlock() + + if _, ok := s.nodes[nodeID]; !ok { + s.nodes[nodeID] = struct{}{} + s.changed.Publish(s) + } + } + + for { + var t *api.Task + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case event := <-ch: + switch v := event.(type) { + case state.EventCreateTask: + t = v.Task + case state.EventUpdateTask: + t = v.Task + } + } + + if t == nil { + panic("received invalid task from the watch queue") + } + + if _, ok := matchTasks[t.ID]; ok { + add(t.NodeID) + } + if _, ok := matchServices[t.ServiceID]; ok { + add(t.NodeID) + } + } +} diff --git a/vendor/github.com/docker/swarmkit/manager/manager.go b/vendor/github.com/docker/swarmkit/manager/manager.go index 975ca57fc5..edf807cdd8 100644 --- a/vendor/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/github.com/docker/swarmkit/manager/manager.go @@ -256,7 +256,7 @@ func New(config *Config) (*Manager, error) { listeners: listeners, caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig), dispatcher: dispatcher.New(raftNode, dispatcherConfig), - logbroker: logbroker.New(), + logbroker: logbroker.New(raftNode.MemoryStore()), server: grpc.NewServer(opts...), localserver: grpc.NewServer(opts...), raftNode: raftNode,