diff --git a/hack/vendor.sh b/hack/vendor.sh index f4b079b5d5..a03d13c69b 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267 # cluster -clone git github.com/docker/swarmkit 4d7e44321726f011d010cdb72d2230f5db2b604e +clone git github.com/docker/swarmkit 9d4c2f73124e70f8fa85f9076635b827d17b109f clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028 clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b diff --git a/vendor/src/github.com/docker/swarmkit/agent/task.go b/vendor/src/github.com/docker/swarmkit/agent/task.go index d10e253afd..005ffdf973 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/task.go +++ b/vendor/src/github.com/docker/swarmkit/agent/task.go @@ -1,11 +1,11 @@ package agent import ( - "reflect" "time" "github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/api/equality" "github.com/docker/swarmkit/log" "golang.org/x/net/context" ) @@ -175,7 +175,7 @@ func (tm *taskManager) run(ctx context.Context) { case status := <-statusq: tm.task.Status = *status case task := <-tm.updateq: - if tasksEqual(task, tm.task) { + if equality.TasksEqualStable(task, tm.task) { continue // ignore the update } @@ -241,17 +241,3 @@ func (tm *taskManager) run(ctx context.Context) { } } } - -// tasksEqual returns true if the tasks are functionaly equal, ignoring status, -// version and other superfluous fields. -// -// This used to decide whether or not to propagate a task update to a controller. -func tasksEqual(a, b *api.Task) bool { - // shallow copy - copyA, copyB := *a, *b - - copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{} - copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{} - - return reflect.DeepEqual(©A, ©B) -} diff --git a/vendor/src/github.com/docker/swarmkit/api/equality/equality.go b/vendor/src/github.com/docker/swarmkit/api/equality/equality.go new file mode 100644 index 0000000000..624f4f689a --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/api/equality/equality.go @@ -0,0 +1,21 @@ +package equality + +import ( + "reflect" + + "github.com/docker/swarmkit/api" +) + +// TasksEqualStable returns true if the tasks are functionaly equal, ignoring status, +// version and other superfluous fields. +// +// This used to decide whether or not to propagate a task update to a controller. +func TasksEqualStable(a, b *api.Task) bool { + // shallow copy + copyA, copyB := *a, *b + + copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{} + copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{} + + return reflect.DeepEqual(©A, ©B) +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index 1c27dd9920..3617d7e756 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -14,6 +14,7 @@ import ( "github.com/Sirupsen/logrus" "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/api/equality" "github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/log" "github.com/docker/swarmkit/manager/state" @@ -29,7 +30,7 @@ const ( DefaultHeartBeatPeriod = 5 * time.Second defaultHeartBeatEpsilon = 500 * time.Millisecond defaultGracePeriodMultiplier = 3 - defaultRateLimitPeriod = 16 * time.Second + defaultRateLimitPeriod = 8 * time.Second // maxBatchItems is the threshold of queued writes that should // trigger an actual transaction to commit them to the shared store. @@ -572,20 +573,44 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe return err } - select { - case event := <-nodeTasks: - switch v := event.(type) { - case state.EventCreateTask: - tasksMap[v.Task.ID] = v.Task - case state.EventUpdateTask: - tasksMap[v.Task.ID] = v.Task - case state.EventDeleteTask: - delete(tasksMap, v.Task.ID) + // bursty events should be processed in batches and sent out snapshot + const modificationBatchLimit = 200 + const eventPausedGap = 50 * time.Millisecond + var modificationCnt int + // eventPaused is true when there have been modifications + // but next event has not arrived within eventPausedGap + eventPaused := false + + for modificationCnt < modificationBatchLimit && !eventPaused { + select { + case event := <-nodeTasks: + switch v := event.(type) { + case state.EventCreateTask: + tasksMap[v.Task.ID] = v.Task + modificationCnt++ + case state.EventUpdateTask: + if oldTask, exists := tasksMap[v.Task.ID]; exists { + if equality.TasksEqualStable(oldTask, v.Task) { + // this update should not trigger action at agent + tasksMap[v.Task.ID] = v.Task + continue + } + } + tasksMap[v.Task.ID] = v.Task + modificationCnt++ + case state.EventDeleteTask: + delete(tasksMap, v.Task.ID) + modificationCnt++ + } + case <-time.After(eventPausedGap): + if modificationCnt > 0 { + eventPaused = true + } + case <-stream.Context().Done(): + return stream.Context().Err() + case <-d.ctx.Done(): + return d.ctx.Err() } - case <-stream.Context().Done(): - return stream.Context().Err() - case <-d.ctx.Done(): - return d.ctx.Err() } } } diff --git a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/nodes.go b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/nodes.go index b69b4be2ad..2d0ea076a5 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/nodes.go +++ b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/nodes.go @@ -12,10 +12,13 @@ import ( "github.com/docker/swarmkit/manager/dispatcher/heartbeat" ) +const rateLimitCount = 3 + type registeredNode struct { SessionID string Heartbeat *heartbeat.Heartbeat Registered time.Time + Attempts int Node *api.Node Disconnect chan struct{} // signal to disconnect mu sync.Mutex @@ -86,9 +89,14 @@ func (s *nodeStore) CheckRateLimit(id string) error { s.mu.Lock() defer s.mu.Unlock() if existRn, ok := s.nodes[id]; ok { - if time.Since(existRn.Registered) < s.rateLimitPeriod { - return grpc.Errorf(codes.Unavailable, "node %s attempted registration too recently", id) + if time.Since(existRn.Registered) > s.rateLimitPeriod { + existRn.Attempts = 0 } + existRn.Attempts++ + if existRn.Attempts > rateLimitCount { + return grpc.Errorf(codes.Unavailable, "node %s exceeded rate limit count of registrations", id) + } + existRn.Registered = time.Now() } return nil } @@ -97,14 +105,22 @@ func (s *nodeStore) CheckRateLimit(id string) error { func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode { s.mu.Lock() defer s.mu.Unlock() + var attempts int + var registered time.Time if existRn, ok := s.nodes[n.ID]; ok { + attempts = existRn.Attempts + registered = existRn.Registered existRn.Heartbeat.Stop() delete(s.nodes, n.ID) } + if registered.IsZero() { + registered = time.Now() + } rn := ®isteredNode{ SessionID: identity.NewID(), // session ID is local to the dispatcher. Node: n, - Registered: time.Now(), + Registered: registered, + Attempts: attempts, Disconnect: make(chan struct{}), } s.nodes[n.ID] = rn diff --git a/vendor/src/github.com/docker/swarmkit/manager/manager.go b/vendor/src/github.com/docker/swarmkit/manager/manager.go index 861e071d79..15e1f94981 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/src/github.com/docker/swarmkit/manager/manager.go @@ -3,7 +3,6 @@ package manager import ( "crypto/x509" "encoding/pem" - "errors" "fmt" "net" "os" @@ -121,16 +120,26 @@ func New(config *Config) (*Manager, error) { config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String() } - tcpAddr := config.ProtoAddr["tcp"] - - if config.AdvertiseAddr != "" { - tcpAddr = config.AdvertiseAddr - } + // If an AdvertiseAddr was specified, we use that as our + // externally-reachable address. + tcpAddr := config.AdvertiseAddr if tcpAddr == "" { - return nil, errors.New("no tcp listen address or listener provided") + // Otherwise, we know we are joining an existing swarm. Use a + // wildcard address to trigger remote autodetection of our + // address. + _, tcpAddrPort, err := net.SplitHostPort(config.ProtoAddr["tcp"]) + if err != nil { + return nil, fmt.Errorf("missing or invalid listen address %s", config.ProtoAddr["tcp"]) + } + + // Even with an IPv6 listening address, it's okay to use + // 0.0.0.0 here. Any "unspecified" (wildcard) IP will + // be substituted with the actual source address. + tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort) } + // FIXME(aaronl): Remove this. It appears to be unused. dispatcherConfig.Addr = tcpAddr err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700) diff --git a/vendor/src/github.com/docker/swarmkit/manager/scheduler/pipeline.go b/vendor/src/github.com/docker/swarmkit/manager/scheduler/pipeline.go index 517319376c..b1517db961 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/scheduler/pipeline.go +++ b/vendor/src/github.com/docker/swarmkit/manager/scheduler/pipeline.go @@ -7,7 +7,12 @@ var ( // Always check for readiness first. &ReadyFilter{}, &ResourceFilter{}, - &PluginFilter{}, + + // TODO(stevvooe): Do not filter based on plugins since they are lazy + // loaded in the engine. We can add this back when we can schedule + // plugins in the future. + // &PluginFilter{}, + &ConstraintFilter{}, } ) diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go index 4389db7848..212c09e1e0 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -401,7 +401,9 @@ func (n *Node) Run(ctx context.Context) error { // restoring from the state, campaign to be the // leader. if !n.restored { - if len(n.cluster.Members()) <= 1 { + // Node ID should be in the progress list to Campaign + _, ok := n.Node.Status().Progress[n.Config.ID] + if len(n.cluster.Members()) <= 1 && ok { if err := n.Campaign(n.Ctx); err != nil { panic("raft: cannot campaign to be the leader on node restore") } @@ -779,10 +781,27 @@ func (n *Node) LeaderAddr() (string, error) { // registerNode registers a new node on the cluster memberlist func (n *Node) registerNode(node *api.RaftMember) error { + if n.cluster.IsIDRemoved(node.RaftID) { + return nil + } + member := &membership.Member{} - if n.cluster.GetMember(node.RaftID) != nil || n.cluster.IsIDRemoved(node.RaftID) { - // member already exists + existingMember := n.cluster.GetMember(node.RaftID) + if existingMember != nil { + // Member already exists + + // If the address is different from what we thought it was, + // update it. This can happen if we just joined a cluster + // and are adding ourself now with the remotely-reachable + // address. + if existingMember.Addr != node.Addr { + member.RaftMember = node + member.RaftClient = existingMember.RaftClient + member.Conn = existingMember.Conn + n.cluster.AddMember(member) + } + return nil }