diff --git a/api/server/httputils/errors.go b/api/server/httputils/errors.go index 0f89dd90fc..da4db97915 100644 --- a/api/server/httputils/errors.go +++ b/api/server/httputils/errors.go @@ -8,6 +8,7 @@ import ( "github.com/docker/engine-api/types" "github.com/docker/engine-api/types/versions" "github.com/gorilla/mux" + "google.golang.org/grpc" ) // httpStatusError is an interface @@ -58,6 +59,7 @@ func GetHTTPErrorStatusCode(err error) int { "wrong login/password": http.StatusUnauthorized, "unauthorized": http.StatusUnauthorized, "hasn't been activated": http.StatusForbidden, + "this node": http.StatusNotAcceptable, } { if strings.Contains(errStr, keyword) { statusCode = status @@ -85,7 +87,7 @@ func MakeErrorHandler(err error) http.HandlerFunc { } WriteJSON(w, statusCode, response) } else { - http.Error(w, err.Error(), statusCode) + http.Error(w, grpc.ErrorDesc(err), statusCode) } } } diff --git a/api/server/router/network/backend.go b/api/server/router/network/backend.go index 4873e1ea28..6e322fa378 100644 --- a/api/server/router/network/backend.go +++ b/api/server/router/network/backend.go @@ -2,7 +2,6 @@ package network import ( "github.com/docker/engine-api/types" - "github.com/docker/engine-api/types/filters" "github.com/docker/engine-api/types/network" "github.com/docker/libnetwork" ) @@ -13,7 +12,7 @@ type Backend interface { FindNetwork(idName string) (libnetwork.Network, error) GetNetworkByName(idName string) (libnetwork.Network, error) GetNetworksByID(partialID string) []libnetwork.Network - FilterNetworks(netFilters filters.Args) ([]libnetwork.Network, error) + GetNetworks() []libnetwork.Network CreateNetwork(nc types.NetworkCreateRequest) (*types.NetworkCreateResponse, error) ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error DisconnectContainerFromNetwork(containerName string, network libnetwork.Network, force bool) error diff --git a/api/server/router/network/filter.go b/api/server/router/network/filter.go new file mode 100644 index 0000000000..b1c1dd187d --- /dev/null +++ b/api/server/router/network/filter.go @@ -0,0 +1,98 @@ +package network + +import ( + "fmt" + + "github.com/docker/docker/runconfig" + "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/filters" +) + +type filterHandler func([]types.NetworkResource, string) ([]types.NetworkResource, error) + +var ( + // AcceptedFilters is an acceptable filters for validation + AcceptedFilters = map[string]bool{ + "driver": true, + "type": true, + "name": true, + "id": true, + "label": true, + } +) + +func filterNetworkByType(nws []types.NetworkResource, netType string) (retNws []types.NetworkResource, err error) { + switch netType { + case "builtin": + for _, nw := range nws { + if runconfig.IsPreDefinedNetwork(nw.Name) { + retNws = append(retNws, nw) + } + } + case "custom": + for _, nw := range nws { + if !runconfig.IsPreDefinedNetwork(nw.Name) { + retNws = append(retNws, nw) + } + } + default: + return nil, fmt.Errorf("Invalid filter: 'type'='%s'", netType) + } + return retNws, nil +} + +// filterNetworks filters network list according to user specified filter +// and returns user chosen networks +func filterNetworks(nws []types.NetworkResource, filter filters.Args) ([]types.NetworkResource, error) { + // if filter is empty, return original network list + if filter.Len() == 0 { + return nws, nil + } + + if err := filter.Validate(AcceptedFilters); err != nil { + return nil, err + } + + var displayNet []types.NetworkResource + for _, nw := range nws { + if filter.Include("driver") { + if !filter.ExactMatch("driver", nw.Driver) { + continue + } + } + if filter.Include("name") { + if !filter.Match("name", nw.Name) { + continue + } + } + if filter.Include("id") { + if !filter.Match("id", nw.ID) { + continue + } + } + if filter.Include("label") { + if !filter.MatchKVList("label", nw.Labels) { + continue + } + } + displayNet = append(displayNet, nw) + } + + if filter.Include("type") { + var typeNet []types.NetworkResource + errFilter := filter.WalkValues("type", func(fval string) error { + passList, err := filterNetworkByType(displayNet, fval) + if err != nil { + return err + } + typeNet = append(typeNet, passList...) + return nil + }) + if errFilter != nil { + return nil, errFilter + } + displayNet = typeNet + } + + return displayNet, nil +} diff --git a/api/server/router/network/network.go b/api/server/router/network/network.go index 7c88089623..8688c3ed1f 100644 --- a/api/server/router/network/network.go +++ b/api/server/router/network/network.go @@ -1,17 +1,22 @@ package network -import "github.com/docker/docker/api/server/router" +import ( + "github.com/docker/docker/api/server/router" + "github.com/docker/docker/daemon/cluster" +) // networkRouter is a router to talk with the network controller type networkRouter struct { - backend Backend - routes []router.Route + backend Backend + clusterProvider *cluster.Cluster + routes []router.Route } // NewRouter initializes a new network router -func NewRouter(b Backend) router.Router { +func NewRouter(b Backend, c *cluster.Cluster) router.Router { r := &networkRouter{ - backend: b, + backend: b, + clusterProvider: c, } r.initRoutes() return r diff --git a/api/server/router/network/network_routes.go b/api/server/router/network/network_routes.go index 8da6de1577..7e5b94cb91 100644 --- a/api/server/router/network/network_routes.go +++ b/api/server/router/network/network_routes.go @@ -24,17 +24,30 @@ func (n *networkRouter) getNetworksList(ctx context.Context, w http.ResponseWrit return err } - list := []*types.NetworkResource{} + list := []types.NetworkResource{} - nwList, err := n.backend.FilterNetworks(netFilters) + if nr, err := n.clusterProvider.GetNetworks(); err == nil { + for _, nw := range nr { + list = append(list, nw) + } + } + + // Combine the network list returned by Docker daemon if it is not already + // returned by the cluster manager +SKIP: + for _, nw := range n.backend.GetNetworks() { + for _, nl := range list { + if nl.ID == nw.ID() { + continue SKIP + } + } + list = append(list, *n.buildNetworkResource(nw)) + } + + list, err = filterNetworks(list, netFilters) if err != nil { return err } - - for _, nw := range nwList { - list = append(list, buildNetworkResource(nw)) - } - return httputils.WriteJSON(w, http.StatusOK, list) } @@ -45,9 +58,12 @@ func (n *networkRouter) getNetwork(ctx context.Context, w http.ResponseWriter, r nw, err := n.backend.FindNetwork(vars["id"]) if err != nil { + if nr, err := n.clusterProvider.GetNetwork(vars["id"]); err == nil { + return httputils.WriteJSON(w, http.StatusOK, nr) + } return err } - return httputils.WriteJSON(w, http.StatusOK, buildNetworkResource(nw)) + return httputils.WriteJSON(w, http.StatusOK, n.buildNetworkResource(nw)) } func (n *networkRouter) postNetworkCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { @@ -67,7 +83,14 @@ func (n *networkRouter) postNetworkCreate(ctx context.Context, w http.ResponseWr nw, err := n.backend.CreateNetwork(create) if err != nil { - return err + if _, ok := err.(libnetwork.ManagerRedirectError); !ok { + return err + } + id, err := n.clusterProvider.CreateNetwork(create) + if err != nil { + return err + } + nw = &types.NetworkCreateResponse{ID: id} } return httputils.WriteJSON(w, http.StatusCreated, nw) @@ -121,6 +144,9 @@ func (n *networkRouter) deleteNetwork(ctx context.Context, w http.ResponseWriter if err := httputils.ParseForm(r); err != nil { return err } + if _, err := n.clusterProvider.GetNetwork(vars["id"]); err == nil { + return n.clusterProvider.RemoveNetwork(vars["id"]) + } if err := n.backend.DeleteNetwork(vars["id"]); err != nil { return err } @@ -128,7 +154,7 @@ func (n *networkRouter) deleteNetwork(ctx context.Context, w http.ResponseWriter return nil } -func buildNetworkResource(nw libnetwork.Network) *types.NetworkResource { +func (n *networkRouter) buildNetworkResource(nw libnetwork.Network) *types.NetworkResource { r := &types.NetworkResource{} if nw == nil { return r @@ -138,6 +164,13 @@ func buildNetworkResource(nw libnetwork.Network) *types.NetworkResource { r.Name = nw.Name() r.ID = nw.ID() r.Scope = info.Scope() + if n.clusterProvider.IsManager() { + if _, err := n.clusterProvider.GetNetwork(nw.Name()); err == nil { + r.Scope = "swarm" + } + } else if info.Dynamic() { + r.Scope = "swarm" + } r.Driver = nw.Type() r.EnableIPv6 = info.IPv6Enabled() r.Internal = info.Internal() diff --git a/api/server/router/swarm/backend.go b/api/server/router/swarm/backend.go new file mode 100644 index 0000000000..05fe00a0c2 --- /dev/null +++ b/api/server/router/swarm/backend.go @@ -0,0 +1,26 @@ +package swarm + +import ( + basictypes "github.com/docker/engine-api/types" + types "github.com/docker/engine-api/types/swarm" +) + +// Backend abstracts an swarm commands manager. +type Backend interface { + Init(req types.InitRequest) (string, error) + Join(req types.JoinRequest) error + Leave(force bool) error + Inspect() (types.Swarm, error) + Update(uint64, types.Spec) error + GetServices(basictypes.ServiceListOptions) ([]types.Service, error) + GetService(string) (types.Service, error) + CreateService(types.ServiceSpec) (string, error) + UpdateService(string, uint64, types.ServiceSpec) error + RemoveService(string) error + GetNodes(basictypes.NodeListOptions) ([]types.Node, error) + GetNode(string) (types.Node, error) + UpdateNode(string, uint64, types.NodeSpec) error + RemoveNode(string) error + GetTasks(basictypes.TaskListOptions) ([]types.Task, error) + GetTask(string) (types.Task, error) +} diff --git a/api/server/router/swarm/cluster.go b/api/server/router/swarm/cluster.go new file mode 100644 index 0000000000..a67ffa9632 --- /dev/null +++ b/api/server/router/swarm/cluster.go @@ -0,0 +1,44 @@ +package swarm + +import "github.com/docker/docker/api/server/router" + +// buildRouter is a router to talk with the build controller +type swarmRouter struct { + backend Backend + routes []router.Route +} + +// NewRouter initializes a new build router +func NewRouter(b Backend) router.Router { + r := &swarmRouter{ + backend: b, + } + r.initRoutes() + return r +} + +// Routes returns the available routers to the swarm controller +func (sr *swarmRouter) Routes() []router.Route { + return sr.routes +} + +func (sr *swarmRouter) initRoutes() { + sr.routes = []router.Route{ + router.NewPostRoute("/swarm/init", sr.initCluster), + router.NewPostRoute("/swarm/join", sr.joinCluster), + router.NewPostRoute("/swarm/leave", sr.leaveCluster), + router.NewGetRoute("/swarm", sr.inspectCluster), + router.NewPostRoute("/swarm/update", sr.updateCluster), + router.NewGetRoute("/services", sr.getServices), + router.NewGetRoute("/services/{id:.*}", sr.getService), + router.NewPostRoute("/services/create", sr.createService), + router.NewPostRoute("/services/{id:.*}/update", sr.updateService), + router.NewDeleteRoute("/services/{id:.*}", sr.removeService), + router.NewGetRoute("/nodes", sr.getNodes), + router.NewGetRoute("/nodes/{id:.*}", sr.getNode), + router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode), + router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode), + router.NewGetRoute("/tasks", sr.getTasks), + router.NewGetRoute("/tasks/{id:.*}", sr.getTask), + } +} diff --git a/api/server/router/swarm/cluster_routes.go b/api/server/router/swarm/cluster_routes.go new file mode 100644 index 0000000000..50f823e07e --- /dev/null +++ b/api/server/router/swarm/cluster_routes.go @@ -0,0 +1,229 @@ +package swarm + +import ( + "encoding/json" + "fmt" + "net/http" + "strconv" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/api/server/httputils" + basictypes "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/filters" + types "github.com/docker/engine-api/types/swarm" + "golang.org/x/net/context" +) + +func (sr *swarmRouter) initCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var req types.InitRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return err + } + nodeID, err := sr.backend.Init(req) + if err != nil { + logrus.Errorf("Error initializing swarm: %v", err) + return err + } + return httputils.WriteJSON(w, http.StatusOK, nodeID) +} + +func (sr *swarmRouter) joinCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var req types.JoinRequest + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + return err + } + return sr.backend.Join(req) +} + +func (sr *swarmRouter) leaveCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + + force := httputils.BoolValue(r, "force") + return sr.backend.Leave(force) +} + +func (sr *swarmRouter) inspectCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + swarm, err := sr.backend.Inspect() + if err != nil { + logrus.Errorf("Error getting swarm: %v", err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, swarm) +} + +func (sr *swarmRouter) updateCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var swarm types.Spec + if err := json.NewDecoder(r.Body).Decode(&swarm); err != nil { + return err + } + + rawVersion := r.URL.Query().Get("version") + version, err := strconv.ParseUint(rawVersion, 10, 64) + if err != nil { + return fmt.Errorf("Invalid swarm version '%s': %s", rawVersion, err.Error()) + } + + if err := sr.backend.Update(version, swarm); err != nil { + logrus.Errorf("Error configuring swarm: %v", err) + return err + } + return nil +} + +func (sr *swarmRouter) getServices(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + filter, err := filters.FromParam(r.Form.Get("filters")) + if err != nil { + return err + } + + services, err := sr.backend.GetServices(basictypes.ServiceListOptions{Filter: filter}) + if err != nil { + logrus.Errorf("Error getting services: %v", err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, services) +} + +func (sr *swarmRouter) getService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + service, err := sr.backend.GetService(vars["id"]) + if err != nil { + logrus.Errorf("Error getting service %s: %v", vars["id"], err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, service) +} + +func (sr *swarmRouter) createService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var service types.ServiceSpec + if err := json.NewDecoder(r.Body).Decode(&service); err != nil { + return err + } + + id, err := sr.backend.CreateService(service) + if err != nil { + logrus.Errorf("Error reating service %s: %v", id, err) + return err + } + + return httputils.WriteJSON(w, http.StatusCreated, &basictypes.ServiceCreateResponse{ + ID: id, + }) +} + +func (sr *swarmRouter) updateService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var service types.ServiceSpec + if err := json.NewDecoder(r.Body).Decode(&service); err != nil { + return err + } + + rawVersion := r.URL.Query().Get("version") + version, err := strconv.ParseUint(rawVersion, 10, 64) + if err != nil { + return fmt.Errorf("Invalid service version '%s': %s", rawVersion, err.Error()) + } + + if err := sr.backend.UpdateService(vars["id"], version, service); err != nil { + logrus.Errorf("Error updating service %s: %v", vars["id"], err) + return err + } + return nil +} + +func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := sr.backend.RemoveService(vars["id"]); err != nil { + logrus.Errorf("Error removing service %s: %v", vars["id"], err) + return err + } + return nil +} + +func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + filter, err := filters.FromParam(r.Form.Get("filters")) + if err != nil { + return err + } + + nodes, err := sr.backend.GetNodes(basictypes.NodeListOptions{Filter: filter}) + if err != nil { + logrus.Errorf("Error getting nodes: %v", err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, nodes) +} + +func (sr *swarmRouter) getNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + node, err := sr.backend.GetNode(vars["id"]) + if err != nil { + logrus.Errorf("Error getting node %s: %v", vars["id"], err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, node) +} + +func (sr *swarmRouter) updateNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + var node types.NodeSpec + if err := json.NewDecoder(r.Body).Decode(&node); err != nil { + return err + } + + rawVersion := r.URL.Query().Get("version") + version, err := strconv.ParseUint(rawVersion, 10, 64) + if err != nil { + return fmt.Errorf("Invalid node version '%s': %s", rawVersion, err.Error()) + } + + if err := sr.backend.UpdateNode(vars["id"], version, node); err != nil { + logrus.Errorf("Error updating node %s: %v", vars["id"], err) + return err + } + return nil +} + +func (sr *swarmRouter) removeNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := sr.backend.RemoveNode(vars["id"]); err != nil { + logrus.Errorf("Error removing node %s: %v", vars["id"], err) + return err + } + return nil +} + +func (sr *swarmRouter) getTasks(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if err := httputils.ParseForm(r); err != nil { + return err + } + filter, err := filters.FromParam(r.Form.Get("filters")) + if err != nil { + return err + } + + tasks, err := sr.backend.GetTasks(basictypes.TaskListOptions{Filter: filter}) + if err != nil { + logrus.Errorf("Error getting tasks: %v", err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, tasks) +} + +func (sr *swarmRouter) getTask(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + task, err := sr.backend.GetTask(vars["id"]) + if err != nil { + logrus.Errorf("Error getting task %s: %v", vars["id"], err) + return err + } + + return httputils.WriteJSON(w, http.StatusOK, task) +} diff --git a/api/server/router/system/system.go b/api/server/router/system/system.go index b34b8c3211..e5742c9fe8 100644 --- a/api/server/router/system/system.go +++ b/api/server/router/system/system.go @@ -1,18 +1,23 @@ package system -import "github.com/docker/docker/api/server/router" +import ( + "github.com/docker/docker/api/server/router" + "github.com/docker/docker/daemon/cluster" +) // systemRouter provides information about the Docker system overall. // It gathers information about host, daemon and container events. type systemRouter struct { - backend Backend - routes []router.Route + backend Backend + clusterProvider *cluster.Cluster + routes []router.Route } // NewRouter initializes a new system router -func NewRouter(b Backend) router.Router { +func NewRouter(b Backend, c *cluster.Cluster) router.Router { r := &systemRouter{ - backend: b, + backend: b, + clusterProvider: c, } r.routes = []router.Route{ diff --git a/api/server/router/system/system_routes.go b/api/server/router/system/system_routes.go index f921de303b..8050301c9d 100644 --- a/api/server/router/system/system_routes.go +++ b/api/server/router/system/system_routes.go @@ -33,6 +33,9 @@ func (s *systemRouter) getInfo(ctx context.Context, w http.ResponseWriter, r *ht if err != nil { return err } + if s.clusterProvider != nil { + info.Swarm = s.clusterProvider.Info() + } return httputils.WriteJSON(w, http.StatusOK, info) } diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 63eb22b2b1..2b39aae8c4 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -20,12 +20,14 @@ import ( "github.com/docker/docker/api/server/router/container" "github.com/docker/docker/api/server/router/image" "github.com/docker/docker/api/server/router/network" + swarmrouter "github.com/docker/docker/api/server/router/swarm" systemrouter "github.com/docker/docker/api/server/router/system" "github.com/docker/docker/api/server/router/volume" "github.com/docker/docker/builder/dockerfile" cliflags "github.com/docker/docker/cli/flags" "github.com/docker/docker/cliconfig" "github.com/docker/docker/daemon" + "github.com/docker/docker/daemon/cluster" "github.com/docker/docker/daemon/logger" "github.com/docker/docker/dockerversion" "github.com/docker/docker/libcontainerd" @@ -208,6 +210,7 @@ func (cli *DaemonCli) start() (err error) { } api := apiserver.New(serverConfig) + cli.api = api for i := 0; i < len(cli.Config.Hosts); i++ { var err error @@ -264,6 +267,17 @@ func (cli *DaemonCli) start() (err error) { return fmt.Errorf("Error starting daemon: %v", err) } + name, _ := os.Hostname() + + c, err := cluster.New(cluster.Config{ + Root: cli.Config.Root, + Name: name, + Backend: d, + }) + if err != nil { + logrus.Fatalf("Error creating cluster component: %v", err) + } + logrus.Info("Daemon has completed initialization") logrus.WithFields(logrus.Fields{ @@ -273,7 +287,7 @@ func (cli *DaemonCli) start() (err error) { }).Info("Docker daemon") cli.initMiddlewares(api, serverConfig) - initRouter(api, d) + initRouter(api, d, c) cli.d = d cli.setupConfigReloadTrap() @@ -290,6 +304,7 @@ func (cli *DaemonCli) start() (err error) { // Daemon is fully initialized and handling API traffic // Wait for serve API to complete errAPI := <-serveAPIWait + c.Cleanup() shutdownDaemon(d, 15) containerdRemote.Cleanup() if errAPI != nil { @@ -385,18 +400,19 @@ func loadDaemonCliConfig(config *daemon.Config, flags *flag.FlagSet, commonConfi return config, nil } -func initRouter(s *apiserver.Server, d *daemon.Daemon) { +func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) { decoder := runconfig.ContainerDecoder{} routers := []router.Router{ container.NewRouter(d, decoder), image.NewRouter(d, decoder), - systemrouter.NewRouter(d), + systemrouter.NewRouter(d, c), volume.NewRouter(d), build.NewRouter(dockerfile.NewBuildManager(d)), + swarmrouter.NewRouter(c), } if d.NetworkControllerEnabled() { - routers = append(routers, network.NewRouter(d)) + routers = append(routers, network.NewRouter(d, c)) } s.InitRouter(utils.IsDebugEnabled(), routers...) diff --git a/container/container.go b/container/container.go index 2ef31d53eb..1300d96d9c 100644 --- a/container/container.go +++ b/container/container.go @@ -66,6 +66,7 @@ type CommonContainer struct { RWLayer layer.RWLayer `json:"-"` ID string Created time.Time + Managed bool Path string Args []string Config *containertypes.Config @@ -790,7 +791,7 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC ipam := epConfig.IPAMConfig if ipam != nil && (ipam.IPv4Address != "" || ipam.IPv6Address != "") { createOptions = append(createOptions, - libnetwork.CreateOptionIpam(net.ParseIP(ipam.IPv4Address), net.ParseIP(ipam.IPv6Address), nil)) + libnetwork.CreateOptionIpam(net.ParseIP(ipam.IPv4Address), net.ParseIP(ipam.IPv6Address), nil, nil)) } for _, alias := range epConfig.Aliases { @@ -798,6 +799,27 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC } } + if container.NetworkSettings.Service != nil { + svcCfg := container.NetworkSettings.Service + + var vip string + if svcCfg.VirtualAddresses[n.ID()] != nil { + vip = svcCfg.VirtualAddresses[n.ID()].IPv4 + } + + var portConfigs []*libnetwork.PortConfig + for _, portConfig := range svcCfg.ExposedPorts { + portConfigs = append(portConfigs, &libnetwork.PortConfig{ + Name: portConfig.Name, + Protocol: libnetwork.PortConfig_Protocol(portConfig.Protocol), + TargetPort: portConfig.TargetPort, + PublishedPort: portConfig.PublishedPort, + }) + } + + createOptions = append(createOptions, libnetwork.CreateOptionService(svcCfg.Name, svcCfg.ID, net.ParseIP(vip), portConfigs)) + } + if !containertypes.NetworkMode(n.Name()).IsUserDefined() { createOptions = append(createOptions, libnetwork.CreateOptionDisableResolution()) } diff --git a/container/state.go b/container/state.go index 852ca1d0e5..b01bb3806c 100644 --- a/container/state.go +++ b/container/state.go @@ -5,6 +5,8 @@ import ( "sync" "time" + "golang.org/x/net/context" + "github.com/docker/go-units" ) @@ -139,6 +141,32 @@ func (s *State) WaitStop(timeout time.Duration) (int, error) { return s.getExitCode(), nil } +// WaitWithContext waits for the container to stop. Optional context can be +// passed for canceling the request. +func (s *State) WaitWithContext(ctx context.Context) <-chan int { + // todo(tonistiigi): make other wait functions use this + c := make(chan int) + go func() { + s.Lock() + if !s.Running { + exitCode := s.ExitCode + s.Unlock() + c <- exitCode + close(c) + return + } + waitChan := s.waitChan + s.Unlock() + select { + case <-waitChan: + c <- s.getExitCode() + case <-ctx.Done(): + } + close(c) + }() + return c +} + // IsRunning returns whether the running flag is set. Used by Container to check whether a container is running. func (s *State) IsRunning() bool { s.Lock() diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go new file mode 100644 index 0000000000..1746e9b2cf --- /dev/null +++ b/daemon/cluster/cluster.go @@ -0,0 +1,1056 @@ +package cluster + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "google.golang.org/grpc" + + "github.com/Sirupsen/logrus" + "github.com/docker/docker/daemon/cluster/convert" + executorpkg "github.com/docker/docker/daemon/cluster/executor" + "github.com/docker/docker/daemon/cluster/executor/container" + "github.com/docker/docker/errors" + "github.com/docker/docker/pkg/ioutils" + "github.com/docker/docker/runconfig" + apitypes "github.com/docker/engine-api/types" + types "github.com/docker/engine-api/types/swarm" + swarmagent "github.com/docker/swarmkit/agent" + swarmapi "github.com/docker/swarmkit/api" + "golang.org/x/net/context" +) + +const swarmDirName = "swarm" +const controlSocket = "control.sock" +const swarmConnectTimeout = 5 * time.Second +const stateFile = "docker-state.json" + +const ( + initialReconnectDelay = 100 * time.Millisecond + maxReconnectDelay = 10 * time.Second +) + +// ErrNoManager is returned then a manager-only function is called on non-manager +var ErrNoManager = fmt.Errorf("this node is not participating as a Swarm manager") + +// ErrNoSwarm is returned on leaving a cluster that was never initialized +var ErrNoSwarm = fmt.Errorf("this node is not part of Swarm") + +// ErrSwarmExists is returned on initialize or join request for a cluster that has already been activated +var ErrSwarmExists = fmt.Errorf("this node is already part of a Swarm") + +// ErrSwarmJoinTimeoutReached is returned when cluster join could not complete before timeout was reached. +var ErrSwarmJoinTimeoutReached = fmt.Errorf("timeout reached before node was joined") + +type state struct { + ListenAddr string +} + +// Config provides values for Cluster. +type Config struct { + Root string + Name string + Backend executorpkg.Backend +} + +// Cluster provides capabilities to pariticipate in a cluster as worker or a +// manager and a worker. +type Cluster struct { + sync.RWMutex + root string + config Config + configEvent chan struct{} // todo: make this array and goroutine safe + node *swarmagent.Node + conn *grpc.ClientConn + client swarmapi.ControlClient + ready bool + listenAddr string + err error + reconnectDelay time.Duration + stop bool + cancelDelay func() +} + +// New creates a new Cluster instance using provided config. +func New(config Config) (*Cluster, error) { + root := filepath.Join(config.Root, swarmDirName) + if err := os.MkdirAll(root, 0700); err != nil { + return nil, err + } + c := &Cluster{ + root: root, + config: config, + configEvent: make(chan struct{}, 10), + reconnectDelay: initialReconnectDelay, + } + + dt, err := ioutil.ReadFile(filepath.Join(root, stateFile)) + if err != nil { + if os.IsNotExist(err) { + return c, nil + } + return nil, err + } + + var st state + if err := json.Unmarshal(dt, &st); err != nil { + return nil, err + } + + n, ctx, err := c.startNewNode(false, st.ListenAddr, "", "", "", false) + if err != nil { + return nil, err + } + + select { + case <-time.After(swarmConnectTimeout): + logrus.Errorf("swarm component could not be started before timeout was reached") + case <-n.Ready(context.Background()): + case <-ctx.Done(): + } + if ctx.Err() != nil { + return nil, fmt.Errorf("swarm component could not be started") + } + go c.reconnectOnFailure(ctx) + return c, nil +} + +func (c *Cluster) checkCompatibility() error { + info, _ := c.config.Backend.SystemInfo() + if info != nil && (info.ClusterStore != "" || info.ClusterAdvertise != "") { + return fmt.Errorf("swarm mode is incompatible with `--cluster-store` and `--cluster-advertise daemon configuration") + } + return nil +} + +func (c *Cluster) saveState() error { + dt, err := json.Marshal(state{ListenAddr: c.listenAddr}) + if err != nil { + return err + } + return ioutils.AtomicWriteFile(filepath.Join(c.root, stateFile), dt, 0600) +} + +func (c *Cluster) reconnectOnFailure(ctx context.Context) { + for { + <-ctx.Done() + c.Lock() + if c.stop || c.node != nil { + c.Unlock() + return + } + c.reconnectDelay *= 2 + if c.reconnectDelay > maxReconnectDelay { + c.reconnectDelay = maxReconnectDelay + } + logrus.Warnf("Restarting swarm in %.2f seconds", c.reconnectDelay.Seconds()) + delayCtx, cancel := context.WithTimeout(context.Background(), c.reconnectDelay) + c.cancelDelay = cancel + c.Unlock() + <-delayCtx.Done() + if delayCtx.Err() != context.DeadlineExceeded { + return + } + c.Lock() + if c.node != nil { + c.Unlock() + return + } + var err error + _, ctx, err = c.startNewNode(false, c.listenAddr, c.getRemoteAddress(), "", "", false) + if err != nil { + c.err = err + ctx = delayCtx + } + c.Unlock() + } +} + +func (c *Cluster) startNewNode(forceNewCluster bool, listenAddr, joinAddr, secret, cahash string, ismanager bool) (*swarmagent.Node, context.Context, error) { + if err := c.checkCompatibility(); err != nil { + return nil, nil, err + } + c.node = nil + c.cancelDelay = nil + node, err := swarmagent.NewNode(&swarmagent.NodeConfig{ + Hostname: c.config.Name, + ForceNewCluster: forceNewCluster, + ListenControlAPI: filepath.Join(c.root, controlSocket), + ListenRemoteAPI: listenAddr, + JoinAddr: joinAddr, + StateDir: c.root, + CAHash: cahash, + Secret: secret, + Executor: container.NewExecutor(c.config.Backend), + HeartbeatTick: 1, + ElectionTick: 3, + IsManager: ismanager, + }) + if err != nil { + return nil, nil, err + } + ctx, cancel := context.WithCancel(context.Background()) + if err := node.Start(ctx); err != nil { + return nil, nil, err + } + + c.node = node + c.listenAddr = listenAddr + c.saveState() + c.config.Backend.SetClusterProvider(c) + go func() { + err := node.Err(ctx) + if err != nil { + logrus.Errorf("cluster exited with error: %v", err) + } + c.Lock() + c.conn = nil + c.client = nil + c.node = nil + c.ready = false + c.err = err + c.Unlock() + cancel() + }() + + go func() { + select { + case <-node.Ready(context.Background()): + c.Lock() + c.reconnectDelay = initialReconnectDelay + c.Unlock() + case <-ctx.Done(): + } + if ctx.Err() == nil { + c.Lock() + c.ready = true + c.err = nil + c.Unlock() + } + c.configEvent <- struct{}{} + }() + + go func() { + for conn := range node.ListenControlSocket(ctx) { + c.Lock() + if c.conn != conn { + c.client = swarmapi.NewControlClient(conn) + } + if c.conn != nil { + c.client = nil + } + c.conn = conn + c.Unlock() + c.configEvent <- struct{}{} + } + }() + + return node, ctx, nil +} + +// Init initializes new cluster from user provided request. +func (c *Cluster) Init(req types.InitRequest) (string, error) { + c.Lock() + if c.node != nil { + c.Unlock() + if !req.ForceNewCluster { + return "", ErrSwarmExists + } + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := c.node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + return "", err + } + c.Lock() + c.node = nil + c.conn = nil + c.ready = false + } + // todo: check current state existing + n, ctx, err := c.startNewNode(req.ForceNewCluster, req.ListenAddr, "", "", "", false) + if err != nil { + c.Unlock() + return "", err + } + c.Unlock() + + select { + case <-n.Ready(context.Background()): + if err := initAcceptancePolicy(n, req.Spec.AcceptancePolicy); err != nil { + return "", err + } + go c.reconnectOnFailure(ctx) + return n.NodeID(), nil + case <-ctx.Done(): + c.RLock() + defer c.RUnlock() + if c.err != nil { + if !req.ForceNewCluster { // if failure on first attempt don't keep state + if err := c.clearState(); err != nil { + return "", err + } + } + return "", c.err + } + return "", ctx.Err() + } +} + +// Join makes current Cluster part of an existing swarm cluster. +func (c *Cluster) Join(req types.JoinRequest) error { + c.Lock() + if c.node != nil { + c.Unlock() + return ErrSwarmExists + } + // todo: check current state existing + if len(req.RemoteAddrs) == 0 { + return fmt.Errorf("at least 1 RemoteAddr is required to join") + } + n, ctx, err := c.startNewNode(false, req.ListenAddr, req.RemoteAddrs[0], req.Secret, req.CACertHash, req.Manager) + if err != nil { + c.Unlock() + return err + } + c.Unlock() + + select { + case <-time.After(swarmConnectTimeout): + go c.reconnectOnFailure(ctx) + if nodeid := n.NodeID(); nodeid != "" { + return fmt.Errorf("Timeout reached before node was joined. Your cluster settings may be preventing this node from automatically joining. To accept this node into cluster run `docker node accept %v` in an existing cluster manager", nodeid) + } + return ErrSwarmJoinTimeoutReached + case <-n.Ready(context.Background()): + go c.reconnectOnFailure(ctx) + return nil + case <-ctx.Done(): + c.RLock() + defer c.RUnlock() + if c.err != nil { + return c.err + } + return ctx.Err() + } +} + +func (c *Cluster) cancelReconnect() { + c.stop = true + if c.cancelDelay != nil { + c.cancelDelay() + c.cancelDelay = nil + } +} + +// Leave shuts down Cluster and removes current state. +func (c *Cluster) Leave(force bool) error { + c.Lock() + node := c.node + if node == nil { + c.Unlock() + return ErrNoSwarm + } + + if node.Manager() != nil && !force { + msg := "You are attempting to leave cluster on a node that is participating as a manager. " + if c.isActiveManager() { + active, reachable, unreachable, err := c.managerStats() + if err == nil { + if active && reachable-2 <= unreachable { + if reachable == 1 && unreachable == 0 { + msg += "Leaving last manager will remove all current state of the cluster. Use `--force` to ignore this message. " + c.Unlock() + return fmt.Errorf(msg) + } + msg += fmt.Sprintf("Leaving cluster will leave you with %v managers out of %v. This means Raft quorum will be lost and your cluster will become inaccessible. ", reachable-1, reachable+unreachable) + } + } + } else { + msg += "Doing so may lose the consenus of your cluster. " + } + + msg += "Only way to restore a cluster that has lost consensus is to reinitialize it with `--force-new-cluster`. Use `--force` to ignore this message." + c.Unlock() + return fmt.Errorf(msg) + } + c.cancelReconnect() + c.Unlock() + + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + if err := node.Stop(ctx); err != nil && !strings.Contains(err.Error(), "context canceled") { + return err + } + nodeID := node.NodeID() + for _, id := range c.config.Backend.ListContainersForNode(nodeID) { + if err := c.config.Backend.ContainerRm(id, &apitypes.ContainerRmConfig{ForceRemove: true}); err != nil { + logrus.Errorf("error removing %v: %v", id, err) + } + } + c.Lock() + defer c.Unlock() + c.node = nil + c.conn = nil + c.ready = false + c.configEvent <- struct{}{} + // todo: cleanup optional? + if err := c.clearState(); err != nil { + return err + } + return nil +} + +func (c *Cluster) clearState() error { + if err := os.RemoveAll(c.root); err != nil { + return err + } + if err := os.MkdirAll(c.root, 0700); err != nil { + return err + } + c.config.Backend.SetClusterProvider(nil) + return nil +} + +func (c *Cluster) getRequestContext() context.Context { // TODO: not needed when requests don't block on qourum lost + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + return ctx +} + +// Inspect retrives the confuguration properties of managed swarm cluster. +func (c *Cluster) Inspect() (types.Swarm, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return types.Swarm{}, ErrNoManager + } + + swarm, err := getSwarm(c.getRequestContext(), c.client) + if err != nil { + return types.Swarm{}, err + } + + if err != nil { + return types.Swarm{}, err + } + + return convert.SwarmFromGRPC(*swarm), nil +} + +// Update updates configuration of a managed swarm cluster. +func (c *Cluster) Update(version uint64, spec types.Spec) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + swarmSpec, err := convert.SwarmSpecToGRPC(spec) + if err != nil { + return err + } + + swarm, err := getSwarm(c.getRequestContext(), c.client) + if err != nil { + return err + } + + _, err = c.client.UpdateCluster( + c.getRequestContext(), + &swarmapi.UpdateClusterRequest{ + ClusterID: swarm.ID, + Spec: &swarmSpec, + ClusterVersion: &swarmapi.Version{ + Index: version, + }, + }, + ) + return err +} + +// IsManager returns true is Cluster is participating as a manager. +func (c *Cluster) IsManager() bool { + c.RLock() + defer c.RUnlock() + return c.isActiveManager() +} + +// IsAgent returns true is Cluster is participating as a worker/agent. +func (c *Cluster) IsAgent() bool { + c.RLock() + defer c.RUnlock() + return c.ready +} + +// GetListenAddress returns the listening address for current maanger's +// consensus and dispatcher APIs. +func (c *Cluster) GetListenAddress() string { + c.RLock() + defer c.RUnlock() + if c.conn != nil { + return c.listenAddr + } + return "" +} + +// GetRemoteAddress returns a known advertise address of a remote maanger if +// available. +// todo: change to array/connect with info +func (c *Cluster) GetRemoteAddress() string { + c.RLock() + defer c.RUnlock() + return c.getRemoteAddress() +} + +func (c *Cluster) getRemoteAddress() string { + if c.node == nil { + return "" + } + nodeID := c.node.NodeID() + for _, r := range c.node.Remotes() { + if r.NodeID != nodeID { + return r.Addr + } + } + return "" +} + +// ListenClusterEvents returns a channel that receives messages on cluster +// participation changes. +// todo: make cancelable and accessible to multiple callers +func (c *Cluster) ListenClusterEvents() <-chan struct{} { + return c.configEvent +} + +// Info returns information about the current cluster state. +func (c *Cluster) Info() types.Info { + var info types.Info + c.RLock() + defer c.RUnlock() + + if c.node == nil { + info.LocalNodeState = types.LocalNodeStateInactive + if c.cancelDelay != nil { + info.LocalNodeState = types.LocalNodeStateError + } + } else { + info.LocalNodeState = types.LocalNodeStatePending + if c.ready == true { + info.LocalNodeState = types.LocalNodeStateActive + } + } + if c.err != nil { + info.Error = c.err.Error() + } + + if c.isActiveManager() { + info.ControlAvailable = true + if r, err := c.client.ListNodes(c.getRequestContext(), &swarmapi.ListNodesRequest{}); err == nil { + info.Nodes = len(r.Nodes) + for _, n := range r.Nodes { + if n.ManagerStatus != nil { + info.Managers = info.Managers + 1 + } + } + } + + if swarm, err := getSwarm(c.getRequestContext(), c.client); err == nil && swarm != nil { + info.CACertHash = swarm.RootCA.CACertHash + } + } + + if c.node != nil { + for _, r := range c.node.Remotes() { + info.RemoteManagers = append(info.RemoteManagers, types.Peer{NodeID: r.NodeID, Addr: r.Addr}) + } + info.NodeID = c.node.NodeID() + } + + return info +} + +// isActiveManager should not be called without a read lock +func (c *Cluster) isActiveManager() bool { + return c.conn != nil +} + +// GetServices returns all services of a managed swarm cluster. +func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Service, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return nil, ErrNoManager + } + + filters, err := newListServicesFilters(options.Filter) + if err != nil { + return nil, err + } + r, err := c.client.ListServices( + c.getRequestContext(), + &swarmapi.ListServicesRequest{Filters: filters}) + if err != nil { + return nil, err + } + + var services []types.Service + + for _, service := range r.Services { + services = append(services, convert.ServiceFromGRPC(*service)) + } + + return services, nil +} + +// CreateService creates a new service in a managed swarm cluster. +func (c *Cluster) CreateService(s types.ServiceSpec) (string, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return "", ErrNoManager + } + + ctx := c.getRequestContext() + + err := populateNetworkID(ctx, c.client, &s) + if err != nil { + return "", err + } + + serviceSpec, err := convert.ServiceSpecToGRPC(s) + if err != nil { + return "", err + } + r, err := c.client.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) + if err != nil { + return "", err + } + + return r.Service.ID, nil +} + +// GetService returns a service based on a ID or name. +func (c *Cluster) GetService(input string) (types.Service, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return types.Service{}, ErrNoManager + } + + service, err := getService(c.getRequestContext(), c.client, input) + if err != nil { + return types.Service{}, err + } + return convert.ServiceFromGRPC(*service), nil +} + +// UpdateService updates existing service to match new properties. +func (c *Cluster) UpdateService(serviceID string, version uint64, spec types.ServiceSpec) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + serviceSpec, err := convert.ServiceSpecToGRPC(spec) + if err != nil { + return err + } + + _, err = c.client.UpdateService( + c.getRequestContext(), + &swarmapi.UpdateServiceRequest{ + ServiceID: serviceID, + Spec: &serviceSpec, + ServiceVersion: &swarmapi.Version{ + Index: version, + }, + }, + ) + return err +} + +// RemoveService removes a service from a managed swarm cluster. +func (c *Cluster) RemoveService(input string) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + service, err := getService(c.getRequestContext(), c.client, input) + if err != nil { + return err + } + + if _, err := c.client.RemoveService(c.getRequestContext(), &swarmapi.RemoveServiceRequest{ServiceID: service.ID}); err != nil { + return err + } + return nil +} + +// GetNodes returns a list of all nodes known to a cluster. +func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return nil, ErrNoManager + } + + filters, err := newListNodesFilters(options.Filter) + if err != nil { + return nil, err + } + r, err := c.client.ListNodes( + c.getRequestContext(), + &swarmapi.ListNodesRequest{Filters: filters}) + if err != nil { + return nil, err + } + + nodes := []types.Node{} + + for _, node := range r.Nodes { + nodes = append(nodes, convert.NodeFromGRPC(*node)) + } + return nodes, nil +} + +// GetNode returns a node based on a ID or name. +func (c *Cluster) GetNode(input string) (types.Node, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return types.Node{}, ErrNoManager + } + + node, err := getNode(c.getRequestContext(), c.client, input) + if err != nil { + return types.Node{}, err + } + return convert.NodeFromGRPC(*node), nil +} + +// UpdateNode updates existing nodes properties. +func (c *Cluster) UpdateNode(nodeID string, version uint64, spec types.NodeSpec) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + nodeSpec, err := convert.NodeSpecToGRPC(spec) + if err != nil { + return err + } + + _, err = c.client.UpdateNode( + c.getRequestContext(), + &swarmapi.UpdateNodeRequest{ + NodeID: nodeID, + Spec: &nodeSpec, + NodeVersion: &swarmapi.Version{ + Index: version, + }, + }, + ) + return err +} + +// RemoveNode removes a node from a cluster +func (c *Cluster) RemoveNode(input string) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + ctx := c.getRequestContext() + + node, err := getNode(ctx, c.client, input) + if err != nil { + return err + } + + if _, err := c.client.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID}); err != nil { + return err + } + return nil +} + +// GetTasks returns a list of tasks matching the filter options. +func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return nil, ErrNoManager + } + + filters, err := newListTasksFilters(options.Filter) + if err != nil { + return nil, err + } + r, err := c.client.ListTasks( + c.getRequestContext(), + &swarmapi.ListTasksRequest{Filters: filters}) + if err != nil { + return nil, err + } + + tasks := []types.Task{} + + for _, task := range r.Tasks { + tasks = append(tasks, convert.TaskFromGRPC(*task)) + } + return tasks, nil +} + +// GetTask returns a task by an ID. +func (c *Cluster) GetTask(input string) (types.Task, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return types.Task{}, ErrNoManager + } + + task, err := getTask(c.getRequestContext(), c.client, input) + if err != nil { + return types.Task{}, err + } + return convert.TaskFromGRPC(*task), nil +} + +// GetNetwork returns a cluster network by ID. +func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return apitypes.NetworkResource{}, ErrNoManager + } + + network, err := getNetwork(c.getRequestContext(), c.client, input) + if err != nil { + return apitypes.NetworkResource{}, err + } + return convert.BasicNetworkFromGRPC(*network), nil +} + +// GetNetworks returns all current cluster managed networks. +func (c *Cluster) GetNetworks() ([]apitypes.NetworkResource, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return nil, ErrNoManager + } + + r, err := c.client.ListNetworks(c.getRequestContext(), &swarmapi.ListNetworksRequest{}) + if err != nil { + return nil, err + } + + var networks []apitypes.NetworkResource + + for _, network := range r.Networks { + networks = append(networks, convert.BasicNetworkFromGRPC(*network)) + } + + return networks, nil +} + +// CreateNetwork creates a new cluster managed network. +func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return "", ErrNoManager + } + + if runconfig.IsPreDefinedNetwork(s.Name) { + err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name) + return "", errors.NewRequestForbiddenError(err) + } + + networkSpec := convert.BasicNetworkCreateToGRPC(s) + r, err := c.client.CreateNetwork(c.getRequestContext(), &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) + if err != nil { + return "", err + } + + return r.Network.ID, nil +} + +// RemoveNetwork removes a cluster network. +func (c *Cluster) RemoveNetwork(input string) error { + c.RLock() + defer c.RUnlock() + + if !c.isActiveManager() { + return ErrNoManager + } + + network, err := getNetwork(c.getRequestContext(), c.client, input) + if err != nil { + return err + } + + if _, err := c.client.RemoveNetwork(c.getRequestContext(), &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}); err != nil { + return err + } + return nil +} + +func populateNetworkID(ctx context.Context, c swarmapi.ControlClient, s *types.ServiceSpec) error { + for i, n := range s.Networks { + apiNetwork, err := getNetwork(ctx, c, n.Target) + if err != nil { + return err + } + s.Networks[i] = types.NetworkAttachmentConfig{Target: apiNetwork.ID} + } + return nil +} + +func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Network, error) { + // GetNetwork to match via full ID. + rg, err := c.GetNetwork(ctx, &swarmapi.GetNetworkRequest{NetworkID: input}) + if err != nil { + // If any error (including NotFound), ListNetworks to match via ID prefix and full name. + rl, err := c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{Names: []string{input}}}) + if err != nil || len(rl.Networks) == 0 { + rl, err = c.ListNetworks(ctx, &swarmapi.ListNetworksRequest{Filters: &swarmapi.ListNetworksRequest_Filters{IDPrefixes: []string{input}}}) + } + + if err != nil { + return nil, err + } + + if len(rl.Networks) == 0 { + return nil, fmt.Errorf("network %s not found", input) + } + + if l := len(rl.Networks); l > 1 { + return nil, fmt.Errorf("network %s is ambigious (%d matches found)", input, l) + } + + return rl.Networks[0], nil + } + return rg.Network, nil +} + +// Cleanup stops active swarm node. This is run before daemon shutdown. +func (c *Cluster) Cleanup() { + c.Lock() + node := c.node + if node == nil { + c.Unlock() + return + } + + if c.isActiveManager() { + active, reachable, unreachable, err := c.managerStats() + if err == nil { + singlenode := active && reachable == 1 && unreachable == 0 + if active && !singlenode && reachable-2 <= unreachable { + logrus.Errorf("Leaving cluster with %v managers left out of %v. Raft quorum will be lost.", reachable-1, reachable+unreachable) + } + } + } + c.cancelReconnect() + c.Unlock() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + if err := node.Stop(ctx); err != nil { + logrus.Errorf("error cleaning up cluster: %v", err) + } + c.Lock() + c.node = nil + c.ready = false + c.conn = nil + c.Unlock() +} + +func (c *Cluster) managerStats() (current bool, reachable int, unreachable int, err error) { + ctx, _ := context.WithTimeout(context.Background(), 3*time.Second) + nodes, err := c.client.ListNodes(ctx, &swarmapi.ListNodesRequest{}) + if err != nil { + return false, 0, 0, err + } + for _, n := range nodes.Nodes { + if n.ManagerStatus != nil { + if n.ManagerStatus.Raft.Status.Reachability == swarmapi.RaftMemberStatus_REACHABLE { + reachable++ + if n.ID == c.node.NodeID() { + current = true + } + } + if n.ManagerStatus.Raft.Status.Reachability == swarmapi.RaftMemberStatus_UNREACHABLE { + unreachable++ + } + } + } + return +} + +func initAcceptancePolicy(node *swarmagent.Node, acceptancePolicy types.AcceptancePolicy) error { + ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) + for conn := range node.ListenControlSocket(ctx) { + if ctx.Err() != nil { + return ctx.Err() + } + if conn != nil { + client := swarmapi.NewControlClient(conn) + var cluster *swarmapi.Cluster + for i := 0; ; i++ { + lcr, err := client.ListClusters(ctx, &swarmapi.ListClustersRequest{}) + if err != nil { + return fmt.Errorf("error on listing clusters: %v", err) + } + if len(lcr.Clusters) == 0 { + if i < 10 { + time.Sleep(200 * time.Millisecond) + continue + } + return fmt.Errorf("empty list of clusters was returned") + } + cluster = lcr.Clusters[0] + break + } + spec := &cluster.Spec + + if err := convert.SwarmSpecUpdateAcceptancePolicy(spec, acceptancePolicy); err != nil { + return fmt.Errorf("error updating cluster settings: %v", err) + } + _, err := client.UpdateCluster(ctx, &swarmapi.UpdateClusterRequest{ + ClusterID: cluster.ID, + ClusterVersion: &cluster.Meta.Version, + Spec: spec, + }) + if err != nil { + return fmt.Errorf("error updating cluster settings: %v", err) + } + return nil + } + } + return ctx.Err() +} diff --git a/daemon/cluster/convert/container.go b/daemon/cluster/convert/container.go new file mode 100644 index 0000000000..c943537ad4 --- /dev/null +++ b/daemon/cluster/convert/container.go @@ -0,0 +1,116 @@ +package convert + +import ( + "fmt" + "strings" + + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +func containerSpecFromGRPC(c *swarmapi.ContainerSpec) types.ContainerSpec { + containerSpec := types.ContainerSpec{ + Image: c.Image, + Labels: c.Labels, + Command: c.Command, + Args: c.Args, + Env: c.Env, + Dir: c.Dir, + User: c.User, + } + + // Mounts + for _, m := range c.Mounts { + mount := types.Mount{ + Target: m.Target, + Source: m.Source, + Type: types.MountType(strings.ToLower(swarmapi.Mount_MountType_name[int32(m.Type)])), + Writable: m.Writable, + } + + if m.BindOptions != nil { + mount.BindOptions = &types.BindOptions{ + Propagation: types.MountPropagation(strings.ToLower(swarmapi.Mount_BindOptions_MountPropagation_name[int32(m.BindOptions.Propagation)])), + } + } + + if m.VolumeOptions != nil { + mount.VolumeOptions = &types.VolumeOptions{ + Populate: m.VolumeOptions.Populate, + Labels: m.VolumeOptions.Labels, + } + if m.VolumeOptions.DriverConfig != nil { + mount.VolumeOptions.DriverConfig = &types.Driver{ + Name: m.VolumeOptions.DriverConfig.Name, + Options: m.VolumeOptions.DriverConfig.Options, + } + } + } + containerSpec.Mounts = append(containerSpec.Mounts, mount) + } + + if c.StopGracePeriod != nil { + grace, _ := ptypes.Duration(c.StopGracePeriod) + containerSpec.StopGracePeriod = &grace + } + return containerSpec +} + +func containerToGRPC(c types.ContainerSpec) (*swarmapi.ContainerSpec, error) { + containerSpec := &swarmapi.ContainerSpec{ + Image: c.Image, + Labels: c.Labels, + Command: c.Command, + Args: c.Args, + Env: c.Env, + Dir: c.Dir, + User: c.User, + } + + if c.StopGracePeriod != nil { + containerSpec.StopGracePeriod = ptypes.DurationProto(*c.StopGracePeriod) + } + + // Mounts + for _, m := range c.Mounts { + mount := swarmapi.Mount{ + Target: m.Target, + Source: m.Source, + Writable: m.Writable, + } + + if mountType, ok := swarmapi.Mount_MountType_value[strings.ToUpper(string(m.Type))]; ok { + mount.Type = swarmapi.Mount_MountType(mountType) + } else if string(m.Type) != "" { + return nil, fmt.Errorf("invalid MountType: %q", m.Type) + } + + if m.BindOptions != nil { + if mountPropagation, ok := swarmapi.Mount_BindOptions_MountPropagation_value[strings.ToUpper(string(m.BindOptions.Propagation))]; ok { + mount.BindOptions = &swarmapi.Mount_BindOptions{Propagation: swarmapi.Mount_BindOptions_MountPropagation(mountPropagation)} + } else if string(m.BindOptions.Propagation) != "" { + return nil, fmt.Errorf("invalid MountPropagation: %q", m.BindOptions.Propagation) + + } + + } + + if m.VolumeOptions != nil { + mount.VolumeOptions = &swarmapi.Mount_VolumeOptions{ + Populate: m.VolumeOptions.Populate, + Labels: m.VolumeOptions.Labels, + } + if m.VolumeOptions.DriverConfig != nil { + mount.VolumeOptions.DriverConfig = &swarmapi.Driver{ + Name: m.VolumeOptions.DriverConfig.Name, + Options: m.VolumeOptions.DriverConfig.Options, + } + } + } + + containerSpec.Mounts = append(containerSpec.Mounts, mount) + } + + return containerSpec, nil +} diff --git a/daemon/cluster/convert/network.go b/daemon/cluster/convert/network.go new file mode 100644 index 0000000000..53b952427a --- /dev/null +++ b/daemon/cluster/convert/network.go @@ -0,0 +1,194 @@ +package convert + +import ( + "strings" + + basictypes "github.com/docker/engine-api/types" + networktypes "github.com/docker/engine-api/types/network" + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +func networkAttachementFromGRPC(na *swarmapi.NetworkAttachment) types.NetworkAttachment { + if na != nil { + return types.NetworkAttachment{ + Network: networkFromGRPC(na.Network), + Addresses: na.Addresses, + } + } + return types.NetworkAttachment{} +} + +func networkFromGRPC(n *swarmapi.Network) types.Network { + if n != nil { + network := types.Network{ + ID: n.ID, + Spec: types.NetworkSpec{ + IPv6Enabled: n.Spec.Ipv6Enabled, + Internal: n.Spec.Internal, + IPAMOptions: ipamFromGRPC(n.Spec.IPAM), + }, + IPAMOptions: ipamFromGRPC(n.IPAM), + } + + // Meta + network.Version.Index = n.Meta.Version.Index + network.CreatedAt, _ = ptypes.Timestamp(n.Meta.CreatedAt) + network.UpdatedAt, _ = ptypes.Timestamp(n.Meta.UpdatedAt) + + //Annotations + network.Spec.Name = n.Spec.Annotations.Name + network.Spec.Labels = n.Spec.Annotations.Labels + + //DriverConfiguration + if n.Spec.DriverConfig != nil { + network.Spec.DriverConfiguration = &types.Driver{ + Name: n.Spec.DriverConfig.Name, + Options: n.Spec.DriverConfig.Options, + } + } + + //DriverState + if n.DriverState != nil { + network.DriverState = types.Driver{ + Name: n.DriverState.Name, + Options: n.DriverState.Options, + } + } + + return network + } + return types.Network{} +} + +func ipamFromGRPC(i *swarmapi.IPAMOptions) *types.IPAMOptions { + var ipam *types.IPAMOptions + if i != nil { + ipam = &types.IPAMOptions{} + if i.Driver != nil { + ipam.Driver.Name = i.Driver.Name + ipam.Driver.Options = i.Driver.Options + } + + for _, config := range i.Configs { + ipam.Configs = append(ipam.Configs, types.IPAMConfig{ + Subnet: config.Subnet, + Range: config.Range, + Gateway: config.Gateway, + }) + } + } + return ipam +} + +func endpointSpecFromGRPC(es *swarmapi.EndpointSpec) *types.EndpointSpec { + var endpointSpec *types.EndpointSpec + if es != nil { + endpointSpec = &types.EndpointSpec{} + endpointSpec.Mode = types.ResolutionMode(strings.ToLower(es.Mode.String())) + + for _, portState := range es.Ports { + endpointSpec.Ports = append(endpointSpec.Ports, types.PortConfig{ + Name: portState.Name, + Protocol: types.PortConfigProtocol(strings.ToLower(swarmapi.PortConfig_Protocol_name[int32(portState.Protocol)])), + TargetPort: portState.TargetPort, + PublishedPort: portState.PublishedPort, + }) + } + } + return endpointSpec +} + +func endpointFromGRPC(e *swarmapi.Endpoint) types.Endpoint { + endpoint := types.Endpoint{} + if e != nil { + if espec := endpointSpecFromGRPC(e.Spec); espec != nil { + endpoint.Spec = *espec + } + + for _, portState := range e.Ports { + endpoint.Ports = append(endpoint.Ports, types.PortConfig{ + Name: portState.Name, + Protocol: types.PortConfigProtocol(strings.ToLower(swarmapi.PortConfig_Protocol_name[int32(portState.Protocol)])), + TargetPort: portState.TargetPort, + PublishedPort: portState.PublishedPort, + }) + } + + for _, v := range e.VirtualIPs { + endpoint.VirtualIPs = append(endpoint.VirtualIPs, types.EndpointVirtualIP{ + NetworkID: v.NetworkID, + Addr: v.Addr}) + } + + } + + return endpoint +} + +// BasicNetworkFromGRPC converts a grpc Network to a NetworkResource. +func BasicNetworkFromGRPC(n swarmapi.Network) basictypes.NetworkResource { + spec := n.Spec + var ipam networktypes.IPAM + if spec.IPAM != nil { + if spec.IPAM.Driver != nil { + ipam.Driver = spec.IPAM.Driver.Name + ipam.Options = spec.IPAM.Driver.Options + } + ipam.Config = make([]networktypes.IPAMConfig, 0, len(spec.IPAM.Configs)) + for _, ic := range spec.IPAM.Configs { + ipamConfig := networktypes.IPAMConfig{ + Subnet: ic.Subnet, + IPRange: ic.Range, + Gateway: ic.Gateway, + AuxAddress: ic.Reserved, + } + ipam.Config = append(ipam.Config, ipamConfig) + } + } + + return basictypes.NetworkResource{ + ID: n.ID, + Name: n.Spec.Annotations.Name, + Scope: "swarm", + Driver: n.DriverState.Name, + EnableIPv6: spec.Ipv6Enabled, + IPAM: ipam, + Internal: spec.Internal, + Options: n.DriverState.Options, + Labels: n.Spec.Annotations.Labels, + } +} + +// BasicNetworkCreateToGRPC converts a NetworkCreateRequest to a grpc NetworkSpec. +func BasicNetworkCreateToGRPC(create basictypes.NetworkCreateRequest) swarmapi.NetworkSpec { + ns := swarmapi.NetworkSpec{ + Annotations: swarmapi.Annotations{ + Name: create.Name, + Labels: create.Labels, + }, + DriverConfig: &swarmapi.Driver{ + Name: create.Driver, + Options: create.Options, + }, + Ipv6Enabled: create.EnableIPv6, + Internal: create.Internal, + IPAM: &swarmapi.IPAMOptions{ + Driver: &swarmapi.Driver{ + Name: create.IPAM.Driver, + Options: create.IPAM.Options, + }, + }, + } + ipamSpec := make([]*swarmapi.IPAMConfig, 0, len(create.IPAM.Config)) + for _, ipamConfig := range create.IPAM.Config { + ipamSpec = append(ipamSpec, &swarmapi.IPAMConfig{ + Subnet: ipamConfig.Subnet, + Range: ipamConfig.IPRange, + Gateway: ipamConfig.Gateway, + }) + } + ns.IPAM.Configs = ipamSpec + return ns +} diff --git a/daemon/cluster/convert/node.go b/daemon/cluster/convert/node.go new file mode 100644 index 0000000000..fb15b2b5fa --- /dev/null +++ b/daemon/cluster/convert/node.go @@ -0,0 +1,95 @@ +package convert + +import ( + "fmt" + "strings" + + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +// NodeFromGRPC converts a grpc Node to a Node. +func NodeFromGRPC(n swarmapi.Node) types.Node { + node := types.Node{ + ID: n.ID, + Spec: types.NodeSpec{ + Role: types.NodeRole(strings.ToLower(n.Spec.Role.String())), + Membership: types.NodeMembership(strings.ToLower(n.Spec.Membership.String())), + Availability: types.NodeAvailability(strings.ToLower(n.Spec.Availability.String())), + }, + Status: types.NodeStatus{ + State: types.NodeState(strings.ToLower(n.Status.State.String())), + Message: n.Status.Message, + }, + } + + // Meta + node.Version.Index = n.Meta.Version.Index + node.CreatedAt, _ = ptypes.Timestamp(n.Meta.CreatedAt) + node.UpdatedAt, _ = ptypes.Timestamp(n.Meta.UpdatedAt) + + //Annotations + node.Spec.Name = n.Spec.Annotations.Name + node.Spec.Labels = n.Spec.Annotations.Labels + + //Description + if n.Description != nil { + node.Description.Hostname = n.Description.Hostname + if n.Description.Platform != nil { + node.Description.Platform.Architecture = n.Description.Platform.Architecture + node.Description.Platform.OS = n.Description.Platform.OS + } + if n.Description.Resources != nil { + node.Description.Resources.NanoCPUs = n.Description.Resources.NanoCPUs + node.Description.Resources.MemoryBytes = n.Description.Resources.MemoryBytes + } + if n.Description.Engine != nil { + node.Description.Engine.EngineVersion = n.Description.Engine.EngineVersion + node.Description.Engine.Labels = n.Description.Engine.Labels + for _, plugin := range n.Description.Engine.Plugins { + node.Description.Engine.Plugins = append(node.Description.Engine.Plugins, types.PluginDescription{Type: plugin.Type, Name: plugin.Name}) + } + } + } + + //Manager + if n.ManagerStatus != nil { + node.ManagerStatus = &types.ManagerStatus{ + Leader: n.ManagerStatus.Raft.Status.Leader, + Reachability: types.Reachability(strings.ToLower(n.ManagerStatus.Raft.Status.Reachability.String())), + Addr: n.ManagerStatus.Raft.Addr, + } + } + + return node +} + +// NodeSpecToGRPC converts a NodeSpec to a grpc NodeSpec. +func NodeSpecToGRPC(s types.NodeSpec) (swarmapi.NodeSpec, error) { + spec := swarmapi.NodeSpec{ + Annotations: swarmapi.Annotations{ + Name: s.Name, + Labels: s.Labels, + }, + } + if role, ok := swarmapi.NodeRole_value[strings.ToUpper(string(s.Role))]; ok { + spec.Role = swarmapi.NodeRole(role) + } else { + return swarmapi.NodeSpec{}, fmt.Errorf("invalid Role: %q", s.Role) + } + + if membership, ok := swarmapi.NodeSpec_Membership_value[strings.ToUpper(string(s.Membership))]; ok { + spec.Membership = swarmapi.NodeSpec_Membership(membership) + } else { + return swarmapi.NodeSpec{}, fmt.Errorf("invalid Membership: %q", s.Membership) + } + + if availability, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(s.Availability))]; ok { + spec.Availability = swarmapi.NodeSpec_Availability(availability) + } else { + return swarmapi.NodeSpec{}, fmt.Errorf("invalid Availability: %q", s.Availability) + } + + return spec, nil +} diff --git a/daemon/cluster/convert/service.go b/daemon/cluster/convert/service.go new file mode 100644 index 0000000000..60df93a59e --- /dev/null +++ b/daemon/cluster/convert/service.go @@ -0,0 +1,252 @@ +package convert + +import ( + "fmt" + "strings" + + "github.com/docker/docker/pkg/namesgenerator" + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +// ServiceFromGRPC converts a grpc Service to a Service. +func ServiceFromGRPC(s swarmapi.Service) types.Service { + spec := s.Spec + containerConfig := spec.Task.Runtime.(*swarmapi.TaskSpec_Container).Container + + networks := make([]types.NetworkAttachmentConfig, 0, len(spec.Networks)) + for _, n := range spec.Networks { + networks = append(networks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases}) + } + service := types.Service{ + ID: s.ID, + + Spec: types.ServiceSpec{ + TaskTemplate: types.TaskSpec{ + ContainerSpec: containerSpecFromGRPC(containerConfig), + Resources: resourcesFromGRPC(s.Spec.Task.Resources), + RestartPolicy: restartPolicyFromGRPC(s.Spec.Task.Restart), + Placement: placementFromGRPC(s.Spec.Task.Placement), + }, + + Networks: networks, + EndpointSpec: endpointSpecFromGRPC(s.Spec.Endpoint), + }, + Endpoint: endpointFromGRPC(s.Endpoint), + } + + // Meta + service.Version.Index = s.Meta.Version.Index + service.CreatedAt, _ = ptypes.Timestamp(s.Meta.CreatedAt) + service.UpdatedAt, _ = ptypes.Timestamp(s.Meta.UpdatedAt) + + // Annotations + service.Spec.Name = s.Spec.Annotations.Name + service.Spec.Labels = s.Spec.Annotations.Labels + + // UpdateConfig + if s.Spec.Update != nil { + service.Spec.UpdateConfig = &types.UpdateConfig{ + Parallelism: s.Spec.Update.Parallelism, + } + + service.Spec.UpdateConfig.Delay, _ = ptypes.Duration(&s.Spec.Update.Delay) + } + + //Mode + switch t := s.Spec.GetMode().(type) { + case *swarmapi.ServiceSpec_Global: + service.Spec.Mode.Global = &types.GlobalService{} + case *swarmapi.ServiceSpec_Replicated: + service.Spec.Mode.Replicated = &types.ReplicatedService{ + Replicas: &t.Replicated.Replicas, + } + } + + return service +} + +// ServiceSpecToGRPC converts a ServiceSpec to a grpc ServiceSpec. +func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) { + name := s.Name + if name == "" { + name = namesgenerator.GetRandomName(0) + } + + networks := make([]*swarmapi.ServiceSpec_NetworkAttachmentConfig, 0, len(s.Networks)) + for _, n := range s.Networks { + networks = append(networks, &swarmapi.ServiceSpec_NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases}) + } + + spec := swarmapi.ServiceSpec{ + Annotations: swarmapi.Annotations{ + Name: name, + Labels: s.Labels, + }, + Task: swarmapi.TaskSpec{ + Resources: resourcesToGRPC(s.TaskTemplate.Resources), + }, + Networks: networks, + } + + containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec) + if err != nil { + return swarmapi.ServiceSpec{}, err + } + spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec} + + restartPolicy, err := restartPolicyToGRPC(s.TaskTemplate.RestartPolicy) + if err != nil { + return swarmapi.ServiceSpec{}, err + } + spec.Task.Restart = restartPolicy + + if s.TaskTemplate.Placement != nil { + spec.Task.Placement = &swarmapi.Placement{ + Constraints: s.TaskTemplate.Placement.Constraints, + } + } + + if s.UpdateConfig != nil { + spec.Update = &swarmapi.UpdateConfig{ + Parallelism: s.UpdateConfig.Parallelism, + Delay: *ptypes.DurationProto(s.UpdateConfig.Delay), + } + } + + if s.EndpointSpec != nil { + if s.EndpointSpec.Mode != "" && + s.EndpointSpec.Mode != types.ResolutionModeVIP && + s.EndpointSpec.Mode != types.ResolutionModeDNSRR { + return swarmapi.ServiceSpec{}, fmt.Errorf("invalid resolution mode: %q", s.EndpointSpec.Mode) + } + + spec.Endpoint = &swarmapi.EndpointSpec{} + + spec.Endpoint.Mode = swarmapi.EndpointSpec_ResolutionMode(swarmapi.EndpointSpec_ResolutionMode_value[strings.ToUpper(string(s.EndpointSpec.Mode))]) + + for _, portConfig := range s.EndpointSpec.Ports { + spec.Endpoint.Ports = append(spec.Endpoint.Ports, &swarmapi.PortConfig{ + Name: portConfig.Name, + Protocol: swarmapi.PortConfig_Protocol(swarmapi.PortConfig_Protocol_value[strings.ToUpper(string(portConfig.Protocol))]), + TargetPort: portConfig.TargetPort, + PublishedPort: portConfig.PublishedPort, + }) + } + } + + //Mode + if s.Mode.Global != nil { + spec.Mode = &swarmapi.ServiceSpec_Global{ + Global: &swarmapi.GlobalService{}, + } + } else if s.Mode.Replicated != nil && s.Mode.Replicated.Replicas != nil { + spec.Mode = &swarmapi.ServiceSpec_Replicated{ + Replicated: &swarmapi.ReplicatedService{Replicas: *s.Mode.Replicated.Replicas}, + } + } else { + spec.Mode = &swarmapi.ServiceSpec_Replicated{ + Replicated: &swarmapi.ReplicatedService{Replicas: 1}, + } + } + + return spec, nil +} + +func resourcesFromGRPC(res *swarmapi.ResourceRequirements) *types.ResourceRequirements { + var resources *types.ResourceRequirements + if res != nil { + resources = &types.ResourceRequirements{} + if res.Limits != nil { + resources.Limits = &types.Resources{ + NanoCPUs: res.Limits.NanoCPUs, + MemoryBytes: res.Limits.MemoryBytes, + } + } + if res.Reservations != nil { + resources.Reservations = &types.Resources{ + NanoCPUs: res.Reservations.NanoCPUs, + MemoryBytes: res.Reservations.MemoryBytes, + } + } + } + + return resources +} + +func resourcesToGRPC(res *types.ResourceRequirements) *swarmapi.ResourceRequirements { + var reqs *swarmapi.ResourceRequirements + if res != nil { + reqs = &swarmapi.ResourceRequirements{} + if res.Limits != nil { + reqs.Limits = &swarmapi.Resources{ + NanoCPUs: res.Limits.NanoCPUs, + MemoryBytes: res.Limits.MemoryBytes, + } + } + if res.Reservations != nil { + reqs.Reservations = &swarmapi.Resources{ + NanoCPUs: res.Reservations.NanoCPUs, + MemoryBytes: res.Reservations.MemoryBytes, + } + + } + } + return reqs +} + +func restartPolicyFromGRPC(p *swarmapi.RestartPolicy) *types.RestartPolicy { + var rp *types.RestartPolicy + if p != nil { + rp = &types.RestartPolicy{} + rp.Condition = types.RestartPolicyCondition(strings.ToLower(p.Condition.String())) + if p.Delay != nil { + delay, _ := ptypes.Duration(p.Delay) + rp.Delay = &delay + } + if p.Window != nil { + window, _ := ptypes.Duration(p.Window) + rp.Window = &window + } + + rp.MaxAttempts = &p.MaxAttempts + } + return rp +} + +func restartPolicyToGRPC(p *types.RestartPolicy) (*swarmapi.RestartPolicy, error) { + var rp *swarmapi.RestartPolicy + if p != nil { + rp = &swarmapi.RestartPolicy{} + if condition, ok := swarmapi.RestartPolicy_RestartCondition_value[strings.ToUpper(string(p.Condition))]; ok { + rp.Condition = swarmapi.RestartPolicy_RestartCondition(condition) + } else if string(p.Condition) == "" { + rp.Condition = swarmapi.RestartOnAny + } else { + return nil, fmt.Errorf("invalid RestartCondition: %q", p.Condition) + } + + if p.Delay != nil { + rp.Delay = ptypes.DurationProto(*p.Delay) + } + if p.Window != nil { + rp.Window = ptypes.DurationProto(*p.Window) + } + if p.MaxAttempts != nil { + rp.MaxAttempts = *p.MaxAttempts + + } + } + return rp, nil +} + +func placementFromGRPC(p *swarmapi.Placement) *types.Placement { + var r *types.Placement + if p != nil { + r = &types.Placement{} + r.Constraints = p.Constraints + } + + return r +} diff --git a/daemon/cluster/convert/swarm.go b/daemon/cluster/convert/swarm.go new file mode 100644 index 0000000000..cb9d7d0821 --- /dev/null +++ b/daemon/cluster/convert/swarm.go @@ -0,0 +1,116 @@ +package convert + +import ( + "fmt" + "strings" + + "golang.org/x/crypto/bcrypt" + + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +// SwarmFromGRPC converts a grpc Cluster to a Swarm. +func SwarmFromGRPC(c swarmapi.Cluster) types.Swarm { + swarm := types.Swarm{ + ID: c.ID, + Spec: types.Spec{ + Orchestration: types.OrchestrationConfig{ + TaskHistoryRetentionLimit: c.Spec.Orchestration.TaskHistoryRetentionLimit, + }, + Raft: types.RaftConfig{ + SnapshotInterval: c.Spec.Raft.SnapshotInterval, + KeepOldSnapshots: c.Spec.Raft.KeepOldSnapshots, + LogEntriesForSlowFollowers: c.Spec.Raft.LogEntriesForSlowFollowers, + HeartbeatTick: c.Spec.Raft.HeartbeatTick, + ElectionTick: c.Spec.Raft.ElectionTick, + }, + Dispatcher: types.DispatcherConfig{ + HeartbeatPeriod: c.Spec.Dispatcher.HeartbeatPeriod, + }, + }, + } + + swarm.Spec.CAConfig.NodeCertExpiry, _ = ptypes.Duration(c.Spec.CAConfig.NodeCertExpiry) + + // Meta + swarm.Version.Index = c.Meta.Version.Index + swarm.CreatedAt, _ = ptypes.Timestamp(c.Meta.CreatedAt) + swarm.UpdatedAt, _ = ptypes.Timestamp(c.Meta.UpdatedAt) + + // Annotations + swarm.Spec.Name = c.Spec.Annotations.Name + swarm.Spec.Labels = c.Spec.Annotations.Labels + + for _, policy := range c.Spec.AcceptancePolicy.Policies { + p := types.Policy{ + Role: types.NodeRole(strings.ToLower(policy.Role.String())), + Autoaccept: policy.Autoaccept, + } + if policy.Secret != nil { + p.Secret = string(policy.Secret.Data) + } + swarm.Spec.AcceptancePolicy.Policies = append(swarm.Spec.AcceptancePolicy.Policies, p) + } + + return swarm +} + +// SwarmSpecToGRPC converts a Spec to a grpc ClusterSpec. +func SwarmSpecToGRPC(s types.Spec) (swarmapi.ClusterSpec, error) { + spec := swarmapi.ClusterSpec{ + Annotations: swarmapi.Annotations{ + Name: s.Name, + Labels: s.Labels, + }, + Orchestration: swarmapi.OrchestrationConfig{ + TaskHistoryRetentionLimit: s.Orchestration.TaskHistoryRetentionLimit, + }, + Raft: swarmapi.RaftConfig{ + SnapshotInterval: s.Raft.SnapshotInterval, + KeepOldSnapshots: s.Raft.KeepOldSnapshots, + LogEntriesForSlowFollowers: s.Raft.LogEntriesForSlowFollowers, + HeartbeatTick: s.Raft.HeartbeatTick, + ElectionTick: s.Raft.ElectionTick, + }, + Dispatcher: swarmapi.DispatcherConfig{ + HeartbeatPeriod: s.Dispatcher.HeartbeatPeriod, + }, + CAConfig: swarmapi.CAConfig{ + NodeCertExpiry: ptypes.DurationProto(s.CAConfig.NodeCertExpiry), + }, + } + + if err := SwarmSpecUpdateAcceptancePolicy(&spec, s.AcceptancePolicy); err != nil { + return swarmapi.ClusterSpec{}, err + } + return spec, nil +} + +// SwarmSpecUpdateAcceptancePolicy updates a grpc ClusterSpec using AcceptancePolicy. +func SwarmSpecUpdateAcceptancePolicy(spec *swarmapi.ClusterSpec, acceptancePolicy types.AcceptancePolicy) error { + spec.AcceptancePolicy.Policies = nil + for _, p := range acceptancePolicy.Policies { + role, ok := swarmapi.NodeRole_value[strings.ToUpper(string(p.Role))] + if !ok { + return fmt.Errorf("invalid Role: %q", p.Role) + } + + policy := &swarmapi.AcceptancePolicy_RoleAdmissionPolicy{ + Role: swarmapi.NodeRole(role), + Autoaccept: p.Autoaccept, + } + + if p.Secret != "" { + hashPwd, _ := bcrypt.GenerateFromPassword([]byte(p.Secret), 0) + policy.Secret = &swarmapi.AcceptancePolicy_RoleAdmissionPolicy_HashedSecret{ + Data: hashPwd, + Alg: "bcrypt", + } + } + + spec.AcceptancePolicy.Policies = append(spec.AcceptancePolicy.Policies, policy) + } + return nil +} diff --git a/daemon/cluster/convert/task.go b/daemon/cluster/convert/task.go new file mode 100644 index 0000000000..b701ae36cf --- /dev/null +++ b/daemon/cluster/convert/task.go @@ -0,0 +1,53 @@ +package convert + +import ( + "strings" + + types "github.com/docker/engine-api/types/swarm" + swarmapi "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/protobuf/ptypes" +) + +// TaskFromGRPC converts a grpc Task to a Task. +func TaskFromGRPC(t swarmapi.Task) types.Task { + containerConfig := t.Spec.Runtime.(*swarmapi.TaskSpec_Container).Container + containerStatus := t.Status.GetContainer() + task := types.Task{ + ID: t.ID, + ServiceID: t.ServiceID, + Slot: int(t.Slot), + NodeID: t.NodeID, + Spec: types.TaskSpec{ + ContainerSpec: containerSpecFromGRPC(containerConfig), + Resources: resourcesFromGRPC(t.Spec.Resources), + RestartPolicy: restartPolicyFromGRPC(t.Spec.Restart), + Placement: placementFromGRPC(t.Spec.Placement), + }, + Status: types.TaskStatus{ + State: types.TaskState(strings.ToLower(t.Status.State.String())), + Message: t.Status.Message, + Err: t.Status.Err, + }, + DesiredState: types.TaskState(strings.ToLower(t.DesiredState.String())), + } + + // Meta + task.Version.Index = t.Meta.Version.Index + task.CreatedAt, _ = ptypes.Timestamp(t.Meta.CreatedAt) + task.UpdatedAt, _ = ptypes.Timestamp(t.Meta.UpdatedAt) + + task.Status.Timestamp, _ = ptypes.Timestamp(t.Status.Timestamp) + + if containerStatus != nil { + task.Status.ContainerStatus.ContainerID = containerStatus.ContainerID + task.Status.ContainerStatus.PID = int(containerStatus.PID) + task.Status.ContainerStatus.ExitCode = int(containerStatus.ExitCode) + } + + // NetworksAttachments + for _, na := range t.Networks { + task.NetworksAttachments = append(task.NetworksAttachments, networkAttachementFromGRPC(na)) + } + + return task +} diff --git a/daemon/cluster/executor/backend.go b/daemon/cluster/executor/backend.go new file mode 100644 index 0000000000..6b0d0e5a48 --- /dev/null +++ b/daemon/cluster/executor/backend.go @@ -0,0 +1,35 @@ +package executor + +import ( + "io" + + clustertypes "github.com/docker/docker/daemon/cluster/provider" + "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/network" + "github.com/docker/libnetwork/cluster" + networktypes "github.com/docker/libnetwork/types" + "golang.org/x/net/context" +) + +// Backend defines the executor component for a swarm agent. +type Backend interface { + CreateManagedNetwork(clustertypes.NetworkCreateRequest) error + DeleteManagedNetwork(name string) error + SetupIngress(req clustertypes.NetworkCreateRequest, nodeIP string) error + PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error + CreateManagedContainer(types.ContainerCreateConfig) (types.ContainerCreateResponse, error) + ContainerStart(name string, hostConfig *container.HostConfig) error + ContainerStop(name string, seconds int) error + ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error + UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error + ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error) + ContainerWaitWithContext(ctx context.Context, name string) (<-chan int, error) + ContainerRm(name string, config *types.ContainerRmConfig) error + ContainerKill(name string, sig uint64) error + SystemInfo() (*types.Info, error) + VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error) + ListContainersForNode(nodeID string) []string + SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error + SetClusterProvider(provider cluster.Provider) +} diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go new file mode 100644 index 0000000000..c9751caeff --- /dev/null +++ b/daemon/cluster/executor/container/adapter.go @@ -0,0 +1,229 @@ +package container + +import ( + "encoding/base64" + "encoding/json" + "fmt" + "io" + "strings" + "syscall" + + "github.com/Sirupsen/logrus" + executorpkg "github.com/docker/docker/daemon/cluster/executor" + "github.com/docker/engine-api/types" + "github.com/docker/libnetwork" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/log" + "golang.org/x/net/context" +) + +// containerAdapter conducts remote operations for a container. All calls +// are mostly naked calls to the client API, seeded with information from +// containerConfig. +type containerAdapter struct { + backend executorpkg.Backend + container *containerConfig +} + +func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) { + ctnr, err := newContainerConfig(task) + if err != nil { + return nil, err + } + + return &containerAdapter{ + container: ctnr, + backend: b, + }, nil +} + +func (c *containerAdapter) pullImage(ctx context.Context) error { + // if the image needs to be pulled, the auth config will be retrieved and updated + encodedAuthConfig := c.container.task.ServiceAnnotations.Labels[fmt.Sprintf("%v.registryauth", systemLabelPrefix)] + + authConfig := &types.AuthConfig{} + if encodedAuthConfig != "" { + if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil { + logrus.Warnf("invalid authconfig: %v", err) + } + } + + pr, pw := io.Pipe() + metaHeaders := map[string][]string{} + go func() { + err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw) + pw.CloseWithError(err) + }() + + dec := json.NewDecoder(pr) + m := map[string]interface{}{} + for { + if err := dec.Decode(&m); err != nil { + if err == io.EOF { + break + } + return err + } + // TOOD(stevvooe): Report this status somewhere. + logrus.Debugln("pull progress", m) + } + // if the final stream object contained an error, return it + if errMsg, ok := m["error"]; ok { + return fmt.Errorf("%v", errMsg) + } + return nil +} + +func (c *containerAdapter) createNetworks(ctx context.Context) error { + for _, network := range c.container.networks() { + ncr, err := c.container.networkCreateRequest(network) + if err != nil { + return err + } + + if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing + if _, ok := err.(libnetwork.NetworkNameError); ok { + continue + } + + return err + } + } + + return nil +} + +func (c *containerAdapter) removeNetworks(ctx context.Context) error { + for _, nid := range c.container.networks() { + if err := c.backend.DeleteManagedNetwork(nid); err != nil { + if _, ok := err.(*libnetwork.ActiveEndpointsError); ok { + continue + } + log.G(ctx).Errorf("network %s remove failed: %v", nid, err) + return err + } + } + + return nil +} + +func (c *containerAdapter) create(ctx context.Context, backend executorpkg.Backend) error { + var cr types.ContainerCreateResponse + var err error + if cr, err = backend.CreateManagedContainer(types.ContainerCreateConfig{ + Name: c.container.name(), + Config: c.container.config(), + HostConfig: c.container.hostConfig(), + // Use the first network in container create + NetworkingConfig: c.container.createNetworkingConfig(), + }); err != nil { + return err + } + + // Docker daemon currently doesnt support multiple networks in container create + // Connect to all other networks + nc := c.container.connectNetworkingConfig() + + if nc != nil { + for n, ep := range nc.EndpointsConfig { + logrus.Errorf("CONNECT %s : %v", n, ep.IPAMConfig.IPv4Address) + if err := backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil { + return err + } + } + } + + if err := backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil { + return err + } + + return nil +} + +func (c *containerAdapter) start(ctx context.Context) error { + return c.backend.ContainerStart(c.container.name(), nil) +} + +func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) { + cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false) + if ctx.Err() != nil { + return types.ContainerJSON{}, ctx.Err() + } + if err != nil { + return types.ContainerJSON{}, err + } + return *cs, nil +} + +// events issues a call to the events API and returns a channel with all +// events. The stream of events can be shutdown by cancelling the context. +// +// A chan struct{} is returned that will be closed if the event procressing +// fails and needs to be restarted. +func (c *containerAdapter) wait(ctx context.Context) (<-chan int, error) { + return c.backend.ContainerWaitWithContext(ctx, c.container.name()) +} + +func (c *containerAdapter) shutdown(ctx context.Context) error { + // Default stop grace period to 10s. + stopgrace := 10 + spec := c.container.spec() + if spec.StopGracePeriod != nil { + stopgrace = int(spec.StopGracePeriod.Seconds) + } + return c.backend.ContainerStop(c.container.name(), stopgrace) +} + +func (c *containerAdapter) terminate(ctx context.Context) error { + return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL)) +} + +func (c *containerAdapter) remove(ctx context.Context) error { + return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{ + RemoveVolume: true, + ForceRemove: true, + }) +} + +func (c *containerAdapter) createVolumes(ctx context.Context, backend executorpkg.Backend) error { + // Create plugin volumes that are embedded inside a Mount + for _, mount := range c.container.task.Spec.GetContainer().Mounts { + if mount.Type != api.MountTypeVolume { + continue + } + + if mount.VolumeOptions != nil { + continue + } + + if mount.VolumeOptions.DriverConfig == nil { + continue + } + + req := c.container.volumeCreateRequest(&mount) + + // Check if this volume exists on the engine + if _, err := backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil { + // TODO(amitshukla): Today, volume create through the engine api does not return an error + // when the named volume with the same parameters already exists. + // It returns an error if the driver name is different - that is a valid error + return err + } + + } + + return nil +} + +// todo: typed/wrapped errors +func isContainerCreateNameConflict(err error) bool { + return strings.Contains(err.Error(), "Conflict. The name") +} + +func isUnknownContainer(err error) bool { + return strings.Contains(err.Error(), "No such container:") +} + +func isStoppedContainer(err error) bool { + return strings.Contains(err.Error(), "is already stopped") +} diff --git a/daemon/cluster/executor/container/container.go b/daemon/cluster/executor/container/container.go new file mode 100644 index 0000000000..1326bf1a8b --- /dev/null +++ b/daemon/cluster/executor/container/container.go @@ -0,0 +1,415 @@ +package container + +import ( + "errors" + "fmt" + "log" + "net" + "strings" + "time" + + clustertypes "github.com/docker/docker/daemon/cluster/provider" + "github.com/docker/docker/reference" + "github.com/docker/engine-api/types" + enginecontainer "github.com/docker/engine-api/types/container" + "github.com/docker/engine-api/types/network" + "github.com/docker/swarmkit/agent/exec" + "github.com/docker/swarmkit/api" +) + +const ( + // Explictly use the kernel's default setting for CPU quota of 100ms. + // https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt + cpuQuotaPeriod = 100 * time.Millisecond + + // systemLabelPrefix represents the reserved namespace for system labels. + systemLabelPrefix = "com.docker.swarm" +) + +// containerConfig converts task properties into docker container compatible +// components. +type containerConfig struct { + task *api.Task + networksAttachments map[string]*api.NetworkAttachment +} + +// newContainerConfig returns a validated container config. No methods should +// return an error if this function returns without error. +func newContainerConfig(t *api.Task) (*containerConfig, error) { + var c containerConfig + return &c, c.setTask(t) +} + +func (c *containerConfig) setTask(t *api.Task) error { + container := t.Spec.GetContainer() + if container == nil { + return exec.ErrRuntimeUnsupported + } + + if container.Image == "" { + return ErrImageRequired + } + + // index the networks by name + c.networksAttachments = make(map[string]*api.NetworkAttachment, len(t.Networks)) + for _, attachment := range t.Networks { + c.networksAttachments[attachment.Network.Spec.Annotations.Name] = attachment + } + + c.task = t + return nil +} + +func (c *containerConfig) endpoint() *api.Endpoint { + return c.task.Endpoint +} + +func (c *containerConfig) spec() *api.ContainerSpec { + return c.task.Spec.GetContainer() +} + +func (c *containerConfig) name() string { + if c.task.Annotations.Name != "" { + // if set, use the container Annotations.Name field, set in the orchestrator. + return c.task.Annotations.Name + } + + // fallback to service.slot.id. + return strings.Join([]string{c.task.ServiceAnnotations.Name, fmt.Sprint(c.task.Slot), c.task.ID}, ".") +} + +func (c *containerConfig) image() string { + raw := c.spec().Image + ref, err := reference.ParseNamed(raw) + if err != nil { + return raw + } + return reference.WithDefaultTag(ref).String() +} + +func (c *containerConfig) volumes() map[string]struct{} { + r := make(map[string]struct{}) + + for _, mount := range c.spec().Mounts { + // pick off all the volume mounts. + if mount.Type != api.MountTypeVolume { + continue + } + + r[fmt.Sprintf("%s:%s", mount.Target, getMountMask(&mount))] = struct{}{} + } + + return r +} + +func (c *containerConfig) config() *enginecontainer.Config { + config := &enginecontainer.Config{ + Labels: c.labels(), + User: c.spec().User, + Env: c.spec().Env, + WorkingDir: c.spec().Dir, + Image: c.image(), + Volumes: c.volumes(), + } + + if len(c.spec().Command) > 0 { + // If Command is provided, we replace the whole invocation with Command + // by replacing Entrypoint and specifying Cmd. Args is ignored in this + // case. + config.Entrypoint = append(config.Entrypoint, c.spec().Command[0]) + config.Cmd = append(config.Cmd, c.spec().Command[1:]...) + } else if len(c.spec().Args) > 0 { + // In this case, we assume the image has an Entrypoint and Args + // specifies the arguments for that entrypoint. + config.Cmd = c.spec().Args + } + + return config +} + +func (c *containerConfig) labels() map[string]string { + var ( + system = map[string]string{ + "task": "", // mark as cluster task + "task.id": c.task.ID, + "task.name": fmt.Sprintf("%v.%v", c.task.ServiceAnnotations.Name, c.task.Slot), + "node.id": c.task.NodeID, + "service.id": c.task.ServiceID, + "service.name": c.task.ServiceAnnotations.Name, + } + labels = make(map[string]string) + ) + + // base labels are those defined in the spec. + for k, v := range c.spec().Labels { + labels[k] = v + } + + // we then apply the overrides from the task, which may be set via the + // orchestrator. + for k, v := range c.task.Annotations.Labels { + labels[k] = v + } + + // finally, we apply the system labels, which override all labels. + for k, v := range system { + labels[strings.Join([]string{systemLabelPrefix, k}, ".")] = v + } + + return labels +} + +func (c *containerConfig) bindMounts() []string { + var r []string + + for _, val := range c.spec().Mounts { + mask := getMountMask(&val) + if val.Type == api.MountTypeBind { + r = append(r, fmt.Sprintf("%s:%s:%s", val.Source, val.Target, mask)) + } + } + + return r +} + +func getMountMask(m *api.Mount) string { + maskOpts := []string{"ro"} + if m.Writable { + maskOpts[0] = "rw" + } + + if m.BindOptions != nil { + switch m.BindOptions.Propagation { + case api.MountPropagationPrivate: + maskOpts = append(maskOpts, "private") + case api.MountPropagationRPrivate: + maskOpts = append(maskOpts, "rprivate") + case api.MountPropagationShared: + maskOpts = append(maskOpts, "shared") + case api.MountPropagationRShared: + maskOpts = append(maskOpts, "rshared") + case api.MountPropagationSlave: + maskOpts = append(maskOpts, "slave") + case api.MountPropagationRSlave: + maskOpts = append(maskOpts, "rslave") + } + } + + if m.VolumeOptions != nil { + if !m.VolumeOptions.Populate { + maskOpts = append(maskOpts, "nocopy") + } + } + return strings.Join(maskOpts, ",") +} + +func (c *containerConfig) hostConfig() *enginecontainer.HostConfig { + return &enginecontainer.HostConfig{ + Resources: c.resources(), + Binds: c.bindMounts(), + } +} + +// This handles the case of volumes that are defined inside a service Mount +func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *types.VolumeCreateRequest { + var ( + driverName string + driverOpts map[string]string + labels map[string]string + ) + + if mount.VolumeOptions != nil && mount.VolumeOptions.DriverConfig != nil { + driverName = mount.VolumeOptions.DriverConfig.Name + driverOpts = mount.VolumeOptions.DriverConfig.Options + labels = mount.VolumeOptions.Labels + } + + if mount.VolumeOptions != nil { + return &types.VolumeCreateRequest{ + Name: mount.Source, + Driver: driverName, + DriverOpts: driverOpts, + Labels: labels, + } + } + return nil +} + +func (c *containerConfig) resources() enginecontainer.Resources { + resources := enginecontainer.Resources{} + + // If no limits are specified let the engine use its defaults. + // + // TODO(aluzzardi): We might want to set some limits anyway otherwise + // "unlimited" tasks will step over the reservation of other tasks. + r := c.task.Spec.Resources + if r == nil || r.Limits == nil { + return resources + } + + if r.Limits.MemoryBytes > 0 { + resources.Memory = r.Limits.MemoryBytes + } + + if r.Limits.NanoCPUs > 0 { + // CPU Period must be set in microseconds. + resources.CPUPeriod = int64(cpuQuotaPeriod / time.Microsecond) + resources.CPUQuota = r.Limits.NanoCPUs * resources.CPUPeriod / 1e9 + } + + return resources +} + +// Docker daemon supports just 1 network during container create. +func (c *containerConfig) createNetworkingConfig() *network.NetworkingConfig { + var networks []*api.NetworkAttachment + if c.task.Spec.GetContainer() != nil { + networks = c.task.Networks + } + + epConfig := make(map[string]*network.EndpointSettings) + if len(networks) > 0 { + epConfig[networks[0].Network.Spec.Annotations.Name] = getEndpointConfig(networks[0]) + } + + return &network.NetworkingConfig{EndpointsConfig: epConfig} +} + +// TODO: Merge this function with createNetworkingConfig after daemon supports multiple networks in container create +func (c *containerConfig) connectNetworkingConfig() *network.NetworkingConfig { + var networks []*api.NetworkAttachment + if c.task.Spec.GetContainer() != nil { + networks = c.task.Networks + } + + // First network is used during container create. Other networks are used in "docker network connect" + if len(networks) < 2 { + return nil + } + + epConfig := make(map[string]*network.EndpointSettings) + for _, na := range networks[1:] { + epConfig[na.Network.Spec.Annotations.Name] = getEndpointConfig(na) + } + return &network.NetworkingConfig{EndpointsConfig: epConfig} +} + +func getEndpointConfig(na *api.NetworkAttachment) *network.EndpointSettings { + var ipv4, ipv6 string + for _, addr := range na.Addresses { + ip, _, err := net.ParseCIDR(addr) + if err != nil { + continue + } + + if ip.To4() != nil { + ipv4 = ip.String() + continue + } + + if ip.To16() != nil { + ipv6 = ip.String() + } + } + + return &network.EndpointSettings{ + IPAMConfig: &network.EndpointIPAMConfig{ + IPv4Address: ipv4, + IPv6Address: ipv6, + }, + } +} + +func (c *containerConfig) virtualIP(networkID string) string { + if c.task.Endpoint == nil { + return "" + } + + for _, eVip := range c.task.Endpoint.VirtualIPs { + // We only support IPv4 VIPs for now. + if eVip.NetworkID == networkID { + vip, _, err := net.ParseCIDR(eVip.Addr) + if err != nil { + return "" + } + + return vip.String() + } + } + + return "" +} + +func (c *containerConfig) serviceConfig() *clustertypes.ServiceConfig { + if len(c.task.Networks) == 0 { + return nil + } + + log.Printf("Creating service config in agent for t = %+v", c.task) + svcCfg := &clustertypes.ServiceConfig{ + Name: c.task.ServiceAnnotations.Name, + ID: c.task.ServiceID, + VirtualAddresses: make(map[string]*clustertypes.VirtualAddress), + } + + for _, na := range c.task.Networks { + svcCfg.VirtualAddresses[na.Network.ID] = &clustertypes.VirtualAddress{ + // We support only IPv4 virtual IP for now. + IPv4: c.virtualIP(na.Network.ID), + } + } + + if c.task.Endpoint != nil { + for _, ePort := range c.task.Endpoint.Ports { + svcCfg.ExposedPorts = append(svcCfg.ExposedPorts, &clustertypes.PortConfig{ + Name: ePort.Name, + Protocol: int32(ePort.Protocol), + TargetPort: ePort.TargetPort, + PublishedPort: ePort.PublishedPort, + }) + } + } + + return svcCfg +} + +// networks returns a list of network names attached to the container. The +// returned name can be used to lookup the corresponding network create +// options. +func (c *containerConfig) networks() []string { + var networks []string + + for name := range c.networksAttachments { + networks = append(networks, name) + } + + return networks +} + +func (c *containerConfig) networkCreateRequest(name string) (clustertypes.NetworkCreateRequest, error) { + na, ok := c.networksAttachments[name] + if !ok { + return clustertypes.NetworkCreateRequest{}, errors.New("container: unknown network referenced") + } + + options := types.NetworkCreate{ + // ID: na.Network.ID, + Driver: na.Network.DriverState.Name, + IPAM: network.IPAM{ + Driver: na.Network.IPAM.Driver.Name, + }, + Options: na.Network.DriverState.Options, + CheckDuplicate: true, + } + + for _, ic := range na.Network.IPAM.Configs { + c := network.IPAMConfig{ + Subnet: ic.Subnet, + IPRange: ic.Range, + Gateway: ic.Gateway, + } + options.IPAM.Config = append(options.IPAM.Config, c) + } + + return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil +} diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go new file mode 100644 index 0000000000..17aa454093 --- /dev/null +++ b/daemon/cluster/executor/container/controller.go @@ -0,0 +1,305 @@ +package container + +import ( + "errors" + "fmt" + "strings" + + executorpkg "github.com/docker/docker/daemon/cluster/executor" + "github.com/docker/engine-api/types" + "github.com/docker/swarmkit/agent/exec" + "github.com/docker/swarmkit/api" + "github.com/docker/swarmkit/log" + "golang.org/x/net/context" +) + +// controller implements agent.Controller against docker's API. +// +// Most operations against docker's API are done through the container name, +// which is unique to the task. +type controller struct { + backend executorpkg.Backend + task *api.Task + adapter *containerAdapter + closed chan struct{} + err error +} + +var _ exec.Controller = &controller{} + +// NewController returns a dockerexec runner for the provided task. +func newController(b executorpkg.Backend, task *api.Task) (*controller, error) { + adapter, err := newContainerAdapter(b, task) + if err != nil { + return nil, err + } + + return &controller{ + backend: b, + task: task, + adapter: adapter, + closed: make(chan struct{}), + }, nil +} + +func (r *controller) Task() (*api.Task, error) { + return r.task, nil +} + +// ContainerStatus returns the container-specific status for the task. +func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) { + ctnr, err := r.adapter.inspect(ctx) + if err != nil { + if isUnknownContainer(err) { + return nil, nil + } + return nil, err + } + return parseContainerStatus(ctnr) +} + +// Update tasks a recent task update and applies it to the container. +func (r *controller) Update(ctx context.Context, t *api.Task) error { + log.G(ctx).Warnf("task updates not yet supported") + // TODO(stevvooe): While assignment of tasks is idempotent, we do allow + // updates of metadata, such as labelling, as well as any other properties + // that make sense. + return nil +} + +// Prepare creates a container and ensures the image is pulled. +// +// If the container has already be created, exec.ErrTaskPrepared is returned. +func (r *controller) Prepare(ctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + // Make sure all the networks that the task needs are created. + if err := r.adapter.createNetworks(ctx); err != nil { + return err + } + + // Make sure all the volumes that the task needs are created. + if err := r.adapter.createVolumes(ctx, r.backend); err != nil { + return err + } + + for { + if err := r.checkClosed(); err != nil { + return err + } + if err := r.adapter.create(ctx, r.backend); err != nil { + if isContainerCreateNameConflict(err) { + if _, err := r.adapter.inspect(ctx); err != nil { + return err + } + + // container is already created. success! + return exec.ErrTaskPrepared + } + + if !strings.Contains(err.Error(), "No such image") { // todo: better error detection + return err + } + if err := r.adapter.pullImage(ctx); err != nil { + return err + } + + continue // retry to create the container + } + + break + } + + return nil +} + +// Start the container. An error will be returned if the container is already started. +func (r *controller) Start(ctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + ctnr, err := r.adapter.inspect(ctx) + if err != nil { + return err + } + + // Detect whether the container has *ever* been started. If so, we don't + // issue the start. + // + // TODO(stevvooe): This is very racy. While reading inspect, another could + // start the process and we could end up starting it twice. + if ctnr.State.Status != "created" { + return exec.ErrTaskStarted + } + + if err := r.adapter.start(ctx); err != nil { + return err + } + + return nil +} + +// Wait on the container to exit. +func (r *controller) Wait(pctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + ctx, cancel := context.WithCancel(pctx) + defer cancel() + + c, err := r.adapter.wait(ctx) + if err != nil { + return err + } + + <-c + if ctx.Err() != nil { + return ctx.Err() + } + ctnr, err := r.adapter.inspect(ctx) + if err != nil { + // TODO(stevvooe): Need to handle missing container here. It is likely + // that a Wait call with a not found error should result in no waiting + // and no error at all. + return err + } + + if ctnr.State.ExitCode != 0 { + var cause error + if ctnr.State.Error != "" { + cause = errors.New(ctnr.State.Error) + } + cstatus, _ := parseContainerStatus(ctnr) + return &exitError{ + code: ctnr.State.ExitCode, + cause: cause, + containerStatus: cstatus, + } + } + return nil +} + +// Shutdown the container cleanly. +func (r *controller) Shutdown(ctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + if err := r.adapter.shutdown(ctx); err != nil { + if isUnknownContainer(err) || isStoppedContainer(err) { + return nil + } + + return err + } + + return nil +} + +// Terminate the container, with force. +func (r *controller) Terminate(ctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + if err := r.adapter.terminate(ctx); err != nil { + if isUnknownContainer(err) { + return nil + } + + return err + } + + return nil +} + +// Remove the container and its resources. +func (r *controller) Remove(ctx context.Context) error { + if err := r.checkClosed(); err != nil { + return err + } + + // It may be necessary to shut down the task before removing it. + if err := r.Shutdown(ctx); err != nil { + if isUnknownContainer(err) { + return nil + } + // This may fail if the task was already shut down. + log.G(ctx).WithError(err).Debug("shutdown failed on removal") + } + + // Try removing networks referenced in this task in case this + // task is the last one referencing it + if err := r.adapter.removeNetworks(ctx); err != nil { + if isUnknownContainer(err) { + return nil + } + return err + } + + if err := r.adapter.remove(ctx); err != nil { + if isUnknownContainer(err) { + return nil + } + + return err + } + return nil +} + +// Close the runner and clean up any ephemeral resources. +func (r *controller) Close() error { + select { + case <-r.closed: + return r.err + default: + r.err = exec.ErrControllerClosed + close(r.closed) + } + return nil +} + +func (r *controller) checkClosed() error { + select { + case <-r.closed: + return r.err + default: + return nil + } +} + +func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) { + status := &api.ContainerStatus{ + ContainerID: ctnr.ID, + PID: int32(ctnr.State.Pid), + ExitCode: int32(ctnr.State.ExitCode), + } + + return status, nil +} + +type exitError struct { + code int + cause error + containerStatus *api.ContainerStatus +} + +func (e *exitError) Error() string { + if e.cause != nil { + return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause) + } + + return fmt.Sprintf("task: non-zero exit (%v)", e.code) +} + +func (e *exitError) ExitCode() int { + return int(e.containerStatus.ExitCode) +} + +func (e *exitError) Cause() error { + return e.cause +} diff --git a/daemon/cluster/executor/container/errors.go b/daemon/cluster/executor/container/errors.go new file mode 100644 index 0000000000..6c03d36071 --- /dev/null +++ b/daemon/cluster/executor/container/errors.go @@ -0,0 +1,12 @@ +package container + +import "fmt" + +var ( + // ErrImageRequired returned if a task is missing the image definition. + ErrImageRequired = fmt.Errorf("dockerexec: image required") + + // ErrContainerDestroyed returned when a container is prematurely destroyed + // during a wait call. + ErrContainerDestroyed = fmt.Errorf("dockerexec: container destroyed") +) diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go new file mode 100644 index 0000000000..bf5e248f62 --- /dev/null +++ b/daemon/cluster/executor/container/executor.go @@ -0,0 +1,139 @@ +package container + +import ( + "strings" + + executorpkg "github.com/docker/docker/daemon/cluster/executor" + clustertypes "github.com/docker/docker/daemon/cluster/provider" + "github.com/docker/engine-api/types" + "github.com/docker/engine-api/types/network" + networktypes "github.com/docker/libnetwork/types" + "github.com/docker/swarmkit/agent/exec" + "github.com/docker/swarmkit/api" + "golang.org/x/net/context" +) + +type executor struct { + backend executorpkg.Backend +} + +// NewExecutor returns an executor from the docker client. +func NewExecutor(b executorpkg.Backend) exec.Executor { + return &executor{ + backend: b, + } +} + +// Describe returns the underlying node description from the docker client. +func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) { + info, err := e.backend.SystemInfo() + if err != nil { + return nil, err + } + + var plugins []api.PluginDescription + addPlugins := func(typ string, names []string) { + for _, name := range names { + plugins = append(plugins, api.PluginDescription{ + Type: typ, + Name: name, + }) + } + } + + addPlugins("Volume", info.Plugins.Volume) + // Add builtin driver "overlay" (the only builtin multi-host driver) to + // the plugin list by default. + addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...)) + addPlugins("Authorization", info.Plugins.Authorization) + + // parse []string labels into a map[string]string + labels := map[string]string{} + for _, l := range info.Labels { + stringSlice := strings.SplitN(l, "=", 2) + // this will take the last value in the list for a given key + // ideally, one shouldn't assign multiple values to the same key + if len(stringSlice) > 1 { + labels[stringSlice[0]] = stringSlice[1] + } + } + + description := &api.NodeDescription{ + Hostname: info.Name, + Platform: &api.Platform{ + Architecture: info.Architecture, + OS: info.OSType, + }, + Engine: &api.EngineDescription{ + EngineVersion: info.ServerVersion, + Labels: labels, + Plugins: plugins, + }, + Resources: &api.Resources{ + NanoCPUs: int64(info.NCPU) * 1e9, + MemoryBytes: info.MemTotal, + }, + } + + return description, nil +} + +func (e *executor) Configure(ctx context.Context, node *api.Node) error { + na := node.Attachment + if na == nil { + return nil + } + + options := types.NetworkCreate{ + Driver: na.Network.DriverState.Name, + IPAM: network.IPAM{ + Driver: na.Network.IPAM.Driver.Name, + }, + Options: na.Network.DriverState.Options, + CheckDuplicate: true, + } + + for _, ic := range na.Network.IPAM.Configs { + c := network.IPAMConfig{ + Subnet: ic.Subnet, + IPRange: ic.Range, + Gateway: ic.Gateway, + } + options.IPAM.Config = append(options.IPAM.Config, c) + } + + return e.backend.SetupIngress(clustertypes.NetworkCreateRequest{ + na.Network.ID, + types.NetworkCreateRequest{ + Name: na.Network.Spec.Annotations.Name, + NetworkCreate: options, + }, + }, na.Addresses[0]) +} + +// Controller returns a docker container runner. +func (e *executor) Controller(t *api.Task) (exec.Controller, error) { + ctlr, err := newController(e.backend, t) + if err != nil { + return nil, err + } + + return ctlr, nil +} + +func (e *executor) SetNetworkBootstrapKeys(keys []*api.EncryptionKey) error { + nwKeys := []*networktypes.EncryptionKey{} + for _, key := range keys { + nwKey := &networktypes.EncryptionKey{ + Subsystem: key.Subsystem, + Algorithm: int32(key.Algorithm), + Key: make([]byte, len(key.Key)), + LamportTime: key.LamportTime, + } + copy(nwKey.Key, key.Key) + nwKeys = append(nwKeys, nwKey) + } + e.backend.SetNetworkBootstrapKeys(nwKeys) + + return nil +} diff --git a/daemon/cluster/filters.go b/daemon/cluster/filters.go new file mode 100644 index 0000000000..5890698d00 --- /dev/null +++ b/daemon/cluster/filters.go @@ -0,0 +1,93 @@ +package cluster + +import ( + "fmt" + "strings" + + runconfigopts "github.com/docker/docker/runconfig/opts" + "github.com/docker/engine-api/types/filters" + swarmapi "github.com/docker/swarmkit/api" +) + +func newListNodesFilters(filter filters.Args) (*swarmapi.ListNodesRequest_Filters, error) { + accepted := map[string]bool{ + "name": true, + "id": true, + "label": true, + "role": true, + "membership": true, + } + if err := filter.Validate(accepted); err != nil { + return nil, err + } + f := &swarmapi.ListNodesRequest_Filters{ + Names: filter.Get("name"), + IDPrefixes: filter.Get("id"), + Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")), + } + + for _, r := range filter.Get("role") { + if role, ok := swarmapi.NodeRole_value[strings.ToUpper(r)]; ok { + f.Roles = append(f.Roles, swarmapi.NodeRole(role)) + } else if r != "" { + return nil, fmt.Errorf("Invalid role filter: '%s'", r) + } + } + + for _, a := range filter.Get("membership") { + if membership, ok := swarmapi.NodeSpec_Membership_value[strings.ToUpper(a)]; ok { + f.Memberships = append(f.Memberships, swarmapi.NodeSpec_Membership(membership)) + } else if a != "" { + return nil, fmt.Errorf("Invalid membership filter: '%s'", a) + } + } + + return f, nil +} + +func newListServicesFilters(filter filters.Args) (*swarmapi.ListServicesRequest_Filters, error) { + accepted := map[string]bool{ + "name": true, + "id": true, + "label": true, + } + if err := filter.Validate(accepted); err != nil { + return nil, err + } + return &swarmapi.ListServicesRequest_Filters{ + Names: filter.Get("name"), + IDPrefixes: filter.Get("id"), + Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")), + }, nil +} + +func newListTasksFilters(filter filters.Args) (*swarmapi.ListTasksRequest_Filters, error) { + accepted := map[string]bool{ + "name": true, + "id": true, + "label": true, + "service": true, + "node": true, + "desired_state": true, + } + if err := filter.Validate(accepted); err != nil { + return nil, err + } + f := &swarmapi.ListTasksRequest_Filters{ + Names: filter.Get("name"), + IDPrefixes: filter.Get("id"), + Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")), + ServiceIDs: filter.Get("service"), + NodeIDs: filter.Get("node"), + } + + for _, s := range filter.Get("desired_state") { + if state, ok := swarmapi.TaskState_value[strings.ToUpper(s)]; ok { + f.DesiredStates = append(f.DesiredStates, swarmapi.TaskState(state)) + } else if s != "" { + return nil, fmt.Errorf("Invalid desired_state filter: '%s'", s) + } + } + + return f, nil +} diff --git a/daemon/cluster/helpers.go b/daemon/cluster/helpers.go new file mode 100644 index 0000000000..bb9e10f1f5 --- /dev/null +++ b/daemon/cluster/helpers.go @@ -0,0 +1,108 @@ +package cluster + +import ( + "fmt" + + swarmapi "github.com/docker/swarmkit/api" + "golang.org/x/net/context" +) + +func getSwarm(ctx context.Context, c swarmapi.ControlClient) (*swarmapi.Cluster, error) { + rl, err := c.ListClusters(ctx, &swarmapi.ListClustersRequest{}) + if err != nil { + return nil, err + } + + if len(rl.Clusters) == 0 { + return nil, fmt.Errorf("swarm not found") + } + + // TODO: assume one cluster only + return rl.Clusters[0], nil +} + +func getNode(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Node, error) { + // GetNode to match via full ID. + rg, err := c.GetNode(ctx, &swarmapi.GetNodeRequest{NodeID: input}) + if err != nil { + // If any error (including NotFound), ListNodes to match via full name. + rl, err := c.ListNodes(ctx, &swarmapi.ListNodesRequest{Filters: &swarmapi.ListNodesRequest_Filters{Names: []string{input}}}) + + if err != nil || len(rl.Nodes) == 0 { + // If any error or 0 result, ListNodes to match via ID prefix. + rl, err = c.ListNodes(ctx, &swarmapi.ListNodesRequest{Filters: &swarmapi.ListNodesRequest_Filters{IDPrefixes: []string{input}}}) + } + + if err != nil { + return nil, err + } + + if len(rl.Nodes) == 0 { + return nil, fmt.Errorf("node %s not found", input) + } + + if l := len(rl.Nodes); l > 1 { + return nil, fmt.Errorf("node %s is ambigious (%d matches found)", input, l) + } + + return rl.Nodes[0], nil + } + return rg.Node, nil +} + +func getService(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Service, error) { + // GetService to match via full ID. + rg, err := c.GetService(ctx, &swarmapi.GetServiceRequest{ServiceID: input}) + if err != nil { + // If any error (including NotFound), ListServices to match via full name. + rl, err := c.ListServices(ctx, &swarmapi.ListServicesRequest{Filters: &swarmapi.ListServicesRequest_Filters{Names: []string{input}}}) + if err != nil || len(rl.Services) == 0 { + // If any error or 0 result, ListServices to match via ID prefix. + rl, err = c.ListServices(ctx, &swarmapi.ListServicesRequest{Filters: &swarmapi.ListServicesRequest_Filters{IDPrefixes: []string{input}}}) + } + + if err != nil { + return nil, err + } + + if len(rl.Services) == 0 { + return nil, fmt.Errorf("service %s not found", input) + } + + if l := len(rl.Services); l > 1 { + return nil, fmt.Errorf("service %s is ambigious (%d matches found)", input, l) + } + + return rl.Services[0], nil + } + return rg.Service, nil +} + +func getTask(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Task, error) { + // GetTask to match via full ID. + rg, err := c.GetTask(ctx, &swarmapi.GetTaskRequest{TaskID: input}) + if err != nil { + // If any error (including NotFound), ListTasks to match via full name. + rl, err := c.ListTasks(ctx, &swarmapi.ListTasksRequest{Filters: &swarmapi.ListTasksRequest_Filters{Names: []string{input}}}) + + if err != nil || len(rl.Tasks) == 0 { + // If any error or 0 result, ListTasks to match via ID prefix. + rl, err = c.ListTasks(ctx, &swarmapi.ListTasksRequest{Filters: &swarmapi.ListTasksRequest_Filters{IDPrefixes: []string{input}}}) + } + + if err != nil { + return nil, err + } + + if len(rl.Tasks) == 0 { + return nil, fmt.Errorf("task %s not found", input) + } + + if l := len(rl.Tasks); l > 1 { + return nil, fmt.Errorf("task %s is ambigious (%d matches found)", input, l) + } + + return rl.Tasks[0], nil + } + return rg.Task, nil +} diff --git a/daemon/cluster/provider/network.go b/daemon/cluster/provider/network.go new file mode 100644 index 0000000000..d959c15ceb --- /dev/null +++ b/daemon/cluster/provider/network.go @@ -0,0 +1,36 @@ +package provider + +import "github.com/docker/engine-api/types" + +// NetworkCreateRequest is a request when creating a network. +type NetworkCreateRequest struct { + ID string + types.NetworkCreateRequest +} + +// NetworkCreateResponse is a response when creating a network. +type NetworkCreateResponse struct { + ID string `json:"Id"` +} + +// VirtualAddress represents a virtual adress. +type VirtualAddress struct { + IPv4 string + IPv6 string +} + +// PortConfig represents a port configuration. +type PortConfig struct { + Name string + Protocol int32 + TargetPort uint32 + PublishedPort uint32 +} + +// ServiceConfig represents a service configuration. +type ServiceConfig struct { + ID string + Name string + VirtualAddresses map[string]*VirtualAddress + ExposedPorts []*PortConfig +} diff --git a/daemon/container.go b/daemon/container.go index 421b0c25d9..a2d1f47cda 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -101,7 +101,7 @@ func (daemon *Daemon) Register(c *container.Container) error { return nil } -func (daemon *Daemon) newContainer(name string, config *containertypes.Config, imgID image.ID) (*container.Container, error) { +func (daemon *Daemon) newContainer(name string, config *containertypes.Config, imgID image.ID, managed bool) (*container.Container, error) { var ( id string err error @@ -117,6 +117,7 @@ func (daemon *Daemon) newContainer(name string, config *containertypes.Config, i base := daemon.newBaseContainer(id) base.Created = time.Now().UTC() + base.Managed = managed base.Path = entrypoint base.Args = args //FIXME: de-duplicate from config base.Config = config diff --git a/daemon/container_operations.go b/daemon/container_operations.go index 658ca78649..c6fade219b 100644 --- a/daemon/container_operations.go +++ b/daemon/container_operations.go @@ -324,6 +324,10 @@ func (daemon *Daemon) updateNetwork(container *container.Container) error { return nil } +func errClusterNetworkOnRun(n string) error { + return fmt.Errorf("swarm-scoped network (%s) is not compatible with `docker create` or `docker run`. This network can be only used docker service", n) +} + // updateContainerNetworkSettings update the network settings func (daemon *Daemon) updateContainerNetworkSettings(container *container.Container, endpointsConfig map[string]*networktypes.EndpointSettings) error { var ( @@ -345,6 +349,9 @@ func (daemon *Daemon) updateContainerNetworkSettings(container *container.Contai if err != nil { return err } + if !container.Managed && n.Info().Dynamic() { + return errClusterNetworkOnRun(networkName) + } networkName = n.Name() } if container.NetworkSettings == nil { diff --git a/daemon/create.go b/daemon/create.go index 2a3baa0f24..48e7245916 100644 --- a/daemon/create.go +++ b/daemon/create.go @@ -19,8 +19,17 @@ import ( "github.com/opencontainers/runc/libcontainer/label" ) -// ContainerCreate creates a container. +// CreateManagedContainer creates a container that is managed by a Service +func (daemon *Daemon) CreateManagedContainer(params types.ContainerCreateConfig) (types.ContainerCreateResponse, error) { + return daemon.containerCreate(params, true) +} + +// ContainerCreate creates a regular container func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types.ContainerCreateResponse, error) { + return daemon.containerCreate(params, false) +} + +func (daemon *Daemon) containerCreate(params types.ContainerCreateConfig, managed bool) (types.ContainerCreateResponse, error) { if params.Config == nil { return types.ContainerCreateResponse{}, fmt.Errorf("Config cannot be empty in order to create a container") } @@ -43,7 +52,7 @@ func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types return types.ContainerCreateResponse{Warnings: warnings}, err } - container, err := daemon.create(params) + container, err := daemon.create(params, managed) if err != nil { return types.ContainerCreateResponse{Warnings: warnings}, daemon.imageNotExistToErrcode(err) } @@ -52,7 +61,7 @@ func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types } // Create creates a new container from the given configuration with a given name. -func (daemon *Daemon) create(params types.ContainerCreateConfig) (retC *container.Container, retErr error) { +func (daemon *Daemon) create(params types.ContainerCreateConfig, managed bool) (retC *container.Container, retErr error) { var ( container *container.Container img *image.Image @@ -76,7 +85,7 @@ func (daemon *Daemon) create(params types.ContainerCreateConfig) (retC *containe return nil, err } - if container, err = daemon.newContainer(params.Name, params.Config, imgID); err != nil { + if container, err = daemon.newContainer(params.Name, params.Config, imgID, managed); err != nil { return nil, err } defer func() { diff --git a/daemon/daemon.go b/daemon/daemon.go index f00e801328..ed37e0b30e 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -28,6 +28,7 @@ import ( "github.com/docker/docker/daemon/exec" "github.com/docker/engine-api/types" containertypes "github.com/docker/engine-api/types/container" + "github.com/docker/libnetwork/cluster" // register graph drivers _ "github.com/docker/docker/daemon/graphdriver/register" dmetadata "github.com/docker/docker/distribution/metadata" @@ -94,6 +95,7 @@ type Daemon struct { containerd libcontainerd.Client containerdRemote libcontainerd.Remote defaultIsolation containertypes.Isolation // Default isolation mode on Windows + clusterProvider cluster.Provider } func (daemon *Daemon) restore() error { @@ -344,6 +346,12 @@ func (daemon *Daemon) registerLink(parent, child *container.Container, alias str return nil } +// SetClusterProvider sets a component for quering the current cluster state. +func (daemon *Daemon) SetClusterProvider(clusterProvider cluster.Provider) { + daemon.clusterProvider = clusterProvider + daemon.netController.SetClusterProvider(clusterProvider) +} + // NewDaemon sets up everything for the daemon to be able to service // requests from the webserver. func NewDaemon(config *Config, registryService registry.Service, containerdRemote libcontainerd.Remote) (daemon *Daemon, err error) { @@ -893,6 +901,10 @@ func (daemon *Daemon) reloadClusterDiscovery(config *Config) error { return nil } + if daemon.clusterProvider != nil { + return fmt.Errorf("--cluster-store and --cluster-advertise daemon configurations are incompatible with swarm mode") + } + // enable discovery for the first time if it was not previously enabled if daemon.discoveryWatcher == nil { discoveryWatcher, err := initDiscovery(newClusterStore, newAdvertise, config.ClusterOpts) diff --git a/daemon/inspect.go b/daemon/inspect.go index e10402203f..ba9f6ecb2b 100644 --- a/daemon/inspect.go +++ b/daemon/inspect.go @@ -23,10 +23,12 @@ func (daemon *Daemon) ContainerInspect(name string, size bool, version string) ( case versions.Equal(version, "1.20"): return daemon.containerInspect120(name) } - return daemon.containerInspectCurrent(name, size) + return daemon.ContainerInspectCurrent(name, size) } -func (daemon *Daemon) containerInspectCurrent(name string, size bool) (*types.ContainerJSON, error) { +// ContainerInspectCurrent returns low-level information about a +// container in a most recent api version. +func (daemon *Daemon) ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error) { container, err := daemon.GetContainer(name) if err != nil { return nil, err diff --git a/daemon/inspect_windows.go b/daemon/inspect_windows.go index 22496e5b07..a23f703e09 100644 --- a/daemon/inspect_windows.go +++ b/daemon/inspect_windows.go @@ -28,7 +28,7 @@ func addMountPoints(container *container.Container) []types.MountPoint { // containerInspectPre120 get containers for pre 1.20 APIs. func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON, error) { - return daemon.containerInspectCurrent(name, false) + return daemon.ContainerInspectCurrent(name, false) } func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig { diff --git a/daemon/list.go b/daemon/list.go index fd5b78dd60..48323d730c 100644 --- a/daemon/list.go +++ b/daemon/list.go @@ -91,6 +91,17 @@ func (daemon *Daemon) Containers(config *types.ContainerListOptions) ([]*types.C return daemon.reduceContainers(config, daemon.transformContainer) } +// ListContainersForNode returns all containerID that match the specified nodeID +func (daemon *Daemon) ListContainersForNode(nodeID string) []string { + var ids []string + for _, c := range daemon.List() { + if c.Config.Labels["com.docker.swarm.node.id"] == nodeID { + ids = append(ids, c.ID) + } + } + return ids +} + func (daemon *Daemon) filterByNameIDMatches(ctx *listContext) []*container.Container { idSearch := false names := ctx.filters.Get("name") diff --git a/daemon/network.go b/daemon/network.go index 91a9c0bea7..f3203621d9 100644 --- a/daemon/network.go +++ b/daemon/network.go @@ -5,13 +5,14 @@ import ( "net" "strings" - netsettings "github.com/docker/docker/daemon/network" + "github.com/Sirupsen/logrus" + clustertypes "github.com/docker/docker/daemon/cluster/provider" "github.com/docker/docker/errors" "github.com/docker/docker/runconfig" "github.com/docker/engine-api/types" - "github.com/docker/engine-api/types/filters" "github.com/docker/engine-api/types/network" "github.com/docker/libnetwork" + networktypes "github.com/docker/libnetwork/types" ) // NetworkControllerEnabled checks if the networking stack is enabled. @@ -92,9 +93,106 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network { return list } +func isIngressNetwork(name string) bool { + return name == "ingress" +} + +var ingressChan = make(chan struct{}, 1) + +func ingressWait() func() { + ingressChan <- struct{}{} + return func() { <-ingressChan } +} + +// SetupIngress setups ingress networking. +func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) error { + ip, _, err := net.ParseCIDR(nodeIP) + if err != nil { + return err + } + + go func() { + controller := daemon.netController + controller.AgentInitWait() + + if n, err := daemon.GetNetworkByName(create.Name); err == nil && n != nil && n.ID() != create.ID { + if err := controller.SandboxDestroy("ingress-sbox"); err != nil { + logrus.Errorf("Failed to delete stale ingress sandbox: %v", err) + return + } + + if err := n.Delete(); err != nil { + logrus.Errorf("Failed to delete stale ingress network %s: %v", n.ID(), err) + return + } + } + + if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil { + // If it is any other error other than already + // exists error log error and return. + if _, ok := err.(libnetwork.NetworkNameError); !ok { + logrus.Errorf("Failed creating ingress network: %v", err) + return + } + + // Otherwise continue down the call to create or recreate sandbox. + } + + n, err := daemon.GetNetworkByID(create.ID) + if err != nil { + logrus.Errorf("Failed getting ingress network by id after creating: %v", err) + return + } + + sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress()) + if err != nil { + logrus.Errorf("Failed creating ingress sanbox: %v", err) + return + } + + ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil)) + if err != nil { + logrus.Errorf("Failed creating ingress endpoint: %v", err) + return + } + + if err := ep.Join(sb, nil); err != nil { + logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err) + } + }() + + return nil +} + +// SetNetworkBootstrapKeys sets the bootstrap keys. +func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error { + return daemon.netController.SetKeys(keys) +} + +// CreateManagedNetwork creates an agent network. +func (daemon *Daemon) CreateManagedNetwork(create clustertypes.NetworkCreateRequest) error { + _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true) + return err +} + // CreateNetwork creates a network with the given name, driver and other optional parameters func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.NetworkCreateResponse, error) { - if runconfig.IsPreDefinedNetwork(create.Name) { + resp, err := daemon.createNetwork(create, "", false) + if err != nil { + return nil, err + } + return resp, err +} + +func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) { + // If there is a pending ingress network creation wait here + // since ingress network creation can happen via node download + // from manager or task download. + if isIngressNetwork(create.Name) { + defer ingressWait()() + } + + if runconfig.IsPreDefinedNetwork(create.Name) && !agent { err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name) return nil, errors.NewRequestForbiddenError(err) } @@ -134,7 +232,16 @@ func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.N if create.Internal { nwOptions = append(nwOptions, libnetwork.NetworkOptionInternalNetwork()) } - n, err := c.NewNetwork(driver, create.Name, "", nwOptions...) + if agent { + nwOptions = append(nwOptions, libnetwork.NetworkOptionDynamic()) + nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false)) + } + + if isIngressNetwork(create.Name) { + nwOptions = append(nwOptions, libnetwork.NetworkOptionIngress()) + } + + n, err := c.NewNetwork(driver, create.Name, id, nwOptions...) if err != nil { return nil, err } @@ -168,6 +275,17 @@ func getIpamConfig(data []network.IPAMConfig) ([]*libnetwork.IpamConf, []*libnet return ipamV4Cfg, ipamV6Cfg, nil } +// UpdateContainerServiceConfig updates a service configuration. +func (daemon *Daemon) UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error { + container, err := daemon.GetContainer(containerName) + if err != nil { + return err + } + + container.NetworkSettings.Service = serviceConfig + return nil +} + // ConnectContainerToNetwork connects the given container to the given // network. If either cannot be found, an err is returned. If the // network cannot be set up, an err is returned. @@ -207,18 +325,29 @@ func (daemon *Daemon) GetNetworkDriverList() map[string]bool { driver := network.Type() pluginList[driver] = true } + // TODO : Replace this with proper libnetwork API + pluginList["overlay"] = true return pluginList } +// DeleteManagedNetwork deletes an agent network. +func (daemon *Daemon) DeleteManagedNetwork(networkID string) error { + return daemon.deleteNetwork(networkID, true) +} + // DeleteNetwork destroys a network unless it's one of docker's predefined networks. func (daemon *Daemon) DeleteNetwork(networkID string) error { + return daemon.deleteNetwork(networkID, false) +} + +func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error { nw, err := daemon.FindNetwork(networkID) if err != nil { return err } - if runconfig.IsPreDefinedNetwork(nw.Name()) { + if runconfig.IsPreDefinedNetwork(nw.Name()) && !dynamic { err := fmt.Errorf("%s is a pre-defined network and cannot be removed", nw.Name()) return errors.NewRequestForbiddenError(err) } @@ -230,14 +359,7 @@ func (daemon *Daemon) DeleteNetwork(networkID string) error { return nil } -// FilterNetworks returns a list of networks filtered by the given arguments. -// It returns an error if the filters are not included in the list of accepted filters. -func (daemon *Daemon) FilterNetworks(netFilters filters.Args) ([]libnetwork.Network, error) { - if netFilters.Len() != 0 { - if err := netFilters.Validate(netsettings.AcceptedFilters); err != nil { - return nil, err - } - } - nwList := daemon.getAllNetworks() - return netsettings.FilterNetworks(nwList, netFilters) +// GetNetworks returns a list of all networks +func (daemon *Daemon) GetNetworks() []libnetwork.Network { + return daemon.getAllNetworks() } diff --git a/daemon/network/settings.go b/daemon/network/settings.go index 823bec2696..ff27cb0bbc 100644 --- a/daemon/network/settings.go +++ b/daemon/network/settings.go @@ -1,6 +1,7 @@ package network import ( + clustertypes "github.com/docker/docker/daemon/cluster/provider" networktypes "github.com/docker/engine-api/types/network" "github.com/docker/go-connections/nat" ) @@ -14,6 +15,7 @@ type Settings struct { LinkLocalIPv6Address string LinkLocalIPv6PrefixLen int Networks map[string]*networktypes.EndpointSettings + Service *clustertypes.ServiceConfig Ports nat.PortMap SandboxKey string SecondaryIPAddresses []networktypes.Address diff --git a/daemon/wait.go b/daemon/wait.go index 52b335cdd7..bf7e2c7149 100644 --- a/daemon/wait.go +++ b/daemon/wait.go @@ -1,6 +1,10 @@ package daemon -import "time" +import ( + "time" + + "golang.org/x/net/context" +) // ContainerWait stops processing until the given container is // stopped. If the container is not found, an error is returned. On a @@ -15,3 +19,14 @@ func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, er return container.WaitStop(timeout) } + +// ContainerWaitWithContext returns a channel where exit code is sent +// when container stops. Channel can be cancelled with a context. +func (daemon *Daemon) ContainerWaitWithContext(ctx context.Context, name string) (<-chan int, error) { + container, err := daemon.GetContainer(name) + if err != nil { + return nil, err + } + + return container.WaitWithContext(ctx), nil +} diff --git a/opts/opts.go b/opts/opts.go index 9bd8040d25..1b9d6b294a 100644 --- a/opts/opts.go +++ b/opts/opts.go @@ -5,6 +5,8 @@ import ( "net" "regexp" "strings" + + "github.com/docker/engine-api/types/filters" ) var ( @@ -282,3 +284,38 @@ func ValidateSysctl(val string) (string, error) { } return "", fmt.Errorf("sysctl '%s' is not whitelisted", val) } + +// FilterOpt is a flag type for validating filters +type FilterOpt struct { + filter filters.Args +} + +// NewFilterOpt returns a new FilterOpt +func NewFilterOpt() FilterOpt { + return FilterOpt{filter: filters.NewArgs()} +} + +func (o *FilterOpt) String() string { + repr, err := filters.ToParam(o.filter) + if err != nil { + return "invalid filters" + } + return repr +} + +// Set sets the value of the opt by parsing the command line value +func (o *FilterOpt) Set(value string) error { + var err error + o.filter, err = filters.ParseFlag(value, o.filter) + return err +} + +// Type returns the option type +func (o *FilterOpt) Type() string { + return "filter" +} + +// Value returns the value of this option +func (o *FilterOpt) Value() filters.Args { + return o.filter +} diff --git a/runconfig/hostconfig_unix.go b/runconfig/hostconfig_unix.go index 0568791ddd..c06b6ebfa1 100644 --- a/runconfig/hostconfig_unix.go +++ b/runconfig/hostconfig_unix.go @@ -19,7 +19,7 @@ func DefaultDaemonNetworkMode() container.NetworkMode { // IsPreDefinedNetwork indicates if a network is predefined by the daemon func IsPreDefinedNetwork(network string) bool { n := container.NetworkMode(network) - return n.IsBridge() || n.IsHost() || n.IsNone() || n.IsDefault() + return n.IsBridge() || n.IsHost() || n.IsNone() || n.IsDefault() || network == "ingress" } // ValidateNetMode ensures that the various combinations of requested diff --git a/volume/volume.go b/volume/volume.go index 1b57d85087..9bb8b7cdf0 100644 --- a/volume/volume.go +++ b/volume/volume.go @@ -140,6 +140,17 @@ func (m *MountPoint) Path() string { return m.Source } +// Type returns the type of mount point +func (m *MountPoint) Type() string { + if m.Name != "" { + return "VOLUME" + } + if m.Source != "" { + return "BIND" + } + return "EPHEMERAL" +} + // ParseVolumesFrom ensures that the supplied volumes-from is valid. func ParseVolumesFrom(spec string) (string, string, error) { if len(spec) == 0 {