diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index cdfe0b41ba..7a9cbace30 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -28,25 +28,28 @@ import ( const swarmDirName = "swarm" const controlSocket = "control.sock" -const swarmConnectTimeout = 10 * time.Second +const swarmConnectTimeout = 20 * time.Second const stateFile = "docker-state.json" const ( initialReconnectDelay = 100 * time.Millisecond - maxReconnectDelay = 10 * time.Second + maxReconnectDelay = 30 * time.Second ) // ErrNoManager is returned then a manager-only function is called on non-manager -var ErrNoManager = fmt.Errorf("this node is not participating as a Swarm manager") +var ErrNoManager = fmt.Errorf("This node is not participating as a Swarm manager") // ErrNoSwarm is returned on leaving a cluster that was never initialized -var ErrNoSwarm = fmt.Errorf("this node is not part of Swarm") +var ErrNoSwarm = fmt.Errorf("This node is not part of Swarm") // ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated -var ErrSwarmExists = fmt.Errorf("this node is already part of a Swarm") +var ErrSwarmExists = fmt.Errorf("This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one.") + +// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet. +var ErrPendingSwarmExists = fmt.Errorf("This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request.") // ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached. -var ErrSwarmJoinTimeoutReached = fmt.Errorf("timeout reached before node was joined") +var ErrSwarmJoinTimeoutReached = fmt.Errorf("Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node.") type state struct { ListenAddr string @@ -249,13 +252,14 @@ func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secre // Init initializes new cluster from user provided request. func (c *Cluster) Init(req types.InitRequest) (string, error) { c.Lock() - if c.node != nil { + if node := c.node; node != nil { c.Unlock() if !req.ForceNewCluster { - return "", ErrSwarmExists + return "", errSwarmExists(node) } ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) defer cancel() + c.cancelReconnect() if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { return "", err } @@ -297,9 +301,9 @@ func (c *Cluster) Init(req types.InitRequest) (string, error) { // Join makes current Cluster part of an existing swarm cluster. func (c *Cluster) Join(req types.JoinRequest) error { c.Lock() - if c.node != nil { + if node := c.node; node != nil { c.Unlock() - return ErrSwarmExists + return errSwarmExists(node) } // todo: check current state existing if len(req.RemoteAddrs) == 0 { @@ -312,23 +316,29 @@ func (c *Cluster) Join(req types.JoinRequest) error { } c.Unlock() - select { - case <-time.After(swarmConnectTimeout): - go c.reconnectOnFailure(ctx) - if nodeid := n.NodeID(); nodeid != "" { - return fmt.Errorf("Timeout reached before node was joined. Your cluster settings may be preventing this node from automatically joining. To accept this node into cluster run `docker node accept %v` in an existing cluster manager", nodeid) + certificateRequested := n.CertificateRequested() + for { + select { + case <-certificateRequested: + if n.NodeMembership() == swarmapi.NodeMembershipPending { + return fmt.Errorf("Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node.", n.NodeID()) + } + certificateRequested = nil + case <-time.After(swarmConnectTimeout): + // attempt to connect will continue in background, also reconnecting + go c.reconnectOnFailure(ctx) + return ErrSwarmJoinTimeoutReached + case <-n.Ready(): + go c.reconnectOnFailure(ctx) + return nil + case <-ctx.Done(): + c.RLock() + defer c.RUnlock() + if c.err != nil { + return c.err + } + return ctx.Err() } - return ErrSwarmJoinTimeoutReached - case <-n.Ready(): - go c.reconnectOnFailure(ctx) - return nil - case <-ctx.Done(): - c.RLock() - defer c.RUnlock() - if c.err != nil { - return c.err - } - return ctx.Err() } } @@ -1004,6 +1014,13 @@ func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, return } +func errSwarmExists(node *swarmagent.Node) error { + if node.NodeMembership() != swarmapi.NodeMembershipAccepted { + return ErrPendingSwarmExists + } + return ErrSwarmExists +} + func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error { ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) for conn := range node.ListenControlSocket(ctx) { diff --git a/integration-cli/daemon_swarm.go b/integration-cli/daemon_swarm.go index ac54cbfb86..6c18daee19 100644 --- a/integration-cli/daemon_swarm.go +++ b/integration-cli/daemon_swarm.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/docker/docker/pkg/integration/checker" "github.com/docker/engine-api/types" @@ -167,14 +168,22 @@ func (d *SwarmDaemon) getNode(c *check.C, id string) *swarm.Node { return &node } -func (d *SwarmDaemon) updateNode(c *check.C, node *swarm.Node, f ...nodeConstructor) { - for _, fn := range f { - fn(node) +func (d *SwarmDaemon) updateNode(c *check.C, id string, f ...nodeConstructor) { + for i := 0; ; i++ { + node := d.getNode(c, id) + for _, fn := range f { + fn(node) + } + url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index) + status, out, err := d.SockRequest("POST", url, node.Spec) + if i < 10 && strings.Contains(string(out), "update out of sequence") { + time.Sleep(100 * time.Millisecond) + continue + } + c.Assert(err, checker.IsNil) + c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out))) + return } - url := fmt.Sprintf("/nodes/%s/update?version=%d", node.ID, node.Version.Index) - status, out, err := d.SockRequest("POST", url, node.Spec) - c.Assert(err, checker.IsNil) - c.Assert(status, checker.Equals, http.StatusOK, check.Commentf("output: %q", string(out))) } func (d *SwarmDaemon) listNodes(c *check.C) []swarm.Node { diff --git a/integration-cli/docker_api_swarm_test.go b/integration-cli/docker_api_swarm_test.go index d76c6ac0e4..ca3aed235c 100644 --- a/integration-cli/docker_api_swarm_test.go +++ b/integration-cli/docker_api_swarm_test.go @@ -82,7 +82,7 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin err := d2.Join(d1.listenAddr, "", "", false) c.Assert(err, checker.NotNil) if secret == "" { - c.Assert(err.Error(), checker.Contains, "Timeout reached") + c.Assert(err.Error(), checker.Contains, "needs to be accepted") info, err := d2.info() c.Assert(err, checker.IsNil) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending) @@ -97,23 +97,25 @@ func (s *DockerSwarmSuite) testAPISwarmManualAcceptance(c *check.C, secret strin c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateInactive) } d3 := s.AddDaemon(c, false, false) - go func() { - for i := 0; ; i++ { - info, err := d3.info() - c.Assert(err, checker.IsNil) - if info.NodeID != "" { - d1.updateNode(c, d1.getNode(c, info.NodeID), func(n *swarm.Node) { - n.Spec.Membership = swarm.NodeMembershipAccepted - }) - return - } - if i >= 10 { - c.Errorf("could not find nodeID") - } - time.Sleep(300 * time.Millisecond) + c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.NotNil) + info, err := d3.info() + c.Assert(err, checker.IsNil) + c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStatePending) + c.Assert(len(info.NodeID), checker.GreaterThan, 5) + d1.updateNode(c, info.NodeID, func(n *swarm.Node) { + n.Spec.Membership = swarm.NodeMembershipAccepted + }) + for i := 0; ; i++ { + info, err := d3.info() + c.Assert(err, checker.IsNil) + if info.LocalNodeState == swarm.LocalNodeStateActive { + break } - }() - c.Assert(d3.Join(d1.listenAddr, secret, "", false), checker.IsNil) + if i > 10 { + c.Errorf("node did not become active") + } + time.Sleep(200 * time.Millisecond) + } } func (s *DockerSwarmSuite) TestApiSwarmSecretAcceptance(c *check.C) { @@ -236,7 +238,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) { c.Assert(info.ControlAvailable, checker.Equals, false) c.Assert(info.LocalNodeState, checker.Equals, swarm.LocalNodeStateActive) - d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) { + d1.updateNode(c, d2.NodeID, func(n *swarm.Node) { n.Spec.Role = swarm.NodeRoleManager }) @@ -255,7 +257,7 @@ func (s *DockerSwarmSuite) TestApiSwarmPromoteDemote(c *check.C) { time.Sleep(100 * time.Millisecond) } - d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) { + d1.updateNode(c, d2.NodeID, func(n *swarm.Node) { n.Spec.Role = swarm.NodeRoleWorker }) @@ -466,7 +468,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeUpdate(c *check.C) { nodes := d.listNodes(c) - d.updateNode(c, d.getNode(c, nodes[0].ID), func(n *swarm.Node) { + d.updateNode(c, nodes[0].ID, func(n *swarm.Node) { n.Spec.Availability = swarm.NodeAvailabilityPause }) @@ -489,14 +491,14 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) { waitAndAssert(c, defaultReconciliationTimeout, reducedCheck(sumAsIntegers, d1.checkActiveContainerCount, d2.checkActiveContainerCount), checker.Equals, instances) // drain d2, all containers should move to d1 - d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) { + d1.updateNode(c, d2.NodeID, func(n *swarm.Node) { n.Spec.Availability = swarm.NodeAvailabilityDrain }) waitAndAssert(c, defaultReconciliationTimeout, d1.checkActiveContainerCount, checker.Equals, instances) waitAndAssert(c, defaultReconciliationTimeout, d2.checkActiveContainerCount, checker.Equals, 0) // set d2 back to active - d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) { + d1.updateNode(c, d2.NodeID, func(n *swarm.Node) { n.Spec.Availability = swarm.NodeAvailabilityActive }) @@ -516,7 +518,7 @@ func (s *DockerSwarmSuite) TestApiSwarmNodeDrainPause(c *check.C) { d2ContainerCount := len(d2.activeContainers()) // set d2 to paused, scale service up, only d1 gets new tasks - d1.updateNode(c, d1.getNode(c, d2.NodeID), func(n *swarm.Node) { + d1.updateNode(c, d2.NodeID, func(n *swarm.Node) { n.Spec.Availability = swarm.NodeAvailabilityPause })