2017-02-11 13:53:03 -05:00
package cluster
import (
"fmt"
"net"
"strings"
"time"
"github.com/Sirupsen/logrus"
apierrors "github.com/docker/docker/api/errors"
apitypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
types "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/daemon/cluster/convert"
"github.com/docker/docker/opts"
"github.com/docker/docker/pkg/signal"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/manager/encryption"
swarmnode "github.com/docker/swarmkit/node"
"github.com/pkg/errors"
"golang.org/x/net/context"
)
// Init initializes new cluster from user provided request.
func ( c * Cluster ) Init ( req types . InitRequest ) ( string , error ) {
c . controlMutex . Lock ( )
defer c . controlMutex . Unlock ( )
if c . nr != nil {
if req . ForceNewCluster {
2017-04-07 21:27:35 -04:00
// Take c.mu temporarily to wait for presently running
// API handlers to finish before shutting down the node.
c . mu . Lock ( )
c . mu . Unlock ( )
2017-02-11 13:53:03 -05:00
if err := c . nr . Stop ( ) ; err != nil {
return "" , err
}
} else {
return "" , errSwarmExists
}
}
if err := validateAndSanitizeInitRequest ( & req ) ; err != nil {
return "" , apierrors . NewBadRequestError ( err )
}
listenHost , listenPort , err := resolveListenAddr ( req . ListenAddr )
if err != nil {
return "" , err
}
advertiseHost , advertisePort , err := c . resolveAdvertiseAddr ( req . AdvertiseAddr , listenPort )
if err != nil {
return "" , err
}
2017-04-14 19:54:17 -04:00
dataPathAddr , err := resolveDataPathAddr ( req . DataPathAddr )
if err != nil {
return "" , err
}
2017-02-11 13:53:03 -05:00
localAddr := listenHost
// If the local address is undetermined, the advertise address
// will be used as local address, if it belongs to this system.
// If the advertise address is not local, then we try to find
// a system address to use as local address. If this fails,
// we give up and ask the user to pass the listen address.
if net . ParseIP ( localAddr ) . IsUnspecified ( ) {
advertiseIP := net . ParseIP ( advertiseHost )
found := false
for _ , systemIP := range listSystemIPs ( ) {
if systemIP . Equal ( advertiseIP ) {
localAddr = advertiseIP . String ( )
found = true
break
}
}
if ! found {
ip , err := c . resolveSystemAddr ( )
if err != nil {
logrus . Warnf ( "Could not find a local address: %v" , err )
return "" , errMustSpecifyListenAddr
}
localAddr = ip . String ( )
}
}
if ! req . ForceNewCluster {
clearPersistentState ( c . root )
}
nr , err := c . newNodeRunner ( nodeStartConfig {
forceNewCluster : req . ForceNewCluster ,
autolock : req . AutoLockManagers ,
LocalAddr : localAddr ,
ListenAddr : net . JoinHostPort ( listenHost , listenPort ) ,
AdvertiseAddr : net . JoinHostPort ( advertiseHost , advertisePort ) ,
2017-04-14 19:54:17 -04:00
DataPathAddr : dataPathAddr ,
2017-02-11 13:53:03 -05:00
availability : req . Availability ,
} )
if err != nil {
return "" , err
}
c . mu . Lock ( )
c . nr = nr
c . mu . Unlock ( )
if err := <- nr . Ready ( ) ; err != nil {
if ! req . ForceNewCluster { // if failure on first attempt don't keep state
if err := clearPersistentState ( c . root ) ; err != nil {
return "" , err
}
}
if err != nil {
c . mu . Lock ( )
c . nr = nil
c . mu . Unlock ( )
}
return "" , err
}
state := nr . State ( )
if state . swarmNode == nil { // should never happen but protect from panic
return "" , errors . New ( "invalid cluster state for spec initialization" )
}
if err := initClusterSpec ( state . swarmNode , req . Spec ) ; err != nil {
return "" , err
}
return state . NodeID ( ) , nil
}
// Join makes current Cluster part of an existing swarm cluster.
func ( c * Cluster ) Join ( req types . JoinRequest ) error {
c . controlMutex . Lock ( )
defer c . controlMutex . Unlock ( )
c . mu . Lock ( )
if c . nr != nil {
c . mu . Unlock ( )
return errSwarmExists
}
c . mu . Unlock ( )
if err := validateAndSanitizeJoinRequest ( & req ) ; err != nil {
return apierrors . NewBadRequestError ( err )
}
listenHost , listenPort , err := resolveListenAddr ( req . ListenAddr )
if err != nil {
return err
}
var advertiseAddr string
if req . AdvertiseAddr != "" {
advertiseHost , advertisePort , err := c . resolveAdvertiseAddr ( req . AdvertiseAddr , listenPort )
// For joining, we don't need to provide an advertise address,
// since the remote side can detect it.
if err == nil {
advertiseAddr = net . JoinHostPort ( advertiseHost , advertisePort )
}
}
2017-04-14 19:54:17 -04:00
dataPathAddr , err := resolveDataPathAddr ( req . DataPathAddr )
if err != nil {
return err
}
2017-02-11 13:53:03 -05:00
clearPersistentState ( c . root )
nr , err := c . newNodeRunner ( nodeStartConfig {
RemoteAddr : req . RemoteAddrs [ 0 ] ,
ListenAddr : net . JoinHostPort ( listenHost , listenPort ) ,
AdvertiseAddr : advertiseAddr ,
2017-04-14 19:54:17 -04:00
DataPathAddr : dataPathAddr ,
2017-02-11 13:53:03 -05:00
joinAddr : req . RemoteAddrs [ 0 ] ,
joinToken : req . JoinToken ,
availability : req . Availability ,
} )
if err != nil {
return err
}
c . mu . Lock ( )
c . nr = nr
c . mu . Unlock ( )
select {
case <- time . After ( swarmConnectTimeout ) :
return errSwarmJoinTimeoutReached
case err := <- nr . Ready ( ) :
if err != nil {
c . mu . Lock ( )
c . nr = nil
c . mu . Unlock ( )
}
return err
}
}
// Inspect retrieves the configuration properties of a managed swarm cluster.
func ( c * Cluster ) Inspect ( ) ( types . Swarm , error ) {
2017-02-28 05:12:11 -05:00
var swarm * swarmapi . Cluster
if err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
s , err := getSwarm ( ctx , state . controlClient )
if err != nil {
return err
}
swarm = s
return nil
} ) ; err != nil {
2017-02-11 13:53:03 -05:00
return types . Swarm { } , err
}
return convert . SwarmFromGRPC ( * swarm ) , nil
}
// Update updates configuration of a managed swarm cluster.
func ( c * Cluster ) Update ( version uint64 , spec types . Spec , flags types . UpdateFlags ) error {
2017-02-28 05:12:11 -05:00
return c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
swarm , err := getSwarm ( ctx , state . controlClient )
if err != nil {
return err
}
2017-02-11 13:53:03 -05:00
2017-02-28 05:12:11 -05:00
// In update, client should provide the complete spec of the swarm, including
// Name and Labels. If a field is specified with 0 or nil, then the default value
// will be used to swarmkit.
clusterSpec , err := convert . SwarmSpecToGRPC ( spec )
if err != nil {
return apierrors . NewBadRequestError ( err )
}
2017-02-11 13:53:03 -05:00
2017-02-28 05:12:11 -05:00
_ , err = state . controlClient . UpdateCluster (
ctx ,
& swarmapi . UpdateClusterRequest {
ClusterID : swarm . ID ,
Spec : & clusterSpec ,
ClusterVersion : & swarmapi . Version {
Index : version ,
} ,
Rotation : swarmapi . KeyRotation {
WorkerJoinToken : flags . RotateWorkerToken ,
ManagerJoinToken : flags . RotateManagerToken ,
ManagerUnlockKey : flags . RotateManagerUnlockKey ,
} ,
2017-02-11 13:53:03 -05:00
} ,
2017-02-28 05:12:11 -05:00
)
return err
} )
2017-02-11 13:53:03 -05:00
}
// GetUnlockKey returns the unlock key for the swarm.
func ( c * Cluster ) GetUnlockKey ( ) ( string , error ) {
2017-02-28 05:12:11 -05:00
var resp * swarmapi . GetUnlockKeyResponse
if err := c . lockedManagerAction ( func ( ctx context . Context , state nodeState ) error {
client := swarmapi . NewCAClient ( state . grpcConn )
2017-02-11 13:53:03 -05:00
2017-02-28 05:12:11 -05:00
r , err := client . GetUnlockKey ( ctx , & swarmapi . GetUnlockKeyRequest { } )
if err != nil {
return err
}
resp = r
return nil
} ) ; err != nil {
2017-02-11 13:53:03 -05:00
return "" , err
}
2017-02-28 05:12:11 -05:00
if len ( resp . UnlockKey ) == 0 {
2017-02-11 13:53:03 -05:00
// no key
return "" , nil
}
2017-02-28 05:12:11 -05:00
return encryption . HumanReadableKey ( resp . UnlockKey ) , nil
2017-02-11 13:53:03 -05:00
}
// UnlockSwarm provides a key to decrypt data that is encrypted at rest.
func ( c * Cluster ) UnlockSwarm ( req types . UnlockRequest ) error {
c . controlMutex . Lock ( )
defer c . controlMutex . Unlock ( )
c . mu . RLock ( )
state := c . currentNodeState ( )
if ! state . IsActiveManager ( ) {
// when manager is not active,
// unless it is locked, otherwise return error.
if err := c . errNoManager ( state ) ; err != errSwarmLocked {
c . mu . RUnlock ( )
return err
}
} else {
// when manager is active, return an error of "not locked"
c . mu . RUnlock ( )
return errors . New ( "swarm is not locked" )
}
// only when swarm is locked, code running reaches here
nr := c . nr
c . mu . RUnlock ( )
key , err := encryption . ParseHumanReadableKey ( req . UnlockKey )
if err != nil {
return err
}
config := nr . config
config . lockKey = key
if err := nr . Stop ( ) ; err != nil {
return err
}
nr , err = c . newNodeRunner ( config )
if err != nil {
return err
}
c . mu . Lock ( )
c . nr = nr
c . mu . Unlock ( )
if err := <- nr . Ready ( ) ; err != nil {
if errors . Cause ( err ) == errSwarmLocked {
return errors . New ( "swarm could not be unlocked: invalid key provided" )
}
return fmt . Errorf ( "swarm component could not be started: %v" , err )
}
return nil
}
// Leave shuts down Cluster and removes current state.
func ( c * Cluster ) Leave ( force bool ) error {
c . controlMutex . Lock ( )
defer c . controlMutex . Unlock ( )
c . mu . Lock ( )
nr := c . nr
if nr == nil {
c . mu . Unlock ( )
return errNoSwarm
}
state := c . currentNodeState ( )
2017-04-07 21:27:35 -04:00
c . mu . Unlock ( )
2017-02-11 13:53:03 -05:00
if errors . Cause ( state . err ) == errSwarmLocked && ! force {
// leave a locked swarm without --force is not allowed
return errors . New ( "Swarm is encrypted and locked. Please unlock it first or use `--force` to ignore this message." )
}
if state . IsManager ( ) && ! force {
msg := "You are attempting to leave the swarm on a node that is participating as a manager. "
if state . IsActiveManager ( ) {
active , reachable , unreachable , err := managerStats ( state . controlClient , state . NodeID ( ) )
if err == nil {
if active && removingManagerCausesLossOfQuorum ( reachable , unreachable ) {
if isLastManager ( reachable , unreachable ) {
msg += "Removing the last manager erases all current state of the swarm. Use `--force` to ignore this message. "
return errors . New ( msg )
}
msg += fmt . Sprintf ( "Removing this node leaves %v managers out of %v. Without a Raft quorum your swarm will be inaccessible. " , reachable - 1 , reachable + unreachable )
}
}
} else {
msg += "Doing so may lose the consensus of your cluster. "
}
msg += "The only way to restore a swarm that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to suppress this message."
return errors . New ( msg )
}
// release readers in here
if err := nr . Stop ( ) ; err != nil {
logrus . Errorf ( "failed to shut down cluster node: %v" , err )
signal . DumpStacks ( "" )
return err
}
2017-04-07 21:27:35 -04:00
c . mu . Lock ( )
2017-02-11 13:53:03 -05:00
c . nr = nil
c . mu . Unlock ( )
2017-04-07 21:27:35 -04:00
2017-02-11 13:53:03 -05:00
if nodeID := state . NodeID ( ) ; nodeID != "" {
nodeContainers , err := c . listContainerForNode ( nodeID )
if err != nil {
return err
}
for _ , id := range nodeContainers {
if err := c . config . Backend . ContainerRm ( id , & apitypes . ContainerRmConfig { ForceRemove : true } ) ; err != nil {
logrus . Errorf ( "error removing %v: %v" , id , err )
}
}
}
// todo: cleanup optional?
if err := clearPersistentState ( c . root ) ; err != nil {
return err
}
c . config . Backend . DaemonLeavesCluster ( )
return nil
}
// Info returns information about the current cluster state.
func ( c * Cluster ) Info ( ) types . Info {
info := types . Info {
NodeAddr : c . GetAdvertiseAddress ( ) ,
}
c . mu . RLock ( )
defer c . mu . RUnlock ( )
state := c . currentNodeState ( )
info . LocalNodeState = state . status
if state . err != nil {
info . Error = state . err . Error ( )
}
ctx , cancel := c . getRequestContext ( )
defer cancel ( )
if state . IsActiveManager ( ) {
info . ControlAvailable = true
swarm , err := c . Inspect ( )
if err != nil {
info . Error = err . Error ( )
}
2017-03-28 17:20:25 -04:00
info . Cluster = & swarm . ClusterInfo
2017-02-11 13:53:03 -05:00
if r , err := state . controlClient . ListNodes ( ctx , & swarmapi . ListNodesRequest { } ) ; err != nil {
info . Error = err . Error ( )
} else {
info . Nodes = len ( r . Nodes )
for _ , n := range r . Nodes {
if n . ManagerStatus != nil {
info . Managers = info . Managers + 1
}
}
}
}
if state . swarmNode != nil {
for _ , r := range state . swarmNode . Remotes ( ) {
info . RemoteManagers = append ( info . RemoteManagers , types . Peer { NodeID : r . NodeID , Addr : r . Addr } )
}
info . NodeID = state . swarmNode . NodeID ( )
}
return info
}
func validateAndSanitizeInitRequest ( req * types . InitRequest ) error {
var err error
req . ListenAddr , err = validateAddr ( req . ListenAddr )
if err != nil {
return fmt . Errorf ( "invalid ListenAddr %q: %v" , req . ListenAddr , err )
}
if req . Spec . Annotations . Name == "" {
req . Spec . Annotations . Name = "default"
} else if req . Spec . Annotations . Name != "default" {
return errors . New ( ` swarm spec must be named "default" ` )
}
return nil
}
func validateAndSanitizeJoinRequest ( req * types . JoinRequest ) error {
var err error
req . ListenAddr , err = validateAddr ( req . ListenAddr )
if err != nil {
return fmt . Errorf ( "invalid ListenAddr %q: %v" , req . ListenAddr , err )
}
if len ( req . RemoteAddrs ) == 0 {
return errors . New ( "at least 1 RemoteAddr is required to join" )
}
for i := range req . RemoteAddrs {
req . RemoteAddrs [ i ] , err = validateAddr ( req . RemoteAddrs [ i ] )
if err != nil {
return fmt . Errorf ( "invalid remoteAddr %q: %v" , req . RemoteAddrs [ i ] , err )
}
}
return nil
}
func validateAddr ( addr string ) ( string , error ) {
if addr == "" {
return addr , errors . New ( "invalid empty address" )
}
newaddr , err := opts . ParseTCPAddr ( addr , defaultAddr )
if err != nil {
return addr , nil
}
return strings . TrimPrefix ( newaddr , "tcp://" ) , nil
}
func initClusterSpec ( node * swarmnode . Node , spec types . Spec ) error {
ctx , _ := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
for conn := range node . ListenControlSocket ( ctx ) {
if ctx . Err ( ) != nil {
return ctx . Err ( )
}
if conn != nil {
client := swarmapi . NewControlClient ( conn )
var cluster * swarmapi . Cluster
for i := 0 ; ; i ++ {
lcr , err := client . ListClusters ( ctx , & swarmapi . ListClustersRequest { } )
if err != nil {
return fmt . Errorf ( "error on listing clusters: %v" , err )
}
if len ( lcr . Clusters ) == 0 {
if i < 10 {
time . Sleep ( 200 * time . Millisecond )
continue
}
return errors . New ( "empty list of clusters was returned" )
}
cluster = lcr . Clusters [ 0 ]
break
}
// In init, we take the initial default values from swarmkit, and merge
// any non nil or 0 value from spec to GRPC spec. This will leave the
// default value alone.
// Note that this is different from Update(), as in Update() we expect
// user to specify the complete spec of the cluster (as they already know
// the existing one and knows which field to update)
clusterSpec , err := convert . MergeSwarmSpecToGRPC ( spec , cluster . Spec )
if err != nil {
return fmt . Errorf ( "error updating cluster settings: %v" , err )
}
_ , err = client . UpdateCluster ( ctx , & swarmapi . UpdateClusterRequest {
ClusterID : cluster . ID ,
ClusterVersion : & cluster . Meta . Version ,
Spec : & clusterSpec ,
} )
if err != nil {
return fmt . Errorf ( "error updating cluster settings: %v" , err )
}
return nil
}
}
return ctx . Err ( )
}
func ( c * Cluster ) listContainerForNode ( nodeID string ) ( [ ] string , error ) {
var ids [ ] string
filters := filters . NewArgs ( )
filters . Add ( "label" , fmt . Sprintf ( "com.docker.swarm.node.id=%s" , nodeID ) )
containers , err := c . config . Backend . Containers ( & apitypes . ContainerListOptions {
Filters : filters ,
} )
if err != nil {
return [ ] string { } , err
}
for _ , c := range containers {
ids = append ( ids , c . ID )
}
return ids , nil
}