diff --git a/hack/vendor.sh b/hack/vendor.sh index f826489b86..61899c756c 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -143,7 +143,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0 clone git github.com/docker/containerd 837e8c5e1cad013ed57f5c2090c8591c10cbbdae # cluster -clone git github.com/docker/swarmkit 7e63bdefb94e5bea2641e8bdebae2cfa61a0ed44 +clone git github.com/docker/swarmkit 1fed8d2a2ccd2a9b6d6fb864d4ad3461fc6dc3eb clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9 clone git github.com/gogo/protobuf v0.3 clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a diff --git a/vendor/src/github.com/docker/swarmkit/agent/agent.go b/vendor/src/github.com/docker/swarmkit/agent/agent.go index a96ac2d38a..22188132da 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/agent.go +++ b/vendor/src/github.com/docker/swarmkit/agent/agent.go @@ -171,11 +171,12 @@ func (a *Agent) run(ctx context.Context) { case msg := <-session.assignments: switch msg.Type { case api.AssignmentsMessage_COMPLETE: - if err := a.worker.AssignTasks(ctx, msg.UpdateTasks); err != nil { + // Need to assign secrets before tasks, because tasks might depend on new secrets + if err := a.worker.Assign(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to synchronize worker assignments") } case api.AssignmentsMessage_INCREMENTAL: - if err := a.worker.UpdateTasks(ctx, msg.UpdateTasks, msg.RemoveTasks); err != nil { + if err := a.worker.Update(ctx, msg.Changes); err != nil { log.G(ctx).WithError(err).Error("failed to update worker assignments") } } diff --git a/vendor/src/github.com/docker/swarmkit/agent/node.go b/vendor/src/github.com/docker/swarmkit/agent/node.go index 7992b4f24e..35c720dc18 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/node.go +++ b/vendor/src/github.com/docker/swarmkit/agent/node.go @@ -101,6 +101,21 @@ type Node struct { roleChangeReq chan api.NodeRole // used to send role updates from the dispatcher api on promotion/demotion } +// RemoteAPIAddr returns address on which remote manager api listens. +// Returns nil if node is not manager. +func (n *Node) RemoteAPIAddr() (string, error) { + n.RLock() + defer n.RUnlock() + if n.manager == nil { + return "", errors.Errorf("node is not manager") + } + addr := n.manager.Addr() + if addr == nil { + return "", errors.Errorf("manager addr is not set") + } + return addr.String(), nil +} + // NewNode returns new Node instance. func NewNode(c *NodeConfig) (*Node, error) { if err := os.MkdirAll(c.StateDir, 0700); err != nil { @@ -627,7 +642,12 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig go func(ready chan struct{}) { select { case <-ready: - n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: n.config.ListenRemoteAPI}, remotes.DefaultObservationWeight) + addr, err := n.RemoteAPIAddr() + if err != nil { + log.G(ctx).WithError(err).Errorf("get remote api addr") + } else { + n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight) + } case <-connCtx.Done(): } }(ready) diff --git a/vendor/src/github.com/docker/swarmkit/agent/secrets.go b/vendor/src/github.com/docker/swarmkit/agent/secrets.go new file mode 100644 index 0000000000..a3faabfa18 --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/agent/secrets.go @@ -0,0 +1,53 @@ +package agent + +import ( + "sync" + + "github.com/docker/swarmkit/api" +) + +// secrets is a map that keeps all the currenty available secrets to the agent +// mapped by secret ID +type secrets struct { + mu sync.RWMutex + m map[string]api.Secret +} + +func newSecrets() *secrets { + return &secrets{ + m: make(map[string]api.Secret), + } +} + +// Get returns a secret by ID. If the secret doesn't exist, returns nil. +func (s *secrets) Get(secretID string) api.Secret { + s.mu.RLock() + defer s.mu.RUnlock() + return s.m[secretID] +} + +// Add adds one or more secrets to the secret map +func (s *secrets) Add(secrets ...api.Secret) { + s.mu.Lock() + defer s.mu.Unlock() + for _, secret := range secrets { + s.m[secret.ID] = secret + } +} + +// Remove removes one or more secrets by ID from the secret map. Succeeds +// whether or not the given IDs are in the map. +func (s *secrets) Remove(secrets []string) { + s.mu.Lock() + defer s.mu.Unlock() + for _, secret := range secrets { + delete(s.m, secret) + } +} + +// Reset removes all the secrets +func (s *secrets) Reset() { + s.mu.Lock() + defer s.mu.Unlock() + s.m = make(map[string]api.Secret) +} diff --git a/vendor/src/github.com/docker/swarmkit/agent/session.go b/vendor/src/github.com/docker/swarmkit/agent/session.go index e7efdd3a21..035766c15b 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/session.go +++ b/vendor/src/github.com/docker/swarmkit/agent/session.go @@ -252,12 +252,26 @@ func (s *session) watch(ctx context.Context) error { } } if tasksWatch != nil { + // When falling back to Tasks because of an old managers, we wrap the tasks in assignments. var taskResp *api.TasksMessage + var assignmentChanges []*api.AssignmentChange taskResp, err = tasksWatch.Recv() if err != nil { return err } - resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, UpdateTasks: taskResp.Tasks} + for _, t := range taskResp.Tasks { + taskChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Task{ + Task: t, + }, + }, + Action: api.AssignmentChange_AssignmentActionUpdate, + } + + assignmentChanges = append(assignmentChanges, taskChange) + } + resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, Changes: assignmentChanges} } // If there seems to be a gap in the stream, let's break out of the inner for and diff --git a/vendor/src/github.com/docker/swarmkit/agent/worker.go b/vendor/src/github.com/docker/swarmkit/agent/worker.go index f19c9c957b..2029f6c233 100644 --- a/vendor/src/github.com/docker/swarmkit/agent/worker.go +++ b/vendor/src/github.com/docker/swarmkit/agent/worker.go @@ -17,13 +17,13 @@ type Worker interface { // Init prepares the worker for task assignment. Init(ctx context.Context) error - // AssignTasks assigns a complete set of tasks to a worker. Any task not included in + // Assign assigns a complete set of tasks and secrets to a worker. Any task or secrets not included in // this set will be removed. - AssignTasks(ctx context.Context, tasks []*api.Task) error + Assign(ctx context.Context, assignments []*api.AssignmentChange) error - // UpdateTasks updates an incremental set of tasks to the worker. Any task not included + // Updates updates an incremental set of tasks or secrets of the worker. Any task/secret not included // either in added or removed will remain untouched. - UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error + Update(ctx context.Context, assignments []*api.AssignmentChange) error // Listen to updates about tasks controlled by the worker. When first // called, the reporter will receive all updates for all tasks controlled @@ -42,6 +42,7 @@ type worker struct { db *bolt.DB executor exec.Executor listeners map[*statusReporterKey]struct{} + secrets *secrets taskManagers map[string]*taskManager mu sync.RWMutex @@ -53,6 +54,7 @@ func newWorker(db *bolt.DB, executor exec.Executor) *worker { executor: executor, listeners: make(map[*statusReporterKey]struct{}), taskManagers: make(map[string]*taskManager), + secrets: newSecrets(), } } @@ -90,37 +92,70 @@ func (w *worker) Init(ctx context.Context) error { }) } -// AssignTasks assigns the set of tasks to the worker. Any tasks not previously known will -// be started. Any tasks that are in the task set and already running will be -// updated, if possible. Any tasks currently running on the -// worker outside the task set will be terminated. -func (w *worker) AssignTasks(ctx context.Context, tasks []*api.Task) error { +// Assign assigns a full set of tasks and secrets to the worker. +// Any tasks not previously known will be started. Any tasks that are in the task set +// and already running will be updated, if possible. Any tasks currently running on +// the worker outside the task set will be terminated. +// Any secrets not in the set of assignments will be removed. +func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange) error { w.mu.Lock() defer w.mu.Unlock() log.G(ctx).WithFields(logrus.Fields{ - "len(tasks)": len(tasks), - }).Debug("(*worker).AssignTasks") + "len(assignments)": len(assignments), + }).Debug("(*worker).Assign") - return reconcileTaskState(ctx, w, tasks, nil, true) + // Need to update secrets before tasks, because tasks might depend on new secrets + err := reconcileSecrets(ctx, w, assignments, true) + if err != nil { + return err + } + + return reconcileTaskState(ctx, w, assignments, true) } -// UpdateTasks the set of tasks to the worker. +// Update updates the set of tasks and secret for the worker. // Tasks in the added set will be added to the worker, and tasks in the removed set // will be removed from the worker -func (w *worker) UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error { +// Serets in the added set will be added to the worker, and secrets in the removed set +// will be removed from the worker. +func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error { w.mu.Lock() defer w.mu.Unlock() log.G(ctx).WithFields(logrus.Fields{ - "len(added)": len(added), - "len(removed)": len(removed), - }).Debug("(*worker).UpdateTasks") + "len(assignments)": len(assignments), + }).Debug("(*worker).Update") - return reconcileTaskState(ctx, w, added, removed, false) + err := reconcileSecrets(ctx, w, assignments, false) + if err != nil { + return err + } + + return reconcileTaskState(ctx, w, assignments, false) } -func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, removed []string, fullSnapshot bool) error { +func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error { + var ( + updatedTasks []*api.Task + removedTasks []*api.Task + ) + for _, a := range assignments { + if t := a.Assignment.GetTask(); t != nil { + switch a.Action { + case api.AssignmentChange_AssignmentActionUpdate: + updatedTasks = append(updatedTasks, t) + case api.AssignmentChange_AssignmentActionRemove: + removedTasks = append(removedTasks, t) + } + } + } + + log.G(ctx).WithFields(logrus.Fields{ + "len(updatedTasks)": len(updatedTasks), + "len(removedTasks)": len(removedTasks), + }).Debug("(*worker).reconcileTaskState") + tx, err := w.db.Begin(true) if err != nil { log.G(ctx).WithError(err).Error("failed starting transaction against task database") @@ -130,7 +165,7 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov assigned := map[string]struct{}{} - for _, task := range added { + for _, task := range updatedTasks { log.G(ctx).WithFields( logrus.Fields{ "task.id": task.ID, @@ -202,15 +237,15 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov } else { // If this was an incremental set of assignments, we're going to remove only the tasks // in the removed set - for _, taskID := range removed { - err := removeTaskAssignment(taskID) + for _, task := range removedTasks { + err := removeTaskAssignment(task.ID) if err != nil { continue } - tm, ok := w.taskManagers[taskID] + tm, ok := w.taskManagers[task.ID] if ok { - delete(w.taskManagers, taskID) + delete(w.taskManagers, task.ID) go closeManager(tm) } } @@ -219,6 +254,39 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov return tx.Commit() } +func reconcileSecrets(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error { + var ( + updatedSecrets []api.Secret + removedSecrets []string + ) + for _, a := range assignments { + if s := a.Assignment.GetSecret(); s != nil { + switch a.Action { + case api.AssignmentChange_AssignmentActionUpdate: + updatedSecrets = append(updatedSecrets, *s) + case api.AssignmentChange_AssignmentActionRemove: + removedSecrets = append(removedSecrets, s.ID) + } + + } + } + + log.G(ctx).WithFields(logrus.Fields{ + "len(updatedSecrets)": len(updatedSecrets), + "len(removedSecrets)": len(removedSecrets), + }).Debug("(*worker).reconcileSecrets") + + // If this was a complete set of secrets, we're going to clear the secrets map and add all of them + if fullSnapshot { + w.secrets.Reset() + } else { + w.secrets.Remove(removedSecrets) + } + w.secrets.Add(updatedSecrets...) + + return nil +} + func (w *worker) Listen(ctx context.Context, reporter StatusReporter) { w.mu.Lock() defer w.mu.Unlock() diff --git a/vendor/src/github.com/docker/swarmkit/api/dispatcher.pb.go b/vendor/src/github.com/docker/swarmkit/api/dispatcher.pb.go index e0bf2655fa..7e952176fa 100644 --- a/vendor/src/github.com/docker/swarmkit/api/dispatcher.pb.go +++ b/vendor/src/github.com/docker/swarmkit/api/dispatcher.pb.go @@ -35,6 +35,29 @@ var _ = proto.Marshal var _ = fmt.Errorf var _ = math.Inf +type AssignmentChange_AssignmentAction int32 + +const ( + AssignmentChange_AssignmentActionUpdate AssignmentChange_AssignmentAction = 0 + AssignmentChange_AssignmentActionRemove AssignmentChange_AssignmentAction = 1 +) + +var AssignmentChange_AssignmentAction_name = map[int32]string{ + 0: "UPDATE", + 1: "REMOVE", +} +var AssignmentChange_AssignmentAction_value = map[string]int32{ + "UPDATE": 0, + "REMOVE": 1, +} + +func (x AssignmentChange_AssignmentAction) String() string { + return proto.EnumName(AssignmentChange_AssignmentAction_name, int32(x)) +} +func (AssignmentChange_AssignmentAction) EnumDescriptor() ([]byte, []int) { + return fileDescriptorDispatcher, []int{10, 0} +} + // AssignmentType specifies whether this assignment message carries // the full state, or is an update to an existing state. type AssignmentsMessage_Type int32 @@ -57,7 +80,7 @@ func (x AssignmentsMessage_Type) String() string { return proto.EnumName(AssignmentsMessage_Type_name, int32(x)) } func (AssignmentsMessage_Type) EnumDescriptor() ([]byte, []int) { - return fileDescriptorDispatcher, []int{9, 0} + return fileDescriptorDispatcher, []int{11, 0} } // SessionRequest starts a session. @@ -214,6 +237,137 @@ func (m *AssignmentsRequest) Reset() { *m = AssignmentsReques func (*AssignmentsRequest) ProtoMessage() {} func (*AssignmentsRequest) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{8} } +type Assignment struct { + // Types that are valid to be assigned to Item: + // *Assignment_Task + // *Assignment_Secret + Item isAssignment_Item `protobuf_oneof:"item"` +} + +func (m *Assignment) Reset() { *m = Assignment{} } +func (*Assignment) ProtoMessage() {} +func (*Assignment) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{9} } + +type isAssignment_Item interface { + isAssignment_Item() + MarshalTo([]byte) (int, error) + Size() int +} + +type Assignment_Task struct { + Task *Task `protobuf:"bytes,1,opt,name=task,oneof"` +} +type Assignment_Secret struct { + Secret *Secret `protobuf:"bytes,2,opt,name=secret,oneof"` +} + +func (*Assignment_Task) isAssignment_Item() {} +func (*Assignment_Secret) isAssignment_Item() {} + +func (m *Assignment) GetItem() isAssignment_Item { + if m != nil { + return m.Item + } + return nil +} + +func (m *Assignment) GetTask() *Task { + if x, ok := m.GetItem().(*Assignment_Task); ok { + return x.Task + } + return nil +} + +func (m *Assignment) GetSecret() *Secret { + if x, ok := m.GetItem().(*Assignment_Secret); ok { + return x.Secret + } + return nil +} + +// XXX_OneofFuncs is for the internal use of the proto package. +func (*Assignment) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) { + return _Assignment_OneofMarshaler, _Assignment_OneofUnmarshaler, _Assignment_OneofSizer, []interface{}{ + (*Assignment_Task)(nil), + (*Assignment_Secret)(nil), + } +} + +func _Assignment_OneofMarshaler(msg proto.Message, b *proto.Buffer) error { + m := msg.(*Assignment) + // item + switch x := m.Item.(type) { + case *Assignment_Task: + _ = b.EncodeVarint(1<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Task); err != nil { + return err + } + case *Assignment_Secret: + _ = b.EncodeVarint(2<<3 | proto.WireBytes) + if err := b.EncodeMessage(x.Secret); err != nil { + return err + } + case nil: + default: + return fmt.Errorf("Assignment.Item has unexpected type %T", x) + } + return nil +} + +func _Assignment_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) { + m := msg.(*Assignment) + switch tag { + case 1: // item.task + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Task) + err := b.DecodeMessage(msg) + m.Item = &Assignment_Task{msg} + return true, err + case 2: // item.secret + if wire != proto.WireBytes { + return true, proto.ErrInternalBadWireType + } + msg := new(Secret) + err := b.DecodeMessage(msg) + m.Item = &Assignment_Secret{msg} + return true, err + default: + return false, nil + } +} + +func _Assignment_OneofSizer(msg proto.Message) (n int) { + m := msg.(*Assignment) + // item + switch x := m.Item.(type) { + case *Assignment_Task: + s := proto.Size(x.Task) + n += proto.SizeVarint(1<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case *Assignment_Secret: + s := proto.Size(x.Secret) + n += proto.SizeVarint(2<<3 | proto.WireBytes) + n += proto.SizeVarint(uint64(s)) + n += s + case nil: + default: + panic(fmt.Sprintf("proto: unexpected type %T in oneof", x)) + } + return n +} + +type AssignmentChange struct { + Assignment *Assignment `protobuf:"bytes,1,opt,name=assignment" json:"assignment,omitempty"` + Action AssignmentChange_AssignmentAction `protobuf:"varint,2,opt,name=action,proto3,enum=docker.swarmkit.v1.AssignmentChange_AssignmentAction" json:"action,omitempty"` +} + +func (m *AssignmentChange) Reset() { *m = AssignmentChange{} } +func (*AssignmentChange) ProtoMessage() {} +func (*AssignmentChange) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{10} } + type AssignmentsMessage struct { Type AssignmentsMessage_Type `protobuf:"varint,1,opt,name=type,proto3,enum=docker.swarmkit.v1.AssignmentsMessage_Type" json:"type,omitempty"` // AppliesTo references the previous ResultsIn value, to chain @@ -226,28 +380,13 @@ type AssignmentsMessage struct { // match against the next message's AppliesTo value and protect // against missed messages. ResultsIn string `protobuf:"bytes,3,opt,name=results_in,json=resultsIn,proto3" json:"results_in,omitempty"` - // UpdateTasks is a set of new or updated tasks to run on this node. - // In the first assignments message, it contains all of the tasks - // to run on this node. Tasks outside of this set running on the node - // should be terminated. - UpdateTasks []*Task `protobuf:"bytes,4,rep,name=update_tasks,json=updateTasks" json:"update_tasks,omitempty"` - // RemoveTasks is a set of previously-assigned task IDs to remove from the - // assignment set. It is not used in the first assignments message of - // a stream. - RemoveTasks []string `protobuf:"bytes,5,rep,name=remove_tasks,json=removeTasks" json:"remove_tasks,omitempty"` - // UpdateSecrets is a set of new or updated secrets for this node. - // In the first assignments message, it contains all of the secrets - // the node needs for itself and its assigned tasks. - UpdateSecrets []*Secret `protobuf:"bytes,6,rep,name=update_secrets,json=updateSecrets" json:"update_secrets,omitempty"` - // RemoveSecrets is a set of previously-assigned secret names to remove - // from memory. It is not used in the first assignments message of - // a stream. - RemoveSecrets []string `protobuf:"bytes,7,rep,name=remove_secrets,json=removeSecrets" json:"remove_secrets,omitempty"` + // AssignmentChange is a set of changes to apply on this node. + Changes []*AssignmentChange `protobuf:"bytes,4,rep,name=changes" json:"changes,omitempty"` } func (m *AssignmentsMessage) Reset() { *m = AssignmentsMessage{} } func (*AssignmentsMessage) ProtoMessage() {} -func (*AssignmentsMessage) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{9} } +func (*AssignmentsMessage) Descriptor() ([]byte, []int) { return fileDescriptorDispatcher, []int{11} } func init() { proto.RegisterType((*SessionRequest)(nil), "docker.swarmkit.v1.SessionRequest") @@ -260,7 +399,10 @@ func init() { proto.RegisterType((*TasksRequest)(nil), "docker.swarmkit.v1.TasksRequest") proto.RegisterType((*TasksMessage)(nil), "docker.swarmkit.v1.TasksMessage") proto.RegisterType((*AssignmentsRequest)(nil), "docker.swarmkit.v1.AssignmentsRequest") + proto.RegisterType((*Assignment)(nil), "docker.swarmkit.v1.Assignment") + proto.RegisterType((*AssignmentChange)(nil), "docker.swarmkit.v1.AssignmentChange") proto.RegisterType((*AssignmentsMessage)(nil), "docker.swarmkit.v1.AssignmentsMessage") + proto.RegisterEnum("docker.swarmkit.v1.AssignmentChange_AssignmentAction", AssignmentChange_AssignmentAction_name, AssignmentChange_AssignmentAction_value) proto.RegisterEnum("docker.swarmkit.v1.AssignmentsMessage_Type", AssignmentsMessage_Type_name, AssignmentsMessage_Type_value) } @@ -463,6 +605,44 @@ func (m *AssignmentsRequest) Copy() *AssignmentsRequest { return o } +func (m *Assignment) Copy() *Assignment { + if m == nil { + return nil + } + + o := &Assignment{} + + switch m.Item.(type) { + case *Assignment_Task: + i := &Assignment_Task{ + Task: m.GetTask().Copy(), + } + + o.Item = i + case *Assignment_Secret: + i := &Assignment_Secret{ + Secret: m.GetSecret().Copy(), + } + + o.Item = i + } + + return o +} + +func (m *AssignmentChange) Copy() *AssignmentChange { + if m == nil { + return nil + } + + o := &AssignmentChange{ + Assignment: m.Assignment.Copy(), + Action: m.Action, + } + + return o +} + func (m *AssignmentsMessage) Copy() *AssignmentsMessage { if m == nil { return nil @@ -474,31 +654,10 @@ func (m *AssignmentsMessage) Copy() *AssignmentsMessage { ResultsIn: m.ResultsIn, } - if m.UpdateTasks != nil { - o.UpdateTasks = make([]*Task, 0, len(m.UpdateTasks)) - for _, v := range m.UpdateTasks { - o.UpdateTasks = append(o.UpdateTasks, v.Copy()) - } - } - - if m.RemoveTasks != nil { - o.RemoveTasks = make([]string, 0, len(m.RemoveTasks)) - for _, v := range m.RemoveTasks { - o.RemoveTasks = append(o.RemoveTasks, v) - } - } - - if m.UpdateSecrets != nil { - o.UpdateSecrets = make([]*Secret, 0, len(m.UpdateSecrets)) - for _, v := range m.UpdateSecrets { - o.UpdateSecrets = append(o.UpdateSecrets, v.Copy()) - } - } - - if m.RemoveSecrets != nil { - o.RemoveSecrets = make([]string, 0, len(m.RemoveSecrets)) - for _, v := range m.RemoveSecrets { - o.RemoveSecrets = append(o.RemoveSecrets, v) + if m.Changes != nil { + o.Changes = make([]*AssignmentChange, 0, len(m.Changes)) + for _, v := range m.Changes { + o.Changes = append(o.Changes, v.Copy()) } } @@ -624,23 +783,59 @@ func (this *AssignmentsRequest) GoString() string { s = append(s, "}") return strings.Join(s, "") } +func (this *Assignment) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&api.Assignment{") + if this.Item != nil { + s = append(s, "Item: "+fmt.Sprintf("%#v", this.Item)+",\n") + } + s = append(s, "}") + return strings.Join(s, "") +} +func (this *Assignment_Task) GoString() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&api.Assignment_Task{` + + `Task:` + fmt.Sprintf("%#v", this.Task) + `}`}, ", ") + return s +} +func (this *Assignment_Secret) GoString() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&api.Assignment_Secret{` + + `Secret:` + fmt.Sprintf("%#v", this.Secret) + `}`}, ", ") + return s +} +func (this *AssignmentChange) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 6) + s = append(s, "&api.AssignmentChange{") + if this.Assignment != nil { + s = append(s, "Assignment: "+fmt.Sprintf("%#v", this.Assignment)+",\n") + } + s = append(s, "Action: "+fmt.Sprintf("%#v", this.Action)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} func (this *AssignmentsMessage) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 11) + s := make([]string, 0, 8) s = append(s, "&api.AssignmentsMessage{") s = append(s, "Type: "+fmt.Sprintf("%#v", this.Type)+",\n") s = append(s, "AppliesTo: "+fmt.Sprintf("%#v", this.AppliesTo)+",\n") s = append(s, "ResultsIn: "+fmt.Sprintf("%#v", this.ResultsIn)+",\n") - if this.UpdateTasks != nil { - s = append(s, "UpdateTasks: "+fmt.Sprintf("%#v", this.UpdateTasks)+",\n") + if this.Changes != nil { + s = append(s, "Changes: "+fmt.Sprintf("%#v", this.Changes)+",\n") } - s = append(s, "RemoveTasks: "+fmt.Sprintf("%#v", this.RemoveTasks)+",\n") - if this.UpdateSecrets != nil { - s = append(s, "UpdateSecrets: "+fmt.Sprintf("%#v", this.UpdateSecrets)+",\n") - } - s = append(s, "RemoveSecrets: "+fmt.Sprintf("%#v", this.RemoveSecrets)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -1313,6 +1508,92 @@ func (m *AssignmentsRequest) MarshalTo(data []byte) (int, error) { return i, nil } +func (m *Assignment) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *Assignment) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Item != nil { + nn5, err := m.Item.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += nn5 + } + return i, nil +} + +func (m *Assignment_Task) MarshalTo(data []byte) (int, error) { + i := 0 + if m.Task != nil { + data[i] = 0xa + i++ + i = encodeVarintDispatcher(data, i, uint64(m.Task.Size())) + n6, err := m.Task.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n6 + } + return i, nil +} +func (m *Assignment_Secret) MarshalTo(data []byte) (int, error) { + i := 0 + if m.Secret != nil { + data[i] = 0x12 + i++ + i = encodeVarintDispatcher(data, i, uint64(m.Secret.Size())) + n7, err := m.Secret.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n7 + } + return i, nil +} +func (m *AssignmentChange) Marshal() (data []byte, err error) { + size := m.Size() + data = make([]byte, size) + n, err := m.MarshalTo(data) + if err != nil { + return nil, err + } + return data[:n], nil +} + +func (m *AssignmentChange) MarshalTo(data []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Assignment != nil { + data[i] = 0xa + i++ + i = encodeVarintDispatcher(data, i, uint64(m.Assignment.Size())) + n8, err := m.Assignment.MarshalTo(data[i:]) + if err != nil { + return 0, err + } + i += n8 + } + if m.Action != 0 { + data[i] = 0x10 + i++ + i = encodeVarintDispatcher(data, i, uint64(m.Action)) + } + return i, nil +} + func (m *AssignmentsMessage) Marshal() (data []byte, err error) { size := m.Size() data = make([]byte, size) @@ -1345,8 +1626,8 @@ func (m *AssignmentsMessage) MarshalTo(data []byte) (int, error) { i = encodeVarintDispatcher(data, i, uint64(len(m.ResultsIn))) i += copy(data[i:], m.ResultsIn) } - if len(m.UpdateTasks) > 0 { - for _, msg := range m.UpdateTasks { + if len(m.Changes) > 0 { + for _, msg := range m.Changes { data[i] = 0x22 i++ i = encodeVarintDispatcher(data, i, uint64(msg.Size())) @@ -1357,48 +1638,6 @@ func (m *AssignmentsMessage) MarshalTo(data []byte) (int, error) { i += n } } - if len(m.RemoveTasks) > 0 { - for _, s := range m.RemoveTasks { - data[i] = 0x2a - i++ - l = len(s) - for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) - l >>= 7 - i++ - } - data[i] = uint8(l) - i++ - i += copy(data[i:], s) - } - } - if len(m.UpdateSecrets) > 0 { - for _, msg := range m.UpdateSecrets { - data[i] = 0x32 - i++ - i = encodeVarintDispatcher(data, i, uint64(msg.Size())) - n, err := msg.MarshalTo(data[i:]) - if err != nil { - return 0, err - } - i += n - } - } - if len(m.RemoveSecrets) > 0 { - for _, s := range m.RemoveSecrets { - data[i] = 0x3a - i++ - l = len(s) - for l >= 1<<7 { - data[i] = uint8(uint64(l)&0x7f | 0x80) - l >>= 7 - i++ - } - data[i] = uint8(l) - i++ - i += copy(data[i:], s) - } - } return i, nil } @@ -1789,6 +2028,46 @@ func (m *AssignmentsRequest) Size() (n int) { return n } +func (m *Assignment) Size() (n int) { + var l int + _ = l + if m.Item != nil { + n += m.Item.Size() + } + return n +} + +func (m *Assignment_Task) Size() (n int) { + var l int + _ = l + if m.Task != nil { + l = m.Task.Size() + n += 1 + l + sovDispatcher(uint64(l)) + } + return n +} +func (m *Assignment_Secret) Size() (n int) { + var l int + _ = l + if m.Secret != nil { + l = m.Secret.Size() + n += 1 + l + sovDispatcher(uint64(l)) + } + return n +} +func (m *AssignmentChange) Size() (n int) { + var l int + _ = l + if m.Assignment != nil { + l = m.Assignment.Size() + n += 1 + l + sovDispatcher(uint64(l)) + } + if m.Action != 0 { + n += 1 + sovDispatcher(uint64(m.Action)) + } + return n +} + func (m *AssignmentsMessage) Size() (n int) { var l int _ = l @@ -1803,30 +2082,12 @@ func (m *AssignmentsMessage) Size() (n int) { if l > 0 { n += 1 + l + sovDispatcher(uint64(l)) } - if len(m.UpdateTasks) > 0 { - for _, e := range m.UpdateTasks { + if len(m.Changes) > 0 { + for _, e := range m.Changes { l = e.Size() n += 1 + l + sovDispatcher(uint64(l)) } } - if len(m.RemoveTasks) > 0 { - for _, s := range m.RemoveTasks { - l = len(s) - n += 1 + l + sovDispatcher(uint64(l)) - } - } - if len(m.UpdateSecrets) > 0 { - for _, e := range m.UpdateSecrets { - l = e.Size() - n += 1 + l + sovDispatcher(uint64(l)) - } - } - if len(m.RemoveSecrets) > 0 { - for _, s := range m.RemoveSecrets { - l = len(s) - n += 1 + l + sovDispatcher(uint64(l)) - } - } return n } @@ -1948,6 +2209,47 @@ func (this *AssignmentsRequest) String() string { }, "") return s } +func (this *Assignment) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Assignment{`, + `Item:` + fmt.Sprintf("%v", this.Item) + `,`, + `}`, + }, "") + return s +} +func (this *Assignment_Task) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Assignment_Task{`, + `Task:` + strings.Replace(fmt.Sprintf("%v", this.Task), "Task", "Task", 1) + `,`, + `}`, + }, "") + return s +} +func (this *Assignment_Secret) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&Assignment_Secret{`, + `Secret:` + strings.Replace(fmt.Sprintf("%v", this.Secret), "Secret", "Secret", 1) + `,`, + `}`, + }, "") + return s +} +func (this *AssignmentChange) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&AssignmentChange{`, + `Assignment:` + strings.Replace(fmt.Sprintf("%v", this.Assignment), "Assignment", "Assignment", 1) + `,`, + `Action:` + fmt.Sprintf("%v", this.Action) + `,`, + `}`, + }, "") + return s +} func (this *AssignmentsMessage) String() string { if this == nil { return "nil" @@ -1956,10 +2258,7 @@ func (this *AssignmentsMessage) String() string { `Type:` + fmt.Sprintf("%v", this.Type) + `,`, `AppliesTo:` + fmt.Sprintf("%v", this.AppliesTo) + `,`, `ResultsIn:` + fmt.Sprintf("%v", this.ResultsIn) + `,`, - `UpdateTasks:` + strings.Replace(fmt.Sprintf("%v", this.UpdateTasks), "Task", "Task", 1) + `,`, - `RemoveTasks:` + fmt.Sprintf("%v", this.RemoveTasks) + `,`, - `UpdateSecrets:` + strings.Replace(fmt.Sprintf("%v", this.UpdateSecrets), "Secret", "Secret", 1) + `,`, - `RemoveSecrets:` + fmt.Sprintf("%v", this.RemoveSecrets) + `,`, + `Changes:` + strings.Replace(fmt.Sprintf("%v", this.Changes), "AssignmentChange", "AssignmentChange", 1) + `,`, `}`, }, "") return s @@ -2928,6 +3227,222 @@ func (m *AssignmentsRequest) Unmarshal(data []byte) error { } return nil } +func (m *Assignment) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Assignment: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Assignment: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Task", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &Task{} + if err := v.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + m.Item = &Assignment_Task{v} + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Secret", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + v := &Secret{} + if err := v.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + m.Item = &Assignment_Secret{v} + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipDispatcher(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDispatcher + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *AssignmentChange) Unmarshal(data []byte) error { + l := len(data) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: AssignmentChange: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: AssignmentChange: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Assignment", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + msglen |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthDispatcher + } + postIndex := iNdEx + msglen + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.Assignment == nil { + m.Assignment = &Assignment{} + } + if err := m.Assignment.Unmarshal(data[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Action", wireType) + } + m.Action = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowDispatcher + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := data[iNdEx] + iNdEx++ + m.Action |= (AssignmentChange_AssignmentAction(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipDispatcher(data[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthDispatcher + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *AssignmentsMessage) Unmarshal(data []byte) error { l := len(data) iNdEx := 0 @@ -3036,7 +3551,7 @@ func (m *AssignmentsMessage) Unmarshal(data []byte) error { iNdEx = postIndex case 4: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field UpdateTasks", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Changes", wireType) } var msglen int for shift := uint(0); ; shift += 7 { @@ -3060,100 +3575,11 @@ func (m *AssignmentsMessage) Unmarshal(data []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.UpdateTasks = append(m.UpdateTasks, &Task{}) - if err := m.UpdateTasks[len(m.UpdateTasks)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { + m.Changes = append(m.Changes, &AssignmentChange{}) + if err := m.Changes[len(m.Changes)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { return err } iNdEx = postIndex - case 5: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RemoveTasks", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDispatcher - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthDispatcher - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RemoveTasks = append(m.RemoveTasks, string(data[iNdEx:postIndex])) - iNdEx = postIndex - case 6: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field UpdateSecrets", wireType) - } - var msglen int - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDispatcher - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - msglen |= (int(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - if msglen < 0 { - return ErrInvalidLengthDispatcher - } - postIndex := iNdEx + msglen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.UpdateSecrets = append(m.UpdateSecrets, &Secret{}) - if err := m.UpdateSecrets[len(m.UpdateSecrets)-1].Unmarshal(data[iNdEx:postIndex]); err != nil { - return err - } - iNdEx = postIndex - case 7: - if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field RemoveSecrets", wireType) - } - var stringLen uint64 - for shift := uint(0); ; shift += 7 { - if shift >= 64 { - return ErrIntOverflowDispatcher - } - if iNdEx >= l { - return io.ErrUnexpectedEOF - } - b := data[iNdEx] - iNdEx++ - stringLen |= (uint64(b) & 0x7F) << shift - if b < 0x80 { - break - } - } - intStringLen := int(stringLen) - if intStringLen < 0 { - return ErrInvalidLengthDispatcher - } - postIndex := iNdEx + intStringLen - if postIndex > l { - return io.ErrUnexpectedEOF - } - m.RemoveSecrets = append(m.RemoveSecrets, string(data[iNdEx:postIndex])) - iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipDispatcher(data[iNdEx:]) @@ -3283,59 +3709,64 @@ var ( func init() { proto.RegisterFile("dispatcher.proto", fileDescriptorDispatcher) } var fileDescriptorDispatcher = []byte{ - // 858 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x56, 0x41, 0x6f, 0x1b, 0x45, - 0x14, 0xce, 0xd8, 0x8e, 0x53, 0xbf, 0xb5, 0x83, 0x19, 0x2a, 0xba, 0xb2, 0x5a, 0xc7, 0xdd, 0x90, - 0x28, 0x52, 0x83, 0x53, 0x8c, 0xc4, 0x01, 0x22, 0x20, 0xae, 0x2d, 0x61, 0xb5, 0x49, 0xab, 0x8d, - 0xa1, 0x47, 0x6b, 0xe3, 0x7d, 0x72, 0x17, 0x27, 0x3b, 0xcb, 0xcc, 0x6c, 0x8b, 0x0f, 0x48, 0x48, - 0x14, 0x89, 0x23, 0xe2, 0xd4, 0x5f, 0xc1, 0xef, 0x88, 0x38, 0x71, 0xe4, 0x14, 0x11, 0xff, 0x00, - 0xc4, 0x4f, 0x40, 0xbb, 0x33, 0xeb, 0x18, 0x67, 0xdd, 0x38, 0x39, 0x65, 0xe7, 0xcd, 0xf7, 0x7d, - 0xef, 0xd3, 0x7b, 0xf3, 0x5e, 0x0c, 0x65, 0xd7, 0x13, 0x81, 0x23, 0xfb, 0x2f, 0x90, 0xd7, 0x03, - 0xce, 0x24, 0xa3, 0xd4, 0x65, 0xfd, 0x21, 0xf2, 0xba, 0x78, 0xe5, 0xf0, 0x93, 0xa1, 0x27, 0xeb, - 0x2f, 0x3f, 0xaa, 0x18, 0x72, 0x14, 0xa0, 0x50, 0x80, 0x4a, 0x89, 0x1d, 0x7d, 0x8b, 0x7d, 0x99, - 0x1c, 0x6f, 0x0f, 0xd8, 0x80, 0xc5, 0x9f, 0x3b, 0xd1, 0x97, 0x8e, 0xbe, 0x17, 0x1c, 0x87, 0x03, - 0xcf, 0xdf, 0x51, 0x7f, 0x74, 0xf0, 0x8e, 0x1b, 0x72, 0x47, 0x7a, 0xcc, 0xdf, 0x49, 0x3e, 0xd4, - 0x85, 0xf5, 0x33, 0x81, 0xd5, 0x43, 0x14, 0xc2, 0x63, 0xbe, 0x8d, 0xdf, 0x85, 0x28, 0x24, 0x6d, - 0x83, 0xe1, 0xa2, 0xe8, 0x73, 0x2f, 0x88, 0x70, 0x26, 0xa9, 0x91, 0x2d, 0xa3, 0xb1, 0x5e, 0xbf, - 0x6c, 0xae, 0x7e, 0xc0, 0x5c, 0x6c, 0x5d, 0x40, 0xed, 0x69, 0x1e, 0xdd, 0x06, 0x10, 0x4a, 0xb8, - 0xe7, 0xb9, 0x66, 0xa6, 0x46, 0xb6, 0x0a, 0xcd, 0xd2, 0xf8, 0x6c, 0xad, 0xa0, 0xd3, 0x75, 0x5a, - 0x76, 0x41, 0x03, 0x3a, 0xae, 0xf5, 0x53, 0x66, 0xe2, 0x63, 0x1f, 0x85, 0x70, 0x06, 0x38, 0x23, - 0x40, 0xde, 0x2e, 0x40, 0xb7, 0x21, 0xe7, 0x33, 0x17, 0xe3, 0x44, 0x46, 0xc3, 0x9c, 0x67, 0xd7, - 0x8e, 0x51, 0x74, 0x17, 0x6e, 0x9d, 0x38, 0xbe, 0x33, 0x40, 0x2e, 0xcc, 0x6c, 0x2d, 0xbb, 0x65, - 0x34, 0x6a, 0x69, 0x8c, 0xe7, 0xe8, 0x0d, 0x5e, 0x48, 0x74, 0x9f, 0x21, 0x72, 0x7b, 0xc2, 0xa0, - 0xcf, 0xe1, 0x7d, 0x1f, 0xe5, 0x2b, 0xc6, 0x87, 0xbd, 0x23, 0xc6, 0xa4, 0x90, 0xdc, 0x09, 0x7a, - 0x43, 0x1c, 0x09, 0x33, 0x17, 0x6b, 0xdd, 0x4f, 0xd3, 0x6a, 0xfb, 0x7d, 0x3e, 0x8a, 0x4b, 0xf3, - 0x18, 0x47, 0xf6, 0x6d, 0x2d, 0xd0, 0x4c, 0xf8, 0x8f, 0x71, 0x24, 0xac, 0x2f, 0xa1, 0xfc, 0x15, - 0x3a, 0x5c, 0x1e, 0xa1, 0x23, 0x93, 0x76, 0x5c, 0xab, 0x0c, 0xd6, 0x53, 0x78, 0x77, 0x4a, 0x41, - 0x04, 0xcc, 0x17, 0x48, 0x3f, 0x85, 0x7c, 0x80, 0xdc, 0x63, 0xae, 0x6e, 0xe6, 0xdd, 0x34, 0x7f, - 0x2d, 0xfd, 0x30, 0x9a, 0xb9, 0xd3, 0xb3, 0xb5, 0x25, 0x5b, 0x33, 0xac, 0x5f, 0x33, 0x70, 0xe7, - 0xeb, 0xc0, 0x75, 0x24, 0x76, 0x1d, 0x31, 0x3c, 0x94, 0x8e, 0x0c, 0xc5, 0x8d, 0xac, 0xd1, 0x6f, - 0x60, 0x25, 0x8c, 0x85, 0x92, 0x92, 0xef, 0xa6, 0xd9, 0x98, 0x93, 0xab, 0x7e, 0x11, 0x51, 0x08, - 0x3b, 0x11, 0xab, 0x30, 0x28, 0xcf, 0x5e, 0xd2, 0x75, 0x58, 0x91, 0x8e, 0x18, 0x5e, 0xd8, 0x82, - 0xf1, 0xd9, 0x5a, 0x3e, 0x82, 0x75, 0x5a, 0x76, 0x3e, 0xba, 0xea, 0xb8, 0xf4, 0x13, 0xc8, 0x8b, - 0x98, 0xa4, 0x1f, 0x4d, 0x35, 0xcd, 0xcf, 0x94, 0x13, 0x8d, 0xb6, 0x2a, 0x60, 0x5e, 0x76, 0xa9, - 0x4a, 0x6d, 0xed, 0x42, 0x31, 0x8a, 0xde, 0xac, 0x44, 0xd6, 0xe7, 0x9a, 0x9d, 0x8c, 0x40, 0x1d, - 0x96, 0x23, 0xaf, 0xc2, 0x24, 0x71, 0xc1, 0xcc, 0x79, 0x06, 0x6d, 0x05, 0xb3, 0x9a, 0x40, 0xf7, - 0x84, 0xf0, 0x06, 0xfe, 0x09, 0xfa, 0xf2, 0x86, 0x1e, 0x5e, 0x67, 0xff, 0x27, 0x92, 0x58, 0xf9, - 0x02, 0x72, 0xd1, 0x2a, 0x8a, 0xe9, 0xab, 0x8d, 0x07, 0x69, 0x4e, 0x2e, 0xb3, 0xea, 0xdd, 0x51, - 0x80, 0x76, 0x4c, 0xa4, 0xf7, 0x00, 0x9c, 0x20, 0x38, 0xf6, 0x50, 0xf4, 0x24, 0x53, 0xfb, 0xc0, - 0x2e, 0xe8, 0x48, 0x97, 0x45, 0xd7, 0x1c, 0x45, 0x78, 0x2c, 0x45, 0xcf, 0xf3, 0xcd, 0xac, 0xba, - 0xd6, 0x91, 0x8e, 0x4f, 0x3f, 0x83, 0xa2, 0xea, 0x77, 0x4f, 0x15, 0x24, 0x77, 0x45, 0x41, 0x8c, - 0x70, 0xd2, 0x21, 0x41, 0xef, 0x43, 0x91, 0xe3, 0x09, 0x7b, 0x99, 0x90, 0x97, 0x6b, 0xd9, 0xad, - 0x82, 0x6d, 0xa8, 0x98, 0x82, 0xec, 0xc1, 0xaa, 0xd6, 0x17, 0xd8, 0xe7, 0x28, 0x85, 0x99, 0x8f, - 0x33, 0x54, 0xd2, 0x32, 0x1c, 0xc6, 0x10, 0xbb, 0xa4, 0x18, 0xea, 0x24, 0xe8, 0x06, 0xac, 0xea, - 0x2c, 0x89, 0xc4, 0x4a, 0x9c, 0xa7, 0xa4, 0xa2, 0x1a, 0x66, 0x6d, 0x40, 0x2e, 0xaa, 0x0a, 0x2d, - 0xc2, 0xad, 0x47, 0x4f, 0xf7, 0x9f, 0x3d, 0x69, 0x77, 0xdb, 0xe5, 0x25, 0xfa, 0x0e, 0x18, 0x9d, - 0x83, 0x47, 0x76, 0x7b, 0xbf, 0x7d, 0xd0, 0xdd, 0x7b, 0x52, 0x26, 0x8d, 0x37, 0xcb, 0x00, 0xad, - 0xc9, 0x7f, 0x08, 0xfa, 0x3d, 0xac, 0xe8, 0x6e, 0x51, 0x2b, 0xdd, 0xd2, 0xf4, 0x0e, 0xaf, 0xbc, - 0x0d, 0xa3, 0x7b, 0x63, 0xad, 0xff, 0xf1, 0xfb, 0x3f, 0x6f, 0x32, 0xf7, 0xa0, 0x18, 0x63, 0x3e, - 0x8c, 0xb6, 0x11, 0x72, 0x28, 0xa9, 0x93, 0xde, 0x75, 0x0f, 0x09, 0xfd, 0x01, 0x0a, 0x93, 0x8d, - 0x42, 0x3f, 0x48, 0xd3, 0x9d, 0x5d, 0x59, 0x95, 0x8d, 0x2b, 0x50, 0x7a, 0x56, 0x16, 0x31, 0x40, - 0x7f, 0x23, 0x50, 0x9e, 0x9d, 0x36, 0xfa, 0xe0, 0x1a, 0x9b, 0xa3, 0xb2, 0xbd, 0x18, 0xf8, 0x3a, - 0xa6, 0x42, 0x58, 0x56, 0xcf, 0xa6, 0x36, 0xef, 0x01, 0x4e, 0xb2, 0xcf, 0x47, 0x24, 0x7d, 0xd8, - 0x5c, 0x20, 0xe3, 0x2f, 0x19, 0xf2, 0x90, 0xd0, 0xd7, 0x04, 0x8c, 0xa9, 0x21, 0xa3, 0x9b, 0x57, - 0x4c, 0x61, 0xe2, 0x61, 0x73, 0xb1, 0x69, 0x5d, 0xf0, 0x45, 0x34, 0xef, 0x9e, 0x9e, 0x57, 0x97, - 0xfe, 0x3a, 0xaf, 0x2e, 0xfd, 0x7b, 0x5e, 0x25, 0x3f, 0x8e, 0xab, 0xe4, 0x74, 0x5c, 0x25, 0x7f, - 0x8e, 0xab, 0xe4, 0xef, 0x71, 0x95, 0x1c, 0xe5, 0xe3, 0x1f, 0x16, 0x1f, 0xff, 0x17, 0x00, 0x00, - 0xff, 0xff, 0xf5, 0xa0, 0x46, 0x49, 0xe0, 0x08, 0x00, 0x00, + // 939 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x9c, 0x56, 0x4f, 0x6f, 0xdb, 0xc6, + 0x13, 0xd5, 0xca, 0x32, 0x1d, 0x8f, 0x6c, 0xff, 0xf4, 0xdb, 0x06, 0x89, 0x40, 0x24, 0xb2, 0x4a, + 0x37, 0x82, 0x81, 0xb8, 0x72, 0xaa, 0xfe, 0x39, 0x14, 0x86, 0x5b, 0xcb, 0x12, 0x60, 0x21, 0x91, + 0x6d, 0xac, 0x95, 0xe4, 0x28, 0x50, 0xe2, 0x40, 0x66, 0x65, 0x71, 0x59, 0xee, 0x2a, 0xa9, 0x0a, + 0x14, 0x28, 0xd0, 0x06, 0x28, 0x7a, 0x2a, 0x7a, 0xca, 0xa5, 0x5f, 0xa1, 0x9f, 0xc3, 0xe8, 0xa9, + 0xc7, 0x9e, 0x8c, 0x5a, 0x1f, 0xa0, 0xe8, 0xb9, 0xa7, 0x82, 0xe4, 0x52, 0x52, 0x15, 0xca, 0x91, + 0x7d, 0x12, 0x39, 0xf3, 0xde, 0xcc, 0xe3, 0xec, 0xf0, 0x51, 0x90, 0xb1, 0x6c, 0xe1, 0x9a, 0xb2, + 0x7d, 0x8a, 0x5e, 0xd1, 0xf5, 0xb8, 0xe4, 0x94, 0x5a, 0xbc, 0xdd, 0x45, 0xaf, 0x28, 0x5e, 0x9a, + 0x5e, 0xaf, 0x6b, 0xcb, 0xe2, 0x8b, 0x0f, 0xf4, 0xb4, 0x1c, 0xb8, 0x28, 0x42, 0x80, 0xbe, 0xca, + 0x5b, 0x5f, 0x60, 0x5b, 0x46, 0xb7, 0xb7, 0x3b, 0xbc, 0xc3, 0x83, 0xcb, 0x6d, 0xff, 0x4a, 0x45, + 0xdf, 0x71, 0xcf, 0xfa, 0x1d, 0xdb, 0xd9, 0x0e, 0x7f, 0x54, 0xf0, 0xae, 0xd5, 0xf7, 0x4c, 0x69, + 0x73, 0x67, 0x3b, 0xba, 0x08, 0x13, 0xc6, 0x2b, 0x02, 0x6b, 0x27, 0x28, 0x84, 0xcd, 0x1d, 0x86, + 0x5f, 0xf6, 0x51, 0x48, 0x5a, 0x85, 0xb4, 0x85, 0xa2, 0xed, 0xd9, 0xae, 0x8f, 0xcb, 0x92, 0x3c, + 0xd9, 0x4c, 0x97, 0x36, 0x8a, 0x6f, 0x8a, 0x2b, 0x1e, 0x72, 0x0b, 0x2b, 0x63, 0x28, 0x9b, 0xe4, + 0xd1, 0x2d, 0x00, 0x11, 0x16, 0x6e, 0xda, 0x56, 0x36, 0x99, 0x27, 0x9b, 0xcb, 0xe5, 0xd5, 0xe1, + 0xc5, 0xfa, 0xb2, 0x6a, 0x57, 0xab, 0xb0, 0x65, 0x05, 0xa8, 0x59, 0xc6, 0x77, 0xc9, 0x91, 0x8e, + 0x3a, 0x0a, 0x61, 0x76, 0x70, 0xaa, 0x00, 0xb9, 0xba, 0x00, 0xdd, 0x82, 0x94, 0xc3, 0x2d, 0x0c, + 0x1a, 0xa5, 0x4b, 0xd9, 0x59, 0x72, 0x59, 0x80, 0xa2, 0x3b, 0x70, 0xab, 0x67, 0x3a, 0x66, 0x07, + 0x3d, 0x91, 0x5d, 0xc8, 0x2f, 0x6c, 0xa6, 0x4b, 0xf9, 0x38, 0xc6, 0x73, 0xb4, 0x3b, 0xa7, 0x12, + 0xad, 0x63, 0x44, 0x8f, 0x8d, 0x18, 0xf4, 0x39, 0xdc, 0x71, 0x50, 0xbe, 0xe4, 0x5e, 0xb7, 0xd9, + 0xe2, 0x5c, 0x0a, 0xe9, 0x99, 0x6e, 0xb3, 0x8b, 0x03, 0x91, 0x4d, 0x05, 0xb5, 0xde, 0x8d, 0xab, + 0x55, 0x75, 0xda, 0xde, 0x20, 0x18, 0xcd, 0x63, 0x1c, 0xb0, 0xdb, 0xaa, 0x40, 0x39, 0xe2, 0x3f, + 0xc6, 0x81, 0x30, 0x3e, 0x87, 0xcc, 0x01, 0x9a, 0x9e, 0x6c, 0xa1, 0x29, 0xa3, 0xe3, 0xb8, 0xd6, + 0x18, 0x8c, 0x23, 0xf8, 0xff, 0x44, 0x05, 0xe1, 0x72, 0x47, 0x20, 0xfd, 0x14, 0x34, 0x17, 0x3d, + 0x9b, 0x5b, 0xea, 0x30, 0xef, 0xc5, 0xe9, 0xab, 0xa8, 0xc5, 0x28, 0xa7, 0xce, 0x2f, 0xd6, 0x13, + 0x4c, 0x31, 0x8c, 0x9f, 0x92, 0x70, 0xf7, 0xa9, 0x6b, 0x99, 0x12, 0x1b, 0xa6, 0xe8, 0x9e, 0x48, + 0x53, 0xf6, 0xc5, 0x8d, 0xa4, 0xd1, 0x67, 0xb0, 0xd4, 0x0f, 0x0a, 0x45, 0x23, 0xdf, 0x89, 0x93, + 0x31, 0xa3, 0x57, 0x71, 0x1c, 0x09, 0x11, 0x2c, 0x2a, 0xa6, 0x73, 0xc8, 0x4c, 0x27, 0xe9, 0x06, + 0x2c, 0x49, 0x53, 0x74, 0xc7, 0xb2, 0x60, 0x78, 0xb1, 0xae, 0xf9, 0xb0, 0x5a, 0x85, 0x69, 0x7e, + 0xaa, 0x66, 0xd1, 0x4f, 0x40, 0x13, 0x01, 0x49, 0x2d, 0x4d, 0x2e, 0x4e, 0xcf, 0x84, 0x12, 0x85, + 0x36, 0x74, 0xc8, 0xbe, 0xa9, 0x32, 0x1c, 0xb5, 0xb1, 0x03, 0x2b, 0x7e, 0xf4, 0x66, 0x23, 0x32, + 0x76, 0x15, 0x3b, 0x7a, 0x05, 0x8a, 0xb0, 0xe8, 0x6b, 0x15, 0x59, 0x12, 0x0c, 0x2c, 0x3b, 0x4b, + 0x20, 0x0b, 0x61, 0x46, 0x19, 0xe8, 0x9e, 0x10, 0x76, 0xc7, 0xe9, 0xa1, 0x23, 0x6f, 0xa8, 0xe1, + 0x6b, 0x80, 0x71, 0x0d, 0x5a, 0x84, 0x94, 0x5f, 0x5a, 0x2d, 0xce, 0x4c, 0x01, 0x07, 0x09, 0x16, + 0xe0, 0xe8, 0x47, 0xa0, 0x09, 0x6c, 0x7b, 0x28, 0xd5, 0x4c, 0xf5, 0x38, 0xc6, 0x49, 0x80, 0x38, + 0x48, 0x30, 0x85, 0x2d, 0x6b, 0x90, 0xb2, 0x25, 0xf6, 0x8c, 0x57, 0x49, 0xc8, 0x8c, 0x9b, 0xef, + 0x9f, 0x9a, 0x4e, 0x07, 0xe9, 0x2e, 0x80, 0x39, 0x8a, 0x29, 0x21, 0xb1, 0x47, 0x35, 0x66, 0xb2, + 0x09, 0x06, 0xad, 0x83, 0x66, 0xb6, 0x03, 0x2b, 0xf3, 0x25, 0xad, 0x95, 0x3e, 0xbe, 0x9a, 0x1b, + 0x76, 0x9d, 0x08, 0xec, 0x05, 0x64, 0xa6, 0x8a, 0x18, 0xad, 0x49, 0x89, 0x61, 0x8e, 0x16, 0x40, + 0x7b, 0x7a, 0x5c, 0xd9, 0x6b, 0x54, 0x33, 0x09, 0x5d, 0xff, 0xf1, 0x97, 0xfc, 0x9d, 0x69, 0x84, + 0x5a, 0xcb, 0x02, 0x68, 0xac, 0x5a, 0x3f, 0x7a, 0x56, 0xcd, 0x90, 0x78, 0x1c, 0xc3, 0x1e, 0x7f, + 0x81, 0xc6, 0x3f, 0xe4, 0x3f, 0x07, 0x19, 0xad, 0xc3, 0x67, 0x90, 0xf2, 0x3f, 0x07, 0xc1, 0x0c, + 0xd6, 0x4a, 0x0f, 0xaf, 0x7e, 0x8e, 0x88, 0x55, 0x6c, 0x0c, 0x5c, 0x64, 0x01, 0x91, 0xde, 0x07, + 0x30, 0x5d, 0xf7, 0xcc, 0x46, 0xd1, 0x94, 0x3c, 0xf4, 0x64, 0xb6, 0xac, 0x22, 0x0d, 0xee, 0xa7, + 0x3d, 0x14, 0xfd, 0x33, 0x29, 0x9a, 0xb6, 0x93, 0x5d, 0x08, 0xd3, 0x2a, 0x52, 0x73, 0xe8, 0x2e, + 0x2c, 0xb5, 0x83, 0xe1, 0x44, 0x3e, 0xf7, 0xde, 0x3c, 0x93, 0x64, 0x11, 0xc9, 0x78, 0x00, 0x29, + 0x5f, 0x0b, 0x5d, 0x81, 0x5b, 0xfb, 0x47, 0xf5, 0xe3, 0x27, 0x55, 0x7f, 0x5e, 0xf4, 0x7f, 0x90, + 0xae, 0x1d, 0xee, 0xb3, 0x6a, 0xbd, 0x7a, 0xd8, 0xd8, 0x7b, 0x92, 0x21, 0xa5, 0xd7, 0x8b, 0x00, + 0x95, 0xd1, 0xb7, 0x91, 0x7e, 0x05, 0x4b, 0x6a, 0x4f, 0xa9, 0x11, 0xbf, 0x4c, 0x93, 0x5f, 0x2f, + 0xfd, 0x2a, 0x8c, 0x9a, 0x88, 0xb1, 0xf1, 0xdb, 0xaf, 0x7f, 0xbd, 0x4e, 0xde, 0x87, 0x95, 0x00, + 0xf3, 0xbe, 0xef, 0xc3, 0xe8, 0xc1, 0x6a, 0x78, 0xa7, 0x5c, 0xfe, 0x11, 0xa1, 0xdf, 0xc0, 0xf2, + 0xc8, 0x4b, 0x69, 0xec, 0xb3, 0x4e, 0x9b, 0xb5, 0xfe, 0xe0, 0x2d, 0x28, 0xe5, 0x12, 0xf3, 0x08, + 0xa0, 0x3f, 0x13, 0xc8, 0x4c, 0xfb, 0x0c, 0x7d, 0x78, 0x0d, 0xcf, 0xd4, 0xb7, 0xe6, 0x03, 0x5f, + 0x47, 0x54, 0x1f, 0x16, 0x03, 0x87, 0xa2, 0xf9, 0x59, 0x56, 0x30, 0xea, 0x3e, 0x1b, 0x11, 0x9d, + 0x43, 0x61, 0x8e, 0x8e, 0x3f, 0x24, 0xc9, 0x23, 0x42, 0xbf, 0x27, 0x90, 0x9e, 0x58, 0x6d, 0x5a, + 0x78, 0xcb, 0xee, 0x47, 0x1a, 0x0a, 0xf3, 0xbd, 0x23, 0x73, 0x6e, 0x44, 0xf9, 0xde, 0xf9, 0x65, + 0x2e, 0xf1, 0xc7, 0x65, 0x2e, 0xf1, 0xf7, 0x65, 0x8e, 0x7c, 0x3b, 0xcc, 0x91, 0xf3, 0x61, 0x8e, + 0xfc, 0x3e, 0xcc, 0x91, 0x3f, 0x87, 0x39, 0xd2, 0xd2, 0x82, 0xbf, 0x54, 0x1f, 0xfe, 0x1b, 0x00, + 0x00, 0xff, 0xff, 0x1d, 0x6e, 0x3d, 0x14, 0xda, 0x09, 0x00, 0x00, } diff --git a/vendor/src/github.com/docker/swarmkit/api/dispatcher.proto b/vendor/src/github.com/docker/swarmkit/api/dispatcher.proto index 317234a5df..8c3a9965bc 100644 --- a/vendor/src/github.com/docker/swarmkit/api/dispatcher.proto +++ b/vendor/src/github.com/docker/swarmkit/api/dispatcher.proto @@ -170,6 +170,23 @@ message AssignmentsRequest { string session_id = 1 [(gogoproto.customname) = "SessionID"]; } +message Assignment { + oneof item { + Task task = 1; + Secret secret = 2; + } +} + +message AssignmentChange { + enum AssignmentAction { + UPDATE = 0 [(gogoproto.enumvalue_customname) = "AssignmentActionUpdate"]; + REMOVE = 1 [(gogoproto.enumvalue_customname) = "AssignmentActionRemove"]; + } + + Assignment assignment = 1; + AssignmentAction action = 2; +} + message AssignmentsMessage { // AssignmentType specifies whether this assignment message carries // the full state, or is an update to an existing state. @@ -192,24 +209,6 @@ message AssignmentsMessage { // against missed messages. string results_in = 3; - // UpdateTasks is a set of new or updated tasks to run on this node. - // In the first assignments message, it contains all of the tasks - // to run on this node. Tasks outside of this set running on the node - // should be terminated. - repeated Task update_tasks = 4; - - // RemoveTasks is a set of previously-assigned task IDs to remove from the - // assignment set. It is not used in the first assignments message of - // a stream. - repeated string remove_tasks = 5; - - // UpdateSecrets is a set of new or updated secrets for this node. - // In the first assignments message, it contains all of the secrets - // the node needs for itself and its assigned tasks. - repeated Secret update_secrets = 6; - - // RemoveSecrets is a set of previously-assigned secret names to remove - // from memory. It is not used in the first assignments message of - // a stream. - repeated string remove_secrets = 7; + // AssignmentChange is a set of changes to apply on this node. + repeated AssignmentChange changes = 4; } diff --git a/vendor/src/github.com/docker/swarmkit/api/specs.pb.go b/vendor/src/github.com/docker/swarmkit/api/specs.pb.go index 2e1ce2e5c5..81f9b477a9 100644 --- a/vendor/src/github.com/docker/swarmkit/api/specs.pb.go +++ b/vendor/src/github.com/docker/swarmkit/api/specs.pb.go @@ -473,7 +473,7 @@ type ContainerSpec struct { StopGracePeriod *docker_swarmkit_v11.Duration `protobuf:"bytes,9,opt,name=stop_grace_period,json=stopGracePeriod" json:"stop_grace_period,omitempty"` // PullOptions parameterize the behavior of image pulls. PullOptions *ContainerSpec_PullOptions `protobuf:"bytes,10,opt,name=pull_options,json=pullOptions" json:"pull_options,omitempty"` - // Secrets contains references to zero or more secrets that + // SecretReference contains references to zero or more secrets that // will be exposed to the container. Secrets []*SecretReference `protobuf:"bytes,12,rep,name=secrets" json:"secrets,omitempty"` } diff --git a/vendor/src/github.com/docker/swarmkit/api/specs.proto b/vendor/src/github.com/docker/swarmkit/api/specs.proto index 4a42a63a54..ddf7eae10e 100644 --- a/vendor/src/github.com/docker/swarmkit/api/specs.proto +++ b/vendor/src/github.com/docker/swarmkit/api/specs.proto @@ -189,7 +189,7 @@ message ContainerSpec { // PullOptions parameterize the behavior of image pulls. PullOptions pull_options = 10; - // Secrets contains references to zero or more secrets that + // SecretReference contains references to zero or more secrets that // will be exposed to the container. repeated SecretReference secrets = 12; } diff --git a/vendor/src/github.com/docker/swarmkit/api/types.pb.go b/vendor/src/github.com/docker/swarmkit/api/types.pb.go index 11fd41fe53..2065f07a99 100644 --- a/vendor/src/github.com/docker/swarmkit/api/types.pb.go +++ b/vendor/src/github.com/docker/swarmkit/api/types.pb.go @@ -133,6 +133,8 @@ TasksRequest TasksMessage AssignmentsRequest + Assignment + AssignmentChange AssignmentsMessage NodeCertificateStatusRequest NodeCertificateStatusResponse @@ -1053,8 +1055,8 @@ func _TaskStatus_OneofSizer(msg proto.Message) (n int) { // instructing Swarm on how this service should work on the particular // network. type NetworkAttachmentConfig struct { - // Target specifies the target network for attachment. This value may be a - // network name or identifier. Only identifiers are supported at this time. + // Target specifies the target network for attachment. This value must be a + // network ID. Target string `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"` // Aliases specifies a list of discoverable alternate names for the service on this Target. Aliases []string `protobuf:"bytes,2,rep,name=aliases" json:"aliases,omitempty"` diff --git a/vendor/src/github.com/docker/swarmkit/api/types.proto b/vendor/src/github.com/docker/swarmkit/api/types.proto index e85ec666b3..61bf84e83b 100644 --- a/vendor/src/github.com/docker/swarmkit/api/types.proto +++ b/vendor/src/github.com/docker/swarmkit/api/types.proto @@ -447,8 +447,8 @@ message TaskStatus { // instructing Swarm on how this service should work on the particular // network. message NetworkAttachmentConfig { - // Target specifies the target network for attachment. This value may be a - // network name or identifier. Only identifiers are supported at this time. + // Target specifies the target network for attachment. This value must be a + // network ID. string target = 1; // Aliases specifies a list of discoverable alternate names for the service on this Target. repeated string aliases = 2; diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_ipam.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_ipam.go new file mode 100644 index 0000000000..56b944a1a7 --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_ipam.go @@ -0,0 +1,23 @@ +package networkallocator + +import ( + "github.com/docker/libnetwork/drvregistry" + "github.com/docker/libnetwork/ipamapi" + builtinIpam "github.com/docker/libnetwork/ipams/builtin" + nullIpam "github.com/docker/libnetwork/ipams/null" + remoteIpam "github.com/docker/libnetwork/ipams/remote" +) + +func initIPAMDrivers(r *drvregistry.DrvRegistry) error { + for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){ + builtinIpam.Init, + remoteIpam.Init, + nullIpam.Init, + } { + if err := fn(r, nil, nil); err != nil { + return err + } + } + + return nil +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_linux.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_linux.go new file mode 100644 index 0000000000..b8ad76b092 --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_linux.go @@ -0,0 +1,13 @@ +package networkallocator + +import ( + "github.com/docker/libnetwork/drivers/overlay/ovmanager" + "github.com/docker/libnetwork/drivers/remote" +) + +func getInitializers() []initializer { + return []initializer{ + {remote.Init, "remote"}, + {ovmanager.Init, "overlay"}, + } +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_unsupported.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_unsupported.go new file mode 100644 index 0000000000..4bca7583c5 --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/drivers_unsupported.go @@ -0,0 +1,7 @@ +// +build !linux + +package networkallocator + +func getInitializers() []initializer { + return nil +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go index d8ea7830dc..c989070975 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go +++ b/vendor/src/github.com/docker/swarmkit/manager/allocator/networkallocator/networkallocator.go @@ -4,12 +4,11 @@ import ( "fmt" "net" + "github.com/docker/docker/pkg/plugins" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/driverapi" - "github.com/docker/libnetwork/drivers/overlay/ovmanager" "github.com/docker/libnetwork/drvregistry" "github.com/docker/libnetwork/ipamapi" - builtinIpam "github.com/docker/libnetwork/ipams/builtin" - nullIpam "github.com/docker/libnetwork/ipams/null" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" "github.com/pkg/errors" @@ -23,10 +22,6 @@ const ( DefaultDriver = "overlay" ) -var ( - defaultDriverInitFunc = ovmanager.Init -) - // NetworkAllocator acts as the controller for all network related operations // like managing network and IPAM drivers and also creating and // deleting networks and the associated resources. @@ -68,6 +63,11 @@ type network struct { endpoints map[string]string } +type initializer struct { + fn drvregistry.InitFunc + ntype string +} + // New returns a new NetworkAllocator handle func New() (*NetworkAllocator, error) { na := &NetworkAllocator{ @@ -84,18 +84,12 @@ func New() (*NetworkAllocator, error) { return nil, err } - // Add the manager component of overlay driver to the registry. - if err := reg.AddDriver(DefaultDriver, defaultDriverInitFunc, nil); err != nil { + if err := initializeDrivers(reg); err != nil { return nil, err } - for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){ - builtinIpam.Init, - nullIpam.Init, - } { - if err := fn(reg, nil, nil); err != nil { - return nil, err - } + if err = initIPAMDrivers(reg); err != nil { + return nil, err } pa, err := newPortAllocator() @@ -631,14 +625,33 @@ func (na *NetworkAllocator) resolveDriver(n *api.Network) (driverapi.Driver, str dName = n.Spec.DriverConfig.Name } - d, _ := na.drvRegistry.Driver(dName) + d, drvcap := na.drvRegistry.Driver(dName) if d == nil { - return nil, "", fmt.Errorf("could not resolve network driver %s", dName) + var err error + err = na.loadDriver(dName) + if err != nil { + return nil, "", err + } + + d, drvcap = na.drvRegistry.Driver(dName) + if d == nil { + return nil, "", fmt.Errorf("could not resolve network driver %s", dName) + } + + } + + if drvcap.DataScope != datastore.GlobalScope { + return nil, "", fmt.Errorf("swarm can allocate network resources only for global scoped networks. network driver (%s) is scoped %s", dName, drvcap.DataScope) } return d, dName, nil } +func (na *NetworkAllocator) loadDriver(name string) error { + _, err := plugins.Get(name, driverapi.NetworkPluginEndpointType) + return err +} + // Resolve the IPAM driver func (na *NetworkAllocator) resolveIPAM(n *api.Network) (ipamapi.Ipam, string, error) { dName := ipamapi.DefaultIPAM @@ -746,3 +759,12 @@ func (na *NetworkAllocator) allocatePools(n *api.Network) (map[string]string, er return pools, nil } + +func initializeDrivers(reg *drvregistry.DrvRegistry) error { + for _, i := range getInitializers() { + if err := reg.AddDriver(i.ntype, i.fn, nil); err != nil { + return err + } + } + return nil +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/constraint/constraint.go b/vendor/src/github.com/docker/swarmkit/manager/constraint/constraint.go new file mode 100644 index 0000000000..acd4245c61 --- /dev/null +++ b/vendor/src/github.com/docker/swarmkit/manager/constraint/constraint.go @@ -0,0 +1,164 @@ +package constraint + +import ( + "fmt" + "regexp" + "strings" + + "github.com/docker/swarmkit/api" +) + +const ( + eq = iota + noteq + + nodeLabelPrefix = "node.labels." + engineLabelPrefix = "engine.labels." +) + +var ( + alphaNumeric = regexp.MustCompile(`^(?i)[a-z_][a-z0-9\-_.]+$`) + // value can be alphanumeric and some special characters. it shouldn't container + // current or future operators like '>, <, ~', etc. + valuePattern = regexp.MustCompile(`^(?i)[a-z0-9:\-_\s\.\*\(\)\?\+\[\]\\\^\$\|\/]+$`) + + // operators defines list of accepted operators + operators = []string{"==", "!="} +) + +// Constraint defines a constraint. +type Constraint struct { + key string + operator int + exp string +} + +// Parse parses list of constraints. +func Parse(env []string) ([]Constraint, error) { + exprs := []Constraint{} + for _, e := range env { + found := false + // each expr is in the form of "key op value" + for i, op := range operators { + if !strings.Contains(e, op) { + continue + } + // split with the op + parts := strings.SplitN(e, op, 2) + + if len(parts) < 2 { + return nil, fmt.Errorf("invalid expr: %s", e) + } + + part0 := strings.TrimSpace(parts[0]) + // validate key + matched := alphaNumeric.MatchString(part0) + if matched == false { + return nil, fmt.Errorf("key '%s' is invalid", part0) + } + + part1 := strings.TrimSpace(parts[1]) + + // validate Value + matched = valuePattern.MatchString(part1) + if matched == false { + return nil, fmt.Errorf("value '%s' is invalid", part1) + } + // TODO(dongluochen): revisit requirements to see if globing or regex are useful + exprs = append(exprs, Constraint{key: part0, operator: i, exp: part1}) + + found = true + break // found an op, move to next entry + } + if !found { + return nil, fmt.Errorf("constraint expected one operator from %s", strings.Join(operators, ", ")) + } + } + return exprs, nil +} + +// Match checks if the Constraint matches the target strings. +func (c *Constraint) Match(whats ...string) bool { + var match bool + + // full string match + for _, what := range whats { + // case insensitive compare + if strings.EqualFold(c.exp, what) { + match = true + break + } + } + + switch c.operator { + case eq: + return match + case noteq: + return !match + } + + return false +} + +// NodeMatches returns true if the node satisfies the given constraints. +func NodeMatches(constraints []Constraint, n *api.Node) bool { + for _, constraint := range constraints { + switch { + case strings.EqualFold(constraint.key, "node.id"): + if !constraint.Match(n.ID) { + return false + } + case strings.EqualFold(constraint.key, "node.hostname"): + // if this node doesn't have hostname + // it's equivalent to match an empty hostname + // where '==' would fail, '!=' matches + if n.Description == nil { + if !constraint.Match("") { + return false + } + continue + } + if !constraint.Match(n.Description.Hostname) { + return false + } + case strings.EqualFold(constraint.key, "node.role"): + if !constraint.Match(n.Spec.Role.String()) { + return false + } + + // node labels constraint in form like 'node.labels.key==value' + case len(constraint.key) > len(nodeLabelPrefix) && strings.EqualFold(constraint.key[:len(nodeLabelPrefix)], nodeLabelPrefix): + if n.Spec.Annotations.Labels == nil { + if !constraint.Match("") { + return false + } + continue + } + label := constraint.key[len(nodeLabelPrefix):] + // label itself is case sensitive + val := n.Spec.Annotations.Labels[label] + if !constraint.Match(val) { + return false + } + + // engine labels constraint in form like 'engine.labels.key!=value' + case len(constraint.key) > len(engineLabelPrefix) && strings.EqualFold(constraint.key[:len(engineLabelPrefix)], engineLabelPrefix): + if n.Description == nil || n.Description.Engine == nil || n.Description.Engine.Labels == nil { + if !constraint.Match("") { + return false + } + continue + } + label := constraint.key[len(engineLabelPrefix):] + val := n.Description.Engine.Labels[label] + if !constraint.Match(val) { + return false + } + default: + // key doesn't match predefined syntax + return false + } + } + + return true +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/network.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/network.go index 6f38be8dbc..53801fd7c9 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/network.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/network.go @@ -4,10 +4,8 @@ import ( "fmt" "net" - "github.com/docker/libnetwork/ipamapi" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/identity" - "github.com/docker/swarmkit/manager/allocator/networkallocator" "github.com/docker/swarmkit/manager/state/store" "golang.org/x/net/context" "google.golang.org/grpc" @@ -60,10 +58,6 @@ func validateIPAM(ipam *api.IPAMOptions) error { return err } - if ipam.Driver != nil && ipam.Driver.Name != ipamapi.DefaultIPAM { - return grpc.Errorf(codes.InvalidArgument, "invalid IPAM specified") - } - for _, ipamConf := range ipam.Configs { if err := validateIPAMConfiguration(ipamConf); err != nil { return err @@ -86,10 +80,6 @@ func validateNetworkSpec(spec *api.NetworkSpec) error { return err } - if spec.DriverConfig != nil && spec.DriverConfig.Name != networkallocator.DefaultDriver { - return grpc.Errorf(codes.InvalidArgument, "invalid driver specified") - } - if err := validateIPAM(spec.IPAM); err != nil { return err } diff --git a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go index 24b64356c5..6cb69ff063 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go +++ b/vendor/src/github.com/docker/swarmkit/manager/controlapi/service.go @@ -8,7 +8,7 @@ import ( "github.com/docker/distribution/reference" "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/identity" - "github.com/docker/swarmkit/manager/scheduler" + "github.com/docker/swarmkit/manager/constraint" "github.com/docker/swarmkit/manager/state/store" "github.com/docker/swarmkit/protobuf/ptypes" "golang.org/x/net/context" @@ -81,7 +81,7 @@ func validatePlacement(placement *api.Placement) error { if placement == nil { return nil } - _, err := scheduler.ParseExprs(placement.Constraints) + _, err := constraint.Parse(placement.Constraints) return err } @@ -170,6 +170,24 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error { return nil } +func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error { + for _, na := range networks { + var network *api.Network + s.store.View(func(tx store.ReadTx) { + network = store.GetNetwork(tx, na.Target) + }) + if network == nil { + continue + } + if _, ok := network.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok { + return grpc.Errorf(codes.InvalidArgument, + "Service cannot be explicitly attached to %q network which is a swarm internal network", + network.Spec.Annotations.Name) + } + } + return nil +} + func validateServiceSpec(spec *api.ServiceSpec) error { if spec == nil { return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error()) @@ -259,6 +277,10 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe return nil, err } + if err := s.validateNetworks(request.Spec.Networks); err != nil { + return nil, err + } + if err := s.checkPortConflicts(request.Spec, ""); err != nil { return nil, err } diff --git a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go index e3f4bfd181..7e463a24ca 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go +++ b/vendor/src/github.com/docker/swarmkit/manager/dispatcher/dispatcher.go @@ -759,6 +759,7 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche initial api.AssignmentsMessage ) tasksMap := make(map[string]*api.Task) + tasksUsingSecret := make(map[string]map[string]struct{}) sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error { sequence++ @@ -773,6 +774,45 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche return nil } + // returns a slice of new secrets to send down + addSecretsForTask := func(readTx store.ReadTx, t *api.Task) []*api.Secret { + container := t.Spec.GetContainer() + if container == nil { + return nil + } + var newSecrets []*api.Secret + for _, secretRef := range container.Secrets { + secretID := secretRef.SecretID + log := log.WithFields(logrus.Fields{ + "secret.id": secretID, + "secret.name": secretRef.SecretName, + }) + + if tasksUsingSecret[secretID] == nil { + tasksUsingSecret[secretID] = make(map[string]struct{}) + + secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID)) + if err != nil { + log.WithError(err).Errorf("error retrieving secret") + continue + } + if len(secrets) != 1 { + log.Debugf("secret not found") + continue + } + + // If the secret was found and there was one result + // (there should never be more than one because of the + // uniqueness constraint), add this secret to our + // initial set that we send down. + newSecrets = append(newSecrets, secrets[0]) + } + tasksUsingSecret[secretID][t.ID] = struct{}{} + } + + return newSecrets + } + // TODO(aaronl): Also send node secrets that should be exposed to // this node. nodeTasks, cancel, err := store.ViewAndWatch( @@ -794,7 +834,31 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche } tasksMap[t.ID] = t - initial.UpdateTasks = append(initial.UpdateTasks, t) + taskChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Task{ + Task: t, + }, + }, + Action: api.AssignmentChange_AssignmentActionUpdate, + } + initial.Changes = append(initial.Changes, taskChange) + // Only send secrets down if these tasks are in < RUNNING + if t.Status.State <= api.TaskStateRunning { + newSecrets := addSecretsForTask(readTx, t) + for _, secret := range newSecrets { + secretChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Secret{ + Secret: secret, + }, + }, + Action: api.AssignmentChange_AssignmentActionUpdate, + } + + initial.Changes = append(initial.Changes, secretChange) + } + } } return nil }, @@ -802,6 +866,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}}, state.EventDeleteTask{Task: &api.Task{NodeID: nodeID}, Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}}, + state.EventUpdateSecret{}, + state.EventDeleteSecret{}, ) if err != nil { return err @@ -825,7 +891,9 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche batchingTimer *time.Timer batchingTimeout <-chan time.Time updateTasks = make(map[string]*api.Task) + updateSecrets = make(map[string]*api.Secret) removeTasks = make(map[string]struct{}) + removeSecrets = make(map[string]struct{}) ) oneModification := func() { @@ -839,6 +907,28 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche } } + // Release the secrets references from this task + releaseSecretsForTask := func(t *api.Task) bool { + var modified bool + container := t.Spec.GetContainer() + if container == nil { + return modified + } + + for _, secretRef := range container.Secrets { + secretID := secretRef.SecretID + delete(tasksUsingSecret[secretID], t.ID) + if len(tasksUsingSecret[secretID]) == 0 { + // No tasks are using the secret anymore + delete(tasksUsingSecret, secretID) + removeSecrets[secretID] = struct{}{} + modified = true + } + } + + return modified + } + // The batching loop waits for 50 ms after the most recent // change, or until modificationBatchLimit is reached. The // worst case latency is modificationBatchLimit * batchingWaitTime, @@ -867,15 +957,35 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned { // this update should not trigger a task change for the agent tasksMap[v.Task.ID] = v.Task + // If this task got updated to a final state, let's release + // the secrets that are being used by the task + if v.Task.Status.State > api.TaskStateRunning { + // If releasing the secrets caused a secret to be + // removed from an agent, mark one modification + if releaseSecretsForTask(v.Task) { + oneModification() + } + } continue } + } else if v.Task.Status.State <= api.TaskStateRunning { + // If this task wasn't part of the assignment set before, and it's <= RUNNING + // add the secrets it references to the secrets assignment. + // Task states > RUNNING are worker reported only, are never created in + // a > RUNNING state. + var newSecrets []*api.Secret + d.store.View(func(readTx store.ReadTx) { + newSecrets = addSecretsForTask(readTx, v.Task) + }) + for _, secret := range newSecrets { + updateSecrets[secret.ID] = secret + } } tasksMap[v.Task.ID] = v.Task updateTasks[v.Task.ID] = v.Task oneModification() case state.EventDeleteTask: - if _, exists := tasksMap[v.Task.ID]; !exists { continue } @@ -884,7 +994,28 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche delete(tasksMap, v.Task.ID) + // Release the secrets being used by this task + // Ignoring the return here. We will always mark + // this as a modification, since a task is being + // removed. + releaseSecretsForTask(v.Task) + oneModification() + // TODO(aaronl): For node secrets, we'll need to handle + // EventCreateSecret. + case state.EventUpdateSecret: + if _, exists := tasksUsingSecret[v.Secret.ID]; !exists { + continue + } + log.Debugf("Secret %s (ID: %d) was updated though it was still referenced by one or more tasks", + v.Secret.Spec.Annotations.Name, v.Secret.ID) + + case state.EventDeleteSecret: + if _, exists := tasksUsingSecret[v.Secret.ID]; !exists { + continue + } + log.Debugf("Secret %s (ID: %d) was deleted though it was still referenced by one or more tasks", + v.Secret.Spec.Annotations.Name, v.Secret.ID) } case <-batchingTimeout: break batchingLoop @@ -902,12 +1033,57 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche if modificationCnt > 0 { for id, task := range updateTasks { if _, ok := removeTasks[id]; !ok { - update.UpdateTasks = append(update.UpdateTasks, task) + taskChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Task{ + Task: task, + }, + }, + Action: api.AssignmentChange_AssignmentActionUpdate, + } + + update.Changes = append(update.Changes, taskChange) + } + } + for id, secret := range updateSecrets { + if _, ok := removeSecrets[id]; !ok { + secretChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Secret{ + Secret: secret, + }, + }, + Action: api.AssignmentChange_AssignmentActionUpdate, + } + + update.Changes = append(update.Changes, secretChange) } } for id := range removeTasks { - update.RemoveTasks = append(update.RemoveTasks, id) + taskChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Task{ + Task: &api.Task{ID: id}, + }, + }, + Action: api.AssignmentChange_AssignmentActionRemove, + } + + update.Changes = append(update.Changes, taskChange) } + for id := range removeSecrets { + secretChange := &api.AssignmentChange{ + Assignment: &api.Assignment{ + Item: &api.Assignment_Secret{ + Secret: &api.Secret{ID: id}, + }, + }, + Action: api.AssignmentChange_AssignmentActionRemove, + } + + update.Changes = append(update.Changes, secretChange) + } + if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil { return err } diff --git a/vendor/src/github.com/docker/swarmkit/manager/manager.go b/vendor/src/github.com/docker/swarmkit/manager/manager.go index b11afc8ba0..0727c0270f 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/manager.go +++ b/vendor/src/github.com/docker/swarmkit/manager/manager.go @@ -184,6 +184,10 @@ func New(config *Config) (*Manager, error) { } else if err != nil { return nil, err } + if proto == "tcp" { + // in case of 0 port + tcpAddr = l.Addr().String() + } listeners[proto] = l } } @@ -197,7 +201,7 @@ func New(config *Config) (*Manager, error) { raftCfg.HeartbeatTick = int(config.HeartbeatTick) } - newNodeOpts := raft.NewNodeOptions{ + newNodeOpts := raft.NodeOptions{ ID: config.SecurityConfig.ClientTLSCreds.NodeID(), Addr: tcpAddr, JoinAddr: config.JoinRaft, @@ -226,6 +230,14 @@ func New(config *Config) (*Manager, error) { return m, nil } +// Addr returns tcp address on which remote api listens. +func (m *Manager) Addr() net.Addr { + if l, ok := m.listeners["tcp"]; ok { + return l.Addr() + } + return nil +} + // Run starts all manager sub-systems and the gRPC server at the configured // address. // The call never returns unless an error occurs or `Stop()` is called. diff --git a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/global.go b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/global.go index 31f94cf1b2..dc160e6d62 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/global.go +++ b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/global.go @@ -3,19 +3,27 @@ package orchestrator import ( "github.com/docker/swarmkit/api" "github.com/docker/swarmkit/log" + "github.com/docker/swarmkit/manager/constraint" "github.com/docker/swarmkit/manager/state" "github.com/docker/swarmkit/manager/state/store" "golang.org/x/net/context" ) +type globalService struct { + *api.Service + + // Compiled constraints + constraints []constraint.Constraint +} + // GlobalOrchestrator runs a reconciliation loop to create and destroy // tasks as necessary for global services. type GlobalOrchestrator struct { store *store.MemoryStore - // nodes contains nodeID of all valid nodes in the cluster - nodes map[string]struct{} - // globalServices have all the global services in the cluster, indexed by ServiceID - globalServices map[string]*api.Service + // nodes is the set of non-drained nodes in the cluster, indexed by node ID + nodes map[string]*api.Node + // globalServices has all the global services in the cluster, indexed by ServiceID + globalServices map[string]globalService // stopChan signals to the state machine to stop running. stopChan chan struct{} @@ -34,8 +42,8 @@ func NewGlobalOrchestrator(store *store.MemoryStore) *GlobalOrchestrator { updater := NewUpdateSupervisor(store, restartSupervisor) return &GlobalOrchestrator{ store: store, - nodes: make(map[string]struct{}), - globalServices: make(map[string]*api.Service), + nodes: make(map[string]*api.Node), + globalServices: make(map[string]globalService), stopChan: make(chan struct{}), doneChan: make(chan struct{}), updater: updater, @@ -76,10 +84,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error { return err } for _, n := range nodes { - // if a node is in drain state, do not add it - if isValidNode(n) { - g.nodes[n.ID] = struct{}{} - } + g.updateNode(n) } // Lookup global services @@ -90,12 +95,15 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error { if err != nil { return err } + + var reconcileServiceIDs []string for _, s := range existingServices { if isGlobalService(s) { - g.globalServices[s.ID] = s - g.reconcileOneService(ctx, s) + g.updateService(s) + reconcileServiceIDs = append(reconcileServiceIDs, s.ID) } } + g.reconcileServices(ctx, reconcileServiceIDs) for { select { @@ -108,14 +116,14 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error { if !isGlobalService(v.Service) { continue } - g.globalServices[v.Service.ID] = v.Service - g.reconcileOneService(ctx, v.Service) + g.updateService(v.Service) + g.reconcileServices(ctx, []string{v.Service.ID}) case state.EventUpdateService: if !isGlobalService(v.Service) { continue } - g.globalServices[v.Service.ID] = v.Service - g.reconcileOneService(ctx, v.Service) + g.updateService(v.Service) + g.reconcileServices(ctx, []string{v.Service.ID}) case state.EventDeleteService: if !isGlobalService(v.Service) { continue @@ -125,8 +133,10 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error { delete(g.globalServices, v.Service.ID) g.restarts.ClearServiceHistory(v.Service.ID) case state.EventCreateNode: + g.updateNode(v.Node) g.reconcileOneNode(ctx, v.Node) case state.EventUpdateNode: + g.updateNode(v.Node) switch v.Node.Status.State { // NodeStatus_DISCONNECTED is a transient state, no need to make any change case api.NodeStatus_DOWN: @@ -153,7 +163,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error { if _, exists := g.globalServices[v.Task.ServiceID]; !exists { continue } - g.reconcileServiceOneNode(ctx, v.Task.ServiceID, v.Task.NodeID) + g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID) } case <-g.stopChan: return nil @@ -196,138 +206,225 @@ func (g *GlobalOrchestrator) removeTasksFromNode(ctx context.Context, node *api. } } -func (g *GlobalOrchestrator) reconcileOneService(ctx context.Context, service *api.Service) { - var ( - tasks []*api.Task - err error - ) +func (g *GlobalOrchestrator) reconcileServices(ctx context.Context, serviceIDs []string) { + nodeCompleted := make(map[string]map[string]struct{}) + nodeTasks := make(map[string]map[string][]*api.Task) + g.store.View(func(tx store.ReadTx) { - tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID)) + for _, serviceID := range serviceIDs { + tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID)) + if err != nil { + log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID) + continue + } + + // a node may have completed this service + nodeCompleted[serviceID] = make(map[string]struct{}) + // nodeID -> task list + nodeTasks[serviceID] = make(map[string][]*api.Task) + + for _, t := range tasks { + if isTaskRunning(t) { + // Collect all running instances of this service + nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t) + } else { + // for finished tasks, check restartPolicy + if isTaskCompleted(t, restartCondition(t)) { + nodeCompleted[serviceID][t.NodeID] = struct{}{} + } + } + } + } }) - if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService failed finding tasks") - return - } - // a node may have completed this service - nodeCompleted := make(map[string]struct{}) - // nodeID -> task list - nodeTasks := make(map[string][]*api.Task) - for _, t := range tasks { - if isTaskRunning(t) { - // Collect all running instances of this service - nodeTasks[t.NodeID] = append(nodeTasks[t.NodeID], t) - } else { - // for finished tasks, check restartPolicy - if isTaskCompleted(t, restartCondition(t)) { - nodeCompleted[t.NodeID] = struct{}{} - } - } - } - - _, err = g.store.Batch(func(batch *store.Batch) error { + _, err := g.store.Batch(func(batch *store.Batch) error { var updateTasks []slot - for nodeID := range g.nodes { - ntasks := nodeTasks[nodeID] - // if restart policy considers this node has finished its task - // it should remove all running tasks - if _, exists := nodeCompleted[nodeID]; exists { - g.removeTasks(ctx, batch, service, ntasks) - return nil + for _, serviceID := range serviceIDs { + if _, exists := nodeTasks[serviceID]; !exists { + continue } - // this node needs to run 1 copy of the task - if len(ntasks) == 0 { - g.addTask(ctx, batch, service, nodeID) - } else { - updateTasks = append(updateTasks, ntasks) + + service := g.globalServices[serviceID] + + for nodeID, node := range g.nodes { + meetsConstraints := constraint.NodeMatches(service.constraints, node) + ntasks := nodeTasks[serviceID][nodeID] + delete(nodeTasks[serviceID], nodeID) + + // if restart policy considers this node has finished its task + // it should remove all running tasks + if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints { + g.removeTasks(ctx, batch, ntasks) + continue + } + + if node.Spec.Availability == api.NodeAvailabilityPause { + // the node is paused, so we won't add or update + // any tasks + continue + } + + // this node needs to run 1 copy of the task + if len(ntasks) == 0 { + g.addTask(ctx, batch, service.Service, nodeID) + } else { + updateTasks = append(updateTasks, ntasks) + } + } + if len(updateTasks) > 0 { + g.updater.Update(ctx, g.cluster, service.Service, updateTasks) + } + + // Remove any tasks assigned to nodes not found in g.nodes. + // These must be associated with nodes that are drained, or + // nodes that no longer exist. + for _, ntasks := range nodeTasks[serviceID] { + g.removeTasks(ctx, batch, ntasks) } - } - if len(updateTasks) > 0 { - g.updater.Update(ctx, g.cluster, service, updateTasks) } return nil }) if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService transaction failed") + log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed") + } +} + +// updateNode updates g.nodes based on the current node value +func (g *GlobalOrchestrator) updateNode(node *api.Node) { + if node.Spec.Availability == api.NodeAvailabilityDrain { + delete(g.nodes, node.ID) + } else { + g.nodes[node.ID] = node + } +} + +// updateService updates g.globalServices based on the current service value +func (g *GlobalOrchestrator) updateService(service *api.Service) { + var constraints []constraint.Constraint + + if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 { + constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints) + } + + g.globalServices[service.ID] = globalService{ + Service: service, + constraints: constraints, } } // reconcileOneNode checks all global services on one node func (g *GlobalOrchestrator) reconcileOneNode(ctx context.Context, node *api.Node) { - switch node.Spec.Availability { - case api.NodeAvailabilityDrain: + if node.Spec.Availability == api.NodeAvailabilityDrain { log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID) g.removeTasksFromNode(ctx, node) - delete(g.nodes, node.ID) - return - case api.NodeAvailabilityActive: - if _, exists := g.nodes[node.ID]; !exists { - log.G(ctx).Debugf("global orchestrator: node %s not in current node list, adding it", node.ID) - g.nodes[node.ID] = struct{}{} - } - default: - log.G(ctx).Debugf("global orchestrator: node %s in %s state, doing nothing", node.ID, node.Spec.Availability.String()) return } - // typically there are only a few global services on a node - // iterate through all of them one by one. If raft store visits become a concern, - // it can be optimized. - for _, service := range g.globalServices { - g.reconcileServiceOneNode(ctx, service.ID, node.ID) + + var serviceIDs []string + for id := range g.globalServices { + serviceIDs = append(serviceIDs, id) } + g.reconcileServicesOneNode(ctx, serviceIDs, node.ID) } -// reconcileServiceOneNode checks one service on one node -func (g *GlobalOrchestrator) reconcileServiceOneNode(ctx context.Context, serviceID string, nodeID string) { - _, exists := g.nodes[nodeID] +// reconcileServicesOneNode checks the specified services on one node +func (g *GlobalOrchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs []string, nodeID string) { + node, exists := g.nodes[nodeID] if !exists { return } - service, exists := g.globalServices[serviceID] - if !exists { - return - } - // the node has completed this servie - completed := false - // tasks for this node and service + + // whether each service has completed on the node + completed := make(map[string]bool) + // tasks by service + tasks := make(map[string][]*api.Task) + var ( - tasks []*api.Task - err error + tasksOnNode []*api.Task + err error ) + g.store.View(func(tx store.ReadTx) { - var tasksOnNode []*api.Task tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(nodeID)) - if err != nil { - return - } + }) + if err != nil { + log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", nodeID) + return + } + + for _, serviceID := range serviceIDs { for _, t := range tasksOnNode { - // only interested in one service if t.ServiceID != serviceID { continue } if isTaskRunning(t) { - tasks = append(tasks, t) + tasks[serviceID] = append(tasks[serviceID], t) } else { if isTaskCompleted(t, restartCondition(t)) { - completed = true + completed[serviceID] = true } } } - }) - if err != nil { - log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks") - return } _, err = g.store.Batch(func(batch *store.Batch) error { - // if restart policy considers this node has finished its task - // it should remove all running tasks - if completed { - g.removeTasks(ctx, batch, service, tasks) - return nil - } - if len(tasks) == 0 { - g.addTask(ctx, batch, service, nodeID) + for _, serviceID := range serviceIDs { + service, exists := g.globalServices[serviceID] + if !exists { + continue + } + + meetsConstraints := constraint.NodeMatches(service.constraints, node) + + // if restart policy considers this node has finished its task + // it should remove all running tasks + if completed[serviceID] || !meetsConstraints { + g.removeTasks(ctx, batch, tasks[serviceID]) + continue + } + + if node.Spec.Availability == api.NodeAvailabilityPause { + // the node is paused, so we won't add or update tasks + continue + } + + if len(tasks) == 0 { + g.addTask(ctx, batch, service.Service, nodeID) + } else { + // If task is out of date, update it. This can happen + // on node reconciliation if, for example, we pause a + // node, update the service, and then activate the node + // later. + + // We don't use g.updater here for two reasons: + // - This is not a rolling update. Since it was not + // triggered directly by updating the service, it + // should not observe the rolling update parameters + // or show status in UpdateStatus. + // - Calling Update cancels any current rolling updates + // for the service, such as one triggered by service + // reconciliation. + + var ( + dirtyTasks []*api.Task + cleanTasks []*api.Task + ) + + for _, t := range tasks[serviceID] { + if isTaskDirty(service.Service, t) { + dirtyTasks = append(dirtyTasks, t) + } else { + cleanTasks = append(cleanTasks, t) + } + } + + if len(cleanTasks) == 0 { + g.addTask(ctx, batch, service.Service, nodeID) + } else { + dirtyTasks = append(dirtyTasks, cleanTasks[1:]...) + } + g.removeTasks(ctx, batch, dirtyTasks) + } } return nil }) @@ -383,7 +480,7 @@ func (g *GlobalOrchestrator) addTask(ctx context.Context, batch *store.Batch, se } } -func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, service *api.Service, tasks []*api.Task) { +func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) { for _, t := range tasks { g.removeTask(ctx, batch, t) } @@ -393,11 +490,6 @@ func isTaskRunning(t *api.Task) bool { return t != nil && t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning } -func isValidNode(n *api.Node) bool { - // current simulation spec could be nil - return n != nil && n.Spec.Availability != api.NodeAvailabilityDrain -} - func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool { if t == nil || isTaskRunning(t) { return false diff --git a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go index 68bbcb2684..e62b15c97a 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go +++ b/vendor/src/github.com/docker/swarmkit/manager/orchestrator/updater.go @@ -489,9 +489,13 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove return removedTask, nil } +func isTaskDirty(s *api.Service, t *api.Task) bool { + return !reflect.DeepEqual(s.Spec.Task, t.Spec) || + (t.Endpoint != nil && !reflect.DeepEqual(s.Spec.Endpoint, t.Endpoint.Spec)) +} + func (u *Updater) isTaskDirty(t *api.Task) bool { - return !reflect.DeepEqual(u.newService.Spec.Task, t.Spec) || - (t.Endpoint != nil && !reflect.DeepEqual(u.newService.Spec.Endpoint, t.Endpoint.Spec)) + return isTaskDirty(u.newService, t) } func (u *Updater) isSlotDirty(slot slot) bool { diff --git a/vendor/src/github.com/docker/swarmkit/manager/scheduler/constraint.go b/vendor/src/github.com/docker/swarmkit/manager/scheduler/constraint.go deleted file mode 100644 index 61be3b907a..0000000000 --- a/vendor/src/github.com/docker/swarmkit/manager/scheduler/constraint.go +++ /dev/null @@ -1,97 +0,0 @@ -package scheduler - -import ( - "strings" - - "github.com/docker/swarmkit/api" -) - -const ( - nodeLabelPrefix = "node.labels." - engineLabelPrefix = "engine.labels." -) - -// ConstraintFilter selects only nodes that match certain labels. -type ConstraintFilter struct { - constraints []Expr -} - -// SetTask returns true when the filter is enable for a given task. -func (f *ConstraintFilter) SetTask(t *api.Task) bool { - if t.Spec.Placement == nil || len(t.Spec.Placement.Constraints) == 0 { - return false - } - - constraints, err := ParseExprs(t.Spec.Placement.Constraints) - if err != nil { - // constraints have been validated at controlapi - // if in any case it finds an error here, treat this task - // as constraint filter disabled. - return false - } - f.constraints = constraints - return true -} - -// Check returns true if the task's constraint is supported by the given node. -func (f *ConstraintFilter) Check(n *NodeInfo) bool { - for _, constraint := range f.constraints { - switch { - case strings.EqualFold(constraint.Key, "node.id"): - if !constraint.Match(n.ID) { - return false - } - case strings.EqualFold(constraint.Key, "node.hostname"): - // if this node doesn't have hostname - // it's equivalent to match an empty hostname - // where '==' would fail, '!=' matches - if n.Description == nil { - if !constraint.Match("") { - return false - } - continue - } - if !constraint.Match(n.Description.Hostname) { - return false - } - case strings.EqualFold(constraint.Key, "node.role"): - if !constraint.Match(n.Spec.Role.String()) { - return false - } - - // node labels constraint in form like 'node.labels.key==value' - case len(constraint.Key) > len(nodeLabelPrefix) && strings.EqualFold(constraint.Key[:len(nodeLabelPrefix)], nodeLabelPrefix): - if n.Spec.Annotations.Labels == nil { - if !constraint.Match("") { - return false - } - continue - } - label := constraint.Key[len(nodeLabelPrefix):] - // label itself is case sensitive - val := n.Spec.Annotations.Labels[label] - if !constraint.Match(val) { - return false - } - - // engine labels constraint in form like 'engine.labels.key!=value' - case len(constraint.Key) > len(engineLabelPrefix) && strings.EqualFold(constraint.Key[:len(engineLabelPrefix)], engineLabelPrefix): - if n.Description == nil || n.Description.Engine == nil || n.Description.Engine.Labels == nil { - if !constraint.Match("") { - return false - } - continue - } - label := constraint.Key[len(engineLabelPrefix):] - val := n.Description.Engine.Labels[label] - if !constraint.Match(val) { - return false - } - default: - // key doesn't match predefined syntax - return false - } - } - - return true -} diff --git a/vendor/src/github.com/docker/swarmkit/manager/scheduler/expr.go b/vendor/src/github.com/docker/swarmkit/manager/scheduler/expr.go deleted file mode 100644 index 5a6697117c..0000000000 --- a/vendor/src/github.com/docker/swarmkit/manager/scheduler/expr.go +++ /dev/null @@ -1,96 +0,0 @@ -package scheduler - -import ( - "fmt" - "regexp" - "strings" -) - -const ( - eq = iota - noteq -) - -var ( - alphaNumeric = regexp.MustCompile(`^(?i)[a-z_][a-z0-9\-_.]+$`) - // value can be alphanumeric and some special characters. it shouldn't container - // current or future operators like '>, <, ~', etc. - valuePattern = regexp.MustCompile(`^(?i)[a-z0-9:\-_\s\.\*\(\)\?\+\[\]\\\^\$\|\/]+$`) - - // operators defines list of accepted operators - operators = []string{"==", "!="} -) - -// Expr defines a constraint -type Expr struct { - Key string - operator int - exp string -} - -// ParseExprs parses list of constraints into Expr list -func ParseExprs(env []string) ([]Expr, error) { - exprs := []Expr{} - for _, e := range env { - found := false - // each expr is in the form of "key op value" - for i, op := range operators { - if !strings.Contains(e, op) { - continue - } - // split with the op - parts := strings.SplitN(e, op, 2) - - if len(parts) < 2 { - return nil, fmt.Errorf("invalid expr: %s", e) - } - - part0 := strings.TrimSpace(parts[0]) - // validate Key - matched := alphaNumeric.MatchString(part0) - if matched == false { - return nil, fmt.Errorf("key '%s' is invalid", part0) - } - - part1 := strings.TrimSpace(parts[1]) - - // validate Value - matched = valuePattern.MatchString(part1) - if matched == false { - return nil, fmt.Errorf("value '%s' is invalid", part1) - } - // TODO(dongluochen): revisit requirements to see if globing or regex are useful - exprs = append(exprs, Expr{Key: part0, operator: i, exp: part1}) - - found = true - break // found an op, move to next entry - } - if !found { - return nil, fmt.Errorf("constraint expected one operator from %s", strings.Join(operators, ", ")) - } - } - return exprs, nil -} - -// Match checks if the Expr matches the target strings. -func (e *Expr) Match(whats ...string) bool { - var match bool - - // full string match - for _, what := range whats { - // case insensitive compare - if strings.EqualFold(e.exp, what) { - match = true - break - } - } - - switch e.operator { - case eq: - return match - case noteq: - return !match - } - - return false -} diff --git a/vendor/src/github.com/docker/swarmkit/manager/scheduler/filter.go b/vendor/src/github.com/docker/swarmkit/manager/scheduler/filter.go index c0d23d3c5c..77a43ce512 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/scheduler/filter.go +++ b/vendor/src/github.com/docker/swarmkit/manager/scheduler/filter.go @@ -1,6 +1,9 @@ package scheduler -import "github.com/docker/swarmkit/api" +import ( + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/manager/constraint" +) // Filter checks whether the given task can run on the given node. // A filter may only operate @@ -129,3 +132,30 @@ func (f *PluginFilter) pluginExistsOnNode(pluginType string, pluginName string, } return false } + +// ConstraintFilter selects only nodes that match certain labels. +type ConstraintFilter struct { + constraints []constraint.Constraint +} + +// SetTask returns true when the filter is enable for a given task. +func (f *ConstraintFilter) SetTask(t *api.Task) bool { + if t.Spec.Placement == nil || len(t.Spec.Placement.Constraints) == 0 { + return false + } + + constraints, err := constraint.Parse(t.Spec.Placement.Constraints) + if err != nil { + // constraints have been validated at controlapi + // if in any case it finds an error here, treat this task + // as constraint filter disabled. + return false + } + f.constraints = constraints + return true +} + +// Check returns true if the task's constraint is supported by the given node. +func (f *ConstraintFilter) Check(n *NodeInfo) bool { + return constraint.NodeMatches(f.constraints, n.Node) +} diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go index 354c57b48d..65e753c71f 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/membership/cluster.go @@ -27,11 +27,19 @@ var ( ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change") ) +// deferredConn used to store removed members connection for some time. +// We need this in case if removed node is redirector or endpoint of ControlAPI call. +type deferredConn struct { + tick int + conn *grpc.ClientConn +} + // Cluster represents a set of active // raft Members type Cluster struct { - mu sync.RWMutex - members map[uint64]*Member + mu sync.RWMutex + members map[uint64]*Member + deferedConns map[*deferredConn]struct{} // removed contains the list of removed Members, // those ids cannot be reused @@ -73,16 +81,13 @@ func NewCluster(heartbeatTicks int) *Cluster { return &Cluster{ members: make(map[uint64]*Member), removed: make(map[uint64]bool), + deferedConns: make(map[*deferredConn]struct{}), heartbeatTicks: heartbeatTicks, PeersBroadcast: watch.NewQueue(), } } -// Tick increases ticks for all members. After heartbeatTicks node marked as -// inactive. -func (c *Cluster) Tick() { - c.mu.Lock() - defer c.mu.Unlock() +func (c *Cluster) handleInactive() { for _, m := range c.members { if !m.active { continue @@ -97,6 +102,25 @@ func (c *Cluster) Tick() { } } +func (c *Cluster) handleDeferredConns() { + for dc := range c.deferedConns { + dc.tick++ + if dc.tick > c.heartbeatTicks { + dc.conn.Close() + delete(c.deferedConns, dc) + } + } +} + +// Tick increases ticks for all members. After heartbeatTicks node marked as +// inactive. +func (c *Cluster) Tick() { + c.mu.Lock() + defer c.mu.Unlock() + c.handleInactive() + c.handleDeferredConns() +} + // Members returns the list of raft Members in the Cluster. func (c *Cluster) Members() map[uint64]*Member { members := make(map[uint64]*Member) @@ -177,7 +201,9 @@ func (c *Cluster) clearMember(id uint64) error { m, ok := c.members[id] if ok { if m.Conn != nil { - m.Conn.Close() + // defer connection close to after heartbeatTicks + dConn := &deferredConn{conn: m.Conn} + c.deferedConns[dConn] = struct{}{} } delete(c.members, id) } @@ -232,8 +258,13 @@ func (c *Cluster) Clear() { } } + for dc := range c.deferedConns { + dc.conn.Close() + } + c.members = make(map[uint64]*Member) c.removed = make(map[uint64]bool) + c.deferedConns = make(map[*deferredConn]struct{}) c.mu.Unlock() } diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go index a4271454d3..3185d4b4c1 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/raft.go @@ -78,29 +78,24 @@ const ( // Node represents the Raft Node useful // configuration. type Node struct { - raft.Node - cluster *membership.Cluster + raftNode raft.Node + cluster *membership.Cluster - Server *grpc.Server - Ctx context.Context - cancel func() - tlsCredentials credentials.TransportCredentials - - Address string - StateDir string + Server *grpc.Server + Ctx context.Context + cancel func() raftStore *raft.MemoryStorage memoryStore *store.MemoryStore Config *raft.Config - opts NewNodeOptions + opts NodeOptions reqIDGen *idutil.Generator wait *wait wal *wal.WAL snapshotter *snap.Snapshotter - restored bool + campaignWhenAble bool signalledLeadership uint32 isMember uint32 - joinAddr string // waitProp waits for all the proposals to be terminated before // shutting down the node. @@ -110,10 +105,9 @@ type Node struct { appliedIndex uint64 snapshotIndex uint64 - ticker clock.Ticker - sendTimeout time.Duration - stopCh chan struct{} - doneCh chan struct{} + ticker clock.Ticker + stopCh chan struct{} + doneCh chan struct{} // removeRaftCh notifies about node deletion from raft cluster removeRaftCh chan struct{} removeRaftFunc func() @@ -129,8 +123,8 @@ type Node struct { asyncTasks sync.WaitGroup } -// NewNodeOptions provides arguments for NewNode -type NewNodeOptions struct { +// NodeOptions provides node-level options. +type NodeOptions struct { // ID is the node's ID, from its certificate's CN field. ID string // Addr is the address of this node's listener @@ -161,8 +155,8 @@ func init() { rand.Seed(time.Now().UnixNano()) } -// NewNode generates a new Raft node -func NewNode(ctx context.Context, opts NewNodeOptions) *Node { +// NewNode generates a new Raft node. +func NewNode(ctx context.Context, opts NodeOptions) *Node { cfg := opts.Config if cfg == nil { cfg = DefaultNodeConfig() @@ -170,19 +164,20 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node { if opts.TickInterval == 0 { opts.TickInterval = time.Second } + if opts.SendTimeout == 0 { + opts.SendTimeout = 2 * time.Second + } raftStore := raft.NewMemoryStorage() ctx, cancel := context.WithCancel(ctx) n := &Node{ - Ctx: ctx, - cancel: cancel, - cluster: membership.NewCluster(2 * cfg.ElectionTick), - tlsCredentials: opts.TLSCredentials, - raftStore: raftStore, - Address: opts.Addr, - opts: opts, + Ctx: ctx, + cancel: cancel, + cluster: membership.NewCluster(2 * cfg.ElectionTick), + raftStore: raftStore, + opts: opts, Config: &raft.Config{ ElectionTick: cfg.ElectionTick, HeartbeatTick: cfg.HeartbeatTick, @@ -194,9 +189,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node { stopCh: make(chan struct{}), doneCh: make(chan struct{}), removeRaftCh: make(chan struct{}), - StateDir: opts.StateDir, - joinAddr: opts.JoinAddr, - sendTimeout: 2 * time.Second, leadershipBroadcast: watch.NewQueue(), } n.memoryStore = store.NewMemoryStore(n) @@ -206,9 +198,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node { } else { n.ticker = opts.ClockSource.NewTicker(opts.TickInterval) } - if opts.SendTimeout != 0 { - n.sendTimeout = opts.SendTimeout - } n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now()) n.wait = newWait() @@ -249,8 +238,8 @@ func (n *Node) JoinAndStart() (err error) { n.snapshotIndex = snapshot.Metadata.Index if loadAndStartErr == errNoWAL { - if n.joinAddr != "" { - c, err := n.ConnectToMember(n.joinAddr, 10*time.Second) + if n.opts.JoinAddr != "" { + c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second) if err != nil { return err } @@ -262,7 +251,7 @@ func (n *Node) JoinAndStart() (err error) { ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second) defer cancel() resp, err := client.Join(ctx, &api.JoinRequest{ - Addr: n.Address, + Addr: n.opts.Addr, }) if err != nil { return err @@ -274,7 +263,7 @@ func (n *Node) JoinAndStart() (err error) { return err } - n.Node = raft.StartNode(n.Config, []raft.Peer{}) + n.raftNode = raft.StartNode(n.Config, []raft.Peer{}) if err := n.registerNodes(resp.Members); err != nil { if walErr := n.wal.Close(); err != nil { @@ -289,22 +278,18 @@ func (n *Node) JoinAndStart() (err error) { if err != nil { return err } - n.Node = raft.StartNode(n.Config, []raft.Peer{peer}) - if err := n.Campaign(n.Ctx); err != nil { - if walErr := n.wal.Close(); err != nil { - n.Config.Logger.Errorf("raft: error closing WAL: %v", walErr) - } - return err - } + n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer}) + n.campaignWhenAble = true } atomic.StoreUint32(&n.isMember, 1) return nil } - if n.joinAddr != "" { + if n.opts.JoinAddr != "" { n.Config.Logger.Warning("ignoring request to join cluster, because raft state already exists") } - n.Node = raft.RestartNode(n.Config) + n.campaignWhenAble = true + n.raftNode = raft.RestartNode(n.Config) atomic.StoreUint32(&n.isMember, 1) return nil } @@ -362,9 +347,9 @@ func (n *Node) Run(ctx context.Context) error { for { select { case <-n.ticker.C(): - n.Tick() + n.raftNode.Tick() n.cluster.Tick() - case rd := <-n.Ready(): + case rd := <-n.raftNode.Ready(): raftConfig := DefaultRaftConfig() n.memoryStore.View(func(readTx store.ReadTx) { clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName)) @@ -457,19 +442,21 @@ func (n *Node) Run(ctx context.Context) error { } // Advance the state machine - n.Advance() + n.raftNode.Advance() - // If we are the only registered member after - // restoring from the state, campaign to be the - // leader. - if !n.restored { - // Node ID should be in the progress list to Campaign - if len(n.cluster.Members()) <= 1 { - if err := n.Campaign(n.Ctx); err != nil { + // On the first startup, or if we are the only + // registered member after restoring from the state, + // campaign to be the leader. + if n.campaignWhenAble { + members := n.cluster.Members() + if len(members) >= 1 { + n.campaignWhenAble = false + } + if len(members) == 1 && members[n.Config.ID] != nil { + if err := n.raftNode.Campaign(n.Ctx); err != nil { panic("raft: cannot campaign to be the leader on node restore") } } - n.restored = true } case snapshotIndex := <-n.snapshotInProgress: @@ -517,7 +504,7 @@ func (n *Node) stop() { n.waitProp.Wait() n.asyncTasks.Wait() - n.Stop() + n.raftNode.Stop() n.ticker.Stop() if err := n.wal.Close(); err != nil { n.Config.Logger.Errorf("raft: error closing WAL: %v", err) @@ -532,7 +519,7 @@ func (n *Node) isLeader() bool { return false } - if n.Node.Status().Lead == n.Config.ID { + if n.Status().Lead == n.Config.ID { return true } return false @@ -549,7 +536,7 @@ func (n *Node) IsLeader() bool { // leader returns the id of the leader, without the protection of lock and // membership check, so it's caller task. func (n *Node) leader() uint64 { - return n.Node.Status().Lead + return n.Status().Lead } // Leader returns the id of the leader, with the protection of lock @@ -859,7 +846,7 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa return nil, ErrNoRaftMember } - if err := n.Step(n.Ctx, *msg.Message); err != nil { + if err := n.raftNode.Step(n.Ctx, *msg.Message); err != nil { return nil, err } @@ -988,6 +975,7 @@ func (n *Node) registerNode(node *api.RaftMember) error { } return err } + return nil } @@ -1021,7 +1009,7 @@ func (n *Node) GetVersion() *api.Version { return nil } - status := n.Node.Status() + status := n.Status() return &api.Version{Index: status.Commit} } @@ -1068,6 +1056,11 @@ func (n *Node) GetMemberlist() map[uint64]*api.RaftMember { return memberlist } +// Status returns status of underlying etcd.Node. +func (n *Node) Status() raft.Status { + return n.raftNode.Status() +} + // GetMemberByNodeID returns member information based // on its generic Node ID. func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member { @@ -1131,7 +1124,7 @@ func (n *Node) send(messages []raftpb.Message) error { for _, m := range messages { // Process locally if m.To == n.Config.ID { - if err := n.Step(n.Ctx, m); err != nil { + if err := n.raftNode.Step(n.Ctx, m); err != nil { return err } continue @@ -1160,7 +1153,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess return } - ctx, cancel := context.WithTimeout(n.Ctx, n.sendTimeout) + ctx, cancel := context.WithTimeout(n.Ctx, n.opts.SendTimeout) defer cancel() var ( @@ -1195,7 +1188,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess n.Config.Logger.Errorf("could not resolve address of member ID %x: %v", m.To, err) return } - conn, err = n.ConnectToMember(resp.Addr, n.sendTimeout) + conn, err = n.ConnectToMember(resp.Addr, n.opts.SendTimeout) if err != nil { n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, resp.Addr, err) return @@ -1212,13 +1205,13 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess n.removeRaftFunc() } if m.Type == raftpb.MsgSnap { - n.ReportSnapshot(m.To, raft.SnapshotFailure) + n.raftNode.ReportSnapshot(m.To, raft.SnapshotFailure) } if !n.IsMember() { // node is removed from cluster or stopped return } - n.ReportUnreachable(m.To) + n.raftNode.ReportUnreachable(m.To) lastSeenHost := n.cluster.LastSeenHost(m.To) if lastSeenHost != "" { @@ -1246,7 +1239,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess newConn.Conn.Close() } } else if m.Type == raftpb.MsgSnap { - n.ReportSnapshot(m.To, raft.SnapshotFinish) + n.raftNode.ReportSnapshot(m.To, raft.SnapshotFinish) } } @@ -1323,7 +1316,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa return nil, ErrRequestTooLarge } - err = n.Propose(waitCtx, data) + err = n.raftNode.Propose(waitCtx, data) if err != nil { n.wait.cancel(r.ID) return nil, err @@ -1351,7 +1344,7 @@ func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error { ctx, cancel := context.WithCancel(ctx) ch := n.wait.register(cc.ID, nil, cancel) - if err := n.ProposeConfChange(ctx, cc); err != nil { + if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil { n.wait.cancel(cc.ID) return err } @@ -1449,7 +1442,7 @@ func (n *Node) processConfChange(entry raftpb.Entry) { n.wait.trigger(cc.ID, err) } - n.confState = *n.ApplyConfChange(cc) + n.confState = *n.raftNode.ApplyConfChange(cc) n.wait.trigger(cc.ID, nil) } @@ -1520,7 +1513,7 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) { // to be the leader. if cc.NodeID == n.leader() && !n.isLeader() { - if err = n.Campaign(n.Ctx); err != nil { + if err = n.raftNode.Campaign(n.Ctx); err != nil { return err } } @@ -1548,7 +1541,7 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) { // ConnectToMember returns a member object with an initialized // connection to communicate with other raft members func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) { - conn, err := dial(addr, "tcp", n.tlsCredentials, timeout) + conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout) if err != nil { return nil, err } diff --git a/vendor/src/github.com/docker/swarmkit/manager/state/raft/storage.go b/vendor/src/github.com/docker/swarmkit/manager/state/raft/storage.go index d9237ab38a..3209cb40f2 100644 --- a/vendor/src/github.com/docker/swarmkit/manager/state/raft/storage.go +++ b/vendor/src/github.com/docker/swarmkit/manager/state/raft/storage.go @@ -26,19 +26,19 @@ import ( var errNoWAL = errors.New("no WAL present") func (n *Node) legacyWALDir() string { - return filepath.Join(n.StateDir, "wal") + return filepath.Join(n.opts.StateDir, "wal") } func (n *Node) walDir() string { - return filepath.Join(n.StateDir, "wal-v3") + return filepath.Join(n.opts.StateDir, "wal-v3") } func (n *Node) legacySnapDir() string { - return filepath.Join(n.StateDir, "snap") + return filepath.Join(n.opts.StateDir, "snap") } func (n *Node) snapDir() string { - return filepath.Join(n.StateDir, "snap-v3") + return filepath.Join(n.opts.StateDir, "snap-v3") } func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error { @@ -189,7 +189,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) { raftNode := &api.RaftMember{ RaftID: n.Config.ID, NodeID: nodeID, - Addr: n.Address, + Addr: n.opts.Addr, } metadata, err := raftNode.Marshal() if err != nil { @@ -207,7 +207,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) { // moveWALAndSnap moves away the WAL and snapshot because we were removed // from the cluster and will need to recreate them if we are readded. func (n *Node) moveWALAndSnap() error { - newWALDir, err := ioutil.TempDir(n.StateDir, "wal.") + newWALDir, err := ioutil.TempDir(n.opts.StateDir, "wal.") if err != nil { return err } @@ -216,7 +216,7 @@ func (n *Node) moveWALAndSnap() error { return err } - newSnapDir, err := ioutil.TempDir(n.StateDir, "snap.") + newSnapDir, err := ioutil.TempDir(n.opts.StateDir, "snap.") if err != nil { return err }