2016-06-13 22:52:49 -04:00
package cluster
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/cluster/convert"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/docker/daemon/cluster/executor/container"
"github.com/docker/docker/errors"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/docker/runconfig"
apitypes "github.com/docker/engine-api/types"
types "github.com/docker/engine-api/types/swarm"
swarmagent "github.com/docker/swarmkit/agent"
swarmapi "github.com/docker/swarmkit/api"
"golang.org/x/net/context"
)
const swarmDirName = "swarm"
const controlSocket = "control.sock"
2016-06-16 12:42:22 -04:00
const swarmConnectTimeout = 20 * time . Second
2016-06-13 22:52:49 -04:00
const stateFile = "docker-state.json"
const (
initialReconnectDelay = 100 * time . Millisecond
2016-06-16 12:42:22 -04:00
maxReconnectDelay = 30 * time . Second
2016-06-13 22:52:49 -04:00
)
// ErrNoManager is returned then a manager-only function is called on non-manager
2016-06-16 12:42:22 -04:00
var ErrNoManager = fmt . Errorf ( "This node is not participating as a Swarm manager" )
2016-06-13 22:52:49 -04:00
// ErrNoSwarm is returned on leaving a cluster that was never initialized
2016-06-16 12:42:22 -04:00
var ErrNoSwarm = fmt . Errorf ( "This node is not part of Swarm" )
2016-06-13 22:52:49 -04:00
// ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated
2016-06-16 12:42:22 -04:00
var ErrSwarmExists = fmt . Errorf ( "This node is already part of a Swarm cluster. Use \"docker swarm leave\" to leave this cluster and join another one." )
// ErrPendingSwarmExists is returned on initialize or join request for a cluster that is already processing a similar request but has not succeeded yet.
var ErrPendingSwarmExists = fmt . Errorf ( "This node is processing an existing join request that has not succeeded yet. Use \"docker swarm leave\" to cancel the current request." )
2016-06-13 22:52:49 -04:00
// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached.
2016-06-16 12:42:22 -04:00
var ErrSwarmJoinTimeoutReached = fmt . Errorf ( "Timeout was reached before node was joined. Attempt to join the cluster will continue in the background. Use \"docker info\" command to see the current Swarm status of your node." )
2016-06-13 22:52:49 -04:00
type state struct {
ListenAddr string
}
// Config provides values for Cluster.
type Config struct {
Root string
Name string
Backend executorpkg . Backend
}
// Cluster provides capabilities to pariticipate in a cluster as worker or a
// manager and a worker.
type Cluster struct {
sync . RWMutex
root string
config Config
configEvent chan struct { } // todo: make this array and goroutine safe
node * swarmagent . Node
conn * grpc . ClientConn
client swarmapi . ControlClient
ready bool
listenAddr string
err error
reconnectDelay time . Duration
stop bool
cancelDelay func ( )
}
// New creates a new Cluster instance using provided config.
func New ( config Config ) ( * Cluster , error ) {
root := filepath . Join ( config . Root , swarmDirName )
if err := os . MkdirAll ( root , 0700 ) ; err != nil {
return nil , err
}
c := & Cluster {
root : root ,
config : config ,
configEvent : make ( chan struct { } , 10 ) ,
reconnectDelay : initialReconnectDelay ,
}
2016-06-18 22:43:47 -04:00
st , err := c . loadState ( )
2016-06-13 22:52:49 -04:00
if err != nil {
if os . IsNotExist ( err ) {
return c , nil
}
return nil , err
}
n , ctx , err := c . startNewNode ( false , st . ListenAddr , "" , "" , "" , false )
if err != nil {
return nil , err
}
select {
case <- time . After ( swarmConnectTimeout ) :
logrus . Errorf ( "swarm component could not be started before timeout was reached" )
2016-06-16 01:41:43 -04:00
case <- n . Ready ( ) :
2016-06-13 22:52:49 -04:00
case <- ctx . Done ( ) :
}
if ctx . Err ( ) != nil {
return nil , fmt . Errorf ( "swarm component could not be started" )
}
go c . reconnectOnFailure ( ctx )
return c , nil
}
2016-06-18 22:43:47 -04:00
func ( c * Cluster ) loadState ( ) ( * state , error ) {
dt , err := ioutil . ReadFile ( filepath . Join ( c . root , stateFile ) )
if err != nil {
return nil , err
}
// missing certificate means no actual state to restore from
if _ , err := os . Stat ( filepath . Join ( c . root , "certificates/swarm-node.crt" ) ) ; err != nil {
if os . IsNotExist ( err ) {
c . clearState ( )
}
return nil , err
}
var st state
if err := json . Unmarshal ( dt , & st ) ; err != nil {
return nil , err
}
return & st , nil
}
2016-06-13 22:52:49 -04:00
func ( c * Cluster ) saveState ( ) error {
dt , err := json . Marshal ( state { ListenAddr : c . listenAddr } )
if err != nil {
return err
}
return ioutils . AtomicWriteFile ( filepath . Join ( c . root , stateFile ) , dt , 0600 )
}
func ( c * Cluster ) reconnectOnFailure ( ctx context . Context ) {
for {
<- ctx . Done ( )
c . Lock ( )
if c . stop || c . node != nil {
c . Unlock ( )
return
}
c . reconnectDelay *= 2
if c . reconnectDelay > maxReconnectDelay {
c . reconnectDelay = maxReconnectDelay
}
logrus . Warnf ( "Restarting swarm in %.2f seconds" , c . reconnectDelay . Seconds ( ) )
delayCtx , cancel := context . WithTimeout ( context . Background ( ) , c . reconnectDelay )
c . cancelDelay = cancel
c . Unlock ( )
<- delayCtx . Done ( )
if delayCtx . Err ( ) != context . DeadlineExceeded {
return
}
c . Lock ( )
if c . node != nil {
c . Unlock ( )
return
}
var err error
_ , ctx , err = c . startNewNode ( false , c . listenAddr , c . getRemoteAddress ( ) , "" , "" , false )
if err != nil {
c . err = err
ctx = delayCtx
}
c . Unlock ( )
}
}
func ( c * Cluster ) startNewNode ( forceNewCluster bool , listenAddr , joinAddr , secret , cahash string , ismanager bool ) ( * swarmagent . Node , context . Context , error ) {
2016-06-14 12:13:53 -04:00
if err := c . config . Backend . IsSwarmCompatible ( ) ; err != nil {
2016-06-13 22:52:49 -04:00
return nil , nil , err
}
c . node = nil
c . cancelDelay = nil
node , err := swarmagent . NewNode ( & swarmagent . NodeConfig {
Hostname : c . config . Name ,
ForceNewCluster : forceNewCluster ,
ListenControlAPI : filepath . Join ( c . root , controlSocket ) ,
ListenRemoteAPI : listenAddr ,
JoinAddr : joinAddr ,
StateDir : c . root ,
CAHash : cahash ,
Secret : secret ,
Executor : container . NewExecutor ( c . config . Backend ) ,
HeartbeatTick : 1 ,
ElectionTick : 3 ,
IsManager : ismanager ,
} )
if err != nil {
return nil , nil , err
}
ctx , cancel := context . WithCancel ( context . Background ( ) )
if err := node . Start ( ctx ) ; err != nil {
return nil , nil , err
}
c . node = node
c . listenAddr = listenAddr
c . saveState ( )
c . config . Backend . SetClusterProvider ( c )
go func ( ) {
err := node . Err ( ctx )
if err != nil {
logrus . Errorf ( "cluster exited with error: %v" , err )
}
c . Lock ( )
c . conn = nil
c . client = nil
c . node = nil
c . ready = false
c . err = err
c . Unlock ( )
cancel ( )
} ( )
go func ( ) {
select {
2016-06-16 01:41:43 -04:00
case <- node . Ready ( ) :
2016-06-13 22:52:49 -04:00
c . Lock ( )
c . reconnectDelay = initialReconnectDelay
c . Unlock ( )
case <- ctx . Done ( ) :
}
if ctx . Err ( ) == nil {
c . Lock ( )
c . ready = true
c . err = nil
c . Unlock ( )
}
c . configEvent <- struct { } { }
} ( )
go func ( ) {
for conn := range node . ListenControlSocket ( ctx ) {
c . Lock ( )
if c . conn != conn {
c . client = swarmapi . NewControlClient ( conn )
}
if c . conn != nil {
c . client = nil
}
c . conn = conn
c . Unlock ( )
c . configEvent <- struct { } { }
}
} ( )
return node , ctx , nil
}
// Init initializes new cluster from user provided request.
func ( c * Cluster ) Init ( req types . InitRequest ) ( string , error ) {
c . Lock ( )
2016-06-16 12:42:22 -04:00
if node := c . node ; node != nil {
2016-06-13 22:52:49 -04:00
c . Unlock ( )
if ! req . ForceNewCluster {
2016-06-16 12:42:22 -04:00
return "" , errSwarmExists ( node )
2016-06-13 22:52:49 -04:00
}
ctx , cancel := context . WithTimeout ( context . Background ( ) , 15 * time . Second )
defer cancel ( )
2016-06-16 12:42:22 -04:00
c . cancelReconnect ( )
2016-06-13 22:52:49 -04:00
if err := c . node . Stop ( ctx ) ; err != nil && ! strings . Contains ( err . Error ( ) , "context canceled" ) {
return "" , err
}
c . Lock ( )
c . node = nil
c . conn = nil
c . ready = false
}
// todo: check current state existing
n , ctx , err := c . startNewNode ( req . ForceNewCluster , req . ListenAddr , "" , "" , "" , false )
if err != nil {
c . Unlock ( )
return "" , err
}
c . Unlock ( )
select {
2016-06-16 01:41:43 -04:00
case <- n . Ready ( ) :
2016-06-13 22:52:49 -04:00
if err := initAcceptancePolicy ( n , req . Spec . AcceptancePolicy ) ; err != nil {
return "" , err
}
go c . reconnectOnFailure ( ctx )
return n . NodeID ( ) , nil
case <- ctx . Done ( ) :
c . RLock ( )
defer c . RUnlock ( )
if c . err != nil {
if ! req . ForceNewCluster { // if failure on first attempt don't keep state
if err := c . clearState ( ) ; err != nil {
return "" , err
}
}
return "" , c . err
}
return "" , ctx . Err ( )
}
}
// Join makes current Cluster part of an existing swarm cluster.
func ( c * Cluster ) Join ( req types . JoinRequest ) error {
c . Lock ( )
2016-06-16 12:42:22 -04:00
if node := c . node ; node != nil {
2016-06-13 22:52:49 -04:00
c . Unlock ( )
2016-06-16 12:42:22 -04:00
return errSwarmExists ( node )
2016-06-13 22:52:49 -04:00
}
// todo: check current state existing
if len ( req . RemoteAddrs ) == 0 {
return fmt . Errorf ( "at least 1 RemoteAddr is required to join" )
}
n , ctx , err := c . startNewNode ( false , req . ListenAddr , req . RemoteAddrs [ 0 ] , req . Secret , req . CACertHash , req . Manager )
if err != nil {
c . Unlock ( )
return err
}
c . Unlock ( )
2016-06-16 12:42:22 -04:00
certificateRequested := n . CertificateRequested ( )
for {
select {
case <- certificateRequested :
if n . NodeMembership ( ) == swarmapi . NodeMembershipPending {
return fmt . Errorf ( "Your node is in the process of joining the cluster but needs to be accepted by existing cluster member.\nTo accept this node into cluster run \"docker node accept %v\" in an existing cluster manager. Use \"docker info\" command to see the current Swarm status of your node." , n . NodeID ( ) )
}
certificateRequested = nil
case <- time . After ( swarmConnectTimeout ) :
// attempt to connect will continue in background, also reconnecting
go c . reconnectOnFailure ( ctx )
return ErrSwarmJoinTimeoutReached
case <- n . Ready ( ) :
go c . reconnectOnFailure ( ctx )
return nil
case <- ctx . Done ( ) :
c . RLock ( )
defer c . RUnlock ( )
if c . err != nil {
return c . err
}
return ctx . Err ( )
2016-06-13 22:52:49 -04:00
}
}
}
func ( c * Cluster ) cancelReconnect ( ) {
c . stop = true
if c . cancelDelay != nil {
c . cancelDelay ( )
c . cancelDelay = nil
}
}
// Leave shuts down Cluster and removes current state.
func ( c * Cluster ) Leave ( force bool ) error {
c . Lock ( )
node := c . node
if node == nil {
c . Unlock ( )
return ErrNoSwarm
}
if node . Manager ( ) != nil && ! force {
msg := "You are attempting to leave cluster on a node that is participating as a manager. "
if c . isActiveManager ( ) {
active , reachable , unreachable , err := c . managerStats ( )
if err == nil {
if active && reachable - 2 <= unreachable {
if reachable == 1 && unreachable == 0 {
msg += "Leaving last manager will remove all current state of the cluster. Use `--force` to ignore this message. "
c . Unlock ( )
return fmt . Errorf ( msg )
}
msg += fmt . Sprintf ( "Leaving cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. " , reachable - 1 , reachable + unreachable )
}
}
} else {
msg += "Doing so may lose the consenus of your cluster. "
}
msg += "Only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message."
c . Unlock ( )
return fmt . Errorf ( msg )
}
c . cancelReconnect ( )
c . Unlock ( )
ctx , cancel := context . WithTimeout ( context . Background ( ) , 15 * time . Second )
defer cancel ( )
if err := node . Stop ( ctx ) ; err != nil && ! strings . Contains ( err . Error ( ) , "context canceled" ) {
return err
}
2016-06-16 19:08:15 -04:00
if nodeID := node . NodeID ( ) ; nodeID != "" {
for _ , id := range c . config . Backend . ListContainersForNode ( nodeID ) {
if err := c . config . Backend . ContainerRm ( id , & apitypes . ContainerRmConfig { ForceRemove : true } ) ; err != nil {
logrus . Errorf ( "error removing %v: %v" , id , err )
}
2016-06-13 22:52:49 -04:00
}
}
c . Lock ( )
defer c . Unlock ( )
c . node = nil
c . conn = nil
c . ready = false
c . configEvent <- struct { } { }
// todo: cleanup optional?
if err := c . clearState ( ) ; err != nil {
return err
}
return nil
}
func ( c * Cluster ) clearState ( ) error {
2016-06-18 22:43:47 -04:00
// todo: backup this data instead of removing?
2016-06-13 22:52:49 -04:00
if err := os . RemoveAll ( c . root ) ; err != nil {
return err
}
if err := os . MkdirAll ( c . root , 0700 ) ; err != nil {
return err
}
c . config . Backend . SetClusterProvider ( nil )
return nil
}
func ( c * Cluster ) getRequestContext ( ) context . Context { // TODO: not needed when requests don't block on qourum lost
ctx , _ := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
return ctx
}
// Inspect retrives the confuguration properties of managed swarm cluster.
func ( c * Cluster ) Inspect ( ) ( types . Swarm , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return types . Swarm { } , ErrNoManager
}
swarm , err := getSwarm ( c . getRequestContext ( ) , c . client )
if err != nil {
return types . Swarm { } , err
}
if err != nil {
return types . Swarm { } , err
}
return convert . SwarmFromGRPC ( * swarm ) , nil
}
// Update updates configuration of a managed swarm cluster.
func ( c * Cluster ) Update ( version uint64 , spec types . Spec ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
2016-06-15 17:30:54 -04:00
swarm , err := getSwarm ( c . getRequestContext ( ) , c . client )
2016-06-13 22:52:49 -04:00
if err != nil {
return err
}
2016-06-15 17:30:54 -04:00
swarmSpec , err := convert . SwarmSpecToGRPCandMerge ( spec , & swarm . Spec )
2016-06-13 22:52:49 -04:00
if err != nil {
return err
}
_ , err = c . client . UpdateCluster (
c . getRequestContext ( ) ,
& swarmapi . UpdateClusterRequest {
ClusterID : swarm . ID ,
Spec : & swarmSpec ,
ClusterVersion : & swarmapi . Version {
Index : version ,
} ,
} ,
)
return err
}
// IsManager returns true is Cluster is participating as a manager.
func ( c * Cluster ) IsManager ( ) bool {
c . RLock ( )
defer c . RUnlock ( )
return c . isActiveManager ( )
}
// IsAgent returns true is Cluster is participating as a worker/agent.
func ( c * Cluster ) IsAgent ( ) bool {
c . RLock ( )
defer c . RUnlock ( )
return c . ready
}
// GetListenAddress returns the listening address for current maanger's
// consensus and dispatcher APIs.
func ( c * Cluster ) GetListenAddress ( ) string {
c . RLock ( )
defer c . RUnlock ( )
if c . conn != nil {
return c . listenAddr
}
return ""
}
// GetRemoteAddress returns a known advertise address of a remote maanger if
// available.
// todo: change to array/connect with info
func ( c * Cluster ) GetRemoteAddress ( ) string {
c . RLock ( )
defer c . RUnlock ( )
return c . getRemoteAddress ( )
}
func ( c * Cluster ) getRemoteAddress ( ) string {
if c . node == nil {
return ""
}
nodeID := c . node . NodeID ( )
for _ , r := range c . node . Remotes ( ) {
if r . NodeID != nodeID {
return r . Addr
}
}
return ""
}
// ListenClusterEvents returns a channel that receives messages on cluster
// participation changes.
// todo: make cancelable and accessible to multiple callers
func ( c * Cluster ) ListenClusterEvents ( ) <- chan struct { } {
return c . configEvent
}
// Info returns information about the current cluster state.
func ( c * Cluster ) Info ( ) types . Info {
var info types . Info
c . RLock ( )
defer c . RUnlock ( )
if c . node == nil {
info . LocalNodeState = types . LocalNodeStateInactive
if c . cancelDelay != nil {
info . LocalNodeState = types . LocalNodeStateError
}
} else {
info . LocalNodeState = types . LocalNodeStatePending
if c . ready == true {
info . LocalNodeState = types . LocalNodeStateActive
}
}
if c . err != nil {
info . Error = c . err . Error ( )
}
if c . isActiveManager ( ) {
info . ControlAvailable = true
if r , err := c . client . ListNodes ( c . getRequestContext ( ) , & swarmapi . ListNodesRequest { } ) ; err == nil {
info . Nodes = len ( r . Nodes )
for _ , n := range r . Nodes {
if n . ManagerStatus != nil {
info . Managers = info . Managers + 1
}
}
}
if swarm , err := getSwarm ( c . getRequestContext ( ) , c . client ) ; err == nil && swarm != nil {
info . CACertHash = swarm . RootCA . CACertHash
}
}
if c . node != nil {
for _ , r := range c . node . Remotes ( ) {
info . RemoteManagers = append ( info . RemoteManagers , types . Peer { NodeID : r . NodeID , Addr : r . Addr } )
}
info . NodeID = c . node . NodeID ( )
}
return info
}
// isActiveManager should not be called without a read lock
func ( c * Cluster ) isActiveManager ( ) bool {
return c . conn != nil
}
// GetServices returns all services of a managed swarm cluster.
func ( c * Cluster ) GetServices ( options apitypes . ServiceListOptions ) ( [ ] types . Service , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return nil , ErrNoManager
}
filters , err := newListServicesFilters ( options . Filter )
if err != nil {
return nil , err
}
r , err := c . client . ListServices (
c . getRequestContext ( ) ,
& swarmapi . ListServicesRequest { Filters : filters } )
if err != nil {
return nil , err
}
var services [ ] types . Service
for _ , service := range r . Services {
services = append ( services , convert . ServiceFromGRPC ( * service ) )
}
return services , nil
}
// CreateService creates a new service in a managed swarm cluster.
func ( c * Cluster ) CreateService ( s types . ServiceSpec ) ( string , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return "" , ErrNoManager
}
ctx := c . getRequestContext ( )
err := populateNetworkID ( ctx , c . client , & s )
if err != nil {
return "" , err
}
serviceSpec , err := convert . ServiceSpecToGRPC ( s )
if err != nil {
return "" , err
}
r , err := c . client . CreateService ( ctx , & swarmapi . CreateServiceRequest { Spec : & serviceSpec } )
if err != nil {
return "" , err
}
return r . Service . ID , nil
}
// GetService returns a service based on a ID or name.
func ( c * Cluster ) GetService ( input string ) ( types . Service , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return types . Service { } , ErrNoManager
}
service , err := getService ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return types . Service { } , err
}
return convert . ServiceFromGRPC ( * service ) , nil
}
// UpdateService updates existing service to match new properties.
func ( c * Cluster ) UpdateService ( serviceID string , version uint64 , spec types . ServiceSpec ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
serviceSpec , err := convert . ServiceSpecToGRPC ( spec )
if err != nil {
return err
}
_ , err = c . client . UpdateService (
c . getRequestContext ( ) ,
& swarmapi . UpdateServiceRequest {
ServiceID : serviceID ,
Spec : & serviceSpec ,
ServiceVersion : & swarmapi . Version {
Index : version ,
} ,
} ,
)
return err
}
// RemoveService removes a service from a managed swarm cluster.
func ( c * Cluster ) RemoveService ( input string ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
service , err := getService ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return err
}
if _ , err := c . client . RemoveService ( c . getRequestContext ( ) , & swarmapi . RemoveServiceRequest { ServiceID : service . ID } ) ; err != nil {
return err
}
return nil
}
// GetNodes returns a list of all nodes known to a cluster.
func ( c * Cluster ) GetNodes ( options apitypes . NodeListOptions ) ( [ ] types . Node , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return nil , ErrNoManager
}
filters , err := newListNodesFilters ( options . Filter )
if err != nil {
return nil , err
}
r , err := c . client . ListNodes (
c . getRequestContext ( ) ,
& swarmapi . ListNodesRequest { Filters : filters } )
if err != nil {
return nil , err
}
nodes := [ ] types . Node { }
for _ , node := range r . Nodes {
nodes = append ( nodes , convert . NodeFromGRPC ( * node ) )
}
return nodes , nil
}
// GetNode returns a node based on a ID or name.
func ( c * Cluster ) GetNode ( input string ) ( types . Node , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return types . Node { } , ErrNoManager
}
node , err := getNode ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return types . Node { } , err
}
return convert . NodeFromGRPC ( * node ) , nil
}
// UpdateNode updates existing nodes properties.
func ( c * Cluster ) UpdateNode ( nodeID string , version uint64 , spec types . NodeSpec ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
nodeSpec , err := convert . NodeSpecToGRPC ( spec )
if err != nil {
return err
}
_ , err = c . client . UpdateNode (
c . getRequestContext ( ) ,
& swarmapi . UpdateNodeRequest {
NodeID : nodeID ,
Spec : & nodeSpec ,
NodeVersion : & swarmapi . Version {
Index : version ,
} ,
} ,
)
return err
}
// RemoveNode removes a node from a cluster
func ( c * Cluster ) RemoveNode ( input string ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
ctx := c . getRequestContext ( )
node , err := getNode ( ctx , c . client , input )
if err != nil {
return err
}
if _ , err := c . client . RemoveNode ( ctx , & swarmapi . RemoveNodeRequest { NodeID : node . ID } ) ; err != nil {
return err
}
return nil
}
// GetTasks returns a list of tasks matching the filter options.
func ( c * Cluster ) GetTasks ( options apitypes . TaskListOptions ) ( [ ] types . Task , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return nil , ErrNoManager
}
filters , err := newListTasksFilters ( options . Filter )
if err != nil {
return nil , err
}
r , err := c . client . ListTasks (
c . getRequestContext ( ) ,
& swarmapi . ListTasksRequest { Filters : filters } )
if err != nil {
return nil , err
}
tasks := [ ] types . Task { }
for _ , task := range r . Tasks {
tasks = append ( tasks , convert . TaskFromGRPC ( * task ) )
}
return tasks , nil
}
// GetTask returns a task by an ID.
func ( c * Cluster ) GetTask ( input string ) ( types . Task , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return types . Task { } , ErrNoManager
}
task , err := getTask ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return types . Task { } , err
}
return convert . TaskFromGRPC ( * task ) , nil
}
// GetNetwork returns a cluster network by ID.
func ( c * Cluster ) GetNetwork ( input string ) ( apitypes . NetworkResource , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return apitypes . NetworkResource { } , ErrNoManager
}
network , err := getNetwork ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return apitypes . NetworkResource { } , err
}
return convert . BasicNetworkFromGRPC ( * network ) , nil
}
// GetNetworks returns all current cluster managed networks.
func ( c * Cluster ) GetNetworks ( ) ( [ ] apitypes . NetworkResource , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return nil , ErrNoManager
}
r , err := c . client . ListNetworks ( c . getRequestContext ( ) , & swarmapi . ListNetworksRequest { } )
if err != nil {
return nil , err
}
var networks [ ] apitypes . NetworkResource
for _ , network := range r . Networks {
networks = append ( networks , convert . BasicNetworkFromGRPC ( * network ) )
}
return networks , nil
}
// CreateNetwork creates a new cluster managed network.
func ( c * Cluster ) CreateNetwork ( s apitypes . NetworkCreateRequest ) ( string , error ) {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return "" , ErrNoManager
}
if runconfig . IsPreDefinedNetwork ( s . Name ) {
err := fmt . Errorf ( "%s is a pre-defined network and cannot be created" , s . Name )
return "" , errors . NewRequestForbiddenError ( err )
}
networkSpec := convert . BasicNetworkCreateToGRPC ( s )
r , err := c . client . CreateNetwork ( c . getRequestContext ( ) , & swarmapi . CreateNetworkRequest { Spec : & networkSpec } )
if err != nil {
return "" , err
}
return r . Network . ID , nil
}
// RemoveNetwork removes a cluster network.
func ( c * Cluster ) RemoveNetwork ( input string ) error {
c . RLock ( )
defer c . RUnlock ( )
if ! c . isActiveManager ( ) {
return ErrNoManager
}
network , err := getNetwork ( c . getRequestContext ( ) , c . client , input )
if err != nil {
return err
}
if _ , err := c . client . RemoveNetwork ( c . getRequestContext ( ) , & swarmapi . RemoveNetworkRequest { NetworkID : network . ID } ) ; err != nil {
return err
}
return nil
}
func populateNetworkID ( ctx context . Context , c swarmapi . ControlClient , s * types . ServiceSpec ) error {
for i , n := range s . Networks {
apiNetwork , err := getNetwork ( ctx , c , n . Target )
if err != nil {
return err
}
2016-06-14 17:55:05 -04:00
s . Networks [ i ] . Target = apiNetwork . ID
2016-06-13 22:52:49 -04:00
}
return nil
}
func getNetwork ( ctx context . Context , c swarmapi . ControlClient , input string ) ( * swarmapi . Network , error ) {
// GetNetwork to match via full ID.
rg , err := c . GetNetwork ( ctx , & swarmapi . GetNetworkRequest { NetworkID : input } )
if err != nil {
// If any error (including NotFound), ListNetworks to match via ID prefix and full name.
rl , err := c . ListNetworks ( ctx , & swarmapi . ListNetworksRequest { Filters : & swarmapi . ListNetworksRequest_Filters { Names : [ ] string { input } } } )
if err != nil || len ( rl . Networks ) == 0 {
rl , err = c . ListNetworks ( ctx , & swarmapi . ListNetworksRequest { Filters : & swarmapi . ListNetworksRequest_Filters { IDPrefixes : [ ] string { input } } } )
}
if err != nil {
return nil , err
}
if len ( rl . Networks ) == 0 {
return nil , fmt . Errorf ( "network %s not found" , input )
}
if l := len ( rl . Networks ) ; l > 1 {
return nil , fmt . Errorf ( "network %s is ambigious (%d matches found)" , input , l )
}
return rl . Networks [ 0 ] , nil
}
return rg . Network , nil
}
// Cleanup stops active swarm node. This is run before daemon shutdown.
func ( c * Cluster ) Cleanup ( ) {
c . Lock ( )
node := c . node
if node == nil {
c . Unlock ( )
return
}
if c . isActiveManager ( ) {
active , reachable , unreachable , err := c . managerStats ( )
if err == nil {
singlenode := active && reachable == 1 && unreachable == 0
if active && ! singlenode && reachable - 2 <= unreachable {
logrus . Errorf ( "Leaving cluster with %v managers left out of %v. Raft quorum will be lost." , reachable - 1 , reachable + unreachable )
}
}
}
c . cancelReconnect ( )
c . Unlock ( )
ctx , cancel := context . WithTimeout ( context . Background ( ) , 10 * time . Second )
defer cancel ( )
if err := node . Stop ( ctx ) ; err != nil {
logrus . Errorf ( "error cleaning up cluster: %v" , err )
}
c . Lock ( )
c . node = nil
c . ready = false
c . conn = nil
c . Unlock ( )
}
func ( c * Cluster ) managerStats ( ) ( current bool , reachable int , unreachable int , err error ) {
ctx , _ := context . WithTimeout ( context . Background ( ) , 3 * time . Second )
nodes , err := c . client . ListNodes ( ctx , & swarmapi . ListNodesRequest { } )
if err != nil {
return false , 0 , 0 , err
}
for _ , n := range nodes . Nodes {
if n . ManagerStatus != nil {
2016-06-14 20:23:01 -04:00
if n . ManagerStatus . Reachability == swarmapi . RaftMemberStatus_REACHABLE {
2016-06-13 22:52:49 -04:00
reachable ++
if n . ID == c . node . NodeID ( ) {
current = true
}
}
2016-06-14 20:23:01 -04:00
if n . ManagerStatus . Reachability == swarmapi . RaftMemberStatus_UNREACHABLE {
2016-06-13 22:52:49 -04:00
unreachable ++
}
}
}
return
}
2016-06-16 12:42:22 -04:00
func errSwarmExists ( node * swarmagent . Node ) error {
if node . NodeMembership ( ) != swarmapi . NodeMembershipAccepted {
return ErrPendingSwarmExists
}
return ErrSwarmExists
}
2016-06-13 22:52:49 -04:00
func initAcceptancePolicy ( node * swarmagent . Node , acceptancePolicy types . AcceptancePolicy ) error {
ctx , _ := context . WithTimeout ( context . Background ( ) , 5 * time . Second )
for conn := range node . ListenControlSocket ( ctx ) {
if ctx . Err ( ) != nil {
return ctx . Err ( )
}
if conn != nil {
client := swarmapi . NewControlClient ( conn )
var cluster * swarmapi . Cluster
for i := 0 ; ; i ++ {
lcr , err := client . ListClusters ( ctx , & swarmapi . ListClustersRequest { } )
if err != nil {
return fmt . Errorf ( "error on listing clusters: %v" , err )
}
if len ( lcr . Clusters ) == 0 {
if i < 10 {
time . Sleep ( 200 * time . Millisecond )
continue
}
return fmt . Errorf ( "empty list of clusters was returned" )
}
cluster = lcr . Clusters [ 0 ]
break
}
spec := & cluster . Spec
2016-06-15 17:30:54 -04:00
if err := convert . SwarmSpecUpdateAcceptancePolicy ( spec , acceptancePolicy , nil ) ; err != nil {
2016-06-13 22:52:49 -04:00
return fmt . Errorf ( "error updating cluster settings: %v" , err )
}
_ , err := client . UpdateCluster ( ctx , & swarmapi . UpdateClusterRequest {
ClusterID : cluster . ID ,
ClusterVersion : & cluster . Meta . Version ,
Spec : spec ,
} )
if err != nil {
return fmt . Errorf ( "error updating cluster settings: %v" , err )
}
return nil
}
}
return ctx . Err ( )
}