2016-11-16 17:17:18 -05:00
package cluster
import (
"path/filepath"
"runtime"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/executor/container"
swarmapi "github.com/docker/swarmkit/api"
swarmnode "github.com/docker/swarmkit/node"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
)
// nodeRunner implements a manager for continuously running swarmkit node, restarting them with backoff delays if needed.
type nodeRunner struct {
nodeState
mu sync . RWMutex
done chan struct { } // closed when swarmNode exits
ready chan struct { } // closed when swarmNode becomes active
reconnectDelay time . Duration
config nodeStartConfig
repeatedRun bool
cancelReconnect func ( )
stopping bool
cluster * Cluster // only for accessing config helpers, never call any methods. TODO: change to config struct
}
// nodeStartConfig holds configuration needed to start a new node. Exported
// fields of this structure are saved to disk in json. Unexported fields
// contain data that shouldn't be persisted between daemon reloads.
type nodeStartConfig struct {
// LocalAddr is this machine's local IP or hostname, if specified.
LocalAddr string
// RemoteAddr is the address that was given to "swarm join". It is used
// to find LocalAddr if necessary.
RemoteAddr string
// ListenAddr is the address we bind to, including a port.
ListenAddr string
// AdvertiseAddr is the address other nodes should connect to,
// including a port.
AdvertiseAddr string
joinAddr string
forceNewCluster bool
joinToken string
lockKey [ ] byte
autolock bool
}
func ( n * nodeRunner ) Ready ( ) chan error {
c := make ( chan error , 1 )
n . mu . RLock ( )
ready , done := n . ready , n . done
n . mu . RUnlock ( )
go func ( ) {
select {
case <- ready :
case <- done :
}
select {
case <- ready :
default :
n . mu . RLock ( )
c <- n . err
n . mu . RUnlock ( )
}
close ( c )
} ( )
return c
}
func ( n * nodeRunner ) Start ( conf nodeStartConfig ) error {
n . mu . Lock ( )
defer n . mu . Unlock ( )
n . reconnectDelay = initialReconnectDelay
return n . start ( conf )
}
func ( n * nodeRunner ) start ( conf nodeStartConfig ) error {
var control string
if runtime . GOOS == "windows" {
control = ` \\.\pipe\ ` + controlSocket
} else {
control = filepath . Join ( n . cluster . runtimeRoot , controlSocket )
}
node , err := swarmnode . New ( & swarmnode . Config {
Hostname : n . cluster . config . Name ,
ForceNewCluster : conf . forceNewCluster ,
ListenControlAPI : control ,
ListenRemoteAPI : conf . ListenAddr ,
AdvertiseRemoteAPI : conf . AdvertiseAddr ,
JoinAddr : conf . joinAddr ,
StateDir : n . cluster . root ,
JoinToken : conf . joinToken ,
Executor : container . NewExecutor ( n . cluster . config . Backend ) ,
HeartbeatTick : 1 ,
ElectionTick : 3 ,
UnlockKey : conf . lockKey ,
AutoLockManagers : conf . autolock ,
} )
if err != nil {
return err
}
if err := node . Start ( context . Background ( ) ) ; err != nil {
return err
}
n . done = make ( chan struct { } )
n . ready = make ( chan struct { } )
n . swarmNode = node
n . config = conf
savePersistentState ( n . cluster . root , conf )
ctx , cancel := context . WithCancel ( context . Background ( ) )
go func ( ) {
n . handleNodeExit ( node )
cancel ( )
} ( )
go n . handleReadyEvent ( ctx , node , n . ready )
go n . handleControlSocketChange ( ctx , node )
return nil
}
func ( n * nodeRunner ) handleControlSocketChange ( ctx context . Context , node * swarmnode . Node ) {
for conn := range node . ListenControlSocket ( ctx ) {
n . mu . Lock ( )
if n . grpcConn != conn {
if conn == nil {
n . controlClient = nil
n . logsClient = nil
} else {
n . controlClient = swarmapi . NewControlClient ( conn )
n . logsClient = swarmapi . NewLogsClient ( conn )
}
}
n . grpcConn = conn
n . mu . Unlock ( )
n . cluster . configEvent <- struct { } { }
}
}
func ( n * nodeRunner ) handleReadyEvent ( ctx context . Context , node * swarmnode . Node , ready chan struct { } ) {
select {
case <- node . Ready ( ) :
n . mu . Lock ( )
n . err = nil
n . mu . Unlock ( )
close ( ready )
case <- ctx . Done ( ) :
}
n . cluster . configEvent <- struct { } { }
}
func ( n * nodeRunner ) handleNodeExit ( node * swarmnode . Node ) {
err := detectLockedError ( node . Err ( context . Background ( ) ) )
if err != nil {
logrus . Errorf ( "cluster exited with error: %v" , err )
}
n . mu . Lock ( )
n . swarmNode = nil
n . err = err
close ( n . done )
select {
case <- n . ready :
n . enableReconnectWatcher ( )
default :
if n . repeatedRun {
n . enableReconnectWatcher ( )
}
}
n . repeatedRun = true
n . mu . Unlock ( )
}
// Stop stops the current swarm node if it is running.
func ( n * nodeRunner ) Stop ( ) error {
n . mu . Lock ( )
if n . cancelReconnect != nil { // between restarts
n . cancelReconnect ( )
n . cancelReconnect = nil
}
if n . swarmNode == nil {
n . mu . Unlock ( )
return nil
}
n . stopping = true
ctx , cancel := context . WithTimeout ( context . Background ( ) , 15 * time . Second )
defer cancel ( )
if err := n . swarmNode . Stop ( ctx ) ; err != nil && ! strings . Contains ( err . Error ( ) , "context canceled" ) {
n . mu . Unlock ( )
return err
}
n . mu . Unlock ( )
<- n . done
return nil
}
func ( n * nodeRunner ) State ( ) nodeState {
if n == nil {
return nodeState { status : types . LocalNodeStateInactive }
}
n . mu . RLock ( )
defer n . mu . RUnlock ( )
ns := n . nodeState
if ns . err != nil || n . cancelReconnect != nil {
2016-12-02 04:14:32 -05:00
if errors . Cause ( ns . err ) == errSwarmLocked {
2016-11-16 17:17:18 -05:00
ns . status = types . LocalNodeStateLocked
} else {
ns . status = types . LocalNodeStateError
}
} else {
select {
case <- n . ready :
ns . status = types . LocalNodeStateActive
default :
ns . status = types . LocalNodeStatePending
}
}
return ns
}
func ( n * nodeRunner ) enableReconnectWatcher ( ) {
if n . stopping {
return
}
n . reconnectDelay *= 2
if n . reconnectDelay > maxReconnectDelay {
n . reconnectDelay = maxReconnectDelay
}
logrus . Warnf ( "Restarting swarm in %.2f seconds" , n . reconnectDelay . Seconds ( ) )
delayCtx , cancel := context . WithTimeout ( context . Background ( ) , n . reconnectDelay )
n . cancelReconnect = cancel
config := n . config
go func ( ) {
<- delayCtx . Done ( )
if delayCtx . Err ( ) != context . DeadlineExceeded {
return
}
n . mu . Lock ( )
defer n . mu . Unlock ( )
if n . stopping {
return
}
config . RemoteAddr = n . cluster . getRemoteAddress ( )
config . joinAddr = config . RemoteAddr
if err := n . start ( config ) ; err != nil {
n . err = err
}
} ( )
}
// nodeState represents information about the current state of the cluster and
// provides access to the grpc clients.
type nodeState struct {
swarmNode * swarmnode . Node
grpcConn * grpc . ClientConn
controlClient swarmapi . ControlClient
logsClient swarmapi . LogsClient
status types . LocalNodeState
actualLocalAddr string
err error
}
// IsActiveManager returns true if node is a manager ready to accept control requests. It is safe to access the client properties if this returns true.
func ( ns nodeState ) IsActiveManager ( ) bool {
return ns . controlClient != nil
}
// IsManager returns true if node is a manager.
func ( ns nodeState ) IsManager ( ) bool {
return ns . swarmNode != nil && ns . swarmNode . Manager ( ) != nil
}
// NodeID returns node's ID or empty string if node is inactive.
func ( ns nodeState ) NodeID ( ) string {
if ns . swarmNode != nil {
return ns . swarmNode . NodeID ( )
}
return ""
}