Vendoring Swarmkit 1fed8d2a2ccd2a9b6d6fb864d4ad3461fc6dc3eb

Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Madhu Venugopal 2016-10-15 08:49:04 -07:00
parent 6f54d70a54
commit 88cae7412d
29 changed files with 1728 additions and 754 deletions

View File

@ -143,7 +143,7 @@ clone git github.com/docker/docker-credential-helpers v0.3.0
clone git github.com/docker/containerd 837e8c5e1cad013ed57f5c2090c8591c10cbbdae
# cluster
clone git github.com/docker/swarmkit 7e63bdefb94e5bea2641e8bdebae2cfa61a0ed44
clone git github.com/docker/swarmkit 1fed8d2a2ccd2a9b6d6fb864d4ad3461fc6dc3eb
clone git github.com/golang/mock bd3c8e81be01eef76d4b503f5e687d2d1354d2d9
clone git github.com/gogo/protobuf v0.3
clone git github.com/cloudflare/cfssl 7fb22c8cba7ecaf98e4082d22d65800cf45e042a

View File

@ -171,11 +171,12 @@ func (a *Agent) run(ctx context.Context) {
case msg := <-session.assignments:
switch msg.Type {
case api.AssignmentsMessage_COMPLETE:
if err := a.worker.AssignTasks(ctx, msg.UpdateTasks); err != nil {
// Need to assign secrets before tasks, because tasks might depend on new secrets
if err := a.worker.Assign(ctx, msg.Changes); err != nil {
log.G(ctx).WithError(err).Error("failed to synchronize worker assignments")
}
case api.AssignmentsMessage_INCREMENTAL:
if err := a.worker.UpdateTasks(ctx, msg.UpdateTasks, msg.RemoveTasks); err != nil {
if err := a.worker.Update(ctx, msg.Changes); err != nil {
log.G(ctx).WithError(err).Error("failed to update worker assignments")
}
}

View File

@ -101,6 +101,21 @@ type Node struct {
roleChangeReq chan api.NodeRole // used to send role updates from the dispatcher api on promotion/demotion
}
// RemoteAPIAddr returns address on which remote manager api listens.
// Returns nil if node is not manager.
func (n *Node) RemoteAPIAddr() (string, error) {
n.RLock()
defer n.RUnlock()
if n.manager == nil {
return "", errors.Errorf("node is not manager")
}
addr := n.manager.Addr()
if addr == nil {
return "", errors.Errorf("manager addr is not set")
}
return addr.String(), nil
}
// NewNode returns new Node instance.
func NewNode(c *NodeConfig) (*Node, error) {
if err := os.MkdirAll(c.StateDir, 0700); err != nil {
@ -627,7 +642,12 @@ func (n *Node) runManager(ctx context.Context, securityConfig *ca.SecurityConfig
go func(ready chan struct{}) {
select {
case <-ready:
n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: n.config.ListenRemoteAPI}, remotes.DefaultObservationWeight)
addr, err := n.RemoteAPIAddr()
if err != nil {
log.G(ctx).WithError(err).Errorf("get remote api addr")
} else {
n.remotes.Observe(api.Peer{NodeID: n.NodeID(), Addr: addr}, remotes.DefaultObservationWeight)
}
case <-connCtx.Done():
}
}(ready)

View File

@ -0,0 +1,53 @@
package agent
import (
"sync"
"github.com/docker/swarmkit/api"
)
// secrets is a map that keeps all the currenty available secrets to the agent
// mapped by secret ID
type secrets struct {
mu sync.RWMutex
m map[string]api.Secret
}
func newSecrets() *secrets {
return &secrets{
m: make(map[string]api.Secret),
}
}
// Get returns a secret by ID. If the secret doesn't exist, returns nil.
func (s *secrets) Get(secretID string) api.Secret {
s.mu.RLock()
defer s.mu.RUnlock()
return s.m[secretID]
}
// Add adds one or more secrets to the secret map
func (s *secrets) Add(secrets ...api.Secret) {
s.mu.Lock()
defer s.mu.Unlock()
for _, secret := range secrets {
s.m[secret.ID] = secret
}
}
// Remove removes one or more secrets by ID from the secret map. Succeeds
// whether or not the given IDs are in the map.
func (s *secrets) Remove(secrets []string) {
s.mu.Lock()
defer s.mu.Unlock()
for _, secret := range secrets {
delete(s.m, secret)
}
}
// Reset removes all the secrets
func (s *secrets) Reset() {
s.mu.Lock()
defer s.mu.Unlock()
s.m = make(map[string]api.Secret)
}

View File

@ -252,12 +252,26 @@ func (s *session) watch(ctx context.Context) error {
}
}
if tasksWatch != nil {
// When falling back to Tasks because of an old managers, we wrap the tasks in assignments.
var taskResp *api.TasksMessage
var assignmentChanges []*api.AssignmentChange
taskResp, err = tasksWatch.Recv()
if err != nil {
return err
}
resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, UpdateTasks: taskResp.Tasks}
for _, t := range taskResp.Tasks {
taskChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Task{
Task: t,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
assignmentChanges = append(assignmentChanges, taskChange)
}
resp = &api.AssignmentsMessage{Type: api.AssignmentsMessage_COMPLETE, Changes: assignmentChanges}
}
// If there seems to be a gap in the stream, let's break out of the inner for and

View File

@ -17,13 +17,13 @@ type Worker interface {
// Init prepares the worker for task assignment.
Init(ctx context.Context) error
// AssignTasks assigns a complete set of tasks to a worker. Any task not included in
// Assign assigns a complete set of tasks and secrets to a worker. Any task or secrets not included in
// this set will be removed.
AssignTasks(ctx context.Context, tasks []*api.Task) error
Assign(ctx context.Context, assignments []*api.AssignmentChange) error
// UpdateTasks updates an incremental set of tasks to the worker. Any task not included
// Updates updates an incremental set of tasks or secrets of the worker. Any task/secret not included
// either in added or removed will remain untouched.
UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error
Update(ctx context.Context, assignments []*api.AssignmentChange) error
// Listen to updates about tasks controlled by the worker. When first
// called, the reporter will receive all updates for all tasks controlled
@ -42,6 +42,7 @@ type worker struct {
db *bolt.DB
executor exec.Executor
listeners map[*statusReporterKey]struct{}
secrets *secrets
taskManagers map[string]*taskManager
mu sync.RWMutex
@ -53,6 +54,7 @@ func newWorker(db *bolt.DB, executor exec.Executor) *worker {
executor: executor,
listeners: make(map[*statusReporterKey]struct{}),
taskManagers: make(map[string]*taskManager),
secrets: newSecrets(),
}
}
@ -90,37 +92,70 @@ func (w *worker) Init(ctx context.Context) error {
})
}
// AssignTasks assigns the set of tasks to the worker. Any tasks not previously known will
// be started. Any tasks that are in the task set and already running will be
// updated, if possible. Any tasks currently running on the
// worker outside the task set will be terminated.
func (w *worker) AssignTasks(ctx context.Context, tasks []*api.Task) error {
// Assign assigns a full set of tasks and secrets to the worker.
// Any tasks not previously known will be started. Any tasks that are in the task set
// and already running will be updated, if possible. Any tasks currently running on
// the worker outside the task set will be terminated.
// Any secrets not in the set of assignments will be removed.
func (w *worker) Assign(ctx context.Context, assignments []*api.AssignmentChange) error {
w.mu.Lock()
defer w.mu.Unlock()
log.G(ctx).WithFields(logrus.Fields{
"len(tasks)": len(tasks),
}).Debug("(*worker).AssignTasks")
"len(assignments)": len(assignments),
}).Debug("(*worker).Assign")
return reconcileTaskState(ctx, w, tasks, nil, true)
// Need to update secrets before tasks, because tasks might depend on new secrets
err := reconcileSecrets(ctx, w, assignments, true)
if err != nil {
return err
}
return reconcileTaskState(ctx, w, assignments, true)
}
// UpdateTasks the set of tasks to the worker.
// Update updates the set of tasks and secret for the worker.
// Tasks in the added set will be added to the worker, and tasks in the removed set
// will be removed from the worker
func (w *worker) UpdateTasks(ctx context.Context, added []*api.Task, removed []string) error {
// Serets in the added set will be added to the worker, and secrets in the removed set
// will be removed from the worker.
func (w *worker) Update(ctx context.Context, assignments []*api.AssignmentChange) error {
w.mu.Lock()
defer w.mu.Unlock()
log.G(ctx).WithFields(logrus.Fields{
"len(added)": len(added),
"len(removed)": len(removed),
}).Debug("(*worker).UpdateTasks")
"len(assignments)": len(assignments),
}).Debug("(*worker).Update")
return reconcileTaskState(ctx, w, added, removed, false)
err := reconcileSecrets(ctx, w, assignments, false)
if err != nil {
return err
}
return reconcileTaskState(ctx, w, assignments, false)
}
func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, removed []string, fullSnapshot bool) error {
func reconcileTaskState(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
var (
updatedTasks []*api.Task
removedTasks []*api.Task
)
for _, a := range assignments {
if t := a.Assignment.GetTask(); t != nil {
switch a.Action {
case api.AssignmentChange_AssignmentActionUpdate:
updatedTasks = append(updatedTasks, t)
case api.AssignmentChange_AssignmentActionRemove:
removedTasks = append(removedTasks, t)
}
}
}
log.G(ctx).WithFields(logrus.Fields{
"len(updatedTasks)": len(updatedTasks),
"len(removedTasks)": len(removedTasks),
}).Debug("(*worker).reconcileTaskState")
tx, err := w.db.Begin(true)
if err != nil {
log.G(ctx).WithError(err).Error("failed starting transaction against task database")
@ -130,7 +165,7 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov
assigned := map[string]struct{}{}
for _, task := range added {
for _, task := range updatedTasks {
log.G(ctx).WithFields(
logrus.Fields{
"task.id": task.ID,
@ -202,15 +237,15 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov
} else {
// If this was an incremental set of assignments, we're going to remove only the tasks
// in the removed set
for _, taskID := range removed {
err := removeTaskAssignment(taskID)
for _, task := range removedTasks {
err := removeTaskAssignment(task.ID)
if err != nil {
continue
}
tm, ok := w.taskManagers[taskID]
tm, ok := w.taskManagers[task.ID]
if ok {
delete(w.taskManagers, taskID)
delete(w.taskManagers, task.ID)
go closeManager(tm)
}
}
@ -219,6 +254,39 @@ func reconcileTaskState(ctx context.Context, w *worker, added []*api.Task, remov
return tx.Commit()
}
func reconcileSecrets(ctx context.Context, w *worker, assignments []*api.AssignmentChange, fullSnapshot bool) error {
var (
updatedSecrets []api.Secret
removedSecrets []string
)
for _, a := range assignments {
if s := a.Assignment.GetSecret(); s != nil {
switch a.Action {
case api.AssignmentChange_AssignmentActionUpdate:
updatedSecrets = append(updatedSecrets, *s)
case api.AssignmentChange_AssignmentActionRemove:
removedSecrets = append(removedSecrets, s.ID)
}
}
}
log.G(ctx).WithFields(logrus.Fields{
"len(updatedSecrets)": len(updatedSecrets),
"len(removedSecrets)": len(removedSecrets),
}).Debug("(*worker).reconcileSecrets")
// If this was a complete set of secrets, we're going to clear the secrets map and add all of them
if fullSnapshot {
w.secrets.Reset()
} else {
w.secrets.Remove(removedSecrets)
}
w.secrets.Add(updatedSecrets...)
return nil
}
func (w *worker) Listen(ctx context.Context, reporter StatusReporter) {
w.mu.Lock()
defer w.mu.Unlock()

File diff suppressed because it is too large Load Diff

View File

@ -170,6 +170,23 @@ message AssignmentsRequest {
string session_id = 1 [(gogoproto.customname) = "SessionID"];
}
message Assignment {
oneof item {
Task task = 1;
Secret secret = 2;
}
}
message AssignmentChange {
enum AssignmentAction {
UPDATE = 0 [(gogoproto.enumvalue_customname) = "AssignmentActionUpdate"];
REMOVE = 1 [(gogoproto.enumvalue_customname) = "AssignmentActionRemove"];
}
Assignment assignment = 1;
AssignmentAction action = 2;
}
message AssignmentsMessage {
// AssignmentType specifies whether this assignment message carries
// the full state, or is an update to an existing state.
@ -192,24 +209,6 @@ message AssignmentsMessage {
// against missed messages.
string results_in = 3;
// UpdateTasks is a set of new or updated tasks to run on this node.
// In the first assignments message, it contains all of the tasks
// to run on this node. Tasks outside of this set running on the node
// should be terminated.
repeated Task update_tasks = 4;
// RemoveTasks is a set of previously-assigned task IDs to remove from the
// assignment set. It is not used in the first assignments message of
// a stream.
repeated string remove_tasks = 5;
// UpdateSecrets is a set of new or updated secrets for this node.
// In the first assignments message, it contains all of the secrets
// the node needs for itself and its assigned tasks.
repeated Secret update_secrets = 6;
// RemoveSecrets is a set of previously-assigned secret names to remove
// from memory. It is not used in the first assignments message of
// a stream.
repeated string remove_secrets = 7;
// AssignmentChange is a set of changes to apply on this node.
repeated AssignmentChange changes = 4;
}

View File

@ -473,7 +473,7 @@ type ContainerSpec struct {
StopGracePeriod *docker_swarmkit_v11.Duration `protobuf:"bytes,9,opt,name=stop_grace_period,json=stopGracePeriod" json:"stop_grace_period,omitempty"`
// PullOptions parameterize the behavior of image pulls.
PullOptions *ContainerSpec_PullOptions `protobuf:"bytes,10,opt,name=pull_options,json=pullOptions" json:"pull_options,omitempty"`
// Secrets contains references to zero or more secrets that
// SecretReference contains references to zero or more secrets that
// will be exposed to the container.
Secrets []*SecretReference `protobuf:"bytes,12,rep,name=secrets" json:"secrets,omitempty"`
}

View File

@ -189,7 +189,7 @@ message ContainerSpec {
// PullOptions parameterize the behavior of image pulls.
PullOptions pull_options = 10;
// Secrets contains references to zero or more secrets that
// SecretReference contains references to zero or more secrets that
// will be exposed to the container.
repeated SecretReference secrets = 12;
}

View File

@ -133,6 +133,8 @@
TasksRequest
TasksMessage
AssignmentsRequest
Assignment
AssignmentChange
AssignmentsMessage
NodeCertificateStatusRequest
NodeCertificateStatusResponse
@ -1053,8 +1055,8 @@ func _TaskStatus_OneofSizer(msg proto.Message) (n int) {
// instructing Swarm on how this service should work on the particular
// network.
type NetworkAttachmentConfig struct {
// Target specifies the target network for attachment. This value may be a
// network name or identifier. Only identifiers are supported at this time.
// Target specifies the target network for attachment. This value must be a
// network ID.
Target string `protobuf:"bytes,1,opt,name=target,proto3" json:"target,omitempty"`
// Aliases specifies a list of discoverable alternate names for the service on this Target.
Aliases []string `protobuf:"bytes,2,rep,name=aliases" json:"aliases,omitempty"`

View File

@ -447,8 +447,8 @@ message TaskStatus {
// instructing Swarm on how this service should work on the particular
// network.
message NetworkAttachmentConfig {
// Target specifies the target network for attachment. This value may be a
// network name or identifier. Only identifiers are supported at this time.
// Target specifies the target network for attachment. This value must be a
// network ID.
string target = 1;
// Aliases specifies a list of discoverable alternate names for the service on this Target.
repeated string aliases = 2;

View File

@ -0,0 +1,23 @@
package networkallocator
import (
"github.com/docker/libnetwork/drvregistry"
"github.com/docker/libnetwork/ipamapi"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
nullIpam "github.com/docker/libnetwork/ipams/null"
remoteIpam "github.com/docker/libnetwork/ipams/remote"
)
func initIPAMDrivers(r *drvregistry.DrvRegistry) error {
for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){
builtinIpam.Init,
remoteIpam.Init,
nullIpam.Init,
} {
if err := fn(r, nil, nil); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,13 @@
package networkallocator
import (
"github.com/docker/libnetwork/drivers/overlay/ovmanager"
"github.com/docker/libnetwork/drivers/remote"
)
func getInitializers() []initializer {
return []initializer{
{remote.Init, "remote"},
{ovmanager.Init, "overlay"},
}
}

View File

@ -0,0 +1,7 @@
// +build !linux
package networkallocator
func getInitializers() []initializer {
return nil
}

View File

@ -4,12 +4,11 @@ import (
"fmt"
"net"
"github.com/docker/docker/pkg/plugins"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/drivers/overlay/ovmanager"
"github.com/docker/libnetwork/drvregistry"
"github.com/docker/libnetwork/ipamapi"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
nullIpam "github.com/docker/libnetwork/ipams/null"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/pkg/errors"
@ -23,10 +22,6 @@ const (
DefaultDriver = "overlay"
)
var (
defaultDriverInitFunc = ovmanager.Init
)
// NetworkAllocator acts as the controller for all network related operations
// like managing network and IPAM drivers and also creating and
// deleting networks and the associated resources.
@ -68,6 +63,11 @@ type network struct {
endpoints map[string]string
}
type initializer struct {
fn drvregistry.InitFunc
ntype string
}
// New returns a new NetworkAllocator handle
func New() (*NetworkAllocator, error) {
na := &NetworkAllocator{
@ -84,18 +84,12 @@ func New() (*NetworkAllocator, error) {
return nil, err
}
// Add the manager component of overlay driver to the registry.
if err := reg.AddDriver(DefaultDriver, defaultDriverInitFunc, nil); err != nil {
if err := initializeDrivers(reg); err != nil {
return nil, err
}
for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){
builtinIpam.Init,
nullIpam.Init,
} {
if err := fn(reg, nil, nil); err != nil {
return nil, err
}
if err = initIPAMDrivers(reg); err != nil {
return nil, err
}
pa, err := newPortAllocator()
@ -631,14 +625,33 @@ func (na *NetworkAllocator) resolveDriver(n *api.Network) (driverapi.Driver, str
dName = n.Spec.DriverConfig.Name
}
d, _ := na.drvRegistry.Driver(dName)
d, drvcap := na.drvRegistry.Driver(dName)
if d == nil {
return nil, "", fmt.Errorf("could not resolve network driver %s", dName)
var err error
err = na.loadDriver(dName)
if err != nil {
return nil, "", err
}
d, drvcap = na.drvRegistry.Driver(dName)
if d == nil {
return nil, "", fmt.Errorf("could not resolve network driver %s", dName)
}
}
if drvcap.DataScope != datastore.GlobalScope {
return nil, "", fmt.Errorf("swarm can allocate network resources only for global scoped networks. network driver (%s) is scoped %s", dName, drvcap.DataScope)
}
return d, dName, nil
}
func (na *NetworkAllocator) loadDriver(name string) error {
_, err := plugins.Get(name, driverapi.NetworkPluginEndpointType)
return err
}
// Resolve the IPAM driver
func (na *NetworkAllocator) resolveIPAM(n *api.Network) (ipamapi.Ipam, string, error) {
dName := ipamapi.DefaultIPAM
@ -746,3 +759,12 @@ func (na *NetworkAllocator) allocatePools(n *api.Network) (map[string]string, er
return pools, nil
}
func initializeDrivers(reg *drvregistry.DrvRegistry) error {
for _, i := range getInitializers() {
if err := reg.AddDriver(i.ntype, i.fn, nil); err != nil {
return err
}
}
return nil
}

View File

@ -0,0 +1,164 @@
package constraint
import (
"fmt"
"regexp"
"strings"
"github.com/docker/swarmkit/api"
)
const (
eq = iota
noteq
nodeLabelPrefix = "node.labels."
engineLabelPrefix = "engine.labels."
)
var (
alphaNumeric = regexp.MustCompile(`^(?i)[a-z_][a-z0-9\-_.]+$`)
// value can be alphanumeric and some special characters. it shouldn't container
// current or future operators like '>, <, ~', etc.
valuePattern = regexp.MustCompile(`^(?i)[a-z0-9:\-_\s\.\*\(\)\?\+\[\]\\\^\$\|\/]+$`)
// operators defines list of accepted operators
operators = []string{"==", "!="}
)
// Constraint defines a constraint.
type Constraint struct {
key string
operator int
exp string
}
// Parse parses list of constraints.
func Parse(env []string) ([]Constraint, error) {
exprs := []Constraint{}
for _, e := range env {
found := false
// each expr is in the form of "key op value"
for i, op := range operators {
if !strings.Contains(e, op) {
continue
}
// split with the op
parts := strings.SplitN(e, op, 2)
if len(parts) < 2 {
return nil, fmt.Errorf("invalid expr: %s", e)
}
part0 := strings.TrimSpace(parts[0])
// validate key
matched := alphaNumeric.MatchString(part0)
if matched == false {
return nil, fmt.Errorf("key '%s' is invalid", part0)
}
part1 := strings.TrimSpace(parts[1])
// validate Value
matched = valuePattern.MatchString(part1)
if matched == false {
return nil, fmt.Errorf("value '%s' is invalid", part1)
}
// TODO(dongluochen): revisit requirements to see if globing or regex are useful
exprs = append(exprs, Constraint{key: part0, operator: i, exp: part1})
found = true
break // found an op, move to next entry
}
if !found {
return nil, fmt.Errorf("constraint expected one operator from %s", strings.Join(operators, ", "))
}
}
return exprs, nil
}
// Match checks if the Constraint matches the target strings.
func (c *Constraint) Match(whats ...string) bool {
var match bool
// full string match
for _, what := range whats {
// case insensitive compare
if strings.EqualFold(c.exp, what) {
match = true
break
}
}
switch c.operator {
case eq:
return match
case noteq:
return !match
}
return false
}
// NodeMatches returns true if the node satisfies the given constraints.
func NodeMatches(constraints []Constraint, n *api.Node) bool {
for _, constraint := range constraints {
switch {
case strings.EqualFold(constraint.key, "node.id"):
if !constraint.Match(n.ID) {
return false
}
case strings.EqualFold(constraint.key, "node.hostname"):
// if this node doesn't have hostname
// it's equivalent to match an empty hostname
// where '==' would fail, '!=' matches
if n.Description == nil {
if !constraint.Match("") {
return false
}
continue
}
if !constraint.Match(n.Description.Hostname) {
return false
}
case strings.EqualFold(constraint.key, "node.role"):
if !constraint.Match(n.Spec.Role.String()) {
return false
}
// node labels constraint in form like 'node.labels.key==value'
case len(constraint.key) > len(nodeLabelPrefix) && strings.EqualFold(constraint.key[:len(nodeLabelPrefix)], nodeLabelPrefix):
if n.Spec.Annotations.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.key[len(nodeLabelPrefix):]
// label itself is case sensitive
val := n.Spec.Annotations.Labels[label]
if !constraint.Match(val) {
return false
}
// engine labels constraint in form like 'engine.labels.key!=value'
case len(constraint.key) > len(engineLabelPrefix) && strings.EqualFold(constraint.key[:len(engineLabelPrefix)], engineLabelPrefix):
if n.Description == nil || n.Description.Engine == nil || n.Description.Engine.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.key[len(engineLabelPrefix):]
val := n.Description.Engine.Labels[label]
if !constraint.Match(val) {
return false
}
default:
// key doesn't match predefined syntax
return false
}
}
return true
}

View File

@ -4,10 +4,8 @@ import (
"fmt"
"net"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/manager/allocator/networkallocator"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
"google.golang.org/grpc"
@ -60,10 +58,6 @@ func validateIPAM(ipam *api.IPAMOptions) error {
return err
}
if ipam.Driver != nil && ipam.Driver.Name != ipamapi.DefaultIPAM {
return grpc.Errorf(codes.InvalidArgument, "invalid IPAM specified")
}
for _, ipamConf := range ipam.Configs {
if err := validateIPAMConfiguration(ipamConf); err != nil {
return err
@ -86,10 +80,6 @@ func validateNetworkSpec(spec *api.NetworkSpec) error {
return err
}
if spec.DriverConfig != nil && spec.DriverConfig.Name != networkallocator.DefaultDriver {
return grpc.Errorf(codes.InvalidArgument, "invalid driver specified")
}
if err := validateIPAM(spec.IPAM); err != nil {
return err
}

View File

@ -8,7 +8,7 @@ import (
"github.com/docker/distribution/reference"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/identity"
"github.com/docker/swarmkit/manager/scheduler"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state/store"
"github.com/docker/swarmkit/protobuf/ptypes"
"golang.org/x/net/context"
@ -81,7 +81,7 @@ func validatePlacement(placement *api.Placement) error {
if placement == nil {
return nil
}
_, err := scheduler.ParseExprs(placement.Constraints)
_, err := constraint.Parse(placement.Constraints)
return err
}
@ -170,6 +170,24 @@ func validateEndpointSpec(epSpec *api.EndpointSpec) error {
return nil
}
func (s *Server) validateNetworks(networks []*api.NetworkAttachmentConfig) error {
for _, na := range networks {
var network *api.Network
s.store.View(func(tx store.ReadTx) {
network = store.GetNetwork(tx, na.Target)
})
if network == nil {
continue
}
if _, ok := network.Spec.Annotations.Labels["com.docker.swarm.internal"]; ok {
return grpc.Errorf(codes.InvalidArgument,
"Service cannot be explicitly attached to %q network which is a swarm internal network",
network.Spec.Annotations.Name)
}
}
return nil
}
func validateServiceSpec(spec *api.ServiceSpec) error {
if spec == nil {
return grpc.Errorf(codes.InvalidArgument, errInvalidArgument.Error())
@ -259,6 +277,10 @@ func (s *Server) CreateService(ctx context.Context, request *api.CreateServiceRe
return nil, err
}
if err := s.validateNetworks(request.Spec.Networks); err != nil {
return nil, err
}
if err := s.checkPortConflicts(request.Spec, ""); err != nil {
return nil, err
}

View File

@ -759,6 +759,7 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
initial api.AssignmentsMessage
)
tasksMap := make(map[string]*api.Task)
tasksUsingSecret := make(map[string]map[string]struct{})
sendMessage := func(msg api.AssignmentsMessage, assignmentType api.AssignmentsMessage_Type) error {
sequence++
@ -773,6 +774,45 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
return nil
}
// returns a slice of new secrets to send down
addSecretsForTask := func(readTx store.ReadTx, t *api.Task) []*api.Secret {
container := t.Spec.GetContainer()
if container == nil {
return nil
}
var newSecrets []*api.Secret
for _, secretRef := range container.Secrets {
secretID := secretRef.SecretID
log := log.WithFields(logrus.Fields{
"secret.id": secretID,
"secret.name": secretRef.SecretName,
})
if tasksUsingSecret[secretID] == nil {
tasksUsingSecret[secretID] = make(map[string]struct{})
secrets, err := store.FindSecrets(readTx, store.ByIDPrefix(secretID))
if err != nil {
log.WithError(err).Errorf("error retrieving secret")
continue
}
if len(secrets) != 1 {
log.Debugf("secret not found")
continue
}
// If the secret was found and there was one result
// (there should never be more than one because of the
// uniqueness constraint), add this secret to our
// initial set that we send down.
newSecrets = append(newSecrets, secrets[0])
}
tasksUsingSecret[secretID][t.ID] = struct{}{}
}
return newSecrets
}
// TODO(aaronl): Also send node secrets that should be exposed to
// this node.
nodeTasks, cancel, err := store.ViewAndWatch(
@ -794,7 +834,31 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
tasksMap[t.ID] = t
initial.UpdateTasks = append(initial.UpdateTasks, t)
taskChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Task{
Task: t,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
initial.Changes = append(initial.Changes, taskChange)
// Only send secrets down if these tasks are in < RUNNING
if t.Status.State <= api.TaskStateRunning {
newSecrets := addSecretsForTask(readTx, t)
for _, secret := range newSecrets {
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
Secret: secret,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
initial.Changes = append(initial.Changes, secretChange)
}
}
}
return nil
},
@ -802,6 +866,8 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
state.EventDeleteTask{Task: &api.Task{NodeID: nodeID},
Checks: []state.TaskCheckFunc{state.TaskCheckNodeID}},
state.EventUpdateSecret{},
state.EventDeleteSecret{},
)
if err != nil {
return err
@ -825,7 +891,9 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
batchingTimer *time.Timer
batchingTimeout <-chan time.Time
updateTasks = make(map[string]*api.Task)
updateSecrets = make(map[string]*api.Secret)
removeTasks = make(map[string]struct{})
removeSecrets = make(map[string]struct{})
)
oneModification := func() {
@ -839,6 +907,28 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
}
}
// Release the secrets references from this task
releaseSecretsForTask := func(t *api.Task) bool {
var modified bool
container := t.Spec.GetContainer()
if container == nil {
return modified
}
for _, secretRef := range container.Secrets {
secretID := secretRef.SecretID
delete(tasksUsingSecret[secretID], t.ID)
if len(tasksUsingSecret[secretID]) == 0 {
// No tasks are using the secret anymore
delete(tasksUsingSecret, secretID)
removeSecrets[secretID] = struct{}{}
modified = true
}
}
return modified
}
// The batching loop waits for 50 ms after the most recent
// change, or until modificationBatchLimit is reached. The
// worst case latency is modificationBatchLimit * batchingWaitTime,
@ -867,15 +957,35 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
if equality.TasksEqualStable(oldTask, v.Task) && v.Task.Status.State > api.TaskStateAssigned {
// this update should not trigger a task change for the agent
tasksMap[v.Task.ID] = v.Task
// If this task got updated to a final state, let's release
// the secrets that are being used by the task
if v.Task.Status.State > api.TaskStateRunning {
// If releasing the secrets caused a secret to be
// removed from an agent, mark one modification
if releaseSecretsForTask(v.Task) {
oneModification()
}
}
continue
}
} else if v.Task.Status.State <= api.TaskStateRunning {
// If this task wasn't part of the assignment set before, and it's <= RUNNING
// add the secrets it references to the secrets assignment.
// Task states > RUNNING are worker reported only, are never created in
// a > RUNNING state.
var newSecrets []*api.Secret
d.store.View(func(readTx store.ReadTx) {
newSecrets = addSecretsForTask(readTx, v.Task)
})
for _, secret := range newSecrets {
updateSecrets[secret.ID] = secret
}
}
tasksMap[v.Task.ID] = v.Task
updateTasks[v.Task.ID] = v.Task
oneModification()
case state.EventDeleteTask:
if _, exists := tasksMap[v.Task.ID]; !exists {
continue
}
@ -884,7 +994,28 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
delete(tasksMap, v.Task.ID)
// Release the secrets being used by this task
// Ignoring the return here. We will always mark
// this as a modification, since a task is being
// removed.
releaseSecretsForTask(v.Task)
oneModification()
// TODO(aaronl): For node secrets, we'll need to handle
// EventCreateSecret.
case state.EventUpdateSecret:
if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
continue
}
log.Debugf("Secret %s (ID: %d) was updated though it was still referenced by one or more tasks",
v.Secret.Spec.Annotations.Name, v.Secret.ID)
case state.EventDeleteSecret:
if _, exists := tasksUsingSecret[v.Secret.ID]; !exists {
continue
}
log.Debugf("Secret %s (ID: %d) was deleted though it was still referenced by one or more tasks",
v.Secret.Spec.Annotations.Name, v.Secret.ID)
}
case <-batchingTimeout:
break batchingLoop
@ -902,12 +1033,57 @@ func (d *Dispatcher) Assignments(r *api.AssignmentsRequest, stream api.Dispatche
if modificationCnt > 0 {
for id, task := range updateTasks {
if _, ok := removeTasks[id]; !ok {
update.UpdateTasks = append(update.UpdateTasks, task)
taskChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Task{
Task: task,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
update.Changes = append(update.Changes, taskChange)
}
}
for id, secret := range updateSecrets {
if _, ok := removeSecrets[id]; !ok {
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
Secret: secret,
},
},
Action: api.AssignmentChange_AssignmentActionUpdate,
}
update.Changes = append(update.Changes, secretChange)
}
}
for id := range removeTasks {
update.RemoveTasks = append(update.RemoveTasks, id)
taskChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Task{
Task: &api.Task{ID: id},
},
},
Action: api.AssignmentChange_AssignmentActionRemove,
}
update.Changes = append(update.Changes, taskChange)
}
for id := range removeSecrets {
secretChange := &api.AssignmentChange{
Assignment: &api.Assignment{
Item: &api.Assignment_Secret{
Secret: &api.Secret{ID: id},
},
},
Action: api.AssignmentChange_AssignmentActionRemove,
}
update.Changes = append(update.Changes, secretChange)
}
if err := sendMessage(update, api.AssignmentsMessage_INCREMENTAL); err != nil {
return err
}

View File

@ -184,6 +184,10 @@ func New(config *Config) (*Manager, error) {
} else if err != nil {
return nil, err
}
if proto == "tcp" {
// in case of 0 port
tcpAddr = l.Addr().String()
}
listeners[proto] = l
}
}
@ -197,7 +201,7 @@ func New(config *Config) (*Manager, error) {
raftCfg.HeartbeatTick = int(config.HeartbeatTick)
}
newNodeOpts := raft.NewNodeOptions{
newNodeOpts := raft.NodeOptions{
ID: config.SecurityConfig.ClientTLSCreds.NodeID(),
Addr: tcpAddr,
JoinAddr: config.JoinRaft,
@ -226,6 +230,14 @@ func New(config *Config) (*Manager, error) {
return m, nil
}
// Addr returns tcp address on which remote api listens.
func (m *Manager) Addr() net.Addr {
if l, ok := m.listeners["tcp"]; ok {
return l.Addr()
}
return nil
}
// Run starts all manager sub-systems and the gRPC server at the configured
// address.
// The call never returns unless an error occurs or `Stop()` is called.

View File

@ -3,19 +3,27 @@ package orchestrator
import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"github.com/docker/swarmkit/manager/constraint"
"github.com/docker/swarmkit/manager/state"
"github.com/docker/swarmkit/manager/state/store"
"golang.org/x/net/context"
)
type globalService struct {
*api.Service
// Compiled constraints
constraints []constraint.Constraint
}
// GlobalOrchestrator runs a reconciliation loop to create and destroy
// tasks as necessary for global services.
type GlobalOrchestrator struct {
store *store.MemoryStore
// nodes contains nodeID of all valid nodes in the cluster
nodes map[string]struct{}
// globalServices have all the global services in the cluster, indexed by ServiceID
globalServices map[string]*api.Service
// nodes is the set of non-drained nodes in the cluster, indexed by node ID
nodes map[string]*api.Node
// globalServices has all the global services in the cluster, indexed by ServiceID
globalServices map[string]globalService
// stopChan signals to the state machine to stop running.
stopChan chan struct{}
@ -34,8 +42,8 @@ func NewGlobalOrchestrator(store *store.MemoryStore) *GlobalOrchestrator {
updater := NewUpdateSupervisor(store, restartSupervisor)
return &GlobalOrchestrator{
store: store,
nodes: make(map[string]struct{}),
globalServices: make(map[string]*api.Service),
nodes: make(map[string]*api.Node),
globalServices: make(map[string]globalService),
stopChan: make(chan struct{}),
doneChan: make(chan struct{}),
updater: updater,
@ -76,10 +84,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
return err
}
for _, n := range nodes {
// if a node is in drain state, do not add it
if isValidNode(n) {
g.nodes[n.ID] = struct{}{}
}
g.updateNode(n)
}
// Lookup global services
@ -90,12 +95,15 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
if err != nil {
return err
}
var reconcileServiceIDs []string
for _, s := range existingServices {
if isGlobalService(s) {
g.globalServices[s.ID] = s
g.reconcileOneService(ctx, s)
g.updateService(s)
reconcileServiceIDs = append(reconcileServiceIDs, s.ID)
}
}
g.reconcileServices(ctx, reconcileServiceIDs)
for {
select {
@ -108,14 +116,14 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
if !isGlobalService(v.Service) {
continue
}
g.globalServices[v.Service.ID] = v.Service
g.reconcileOneService(ctx, v.Service)
g.updateService(v.Service)
g.reconcileServices(ctx, []string{v.Service.ID})
case state.EventUpdateService:
if !isGlobalService(v.Service) {
continue
}
g.globalServices[v.Service.ID] = v.Service
g.reconcileOneService(ctx, v.Service)
g.updateService(v.Service)
g.reconcileServices(ctx, []string{v.Service.ID})
case state.EventDeleteService:
if !isGlobalService(v.Service) {
continue
@ -125,8 +133,10 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
delete(g.globalServices, v.Service.ID)
g.restarts.ClearServiceHistory(v.Service.ID)
case state.EventCreateNode:
g.updateNode(v.Node)
g.reconcileOneNode(ctx, v.Node)
case state.EventUpdateNode:
g.updateNode(v.Node)
switch v.Node.Status.State {
// NodeStatus_DISCONNECTED is a transient state, no need to make any change
case api.NodeStatus_DOWN:
@ -153,7 +163,7 @@ func (g *GlobalOrchestrator) Run(ctx context.Context) error {
if _, exists := g.globalServices[v.Task.ServiceID]; !exists {
continue
}
g.reconcileServiceOneNode(ctx, v.Task.ServiceID, v.Task.NodeID)
g.reconcileServicesOneNode(ctx, []string{v.Task.ServiceID}, v.Task.NodeID)
}
case <-g.stopChan:
return nil
@ -196,138 +206,225 @@ func (g *GlobalOrchestrator) removeTasksFromNode(ctx context.Context, node *api.
}
}
func (g *GlobalOrchestrator) reconcileOneService(ctx context.Context, service *api.Service) {
var (
tasks []*api.Task
err error
)
func (g *GlobalOrchestrator) reconcileServices(ctx context.Context, serviceIDs []string) {
nodeCompleted := make(map[string]map[string]struct{})
nodeTasks := make(map[string]map[string][]*api.Task)
g.store.View(func(tx store.ReadTx) {
tasks, err = store.FindTasks(tx, store.ByServiceID(service.ID))
for _, serviceID := range serviceIDs {
tasks, err := store.FindTasks(tx, store.ByServiceID(serviceID))
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices failed finding tasks for service %s", serviceID)
continue
}
// a node may have completed this service
nodeCompleted[serviceID] = make(map[string]struct{})
// nodeID -> task list
nodeTasks[serviceID] = make(map[string][]*api.Task)
for _, t := range tasks {
if isTaskRunning(t) {
// Collect all running instances of this service
nodeTasks[serviceID][t.NodeID] = append(nodeTasks[serviceID][t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, restartCondition(t)) {
nodeCompleted[serviceID][t.NodeID] = struct{}{}
}
}
}
}
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService failed finding tasks")
return
}
// a node may have completed this service
nodeCompleted := make(map[string]struct{})
// nodeID -> task list
nodeTasks := make(map[string][]*api.Task)
for _, t := range tasks {
if isTaskRunning(t) {
// Collect all running instances of this service
nodeTasks[t.NodeID] = append(nodeTasks[t.NodeID], t)
} else {
// for finished tasks, check restartPolicy
if isTaskCompleted(t, restartCondition(t)) {
nodeCompleted[t.NodeID] = struct{}{}
}
}
}
_, err = g.store.Batch(func(batch *store.Batch) error {
_, err := g.store.Batch(func(batch *store.Batch) error {
var updateTasks []slot
for nodeID := range g.nodes {
ntasks := nodeTasks[nodeID]
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[nodeID]; exists {
g.removeTasks(ctx, batch, service, ntasks)
return nil
for _, serviceID := range serviceIDs {
if _, exists := nodeTasks[serviceID]; !exists {
continue
}
// this node needs to run 1 copy of the task
if len(ntasks) == 0 {
g.addTask(ctx, batch, service, nodeID)
} else {
updateTasks = append(updateTasks, ntasks)
service := g.globalServices[serviceID]
for nodeID, node := range g.nodes {
meetsConstraints := constraint.NodeMatches(service.constraints, node)
ntasks := nodeTasks[serviceID][nodeID]
delete(nodeTasks[serviceID], nodeID)
// if restart policy considers this node has finished its task
// it should remove all running tasks
if _, exists := nodeCompleted[serviceID][nodeID]; exists || !meetsConstraints {
g.removeTasks(ctx, batch, ntasks)
continue
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update
// any tasks
continue
}
// this node needs to run 1 copy of the task
if len(ntasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
updateTasks = append(updateTasks, ntasks)
}
}
if len(updateTasks) > 0 {
g.updater.Update(ctx, g.cluster, service.Service, updateTasks)
}
// Remove any tasks assigned to nodes not found in g.nodes.
// These must be associated with nodes that are drained, or
// nodes that no longer exist.
for _, ntasks := range nodeTasks[serviceID] {
g.removeTasks(ctx, batch, ntasks)
}
}
if len(updateTasks) > 0 {
g.updater.Update(ctx, g.cluster, service, updateTasks)
}
return nil
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileOneService transaction failed")
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcileServices transaction failed")
}
}
// updateNode updates g.nodes based on the current node value
func (g *GlobalOrchestrator) updateNode(node *api.Node) {
if node.Spec.Availability == api.NodeAvailabilityDrain {
delete(g.nodes, node.ID)
} else {
g.nodes[node.ID] = node
}
}
// updateService updates g.globalServices based on the current service value
func (g *GlobalOrchestrator) updateService(service *api.Service) {
var constraints []constraint.Constraint
if service.Spec.Task.Placement != nil && len(service.Spec.Task.Placement.Constraints) != 0 {
constraints, _ = constraint.Parse(service.Spec.Task.Placement.Constraints)
}
g.globalServices[service.ID] = globalService{
Service: service,
constraints: constraints,
}
}
// reconcileOneNode checks all global services on one node
func (g *GlobalOrchestrator) reconcileOneNode(ctx context.Context, node *api.Node) {
switch node.Spec.Availability {
case api.NodeAvailabilityDrain:
if node.Spec.Availability == api.NodeAvailabilityDrain {
log.G(ctx).Debugf("global orchestrator: node %s in drain state, removing tasks from it", node.ID)
g.removeTasksFromNode(ctx, node)
delete(g.nodes, node.ID)
return
case api.NodeAvailabilityActive:
if _, exists := g.nodes[node.ID]; !exists {
log.G(ctx).Debugf("global orchestrator: node %s not in current node list, adding it", node.ID)
g.nodes[node.ID] = struct{}{}
}
default:
log.G(ctx).Debugf("global orchestrator: node %s in %s state, doing nothing", node.ID, node.Spec.Availability.String())
return
}
// typically there are only a few global services on a node
// iterate through all of them one by one. If raft store visits become a concern,
// it can be optimized.
for _, service := range g.globalServices {
g.reconcileServiceOneNode(ctx, service.ID, node.ID)
var serviceIDs []string
for id := range g.globalServices {
serviceIDs = append(serviceIDs, id)
}
g.reconcileServicesOneNode(ctx, serviceIDs, node.ID)
}
// reconcileServiceOneNode checks one service on one node
func (g *GlobalOrchestrator) reconcileServiceOneNode(ctx context.Context, serviceID string, nodeID string) {
_, exists := g.nodes[nodeID]
// reconcileServicesOneNode checks the specified services on one node
func (g *GlobalOrchestrator) reconcileServicesOneNode(ctx context.Context, serviceIDs []string, nodeID string) {
node, exists := g.nodes[nodeID]
if !exists {
return
}
service, exists := g.globalServices[serviceID]
if !exists {
return
}
// the node has completed this servie
completed := false
// tasks for this node and service
// whether each service has completed on the node
completed := make(map[string]bool)
// tasks by service
tasks := make(map[string][]*api.Task)
var (
tasks []*api.Task
err error
tasksOnNode []*api.Task
err error
)
g.store.View(func(tx store.ReadTx) {
var tasksOnNode []*api.Task
tasksOnNode, err = store.FindTasks(tx, store.ByNodeID(nodeID))
if err != nil {
return
}
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks on node %s", nodeID)
return
}
for _, serviceID := range serviceIDs {
for _, t := range tasksOnNode {
// only interested in one service
if t.ServiceID != serviceID {
continue
}
if isTaskRunning(t) {
tasks = append(tasks, t)
tasks[serviceID] = append(tasks[serviceID], t)
} else {
if isTaskCompleted(t, restartCondition(t)) {
completed = true
completed[serviceID] = true
}
}
}
})
if err != nil {
log.G(ctx).WithError(err).Errorf("global orchestrator: reconcile failed finding tasks")
return
}
_, err = g.store.Batch(func(batch *store.Batch) error {
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed {
g.removeTasks(ctx, batch, service, tasks)
return nil
}
if len(tasks) == 0 {
g.addTask(ctx, batch, service, nodeID)
for _, serviceID := range serviceIDs {
service, exists := g.globalServices[serviceID]
if !exists {
continue
}
meetsConstraints := constraint.NodeMatches(service.constraints, node)
// if restart policy considers this node has finished its task
// it should remove all running tasks
if completed[serviceID] || !meetsConstraints {
g.removeTasks(ctx, batch, tasks[serviceID])
continue
}
if node.Spec.Availability == api.NodeAvailabilityPause {
// the node is paused, so we won't add or update tasks
continue
}
if len(tasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
// If task is out of date, update it. This can happen
// on node reconciliation if, for example, we pause a
// node, update the service, and then activate the node
// later.
// We don't use g.updater here for two reasons:
// - This is not a rolling update. Since it was not
// triggered directly by updating the service, it
// should not observe the rolling update parameters
// or show status in UpdateStatus.
// - Calling Update cancels any current rolling updates
// for the service, such as one triggered by service
// reconciliation.
var (
dirtyTasks []*api.Task
cleanTasks []*api.Task
)
for _, t := range tasks[serviceID] {
if isTaskDirty(service.Service, t) {
dirtyTasks = append(dirtyTasks, t)
} else {
cleanTasks = append(cleanTasks, t)
}
}
if len(cleanTasks) == 0 {
g.addTask(ctx, batch, service.Service, nodeID)
} else {
dirtyTasks = append(dirtyTasks, cleanTasks[1:]...)
}
g.removeTasks(ctx, batch, dirtyTasks)
}
}
return nil
})
@ -383,7 +480,7 @@ func (g *GlobalOrchestrator) addTask(ctx context.Context, batch *store.Batch, se
}
}
func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, service *api.Service, tasks []*api.Task) {
func (g *GlobalOrchestrator) removeTasks(ctx context.Context, batch *store.Batch, tasks []*api.Task) {
for _, t := range tasks {
g.removeTask(ctx, batch, t)
}
@ -393,11 +490,6 @@ func isTaskRunning(t *api.Task) bool {
return t != nil && t.DesiredState <= api.TaskStateRunning && t.Status.State <= api.TaskStateRunning
}
func isValidNode(n *api.Node) bool {
// current simulation spec could be nil
return n != nil && n.Spec.Availability != api.NodeAvailabilityDrain
}
func isTaskCompleted(t *api.Task, restartPolicy api.RestartPolicy_RestartCondition) bool {
if t == nil || isTaskRunning(t) {
return false

View File

@ -489,9 +489,13 @@ func (u *Updater) removeOldTasks(ctx context.Context, batch *store.Batch, remove
return removedTask, nil
}
func isTaskDirty(s *api.Service, t *api.Task) bool {
return !reflect.DeepEqual(s.Spec.Task, t.Spec) ||
(t.Endpoint != nil && !reflect.DeepEqual(s.Spec.Endpoint, t.Endpoint.Spec))
}
func (u *Updater) isTaskDirty(t *api.Task) bool {
return !reflect.DeepEqual(u.newService.Spec.Task, t.Spec) ||
(t.Endpoint != nil && !reflect.DeepEqual(u.newService.Spec.Endpoint, t.Endpoint.Spec))
return isTaskDirty(u.newService, t)
}
func (u *Updater) isSlotDirty(slot slot) bool {

View File

@ -1,97 +0,0 @@
package scheduler
import (
"strings"
"github.com/docker/swarmkit/api"
)
const (
nodeLabelPrefix = "node.labels."
engineLabelPrefix = "engine.labels."
)
// ConstraintFilter selects only nodes that match certain labels.
type ConstraintFilter struct {
constraints []Expr
}
// SetTask returns true when the filter is enable for a given task.
func (f *ConstraintFilter) SetTask(t *api.Task) bool {
if t.Spec.Placement == nil || len(t.Spec.Placement.Constraints) == 0 {
return false
}
constraints, err := ParseExprs(t.Spec.Placement.Constraints)
if err != nil {
// constraints have been validated at controlapi
// if in any case it finds an error here, treat this task
// as constraint filter disabled.
return false
}
f.constraints = constraints
return true
}
// Check returns true if the task's constraint is supported by the given node.
func (f *ConstraintFilter) Check(n *NodeInfo) bool {
for _, constraint := range f.constraints {
switch {
case strings.EqualFold(constraint.Key, "node.id"):
if !constraint.Match(n.ID) {
return false
}
case strings.EqualFold(constraint.Key, "node.hostname"):
// if this node doesn't have hostname
// it's equivalent to match an empty hostname
// where '==' would fail, '!=' matches
if n.Description == nil {
if !constraint.Match("") {
return false
}
continue
}
if !constraint.Match(n.Description.Hostname) {
return false
}
case strings.EqualFold(constraint.Key, "node.role"):
if !constraint.Match(n.Spec.Role.String()) {
return false
}
// node labels constraint in form like 'node.labels.key==value'
case len(constraint.Key) > len(nodeLabelPrefix) && strings.EqualFold(constraint.Key[:len(nodeLabelPrefix)], nodeLabelPrefix):
if n.Spec.Annotations.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.Key[len(nodeLabelPrefix):]
// label itself is case sensitive
val := n.Spec.Annotations.Labels[label]
if !constraint.Match(val) {
return false
}
// engine labels constraint in form like 'engine.labels.key!=value'
case len(constraint.Key) > len(engineLabelPrefix) && strings.EqualFold(constraint.Key[:len(engineLabelPrefix)], engineLabelPrefix):
if n.Description == nil || n.Description.Engine == nil || n.Description.Engine.Labels == nil {
if !constraint.Match("") {
return false
}
continue
}
label := constraint.Key[len(engineLabelPrefix):]
val := n.Description.Engine.Labels[label]
if !constraint.Match(val) {
return false
}
default:
// key doesn't match predefined syntax
return false
}
}
return true
}

View File

@ -1,96 +0,0 @@
package scheduler
import (
"fmt"
"regexp"
"strings"
)
const (
eq = iota
noteq
)
var (
alphaNumeric = regexp.MustCompile(`^(?i)[a-z_][a-z0-9\-_.]+$`)
// value can be alphanumeric and some special characters. it shouldn't container
// current or future operators like '>, <, ~', etc.
valuePattern = regexp.MustCompile(`^(?i)[a-z0-9:\-_\s\.\*\(\)\?\+\[\]\\\^\$\|\/]+$`)
// operators defines list of accepted operators
operators = []string{"==", "!="}
)
// Expr defines a constraint
type Expr struct {
Key string
operator int
exp string
}
// ParseExprs parses list of constraints into Expr list
func ParseExprs(env []string) ([]Expr, error) {
exprs := []Expr{}
for _, e := range env {
found := false
// each expr is in the form of "key op value"
for i, op := range operators {
if !strings.Contains(e, op) {
continue
}
// split with the op
parts := strings.SplitN(e, op, 2)
if len(parts) < 2 {
return nil, fmt.Errorf("invalid expr: %s", e)
}
part0 := strings.TrimSpace(parts[0])
// validate Key
matched := alphaNumeric.MatchString(part0)
if matched == false {
return nil, fmt.Errorf("key '%s' is invalid", part0)
}
part1 := strings.TrimSpace(parts[1])
// validate Value
matched = valuePattern.MatchString(part1)
if matched == false {
return nil, fmt.Errorf("value '%s' is invalid", part1)
}
// TODO(dongluochen): revisit requirements to see if globing or regex are useful
exprs = append(exprs, Expr{Key: part0, operator: i, exp: part1})
found = true
break // found an op, move to next entry
}
if !found {
return nil, fmt.Errorf("constraint expected one operator from %s", strings.Join(operators, ", "))
}
}
return exprs, nil
}
// Match checks if the Expr matches the target strings.
func (e *Expr) Match(whats ...string) bool {
var match bool
// full string match
for _, what := range whats {
// case insensitive compare
if strings.EqualFold(e.exp, what) {
match = true
break
}
}
switch e.operator {
case eq:
return match
case noteq:
return !match
}
return false
}

View File

@ -1,6 +1,9 @@
package scheduler
import "github.com/docker/swarmkit/api"
import (
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/constraint"
)
// Filter checks whether the given task can run on the given node.
// A filter may only operate
@ -129,3 +132,30 @@ func (f *PluginFilter) pluginExistsOnNode(pluginType string, pluginName string,
}
return false
}
// ConstraintFilter selects only nodes that match certain labels.
type ConstraintFilter struct {
constraints []constraint.Constraint
}
// SetTask returns true when the filter is enable for a given task.
func (f *ConstraintFilter) SetTask(t *api.Task) bool {
if t.Spec.Placement == nil || len(t.Spec.Placement.Constraints) == 0 {
return false
}
constraints, err := constraint.Parse(t.Spec.Placement.Constraints)
if err != nil {
// constraints have been validated at controlapi
// if in any case it finds an error here, treat this task
// as constraint filter disabled.
return false
}
f.constraints = constraints
return true
}
// Check returns true if the task's constraint is supported by the given node.
func (f *ConstraintFilter) Check(n *NodeInfo) bool {
return constraint.NodeMatches(f.constraints, n.Node)
}

View File

@ -27,11 +27,19 @@ var (
ErrCannotUnmarshalConfig = errors.New("membership: cannot unmarshal configuration change")
)
// deferredConn used to store removed members connection for some time.
// We need this in case if removed node is redirector or endpoint of ControlAPI call.
type deferredConn struct {
tick int
conn *grpc.ClientConn
}
// Cluster represents a set of active
// raft Members
type Cluster struct {
mu sync.RWMutex
members map[uint64]*Member
mu sync.RWMutex
members map[uint64]*Member
deferedConns map[*deferredConn]struct{}
// removed contains the list of removed Members,
// those ids cannot be reused
@ -73,16 +81,13 @@ func NewCluster(heartbeatTicks int) *Cluster {
return &Cluster{
members: make(map[uint64]*Member),
removed: make(map[uint64]bool),
deferedConns: make(map[*deferredConn]struct{}),
heartbeatTicks: heartbeatTicks,
PeersBroadcast: watch.NewQueue(),
}
}
// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
func (c *Cluster) handleInactive() {
for _, m := range c.members {
if !m.active {
continue
@ -97,6 +102,25 @@ func (c *Cluster) Tick() {
}
}
func (c *Cluster) handleDeferredConns() {
for dc := range c.deferedConns {
dc.tick++
if dc.tick > c.heartbeatTicks {
dc.conn.Close()
delete(c.deferedConns, dc)
}
}
}
// Tick increases ticks for all members. After heartbeatTicks node marked as
// inactive.
func (c *Cluster) Tick() {
c.mu.Lock()
defer c.mu.Unlock()
c.handleInactive()
c.handleDeferredConns()
}
// Members returns the list of raft Members in the Cluster.
func (c *Cluster) Members() map[uint64]*Member {
members := make(map[uint64]*Member)
@ -177,7 +201,9 @@ func (c *Cluster) clearMember(id uint64) error {
m, ok := c.members[id]
if ok {
if m.Conn != nil {
m.Conn.Close()
// defer connection close to after heartbeatTicks
dConn := &deferredConn{conn: m.Conn}
c.deferedConns[dConn] = struct{}{}
}
delete(c.members, id)
}
@ -232,8 +258,13 @@ func (c *Cluster) Clear() {
}
}
for dc := range c.deferedConns {
dc.conn.Close()
}
c.members = make(map[uint64]*Member)
c.removed = make(map[uint64]bool)
c.deferedConns = make(map[*deferredConn]struct{})
c.mu.Unlock()
}

View File

@ -78,29 +78,24 @@ const (
// Node represents the Raft Node useful
// configuration.
type Node struct {
raft.Node
cluster *membership.Cluster
raftNode raft.Node
cluster *membership.Cluster
Server *grpc.Server
Ctx context.Context
cancel func()
tlsCredentials credentials.TransportCredentials
Address string
StateDir string
Server *grpc.Server
Ctx context.Context
cancel func()
raftStore *raft.MemoryStorage
memoryStore *store.MemoryStore
Config *raft.Config
opts NewNodeOptions
opts NodeOptions
reqIDGen *idutil.Generator
wait *wait
wal *wal.WAL
snapshotter *snap.Snapshotter
restored bool
campaignWhenAble bool
signalledLeadership uint32
isMember uint32
joinAddr string
// waitProp waits for all the proposals to be terminated before
// shutting down the node.
@ -110,10 +105,9 @@ type Node struct {
appliedIndex uint64
snapshotIndex uint64
ticker clock.Ticker
sendTimeout time.Duration
stopCh chan struct{}
doneCh chan struct{}
ticker clock.Ticker
stopCh chan struct{}
doneCh chan struct{}
// removeRaftCh notifies about node deletion from raft cluster
removeRaftCh chan struct{}
removeRaftFunc func()
@ -129,8 +123,8 @@ type Node struct {
asyncTasks sync.WaitGroup
}
// NewNodeOptions provides arguments for NewNode
type NewNodeOptions struct {
// NodeOptions provides node-level options.
type NodeOptions struct {
// ID is the node's ID, from its certificate's CN field.
ID string
// Addr is the address of this node's listener
@ -161,8 +155,8 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
// NewNode generates a new Raft node
func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
// NewNode generates a new Raft node.
func NewNode(ctx context.Context, opts NodeOptions) *Node {
cfg := opts.Config
if cfg == nil {
cfg = DefaultNodeConfig()
@ -170,19 +164,20 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
if opts.TickInterval == 0 {
opts.TickInterval = time.Second
}
if opts.SendTimeout == 0 {
opts.SendTimeout = 2 * time.Second
}
raftStore := raft.NewMemoryStorage()
ctx, cancel := context.WithCancel(ctx)
n := &Node{
Ctx: ctx,
cancel: cancel,
cluster: membership.NewCluster(2 * cfg.ElectionTick),
tlsCredentials: opts.TLSCredentials,
raftStore: raftStore,
Address: opts.Addr,
opts: opts,
Ctx: ctx,
cancel: cancel,
cluster: membership.NewCluster(2 * cfg.ElectionTick),
raftStore: raftStore,
opts: opts,
Config: &raft.Config{
ElectionTick: cfg.ElectionTick,
HeartbeatTick: cfg.HeartbeatTick,
@ -194,9 +189,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
stopCh: make(chan struct{}),
doneCh: make(chan struct{}),
removeRaftCh: make(chan struct{}),
StateDir: opts.StateDir,
joinAddr: opts.JoinAddr,
sendTimeout: 2 * time.Second,
leadershipBroadcast: watch.NewQueue(),
}
n.memoryStore = store.NewMemoryStore(n)
@ -206,9 +198,6 @@ func NewNode(ctx context.Context, opts NewNodeOptions) *Node {
} else {
n.ticker = opts.ClockSource.NewTicker(opts.TickInterval)
}
if opts.SendTimeout != 0 {
n.sendTimeout = opts.SendTimeout
}
n.reqIDGen = idutil.NewGenerator(uint16(n.Config.ID), time.Now())
n.wait = newWait()
@ -249,8 +238,8 @@ func (n *Node) JoinAndStart() (err error) {
n.snapshotIndex = snapshot.Metadata.Index
if loadAndStartErr == errNoWAL {
if n.joinAddr != "" {
c, err := n.ConnectToMember(n.joinAddr, 10*time.Second)
if n.opts.JoinAddr != "" {
c, err := n.ConnectToMember(n.opts.JoinAddr, 10*time.Second)
if err != nil {
return err
}
@ -262,7 +251,7 @@ func (n *Node) JoinAndStart() (err error) {
ctx, cancel := context.WithTimeout(n.Ctx, 10*time.Second)
defer cancel()
resp, err := client.Join(ctx, &api.JoinRequest{
Addr: n.Address,
Addr: n.opts.Addr,
})
if err != nil {
return err
@ -274,7 +263,7 @@ func (n *Node) JoinAndStart() (err error) {
return err
}
n.Node = raft.StartNode(n.Config, []raft.Peer{})
n.raftNode = raft.StartNode(n.Config, []raft.Peer{})
if err := n.registerNodes(resp.Members); err != nil {
if walErr := n.wal.Close(); err != nil {
@ -289,22 +278,18 @@ func (n *Node) JoinAndStart() (err error) {
if err != nil {
return err
}
n.Node = raft.StartNode(n.Config, []raft.Peer{peer})
if err := n.Campaign(n.Ctx); err != nil {
if walErr := n.wal.Close(); err != nil {
n.Config.Logger.Errorf("raft: error closing WAL: %v", walErr)
}
return err
}
n.raftNode = raft.StartNode(n.Config, []raft.Peer{peer})
n.campaignWhenAble = true
}
atomic.StoreUint32(&n.isMember, 1)
return nil
}
if n.joinAddr != "" {
if n.opts.JoinAddr != "" {
n.Config.Logger.Warning("ignoring request to join cluster, because raft state already exists")
}
n.Node = raft.RestartNode(n.Config)
n.campaignWhenAble = true
n.raftNode = raft.RestartNode(n.Config)
atomic.StoreUint32(&n.isMember, 1)
return nil
}
@ -362,9 +347,9 @@ func (n *Node) Run(ctx context.Context) error {
for {
select {
case <-n.ticker.C():
n.Tick()
n.raftNode.Tick()
n.cluster.Tick()
case rd := <-n.Ready():
case rd := <-n.raftNode.Ready():
raftConfig := DefaultRaftConfig()
n.memoryStore.View(func(readTx store.ReadTx) {
clusters, err := store.FindClusters(readTx, store.ByName(store.DefaultClusterName))
@ -457,19 +442,21 @@ func (n *Node) Run(ctx context.Context) error {
}
// Advance the state machine
n.Advance()
n.raftNode.Advance()
// If we are the only registered member after
// restoring from the state, campaign to be the
// leader.
if !n.restored {
// Node ID should be in the progress list to Campaign
if len(n.cluster.Members()) <= 1 {
if err := n.Campaign(n.Ctx); err != nil {
// On the first startup, or if we are the only
// registered member after restoring from the state,
// campaign to be the leader.
if n.campaignWhenAble {
members := n.cluster.Members()
if len(members) >= 1 {
n.campaignWhenAble = false
}
if len(members) == 1 && members[n.Config.ID] != nil {
if err := n.raftNode.Campaign(n.Ctx); err != nil {
panic("raft: cannot campaign to be the leader on node restore")
}
}
n.restored = true
}
case snapshotIndex := <-n.snapshotInProgress:
@ -517,7 +504,7 @@ func (n *Node) stop() {
n.waitProp.Wait()
n.asyncTasks.Wait()
n.Stop()
n.raftNode.Stop()
n.ticker.Stop()
if err := n.wal.Close(); err != nil {
n.Config.Logger.Errorf("raft: error closing WAL: %v", err)
@ -532,7 +519,7 @@ func (n *Node) isLeader() bool {
return false
}
if n.Node.Status().Lead == n.Config.ID {
if n.Status().Lead == n.Config.ID {
return true
}
return false
@ -549,7 +536,7 @@ func (n *Node) IsLeader() bool {
// leader returns the id of the leader, without the protection of lock and
// membership check, so it's caller task.
func (n *Node) leader() uint64 {
return n.Node.Status().Lead
return n.Status().Lead
}
// Leader returns the id of the leader, with the protection of lock
@ -859,7 +846,7 @@ func (n *Node) ProcessRaftMessage(ctx context.Context, msg *api.ProcessRaftMessa
return nil, ErrNoRaftMember
}
if err := n.Step(n.Ctx, *msg.Message); err != nil {
if err := n.raftNode.Step(n.Ctx, *msg.Message); err != nil {
return nil, err
}
@ -988,6 +975,7 @@ func (n *Node) registerNode(node *api.RaftMember) error {
}
return err
}
return nil
}
@ -1021,7 +1009,7 @@ func (n *Node) GetVersion() *api.Version {
return nil
}
status := n.Node.Status()
status := n.Status()
return &api.Version{Index: status.Commit}
}
@ -1068,6 +1056,11 @@ func (n *Node) GetMemberlist() map[uint64]*api.RaftMember {
return memberlist
}
// Status returns status of underlying etcd.Node.
func (n *Node) Status() raft.Status {
return n.raftNode.Status()
}
// GetMemberByNodeID returns member information based
// on its generic Node ID.
func (n *Node) GetMemberByNodeID(nodeID string) *membership.Member {
@ -1131,7 +1124,7 @@ func (n *Node) send(messages []raftpb.Message) error {
for _, m := range messages {
// Process locally
if m.To == n.Config.ID {
if err := n.Step(n.Ctx, m); err != nil {
if err := n.raftNode.Step(n.Ctx, m); err != nil {
return err
}
continue
@ -1160,7 +1153,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
return
}
ctx, cancel := context.WithTimeout(n.Ctx, n.sendTimeout)
ctx, cancel := context.WithTimeout(n.Ctx, n.opts.SendTimeout)
defer cancel()
var (
@ -1195,7 +1188,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
n.Config.Logger.Errorf("could not resolve address of member ID %x: %v", m.To, err)
return
}
conn, err = n.ConnectToMember(resp.Addr, n.sendTimeout)
conn, err = n.ConnectToMember(resp.Addr, n.opts.SendTimeout)
if err != nil {
n.Config.Logger.Errorf("could connect to member ID %x at %s: %v", m.To, resp.Addr, err)
return
@ -1212,13 +1205,13 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
n.removeRaftFunc()
}
if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFailure)
n.raftNode.ReportSnapshot(m.To, raft.SnapshotFailure)
}
if !n.IsMember() {
// node is removed from cluster or stopped
return
}
n.ReportUnreachable(m.To)
n.raftNode.ReportUnreachable(m.To)
lastSeenHost := n.cluster.LastSeenHost(m.To)
if lastSeenHost != "" {
@ -1246,7 +1239,7 @@ func (n *Node) sendToMember(members map[uint64]*membership.Member, m raftpb.Mess
newConn.Conn.Close()
}
} else if m.Type == raftpb.MsgSnap {
n.ReportSnapshot(m.To, raft.SnapshotFinish)
n.raftNode.ReportSnapshot(m.To, raft.SnapshotFinish)
}
}
@ -1323,7 +1316,7 @@ func (n *Node) processInternalRaftRequest(ctx context.Context, r *api.InternalRa
return nil, ErrRequestTooLarge
}
err = n.Propose(waitCtx, data)
err = n.raftNode.Propose(waitCtx, data)
if err != nil {
n.wait.cancel(r.ID)
return nil, err
@ -1351,7 +1344,7 @@ func (n *Node) configure(ctx context.Context, cc raftpb.ConfChange) error {
ctx, cancel := context.WithCancel(ctx)
ch := n.wait.register(cc.ID, nil, cancel)
if err := n.ProposeConfChange(ctx, cc); err != nil {
if err := n.raftNode.ProposeConfChange(ctx, cc); err != nil {
n.wait.cancel(cc.ID)
return err
}
@ -1449,7 +1442,7 @@ func (n *Node) processConfChange(entry raftpb.Entry) {
n.wait.trigger(cc.ID, err)
}
n.confState = *n.ApplyConfChange(cc)
n.confState = *n.raftNode.ApplyConfChange(cc)
n.wait.trigger(cc.ID, nil)
}
@ -1520,7 +1513,7 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
// to be the leader.
if cc.NodeID == n.leader() && !n.isLeader() {
if err = n.Campaign(n.Ctx); err != nil {
if err = n.raftNode.Campaign(n.Ctx); err != nil {
return err
}
}
@ -1548,7 +1541,7 @@ func (n *Node) applyRemoveNode(cc raftpb.ConfChange) (err error) {
// ConnectToMember returns a member object with an initialized
// connection to communicate with other raft members
func (n *Node) ConnectToMember(addr string, timeout time.Duration) (*membership.Member, error) {
conn, err := dial(addr, "tcp", n.tlsCredentials, timeout)
conn, err := dial(addr, "tcp", n.opts.TLSCredentials, timeout)
if err != nil {
return nil, err
}

View File

@ -26,19 +26,19 @@ import (
var errNoWAL = errors.New("no WAL present")
func (n *Node) legacyWALDir() string {
return filepath.Join(n.StateDir, "wal")
return filepath.Join(n.opts.StateDir, "wal")
}
func (n *Node) walDir() string {
return filepath.Join(n.StateDir, "wal-v3")
return filepath.Join(n.opts.StateDir, "wal-v3")
}
func (n *Node) legacySnapDir() string {
return filepath.Join(n.StateDir, "snap")
return filepath.Join(n.opts.StateDir, "snap")
}
func (n *Node) snapDir() string {
return filepath.Join(n.StateDir, "snap-v3")
return filepath.Join(n.opts.StateDir, "snap-v3")
}
func (n *Node) loadAndStart(ctx context.Context, forceNewCluster bool) error {
@ -189,7 +189,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
raftNode := &api.RaftMember{
RaftID: n.Config.ID,
NodeID: nodeID,
Addr: n.Address,
Addr: n.opts.Addr,
}
metadata, err := raftNode.Marshal()
if err != nil {
@ -207,7 +207,7 @@ func (n *Node) createWAL(nodeID string) (raft.Peer, error) {
// moveWALAndSnap moves away the WAL and snapshot because we were removed
// from the cluster and will need to recreate them if we are readded.
func (n *Node) moveWALAndSnap() error {
newWALDir, err := ioutil.TempDir(n.StateDir, "wal.")
newWALDir, err := ioutil.TempDir(n.opts.StateDir, "wal.")
if err != nil {
return err
}
@ -216,7 +216,7 @@ func (n *Node) moveWALAndSnap() error {
return err
}
newSnapDir, err := ioutil.TempDir(n.StateDir, "snap.")
newSnapDir, err := ioutil.TempDir(n.opts.StateDir, "snap.")
if err != nil {
return err
}