2018-02-05 16:05:59 -05:00
package cluster // import "github.com/docker/docker/daemon/cluster"
2016-11-16 17:17:18 -05:00
import (
2018-04-19 18:30:59 -04:00
"context"
2016-12-21 21:06:16 -05:00
"fmt"
2016-11-16 17:17:18 -05:00
"path/filepath"
"strings"
"sync"
"time"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/executor/container"
2021-04-05 20:24:47 -04:00
lncluster "github.com/docker/docker/libnetwork/cluster"
2022-04-21 17:33:07 -04:00
swarmapi "github.com/moby/swarmkit/v2/api"
swarmallocator "github.com/moby/swarmkit/v2/manager/allocator/cnmallocator"
swarmnode "github.com/moby/swarmkit/v2/node"
2016-11-16 17:17:18 -05:00
"github.com/pkg/errors"
2017-07-26 17:42:13 -04:00
"github.com/sirupsen/logrus"
2016-11-16 17:17:18 -05:00
"google.golang.org/grpc"
2017-11-14 17:00:47 -05:00
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
2016-11-16 17:17:18 -05:00
)
// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
type nodeRunner struct {
nodeState
mu sync . RWMutex
done chan struct { } // closed when swarmNode exits
ready chan struct { } // closed when swarmNode becomes active
reconnectDelay time . Duration
config nodeStartConfig
repeatedRun bool
cancelReconnect func ( )
stopping bool
cluster * Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
}
// nodeStartConfig holds configuration needed to start a new node. Exported
// fields of this structure are saved to disk in json. Unexported fields
// contain data that shouldn't be persisted between daemon reloads.
type nodeStartConfig struct {
// LocalAddr is this machine's local IP or hostname, if specified.
LocalAddr string
// RemoteAddr is the address that was given to "swarm join". It is used
// to find LocalAddr if necessary.
RemoteAddr string
// ListenAddr is the address we bind to, including a port.
ListenAddr string
// AdvertiseAddr is the address other nodes should connect to,
// including a port.
2017-04-14 19:54:17 -04:00
AdvertiseAddr string
// DataPathAddr is the address that has to be used for the data path
DataPathAddr string
2018-07-30 11:25:02 -04:00
// DefaultAddressPool contains list of subnets
DefaultAddressPool [ ] string
// SubnetSize contains subnet size of DefaultAddressPool
SubnetSize uint32
2018-11-20 16:44:40 -05:00
// DataPathPort contains Data path port (VXLAN UDP port) number that is used for data traffic.
DataPathPort uint32
2017-05-23 17:27:31 -04:00
// JoinInProgress is set to true if a join operation has started, but
// not completed yet.
JoinInProgress bool
2017-04-14 19:54:17 -04:00
2016-11-16 17:17:18 -05:00
joinAddr string
forceNewCluster bool
joinToken string
lockKey [ ] byte
autolock bool
2016-12-21 21:06:16 -05:00
availability types . NodeAvailability
2016-11-16 17:17:18 -05:00
}
func ( n * nodeRunner ) Ready ( ) chan error {
c := make ( chan error , 1 )
n . mu . RLock ( )
ready , done := n . ready , n . done
n . mu . RUnlock ( )
go func ( ) {
select {
case <- ready :
case <- done :
}
select {
case <- ready :
default :
n . mu . RLock ( )
c <- n . err
n . mu . RUnlock ( )
}
close ( c )
} ( )
return c
}
func ( n * nodeRunner ) Start ( conf nodeStartConfig ) error {
n . mu . Lock ( )
defer n . mu . Unlock ( )
n . reconnectDelay = initialReconnectDelay
return n . start ( conf )
}
func ( n * nodeRunner ) start ( conf nodeStartConfig ) error {
var control string
2019-10-12 20:29:21 -04:00
if isWindows {
2016-11-16 17:17:18 -05:00
control = ` \\.\pipe\ ` + controlSocket
} else {
control = filepath . Join ( n . cluster . runtimeRoot , controlSocket )
}
2017-05-23 17:27:31 -04:00
joinAddr := conf . joinAddr
if joinAddr == "" && conf . JoinInProgress {
// We must have been restarted while trying to join a cluster.
// Continue trying to join instead of forming our own cluster.
joinAddr = conf . RemoteAddr
}
2017-01-18 16:37:43 -05:00
// Hostname is not set here. Instead, it is obtained from
// the node description that is reported periodically
2016-12-21 21:06:16 -05:00
swarmnodeConfig := swarmnode . Config {
2016-11-16 17:17:18 -05:00
ForceNewCluster : conf . forceNewCluster ,
ListenControlAPI : control ,
ListenRemoteAPI : conf . ListenAddr ,
AdvertiseRemoteAPI : conf . AdvertiseAddr ,
2018-08-29 23:28:22 -04:00
NetworkConfig : & swarmallocator . NetworkConfig {
DefaultAddrPool : conf . DefaultAddressPool ,
SubnetSize : conf . SubnetSize ,
2018-11-20 16:44:40 -05:00
VXLANUDPPort : conf . DataPathPort ,
2018-08-29 23:28:22 -04:00
} ,
JoinAddr : joinAddr ,
StateDir : n . cluster . root ,
JoinToken : conf . joinToken ,
2018-02-02 17:18:46 -05:00
Executor : container . NewExecutor (
n . cluster . config . Backend ,
n . cluster . config . PluginBackend ,
2018-03-22 17:11:03 -04:00
n . cluster . config . ImageBackend ,
n . cluster . config . VolumeBackend ,
) ,
2018-03-28 19:54:43 -04:00
HeartbeatTick : n . cluster . config . RaftHeartbeatTick ,
2018-03-22 18:04:10 -04:00
// Recommended value in etcd/raft is 10 x (HeartbeatTick).
// Lower values were seen to have caused instability because of
// frequent leader elections when running on flakey networks.
2018-03-28 19:54:43 -04:00
ElectionTick : n . cluster . config . RaftElectionTick ,
2018-02-02 17:18:46 -05:00
UnlockKey : conf . lockKey ,
AutoLockManagers : conf . autolock ,
PluginGetter : n . cluster . config . Backend . PluginGetter ( ) ,
2016-12-21 21:06:16 -05:00
}
if conf . availability != "" {
avail , ok := swarmapi . NodeSpec_Availability_value [ strings . ToUpper ( string ( conf . availability ) ) ]
if ! ok {
return fmt . Errorf ( "invalid Availability: %q" , conf . availability )
}
swarmnodeConfig . Availability = swarmapi . NodeSpec_Availability ( avail )
}
node , err := swarmnode . New ( & swarmnodeConfig )
2016-11-16 17:17:18 -05:00
if err != nil {
return err
}
if err := node . Start ( context . Background ( ) ) ; err != nil {
return err
}
n . done = make ( chan struct { } )
n . ready = make ( chan struct { } )
n . swarmNode = node
2017-05-23 17:27:31 -04:00
if conf . joinAddr != "" {
conf . JoinInProgress = true
}
2016-11-16 17:17:18 -05:00
n . config = conf
savePersistentState ( n . cluster . root , conf )
ctx , cancel := context . WithCancel ( context . Background ( ) )
go func ( ) {
n . handleNodeExit ( node )
cancel ( )
} ( )
go n . handleReadyEvent ( ctx , node , n . ready )
go n . handleControlSocketChange ( ctx , node )
return nil
}
func ( n * nodeRunner ) handleControlSocketChange ( ctx context . Context , node * swarmnode . Node ) {
for conn := range node . ListenControlSocket ( ctx ) {
n . mu . Lock ( )
if n . grpcConn != conn {
if conn == nil {
n . controlClient = nil
n . logsClient = nil
} else {
n . controlClient = swarmapi . NewControlClient ( conn )
n . logsClient = swarmapi . NewLogsClient ( conn )
2017-04-02 18:21:56 -04:00
// push store changes to daemon
go n . watchClusterEvents ( ctx , conn )
2016-11-16 17:17:18 -05:00
}
}
n . grpcConn = conn
n . mu . Unlock ( )
2017-04-30 17:51:43 -04:00
n . cluster . SendClusterEvent ( lncluster . EventSocketChange )
2016-11-16 17:17:18 -05:00
}
}
2017-04-02 18:21:56 -04:00
func ( n * nodeRunner ) watchClusterEvents ( ctx context . Context , conn * grpc . ClientConn ) {
client := swarmapi . NewWatchClient ( conn )
watch , err := client . Watch ( ctx , & swarmapi . WatchRequest {
Entries : [ ] * swarmapi . WatchRequest_WatchEntry {
{
Kind : "node" ,
Action : swarmapi . WatchActionKindCreate | swarmapi . WatchActionKindUpdate | swarmapi . WatchActionKindRemove ,
} ,
{
Kind : "service" ,
Action : swarmapi . WatchActionKindCreate | swarmapi . WatchActionKindUpdate | swarmapi . WatchActionKindRemove ,
} ,
{
Kind : "network" ,
Action : swarmapi . WatchActionKindCreate | swarmapi . WatchActionKindUpdate | swarmapi . WatchActionKindRemove ,
} ,
{
Kind : "secret" ,
Action : swarmapi . WatchActionKindCreate | swarmapi . WatchActionKindUpdate | swarmapi . WatchActionKindRemove ,
} ,
2017-07-09 23:41:25 -04:00
{
Kind : "config" ,
Action : swarmapi . WatchActionKindCreate | swarmapi . WatchActionKindUpdate | swarmapi . WatchActionKindRemove ,
} ,
2017-04-02 18:21:56 -04:00
} ,
IncludeOldObject : true ,
} )
if err != nil {
logrus . WithError ( err ) . Error ( "failed to watch cluster store" )
return
}
for {
msg , err := watch . Recv ( )
if err != nil {
// store watch is broken
2017-11-14 17:00:47 -05:00
errStatus , ok := status . FromError ( err )
if ! ok || errStatus . Code ( ) != codes . Canceled {
logrus . WithError ( err ) . Error ( "failed to receive changes from store watch API" )
}
2017-04-02 18:21:56 -04:00
return
}
select {
case <- ctx . Done ( ) :
return
case n . cluster . watchStream <- msg :
}
}
}
2016-11-16 17:17:18 -05:00
func ( n * nodeRunner ) handleReadyEvent ( ctx context . Context , node * swarmnode . Node , ready chan struct { } ) {
select {
case <- node . Ready ( ) :
n . mu . Lock ( )
n . err = nil
2017-05-23 17:27:31 -04:00
if n . config . JoinInProgress {
n . config . JoinInProgress = false
savePersistentState ( n . cluster . root , n . config )
}
2016-11-16 17:17:18 -05:00
n . mu . Unlock ( )
close ( ready )
case <- ctx . Done ( ) :
}
2017-04-30 17:51:43 -04:00
n . cluster . SendClusterEvent ( lncluster . EventNodeReady )
2016-11-16 17:17:18 -05:00
}
func ( n * nodeRunner ) handleNodeExit ( node * swarmnode . Node ) {
err := detectLockedError ( node . Err ( context . Background ( ) ) )
if err != nil {
logrus . Errorf ( "cluster exited with error: %v" , err )
}
n . mu . Lock ( )
n . swarmNode = nil
n . err = err
close ( n . done )
select {
case <- n . ready :
n . enableReconnectWatcher ( )
default :
if n . repeatedRun {
n . enableReconnectWatcher ( )
}
}
n . repeatedRun = true
n . mu . Unlock ( )
}
// Stop stops the current swarm node if it is running.
func ( n * nodeRunner ) Stop ( ) error {
n . mu . Lock ( )
if n . cancelReconnect != nil { // between restarts
n . cancelReconnect ( )
n . cancelReconnect = nil
}
if n . swarmNode == nil {
2018-11-27 02:06:23 -05:00
// even though the swarm node is nil we still may need
// to send a node leave event to perform any cleanup required.
if n . cluster != nil {
n . cluster . SendClusterEvent ( lncluster . EventNodeLeave )
}
2016-11-16 17:17:18 -05:00
n . mu . Unlock ( )
return nil
}
n . stopping = true
ctx , cancel := context . WithTimeout ( context . Background ( ) , 15 * time . Second )
defer cancel ( )
2017-04-07 21:27:35 -04:00
n . mu . Unlock ( )
2016-11-16 17:17:18 -05:00
if err := n . swarmNode . Stop ( ctx ) ; err != nil && ! strings . Contains ( err . Error ( ) , "context canceled" ) {
return err
}
2017-04-30 17:51:43 -04:00
n . cluster . SendClusterEvent ( lncluster . EventNodeLeave )
2016-11-16 17:17:18 -05:00
<- n . done
return nil
}
func ( n * nodeRunner ) State ( ) nodeState {
if n == nil {
return nodeState { status : types . LocalNodeStateInactive }
}
n . mu . RLock ( )
defer n . mu . RUnlock ( )
ns := n . nodeState
if ns . err != nil || n . cancelReconnect != nil {
2020-04-17 06:01:01 -04:00
if errors . Is ( ns . err , errSwarmLocked ) {
2016-11-16 17:17:18 -05:00
ns . status = types . LocalNodeStateLocked
} else {
ns . status = types . LocalNodeStateError
}
} else {
select {
case <- n . ready :
ns . status = types . LocalNodeStateActive
default :
ns . status = types . LocalNodeStatePending
}
}
return ns
}
func ( n * nodeRunner ) enableReconnectWatcher ( ) {
if n . stopping {
return
}
n . reconnectDelay *= 2
if n . reconnectDelay > maxReconnectDelay {
n . reconnectDelay = maxReconnectDelay
}
logrus . Warnf ( "Restarting swarm in %.2f seconds" , n . reconnectDelay . Seconds ( ) )
delayCtx , cancel := context . WithTimeout ( context . Background ( ) , n . reconnectDelay )
n . cancelReconnect = cancel
go func ( ) {
<- delayCtx . Done ( )
if delayCtx . Err ( ) != context . DeadlineExceeded {
return
}
n . mu . Lock ( )
defer n . mu . Unlock ( )
if n . stopping {
return
}
2017-04-27 20:06:16 -04:00
2017-05-23 17:27:31 -04:00
if err := n . start ( n . config ) ; err != nil {
2016-11-16 17:17:18 -05:00
n . err = err
}
} ( )
}
// nodeState represents information about the current state of the cluster and
// provides access to the grpc clients.
type nodeState struct {
swarmNode * swarmnode . Node
grpcConn * grpc . ClientConn
controlClient swarmapi . ControlClient
logsClient swarmapi . LogsClient
status types . LocalNodeState
actualLocalAddr string
err error
}
// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
func ( ns nodeState ) IsActiveManager ( ) bool {
return ns . controlClient != nil
}
// IsManager returns true if node is a manager.
func ( ns nodeState ) IsManager ( ) bool {
return ns . swarmNode != nil && ns . swarmNode . Manager ( ) != nil
}
// NodeID returns node's ID or empty string if node is inactive.
func ( ns nodeState ) NodeID ( ) string {
if ns . swarmNode != nil {
return ns . swarmNode . NodeID ( )
}
return ""
}