1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Re-vendor SwarmKit to 91c6e2db9c0c91c466a83529ed16649a1de7ccc4

Signed-off-by: Andrea Luzzardi <aluzzardi@gmail.com>
This commit is contained in:
Andrea Luzzardi 2016-11-16 18:15:15 -08:00
parent 956ff8f773
commit fbe8790759
6 changed files with 238 additions and 49 deletions

View file

@ -100,7 +100,7 @@ github.com/docker/containerd 8517738ba4b82aff5662c97ca4627e7e4d03b531
github.com/tonistiigi/fifo 1405643975692217d6720f8b54aeee1bf2cd5cf4
# cluster
github.com/docker/swarmkit efd44df04cc0fd828de5947263858c3a5a2729b1
github.com/docker/swarmkit 91c6e2db9c0c91c466a83529ed16649a1de7ccc4
github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
github.com/gogo/protobuf v0.3
github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View file

@ -1,6 +1,7 @@
package controlapi
import (
"crypto/subtle"
"regexp"
"strings"
@ -71,7 +72,10 @@ func (s *Server) UpdateSecret(ctx context.Context, request *api.UpdateSecretRequ
return nil
}
if secret.Spec.Annotations.Name != request.Spec.Annotations.Name || request.Spec.Data != nil {
// Check if the Name is different than the current name, or the secret is non-nil and different
// than the current secret
if secret.Spec.Annotations.Name != request.Spec.Annotations.Name ||
(request.Spec.Data != nil && subtle.ConstantTimeCompare(request.Spec.Data, secret.Spec.Data) == 0) {
return grpc.Errorf(codes.InvalidArgument, "only updates to Labels are allowed")
}

View file

@ -203,10 +203,6 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
return nil
}
if len(epSpec.Ports) > 0 && epSpec.Mode == api.ResolutionModeDNSRoundRobin {
return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: ports can't be used with dnsrr mode")
}
type portSpec struct {
publishedPort uint32
protocol api.PortConfig_Protocol
@ -214,6 +210,17 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
portSet := make(map[portSpec]struct{})
for _, port := range epSpec.Ports {
// Publish mode = "ingress" represents Routing-Mesh and current implementation
// of routing-mesh relies on IPVS based load-balancing with input=published-port.
// But Endpoint-Spec mode of DNSRR relies on multiple A records and cannot be used
// with routing-mesh (PublishMode="ingress") which cannot rely on DNSRR.
// But PublishMode="host" doesn't provide Routing-Mesh and the DNSRR is applicable
// for the backend network and hence we accept that configuration.
if epSpec.Mode == api.ResolutionModeDNSRoundRobin && port.PublishMode == api.PublishModeIngress {
return grpc.Errorf(codes.InvalidArgument, "EndpointSpec: port published with ingress mode can't be used with dnsrr mode")
}
// If published port is not specified, it does not conflict
// with any others.
if port.PublishedPort == 0 {

View file

@ -14,6 +14,7 @@ import (
"github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/watch"
"golang.org/x/net/context"
)
@ -23,7 +24,7 @@ var (
errNotRunning = errors.New("broker is not running")
)
// LogBroker coordinates log subscriptions to services and tasks. Çlients can
// LogBroker coordinates log subscriptions to services and tasks. Clients can
// publish and subscribe to logs channels.
//
// Log subscriptions are pushed to the work nodes by creating log subscsription
@ -33,15 +34,19 @@ type LogBroker struct {
logQueue *watch.Queue
subscriptionQueue *watch.Queue
registeredSubscriptions map[string]*api.SubscriptionMessage
registeredSubscriptions map[string]*subscription
pctx context.Context
cancelAll context.CancelFunc
store *store.MemoryStore
}
// New initializes and returns a new LogBroker
func New() *LogBroker {
return &LogBroker{}
func New(store *store.MemoryStore) *LogBroker {
return &LogBroker{
store: store,
}
}
// Run the log broker
@ -56,7 +61,7 @@ func (lb *LogBroker) Run(ctx context.Context) error {
lb.pctx, lb.cancelAll = context.WithCancel(ctx)
lb.logQueue = watch.NewQueue()
lb.subscriptionQueue = watch.NewQueue()
lb.registeredSubscriptions = make(map[string]*api.SubscriptionMessage)
lb.registeredSubscriptions = make(map[string]*subscription)
lb.mu.Unlock()
select {
@ -94,36 +99,60 @@ func validateSelector(selector *api.LogSelector) error {
return nil
}
func (lb *LogBroker) registerSubscription(subscription *api.SubscriptionMessage) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.registeredSubscriptions[subscription.ID] = subscription
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) unregisterSubscription(subscription *api.SubscriptionMessage) {
subscription = subscription.Copy()
subscription.Close = true
lb.mu.Lock()
defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.ID)
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) watchSubscriptions() ([]*api.SubscriptionMessage, chan events.Event, func()) {
func (lb *LogBroker) newSubscription(selector *api.LogSelector, options *api.LogSubscriptionOptions) *subscription {
lb.mu.RLock()
defer lb.mu.RUnlock()
subs := make([]*api.SubscriptionMessage, 0, len(lb.registeredSubscriptions))
for _, sub := range lb.registeredSubscriptions {
subs = append(subs, sub)
subscription := newSubscription(lb.store, &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: selector,
Options: options,
}, lb.subscriptionQueue)
return subscription
}
func (lb *LogBroker) registerSubscription(subscription *subscription) {
lb.mu.Lock()
defer lb.mu.Unlock()
lb.registeredSubscriptions[subscription.message.ID] = subscription
lb.subscriptionQueue.Publish(subscription)
}
func (lb *LogBroker) unregisterSubscription(subscription *subscription) {
lb.mu.Lock()
defer lb.mu.Unlock()
delete(lb.registeredSubscriptions, subscription.message.ID)
subscription.message.Close = true
lb.subscriptionQueue.Publish(subscription)
}
// watchSubscriptions grabs all current subscriptions and notifies of any
// subscription change for this node.
//
// Subscriptions may fire multiple times and the caller has to protect against
// dupes.
func (lb *LogBroker) watchSubscriptions(nodeID string) ([]*subscription, chan events.Event, func()) {
lb.mu.RLock()
defer lb.mu.RUnlock()
// Watch for subscription changes for this node.
ch, cancel := lb.subscriptionQueue.CallbackWatch(events.MatcherFunc(func(event events.Event) bool {
s := event.(*subscription)
return s.Contains(nodeID)
}))
// Grab current subscriptions.
subscriptions := make([]*subscription, 0, len(lb.registeredSubscriptions))
for _, s := range lb.registeredSubscriptions {
if s.Contains(nodeID) {
subscriptions = append(subscriptions, s)
}
}
ch, cancel := lb.subscriptionQueue.Watch()
return subs, ch, cancel
return subscriptions, ch, cancel
}
func (lb *LogBroker) subscribe(id string) (chan events.Event, func()) {
@ -151,22 +180,20 @@ func (lb *LogBroker) SubscribeLogs(request *api.SubscribeLogsRequest, stream api
return err
}
subscription := &api.SubscriptionMessage{
ID: identity.NewID(),
Selector: request.Selector,
Options: request.Options,
}
subscription := lb.newSubscription(request.Selector, request.Options)
subscription.Run(lb.pctx)
defer subscription.Stop()
log := log.G(ctx).WithFields(
logrus.Fields{
"method": "(*LogBroker).SubscribeLogs",
"subscription.id": subscription.ID,
"subscription.id": subscription.message.ID,
},
)
log.Debug("subscribed")
publishCh, publishCancel := lb.subscribe(subscription.ID)
publishCh, publishCancel := lb.subscribe(subscription.message.ID)
defer publishCancel()
lb.registerSubscription(subscription)
@ -202,11 +229,13 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
"node": remote.NodeID,
},
)
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions()
subscriptions, subscriptionCh, subscriptionCancel := lb.watchSubscriptions(remote.NodeID)
defer subscriptionCancel()
log.Debug("node registered")
activeSubscriptions := make(map[string]struct{})
// Start by sending down all active subscriptions.
for _, subscription := range subscriptions {
select {
@ -217,19 +246,30 @@ func (lb *LogBroker) ListenSubscriptions(request *api.ListenSubscriptionsRequest
default:
}
if err := stream.Send(subscription); err != nil {
if err := stream.Send(subscription.message); err != nil {
log.Error(err)
return err
}
activeSubscriptions[subscription.message.ID] = struct{}{}
}
// Send down new subscriptions.
// TODO(aluzzardi): We should filter by relevant tasks for this node rather
for {
select {
case v := <-subscriptionCh:
subscription := v.(*api.SubscriptionMessage)
if err := stream.Send(subscription); err != nil {
subscription := v.(*subscription)
if subscription.message.Close {
log.WithField("subscription.id", subscription.message.ID).Debug("subscription closed")
} else {
// Avoid sending down the same subscription multiple times
if _, ok := activeSubscriptions[subscription.message.ID]; ok {
continue
}
activeSubscriptions[subscription.message.ID] = struct{}{}
log.WithField("subscription.id", subscription.message.ID).Debug("subscription added")
}
if err := stream.Send(subscription.message); err != nil {
log.Error(err)
return err
}

View file

@ -0,0 +1,138 @@
package logbroker
import (
"context"
"sync"
events "github.com/docker/go-events"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/watch"
)
type subscription struct {
mu sync.RWMutex
store *store.MemoryStore
message *api.SubscriptionMessage
changed *watch.Queue
ctx context.Context
cancel context.CancelFunc
nodes map[string]struct{}
}
func newSubscription(store *store.MemoryStore, message *api.SubscriptionMessage, changed *watch.Queue) *subscription {
return &subscription{
store: store,
message: message,
changed: changed,
nodes: make(map[string]struct{}),
}
}
func (s *subscription) Contains(nodeID string) bool {
s.mu.RLock()
defer s.mu.RUnlock()
_, ok := s.nodes[nodeID]
return ok
}
func (s *subscription) Run(ctx context.Context) {
s.ctx, s.cancel = context.WithCancel(ctx)
wq := s.store.WatchQueue()
ch, cancel := state.Watch(wq, state.EventCreateTask{}, state.EventUpdateTask{})
go func() {
defer cancel()
s.watch(ch)
}()
s.match()
}
func (s *subscription) Stop() {
if s.cancel != nil {
s.cancel()
}
}
func (s *subscription) match() {
s.mu.Lock()
defer s.mu.Unlock()
s.store.View(func(tx store.ReadTx) {
for _, nid := range s.message.Selector.NodeIDs {
s.nodes[nid] = struct{}{}
}
for _, tid := range s.message.Selector.TaskIDs {
if task := store.GetTask(tx, tid); task != nil {
s.nodes[task.NodeID] = struct{}{}
}
}
for _, sid := range s.message.Selector.ServiceIDs {
tasks, err := store.FindTasks(tx, store.ByServiceID(sid))
if err != nil {
log.L.Warning(err)
continue
}
for _, task := range tasks {
s.nodes[task.NodeID] = struct{}{}
}
}
})
}
func (s *subscription) watch(ch <-chan events.Event) error {
matchTasks := map[string]struct{}{}
for _, tid := range s.message.Selector.TaskIDs {
matchTasks[tid] = struct{}{}
}
matchServices := map[string]struct{}{}
for _, sid := range s.message.Selector.ServiceIDs {
matchServices[sid] = struct{}{}
}
add := func(nodeID string) {
s.mu.Lock()
defer s.mu.Unlock()
if _, ok := s.nodes[nodeID]; !ok {
s.nodes[nodeID] = struct{}{}
s.changed.Publish(s)
}
}
for {
var t *api.Task
select {
case <-s.ctx.Done():
return s.ctx.Err()
case event := <-ch:
switch v := event.(type) {
case state.EventCreateTask:
t = v.Task
case state.EventUpdateTask:
t = v.Task
}
}
if t == nil {
panic("received invalid task from the watch queue")
}
if _, ok := matchTasks[t.ID]; ok {
add(t.NodeID)
}
if _, ok := matchServices[t.ServiceID]; ok {
add(t.NodeID)
}
}
}

View file

@ -256,7 +256,7 @@ func New(config *Config) (*Manager, error) {
listeners: listeners,
caserver: ca.NewServer(raftNode.MemoryStore(), config.SecurityConfig),
dispatcher: dispatcher.New(raftNode, dispatcherConfig),
logbroker: logbroker.New(),
logbroker: logbroker.New(raftNode.MemoryStore()),
server: grpc.NewServer(opts...),
localserver: grpc.NewServer(opts...),
raftNode: raftNode,