1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Vendor swarmkit for 1.12.0-rc5

Signed-off-by: Aaron Lehmann <aaron.lehmann@docker.com>
This commit is contained in:
Aaron Lehmann 2016-07-26 00:15:08 -07:00
parent 4144e11d32
commit 60496af711
8 changed files with 126 additions and 45 deletions

View file

@ -139,7 +139,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267 clone git github.com/docker/containerd 0ac3cd1be170d180b2baed755e8f0da547ceb267
# cluster # cluster
clone git github.com/docker/swarmkit 4d7e44321726f011d010cdb72d2230f5db2b604e clone git github.com/docker/swarmkit 9d4c2f73124e70f8fa85f9076635b827d17b109f
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028 clone git github.com/gogo/protobuf 43a2e0b1c32252bfbbdf81f7faa7a88fb3fa4028
clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b clone git github.com/cloudflare/cfssl b895b0549c0ff676f92cf09ba971ae02bb41367b

View file

@ -1,11 +1,11 @@
package agent package agent
import ( import (
"reflect"
"time" "time"
"github.com/docker/swarmkit/agent/exec" "github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"golang.org/x/net/context" "golang.org/x/net/context"
) )
@ -175,7 +175,7 @@ func (tm *taskManager) run(ctx context.Context) {
case status := <-statusq: case status := <-statusq:
tm.task.Status = *status tm.task.Status = *status
case task := <-tm.updateq: case task := <-tm.updateq:
if tasksEqual(task, tm.task) { if equality.TasksEqualStable(task, tm.task) {
continue // ignore the update continue // ignore the update
} }
@ -241,17 +241,3 @@ func (tm *taskManager) run(ctx context.Context) {
} }
} }
} }
// tasksEqual returns true if the tasks are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a task update to a controller.
func tasksEqual(a, b *api.Task) bool {
// shallow copy
copyA, copyB := *a, *b
copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{}
copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -0,0 +1,21 @@
package equality
import (
"reflect"
"github.com/docker/swarmkit/api"
)
// TasksEqualStable returns true if the tasks are functionaly equal, ignoring status,
// version and other superfluous fields.
//
// This used to decide whether or not to propagate a task update to a controller.
func TasksEqualStable(a, b *api.Task) bool {
// shallow copy
copyA, copyB := *a, *b
copyA.Status, copyB.Status = api.TaskStatus{}, api.TaskStatus{}
copyA.Meta, copyB.Meta = api.Meta{}, api.Meta{}
return reflect.DeepEqual(&copyA, &copyB)
}

View file

@ -14,6 +14,7 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/docker/swarmkit/api" "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/api/equality"
"github.com/docker/swarmkit/ca" "github.com/docker/swarmkit/ca"
"github.com/docker/swarmkit/log" "github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state"
@ -29,7 +30,7 @@ const (
DefaultHeartBeatPeriod = 5 * time.Second DefaultHeartBeatPeriod = 5 * time.Second
defaultHeartBeatEpsilon = 500 * time.Millisecond defaultHeartBeatEpsilon = 500 * time.Millisecond
defaultGracePeriodMultiplier = 3 defaultGracePeriodMultiplier = 3
defaultRateLimitPeriod = 16 * time.Second defaultRateLimitPeriod = 8 * time.Second
// maxBatchItems is the threshold of queued writes that should // maxBatchItems is the threshold of queued writes that should
// trigger an actual transaction to commit them to the shared store. // trigger an actual transaction to commit them to the shared store.
@ -572,20 +573,44 @@ func (d *Dispatcher) Tasks(r *api.TasksRequest, stream api.Dispatcher_TasksServe
return err return err
} }
select { // bursty events should be processed in batches and sent out snapshot
case event := <-nodeTasks: const modificationBatchLimit = 200
switch v := event.(type) { const eventPausedGap = 50 * time.Millisecond
case state.EventCreateTask: var modificationCnt int
tasksMap[v.Task.ID] = v.Task // eventPaused is true when there have been modifications
case state.EventUpdateTask: // but next event has not arrived within eventPausedGap
tasksMap[v.Task.ID] = v.Task eventPaused := false
case state.EventDeleteTask:
delete(tasksMap, v.Task.ID) for modificationCnt < modificationBatchLimit && !eventPaused {
select {
case event := <-nodeTasks:
switch v := event.(type) {
case state.EventCreateTask:
tasksMap[v.Task.ID] = v.Task
modificationCnt++
case state.EventUpdateTask:
if oldTask, exists := tasksMap[v.Task.ID]; exists {
if equality.TasksEqualStable(oldTask, v.Task) {
// this update should not trigger action at agent
tasksMap[v.Task.ID] = v.Task
continue
}
}
tasksMap[v.Task.ID] = v.Task
modificationCnt++
case state.EventDeleteTask:
delete(tasksMap, v.Task.ID)
modificationCnt++
}
case <-time.After(eventPausedGap):
if modificationCnt > 0 {
eventPaused = true
}
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
} }
case <-stream.Context().Done():
return stream.Context().Err()
case <-d.ctx.Done():
return d.ctx.Err()
} }
} }
} }

View file

@ -12,10 +12,13 @@ import (
"github.com/docker/swarmkit/manager/dispatcher/heartbeat" "github.com/docker/swarmkit/manager/dispatcher/heartbeat"
) )
const rateLimitCount = 3
type registeredNode struct { type registeredNode struct {
SessionID string SessionID string
Heartbeat *heartbeat.Heartbeat Heartbeat *heartbeat.Heartbeat
Registered time.Time Registered time.Time
Attempts int
Node *api.Node Node *api.Node
Disconnect chan struct{} // signal to disconnect Disconnect chan struct{} // signal to disconnect
mu sync.Mutex mu sync.Mutex
@ -86,9 +89,14 @@ func (s *nodeStore) CheckRateLimit(id string) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
if existRn, ok := s.nodes[id]; ok { if existRn, ok := s.nodes[id]; ok {
if time.Since(existRn.Registered) < s.rateLimitPeriod { if time.Since(existRn.Registered) > s.rateLimitPeriod {
return grpc.Errorf(codes.Unavailable, "node %s attempted registration too recently", id) existRn.Attempts = 0
} }
existRn.Attempts++
if existRn.Attempts > rateLimitCount {
return grpc.Errorf(codes.Unavailable, "node %s exceeded rate limit count of registrations", id)
}
existRn.Registered = time.Now()
} }
return nil return nil
} }
@ -97,14 +105,22 @@ func (s *nodeStore) CheckRateLimit(id string) error {
func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode { func (s *nodeStore) Add(n *api.Node, expireFunc func()) *registeredNode {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
var attempts int
var registered time.Time
if existRn, ok := s.nodes[n.ID]; ok { if existRn, ok := s.nodes[n.ID]; ok {
attempts = existRn.Attempts
registered = existRn.Registered
existRn.Heartbeat.Stop() existRn.Heartbeat.Stop()
delete(s.nodes, n.ID) delete(s.nodes, n.ID)
} }
if registered.IsZero() {
registered = time.Now()
}
rn := &registeredNode{ rn := &registeredNode{
SessionID: identity.NewID(), // session ID is local to the dispatcher. SessionID: identity.NewID(), // session ID is local to the dispatcher.
Node: n, Node: n,
Registered: time.Now(), Registered: registered,
Attempts: attempts,
Disconnect: make(chan struct{}), Disconnect: make(chan struct{}),
} }
s.nodes[n.ID] = rn s.nodes[n.ID] = rn

View file

@ -3,7 +3,6 @@ package manager
import ( import (
"crypto/x509" "crypto/x509"
"encoding/pem" "encoding/pem"
"errors"
"fmt" "fmt"
"net" "net"
"os" "os"
@ -121,16 +120,26 @@ func New(config *Config) (*Manager, error) {
config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String() config.ProtoAddr["tcp"] = config.ProtoListener["tcp"].Addr().String()
} }
tcpAddr := config.ProtoAddr["tcp"] // If an AdvertiseAddr was specified, we use that as our
// externally-reachable address.
if config.AdvertiseAddr != "" { tcpAddr := config.AdvertiseAddr
tcpAddr = config.AdvertiseAddr
}
if tcpAddr == "" { if tcpAddr == "" {
return nil, errors.New("no tcp listen address or listener provided") // Otherwise, we know we are joining an existing swarm. Use a
// wildcard address to trigger remote autodetection of our
// address.
_, tcpAddrPort, err := net.SplitHostPort(config.ProtoAddr["tcp"])
if err != nil {
return nil, fmt.Errorf("missing or invalid listen address %s", config.ProtoAddr["tcp"])
}
// Even with an IPv6 listening address, it's okay to use
// 0.0.0.0 here. Any "unspecified" (wildcard) IP will
// be substituted with the actual source address.
tcpAddr = net.JoinHostPort("0.0.0.0", tcpAddrPort)
} }
// FIXME(aaronl): Remove this. It appears to be unused.
dispatcherConfig.Addr = tcpAddr dispatcherConfig.Addr = tcpAddr
err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700) err := os.MkdirAll(filepath.Dir(config.ProtoAddr["unix"]), 0700)

View file

@ -7,7 +7,12 @@ var (
// Always check for readiness first. // Always check for readiness first.
&ReadyFilter{}, &ReadyFilter{},
&ResourceFilter{}, &ResourceFilter{},
&PluginFilter{},
// TODO(stevvooe): Do not filter based on plugins since they are lazy
// loaded in the engine. We can add this back when we can schedule
// plugins in the future.
// &PluginFilter{},
&ConstraintFilter{}, &ConstraintFilter{},
} }
) )

View file

@ -401,7 +401,9 @@ func (n *Node) Run(ctx context.Context) error {
// restoring from the state, campaign to be the // restoring from the state, campaign to be the
// leader. // leader.
if !n.restored { if !n.restored {
if len(n.cluster.Members()) <= 1 { // Node ID should be in the progress list to Campaign
_, ok := n.Node.Status().Progress[n.Config.ID]
if len(n.cluster.Members()) <= 1 && ok {
if err := n.Campaign(n.Ctx); err != nil { if err := n.Campaign(n.Ctx); err != nil {
panic("raft: cannot campaign to be the leader on node restore") panic("raft: cannot campaign to be the leader on node restore")
} }
@ -779,10 +781,27 @@ func (n *Node) LeaderAddr() (string, error) {
// registerNode registers a new node on the cluster memberlist // registerNode registers a new node on the cluster memberlist
func (n *Node) registerNode(node *api.RaftMember) error { func (n *Node) registerNode(node *api.RaftMember) error {
if n.cluster.IsIDRemoved(node.RaftID) {
return nil
}
member := &membership.Member{} member := &membership.Member{}
if n.cluster.GetMember(node.RaftID) != nil || n.cluster.IsIDRemoved(node.RaftID) { existingMember := n.cluster.GetMember(node.RaftID)
// member already exists if existingMember != nil {
// Member already exists
// If the address is different from what we thought it was,
// update it. This can happen if we just joined a cluster
// and are adding ourself now with the remotely-reachable
// address.
if existingMember.Addr != node.Addr {
member.RaftMember = node
member.RaftClient = existingMember.RaftClient
member.Conn = existingMember.Conn
n.cluster.AddMember(member)
}
return nil return nil
} }