1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Add Swarm management backend

As described in our ROADMAP.md, introduce new Swarm management API
endpoints relying on swarmkit to deploy services. It currently vendors
docker/engine-api changes.

This PR is fully backward compatible (joining a Swarm is an optional
feature of the Engine, and existing commands are not impacted).

Signed-off-by: Tonis Tiigi <tonistiigi@gmail.com>
Signed-off-by: Victor Vieux <vieux@docker.com>
Signed-off-by: Daniel Nephin <dnephin@docker.com>
Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
Signed-off-by: Madhu Venugopal <madhu@docker.com>
This commit is contained in:
Tonis Tiigi 2016-06-13 19:52:49 -07:00
parent 44793049ce
commit 534a90a993
42 changed files with 4046 additions and 53 deletions

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/versions"
"github.com/gorilla/mux"
"google.golang.org/grpc"
)
// httpStatusError is an interface
@ -58,6 +59,7 @@ func GetHTTPErrorStatusCode(err error) int {
"wrong login/password": http.StatusUnauthorized,
"unauthorized": http.StatusUnauthorized,
"hasn't been activated": http.StatusForbidden,
"this node": http.StatusNotAcceptable,
} {
if strings.Contains(errStr, keyword) {
statusCode = status
@ -85,7 +87,7 @@ func MakeErrorHandler(err error) http.HandlerFunc {
}
WriteJSON(w, statusCode, response)
} else {
http.Error(w, err.Error(), statusCode)
http.Error(w, grpc.ErrorDesc(err), statusCode)
}
}
}

View file

@ -2,7 +2,6 @@ package network
import (
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/filters"
"github.com/docker/engine-api/types/network"
"github.com/docker/libnetwork"
)
@ -13,7 +12,7 @@ type Backend interface {
FindNetwork(idName string) (libnetwork.Network, error)
GetNetworkByName(idName string) (libnetwork.Network, error)
GetNetworksByID(partialID string) []libnetwork.Network
FilterNetworks(netFilters filters.Args) ([]libnetwork.Network, error)
GetNetworks() []libnetwork.Network
CreateNetwork(nc types.NetworkCreateRequest) (*types.NetworkCreateResponse, error)
ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
DisconnectContainerFromNetwork(containerName string, network libnetwork.Network, force bool) error

View file

@ -0,0 +1,98 @@
package network
import (
"fmt"
"github.com/docker/docker/runconfig"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/filters"
)
type filterHandler func([]types.NetworkResource, string) ([]types.NetworkResource, error)
var (
// AcceptedFilters is an acceptable filters for validation
AcceptedFilters = map[string]bool{
"driver": true,
"type": true,
"name": true,
"id": true,
"label": true,
}
)
func filterNetworkByType(nws []types.NetworkResource, netType string) (retNws []types.NetworkResource, err error) {
switch netType {
case "builtin":
for _, nw := range nws {
if runconfig.IsPreDefinedNetwork(nw.Name) {
retNws = append(retNws, nw)
}
}
case "custom":
for _, nw := range nws {
if !runconfig.IsPreDefinedNetwork(nw.Name) {
retNws = append(retNws, nw)
}
}
default:
return nil, fmt.Errorf("Invalid filter: 'type'='%s'", netType)
}
return retNws, nil
}
// filterNetworks filters network list according to user specified filter
// and returns user chosen networks
func filterNetworks(nws []types.NetworkResource, filter filters.Args) ([]types.NetworkResource, error) {
// if filter is empty, return original network list
if filter.Len() == 0 {
return nws, nil
}
if err := filter.Validate(AcceptedFilters); err != nil {
return nil, err
}
var displayNet []types.NetworkResource
for _, nw := range nws {
if filter.Include("driver") {
if !filter.ExactMatch("driver", nw.Driver) {
continue
}
}
if filter.Include("name") {
if !filter.Match("name", nw.Name) {
continue
}
}
if filter.Include("id") {
if !filter.Match("id", nw.ID) {
continue
}
}
if filter.Include("label") {
if !filter.MatchKVList("label", nw.Labels) {
continue
}
}
displayNet = append(displayNet, nw)
}
if filter.Include("type") {
var typeNet []types.NetworkResource
errFilter := filter.WalkValues("type", func(fval string) error {
passList, err := filterNetworkByType(displayNet, fval)
if err != nil {
return err
}
typeNet = append(typeNet, passList...)
return nil
})
if errFilter != nil {
return nil, errFilter
}
displayNet = typeNet
}
return displayNet, nil
}

View file

@ -1,17 +1,22 @@
package network
import "github.com/docker/docker/api/server/router"
import (
"github.com/docker/docker/api/server/router"
"github.com/docker/docker/daemon/cluster"
)
// networkRouter is a router to talk with the network controller
type networkRouter struct {
backend Backend
routes []router.Route
backend Backend
clusterProvider *cluster.Cluster
routes []router.Route
}
// NewRouter initializes a new network router
func NewRouter(b Backend) router.Router {
func NewRouter(b Backend, c *cluster.Cluster) router.Router {
r := &networkRouter{
backend: b,
backend: b,
clusterProvider: c,
}
r.initRoutes()
return r

View file

@ -24,17 +24,30 @@ func (n *networkRouter) getNetworksList(ctx context.Context, w http.ResponseWrit
return err
}
list := []*types.NetworkResource{}
list := []types.NetworkResource{}
nwList, err := n.backend.FilterNetworks(netFilters)
if nr, err := n.clusterProvider.GetNetworks(); err == nil {
for _, nw := range nr {
list = append(list, nw)
}
}
// Combine the network list returned by Docker daemon if it is not already
// returned by the cluster manager
SKIP:
for _, nw := range n.backend.GetNetworks() {
for _, nl := range list {
if nl.ID == nw.ID() {
continue SKIP
}
}
list = append(list, *n.buildNetworkResource(nw))
}
list, err = filterNetworks(list, netFilters)
if err != nil {
return err
}
for _, nw := range nwList {
list = append(list, buildNetworkResource(nw))
}
return httputils.WriteJSON(w, http.StatusOK, list)
}
@ -45,9 +58,12 @@ func (n *networkRouter) getNetwork(ctx context.Context, w http.ResponseWriter, r
nw, err := n.backend.FindNetwork(vars["id"])
if err != nil {
if nr, err := n.clusterProvider.GetNetwork(vars["id"]); err == nil {
return httputils.WriteJSON(w, http.StatusOK, nr)
}
return err
}
return httputils.WriteJSON(w, http.StatusOK, buildNetworkResource(nw))
return httputils.WriteJSON(w, http.StatusOK, n.buildNetworkResource(nw))
}
func (n *networkRouter) postNetworkCreate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
@ -67,7 +83,14 @@ func (n *networkRouter) postNetworkCreate(ctx context.Context, w http.ResponseWr
nw, err := n.backend.CreateNetwork(create)
if err != nil {
return err
if _, ok := err.(libnetwork.ManagerRedirectError); !ok {
return err
}
id, err := n.clusterProvider.CreateNetwork(create)
if err != nil {
return err
}
nw = &types.NetworkCreateResponse{ID: id}
}
return httputils.WriteJSON(w, http.StatusCreated, nw)
@ -121,6 +144,9 @@ func (n *networkRouter) deleteNetwork(ctx context.Context, w http.ResponseWriter
if err := httputils.ParseForm(r); err != nil {
return err
}
if _, err := n.clusterProvider.GetNetwork(vars["id"]); err == nil {
return n.clusterProvider.RemoveNetwork(vars["id"])
}
if err := n.backend.DeleteNetwork(vars["id"]); err != nil {
return err
}
@ -128,7 +154,7 @@ func (n *networkRouter) deleteNetwork(ctx context.Context, w http.ResponseWriter
return nil
}
func buildNetworkResource(nw libnetwork.Network) *types.NetworkResource {
func (n *networkRouter) buildNetworkResource(nw libnetwork.Network) *types.NetworkResource {
r := &types.NetworkResource{}
if nw == nil {
return r
@ -138,6 +164,13 @@ func buildNetworkResource(nw libnetwork.Network) *types.NetworkResource {
r.Name = nw.Name()
r.ID = nw.ID()
r.Scope = info.Scope()
if n.clusterProvider.IsManager() {
if _, err := n.clusterProvider.GetNetwork(nw.Name()); err == nil {
r.Scope = "swarm"
}
} else if info.Dynamic() {
r.Scope = "swarm"
}
r.Driver = nw.Type()
r.EnableIPv6 = info.IPv6Enabled()
r.Internal = info.Internal()

View file

@ -0,0 +1,26 @@
package swarm
import (
basictypes "github.com/docker/engine-api/types"
types "github.com/docker/engine-api/types/swarm"
)
// Backend abstracts an swarm commands manager.
type Backend interface {
Init(req types.InitRequest) (string, error)
Join(req types.JoinRequest) error
Leave(force bool) error
Inspect() (types.Swarm, error)
Update(uint64, types.Spec) error
GetServices(basictypes.ServiceListOptions) ([]types.Service, error)
GetService(string) (types.Service, error)
CreateService(types.ServiceSpec) (string, error)
UpdateService(string, uint64, types.ServiceSpec) error
RemoveService(string) error
GetNodes(basictypes.NodeListOptions) ([]types.Node, error)
GetNode(string) (types.Node, error)
UpdateNode(string, uint64, types.NodeSpec) error
RemoveNode(string) error
GetTasks(basictypes.TaskListOptions) ([]types.Task, error)
GetTask(string) (types.Task, error)
}

View file

@ -0,0 +1,44 @@
package swarm
import "github.com/docker/docker/api/server/router"
// buildRouter is a router to talk with the build controller
type swarmRouter struct {
backend Backend
routes []router.Route
}
// NewRouter initializes a new build router
func NewRouter(b Backend) router.Router {
r := &swarmRouter{
backend: b,
}
r.initRoutes()
return r
}
// Routes returns the available routers to the swarm controller
func (sr *swarmRouter) Routes() []router.Route {
return sr.routes
}
func (sr *swarmRouter) initRoutes() {
sr.routes = []router.Route{
router.NewPostRoute("/swarm/init", sr.initCluster),
router.NewPostRoute("/swarm/join", sr.joinCluster),
router.NewPostRoute("/swarm/leave", sr.leaveCluster),
router.NewGetRoute("/swarm", sr.inspectCluster),
router.NewPostRoute("/swarm/update", sr.updateCluster),
router.NewGetRoute("/services", sr.getServices),
router.NewGetRoute("/services/{id:.*}", sr.getService),
router.NewPostRoute("/services/create", sr.createService),
router.NewPostRoute("/services/{id:.*}/update", sr.updateService),
router.NewDeleteRoute("/services/{id:.*}", sr.removeService),
router.NewGetRoute("/nodes", sr.getNodes),
router.NewGetRoute("/nodes/{id:.*}", sr.getNode),
router.NewDeleteRoute("/nodes/{id:.*}", sr.removeNode),
router.NewPostRoute("/nodes/{id:.*}/update", sr.updateNode),
router.NewGetRoute("/tasks", sr.getTasks),
router.NewGetRoute("/tasks/{id:.*}", sr.getTask),
}
}

View file

@ -0,0 +1,229 @@
package swarm
import (
"encoding/json"
"fmt"
"net/http"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/server/httputils"
basictypes "github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/filters"
types "github.com/docker/engine-api/types/swarm"
"golang.org/x/net/context"
)
func (sr *swarmRouter) initCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var req types.InitRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return err
}
nodeID, err := sr.backend.Init(req)
if err != nil {
logrus.Errorf("Error initializing swarm: %v", err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, nodeID)
}
func (sr *swarmRouter) joinCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var req types.JoinRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
return err
}
return sr.backend.Join(req)
}
func (sr *swarmRouter) leaveCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
force := httputils.BoolValue(r, "force")
return sr.backend.Leave(force)
}
func (sr *swarmRouter) inspectCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
swarm, err := sr.backend.Inspect()
if err != nil {
logrus.Errorf("Error getting swarm: %v", err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, swarm)
}
func (sr *swarmRouter) updateCluster(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var swarm types.Spec
if err := json.NewDecoder(r.Body).Decode(&swarm); err != nil {
return err
}
rawVersion := r.URL.Query().Get("version")
version, err := strconv.ParseUint(rawVersion, 10, 64)
if err != nil {
return fmt.Errorf("Invalid swarm version '%s': %s", rawVersion, err.Error())
}
if err := sr.backend.Update(version, swarm); err != nil {
logrus.Errorf("Error configuring swarm: %v", err)
return err
}
return nil
}
func (sr *swarmRouter) getServices(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
filter, err := filters.FromParam(r.Form.Get("filters"))
if err != nil {
return err
}
services, err := sr.backend.GetServices(basictypes.ServiceListOptions{Filter: filter})
if err != nil {
logrus.Errorf("Error getting services: %v", err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, services)
}
func (sr *swarmRouter) getService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
service, err := sr.backend.GetService(vars["id"])
if err != nil {
logrus.Errorf("Error getting service %s: %v", vars["id"], err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, service)
}
func (sr *swarmRouter) createService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var service types.ServiceSpec
if err := json.NewDecoder(r.Body).Decode(&service); err != nil {
return err
}
id, err := sr.backend.CreateService(service)
if err != nil {
logrus.Errorf("Error reating service %s: %v", id, err)
return err
}
return httputils.WriteJSON(w, http.StatusCreated, &basictypes.ServiceCreateResponse{
ID: id,
})
}
func (sr *swarmRouter) updateService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var service types.ServiceSpec
if err := json.NewDecoder(r.Body).Decode(&service); err != nil {
return err
}
rawVersion := r.URL.Query().Get("version")
version, err := strconv.ParseUint(rawVersion, 10, 64)
if err != nil {
return fmt.Errorf("Invalid service version '%s': %s", rawVersion, err.Error())
}
if err := sr.backend.UpdateService(vars["id"], version, service); err != nil {
logrus.Errorf("Error updating service %s: %v", vars["id"], err)
return err
}
return nil
}
func (sr *swarmRouter) removeService(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := sr.backend.RemoveService(vars["id"]); err != nil {
logrus.Errorf("Error removing service %s: %v", vars["id"], err)
return err
}
return nil
}
func (sr *swarmRouter) getNodes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
filter, err := filters.FromParam(r.Form.Get("filters"))
if err != nil {
return err
}
nodes, err := sr.backend.GetNodes(basictypes.NodeListOptions{Filter: filter})
if err != nil {
logrus.Errorf("Error getting nodes: %v", err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, nodes)
}
func (sr *swarmRouter) getNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
node, err := sr.backend.GetNode(vars["id"])
if err != nil {
logrus.Errorf("Error getting node %s: %v", vars["id"], err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, node)
}
func (sr *swarmRouter) updateNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
var node types.NodeSpec
if err := json.NewDecoder(r.Body).Decode(&node); err != nil {
return err
}
rawVersion := r.URL.Query().Get("version")
version, err := strconv.ParseUint(rawVersion, 10, 64)
if err != nil {
return fmt.Errorf("Invalid node version '%s': %s", rawVersion, err.Error())
}
if err := sr.backend.UpdateNode(vars["id"], version, node); err != nil {
logrus.Errorf("Error updating node %s: %v", vars["id"], err)
return err
}
return nil
}
func (sr *swarmRouter) removeNode(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := sr.backend.RemoveNode(vars["id"]); err != nil {
logrus.Errorf("Error removing node %s: %v", vars["id"], err)
return err
}
return nil
}
func (sr *swarmRouter) getTasks(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
if err := httputils.ParseForm(r); err != nil {
return err
}
filter, err := filters.FromParam(r.Form.Get("filters"))
if err != nil {
return err
}
tasks, err := sr.backend.GetTasks(basictypes.TaskListOptions{Filter: filter})
if err != nil {
logrus.Errorf("Error getting tasks: %v", err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, tasks)
}
func (sr *swarmRouter) getTask(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error {
task, err := sr.backend.GetTask(vars["id"])
if err != nil {
logrus.Errorf("Error getting task %s: %v", vars["id"], err)
return err
}
return httputils.WriteJSON(w, http.StatusOK, task)
}

View file

@ -1,18 +1,23 @@
package system
import "github.com/docker/docker/api/server/router"
import (
"github.com/docker/docker/api/server/router"
"github.com/docker/docker/daemon/cluster"
)
// systemRouter provides information about the Docker system overall.
// It gathers information about host, daemon and container events.
type systemRouter struct {
backend Backend
routes []router.Route
backend Backend
clusterProvider *cluster.Cluster
routes []router.Route
}
// NewRouter initializes a new system router
func NewRouter(b Backend) router.Router {
func NewRouter(b Backend, c *cluster.Cluster) router.Router {
r := &systemRouter{
backend: b,
backend: b,
clusterProvider: c,
}
r.routes = []router.Route{

View file

@ -33,6 +33,9 @@ func (s *systemRouter) getInfo(ctx context.Context, w http.ResponseWriter, r *ht
if err != nil {
return err
}
if s.clusterProvider != nil {
info.Swarm = s.clusterProvider.Info()
}
return httputils.WriteJSON(w, http.StatusOK, info)
}

View file

@ -20,12 +20,14 @@ import (
"github.com/docker/docker/api/server/router/container"
"github.com/docker/docker/api/server/router/image"
"github.com/docker/docker/api/server/router/network"
swarmrouter "github.com/docker/docker/api/server/router/swarm"
systemrouter "github.com/docker/docker/api/server/router/system"
"github.com/docker/docker/api/server/router/volume"
"github.com/docker/docker/builder/dockerfile"
cliflags "github.com/docker/docker/cli/flags"
"github.com/docker/docker/cliconfig"
"github.com/docker/docker/daemon"
"github.com/docker/docker/daemon/cluster"
"github.com/docker/docker/daemon/logger"
"github.com/docker/docker/dockerversion"
"github.com/docker/docker/libcontainerd"
@ -208,6 +210,7 @@ func (cli *DaemonCli) start() (err error) {
}
api := apiserver.New(serverConfig)
cli.api = api
for i := 0; i < len(cli.Config.Hosts); i++ {
var err error
@ -264,6 +267,17 @@ func (cli *DaemonCli) start() (err error) {
return fmt.Errorf("Error starting daemon: %v", err)
}
name, _ := os.Hostname()
c, err := cluster.New(cluster.Config{
Root: cli.Config.Root,
Name: name,
Backend: d,
})
if err != nil {
logrus.Fatalf("Error creating cluster component: %v", err)
}
logrus.Info("Daemon has completed initialization")
logrus.WithFields(logrus.Fields{
@ -273,7 +287,7 @@ func (cli *DaemonCli) start() (err error) {
}).Info("Docker daemon")
cli.initMiddlewares(api, serverConfig)
initRouter(api, d)
initRouter(api, d, c)
cli.d = d
cli.setupConfigReloadTrap()
@ -290,6 +304,7 @@ func (cli *DaemonCli) start() (err error) {
// Daemon is fully initialized and handling API traffic
// Wait for serve API to complete
errAPI := <-serveAPIWait
c.Cleanup()
shutdownDaemon(d, 15)
containerdRemote.Cleanup()
if errAPI != nil {
@ -385,18 +400,19 @@ func loadDaemonCliConfig(config *daemon.Config, flags *flag.FlagSet, commonConfi
return config, nil
}
func initRouter(s *apiserver.Server, d *daemon.Daemon) {
func initRouter(s *apiserver.Server, d *daemon.Daemon, c *cluster.Cluster) {
decoder := runconfig.ContainerDecoder{}
routers := []router.Router{
container.NewRouter(d, decoder),
image.NewRouter(d, decoder),
systemrouter.NewRouter(d),
systemrouter.NewRouter(d, c),
volume.NewRouter(d),
build.NewRouter(dockerfile.NewBuildManager(d)),
swarmrouter.NewRouter(c),
}
if d.NetworkControllerEnabled() {
routers = append(routers, network.NewRouter(d))
routers = append(routers, network.NewRouter(d, c))
}
s.InitRouter(utils.IsDebugEnabled(), routers...)

View file

@ -66,6 +66,7 @@ type CommonContainer struct {
RWLayer layer.RWLayer `json:"-"`
ID string
Created time.Time
Managed bool
Path string
Args []string
Config *containertypes.Config
@ -790,7 +791,7 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC
ipam := epConfig.IPAMConfig
if ipam != nil && (ipam.IPv4Address != "" || ipam.IPv6Address != "") {
createOptions = append(createOptions,
libnetwork.CreateOptionIpam(net.ParseIP(ipam.IPv4Address), net.ParseIP(ipam.IPv6Address), nil))
libnetwork.CreateOptionIpam(net.ParseIP(ipam.IPv4Address), net.ParseIP(ipam.IPv6Address), nil, nil))
}
for _, alias := range epConfig.Aliases {
@ -798,6 +799,27 @@ func (container *Container) BuildCreateEndpointOptions(n libnetwork.Network, epC
}
}
if container.NetworkSettings.Service != nil {
svcCfg := container.NetworkSettings.Service
var vip string
if svcCfg.VirtualAddresses[n.ID()] != nil {
vip = svcCfg.VirtualAddresses[n.ID()].IPv4
}
var portConfigs []*libnetwork.PortConfig
for _, portConfig := range svcCfg.ExposedPorts {
portConfigs = append(portConfigs, &libnetwork.PortConfig{
Name: portConfig.Name,
Protocol: libnetwork.PortConfig_Protocol(portConfig.Protocol),
TargetPort: portConfig.TargetPort,
PublishedPort: portConfig.PublishedPort,
})
}
createOptions = append(createOptions, libnetwork.CreateOptionService(svcCfg.Name, svcCfg.ID, net.ParseIP(vip), portConfigs))
}
if !containertypes.NetworkMode(n.Name()).IsUserDefined() {
createOptions = append(createOptions, libnetwork.CreateOptionDisableResolution())
}

View file

@ -5,6 +5,8 @@ import (
"sync"
"time"
"golang.org/x/net/context"
"github.com/docker/go-units"
)
@ -139,6 +141,32 @@ func (s *State) WaitStop(timeout time.Duration) (int, error) {
return s.getExitCode(), nil
}
// WaitWithContext waits for the container to stop. Optional context can be
// passed for canceling the request.
func (s *State) WaitWithContext(ctx context.Context) <-chan int {
// todo(tonistiigi): make other wait functions use this
c := make(chan int)
go func() {
s.Lock()
if !s.Running {
exitCode := s.ExitCode
s.Unlock()
c <- exitCode
close(c)
return
}
waitChan := s.waitChan
s.Unlock()
select {
case <-waitChan:
c <- s.getExitCode()
case <-ctx.Done():
}
close(c)
}()
return c
}
// IsRunning returns whether the running flag is set. Used by Container to check whether a container is running.
func (s *State) IsRunning() bool {
s.Lock()

1056
daemon/cluster/cluster.go Normal file

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,116 @@
package convert
import (
"fmt"
"strings"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
func containerSpecFromGRPC(c *swarmapi.ContainerSpec) types.ContainerSpec {
containerSpec := types.ContainerSpec{
Image: c.Image,
Labels: c.Labels,
Command: c.Command,
Args: c.Args,
Env: c.Env,
Dir: c.Dir,
User: c.User,
}
// Mounts
for _, m := range c.Mounts {
mount := types.Mount{
Target: m.Target,
Source: m.Source,
Type: types.MountType(strings.ToLower(swarmapi.Mount_MountType_name[int32(m.Type)])),
Writable: m.Writable,
}
if m.BindOptions != nil {
mount.BindOptions = &types.BindOptions{
Propagation: types.MountPropagation(strings.ToLower(swarmapi.Mount_BindOptions_MountPropagation_name[int32(m.BindOptions.Propagation)])),
}
}
if m.VolumeOptions != nil {
mount.VolumeOptions = &types.VolumeOptions{
Populate: m.VolumeOptions.Populate,
Labels: m.VolumeOptions.Labels,
}
if m.VolumeOptions.DriverConfig != nil {
mount.VolumeOptions.DriverConfig = &types.Driver{
Name: m.VolumeOptions.DriverConfig.Name,
Options: m.VolumeOptions.DriverConfig.Options,
}
}
}
containerSpec.Mounts = append(containerSpec.Mounts, mount)
}
if c.StopGracePeriod != nil {
grace, _ := ptypes.Duration(c.StopGracePeriod)
containerSpec.StopGracePeriod = &grace
}
return containerSpec
}
func containerToGRPC(c types.ContainerSpec) (*swarmapi.ContainerSpec, error) {
containerSpec := &swarmapi.ContainerSpec{
Image: c.Image,
Labels: c.Labels,
Command: c.Command,
Args: c.Args,
Env: c.Env,
Dir: c.Dir,
User: c.User,
}
if c.StopGracePeriod != nil {
containerSpec.StopGracePeriod = ptypes.DurationProto(*c.StopGracePeriod)
}
// Mounts
for _, m := range c.Mounts {
mount := swarmapi.Mount{
Target: m.Target,
Source: m.Source,
Writable: m.Writable,
}
if mountType, ok := swarmapi.Mount_MountType_value[strings.ToUpper(string(m.Type))]; ok {
mount.Type = swarmapi.Mount_MountType(mountType)
} else if string(m.Type) != "" {
return nil, fmt.Errorf("invalid MountType: %q", m.Type)
}
if m.BindOptions != nil {
if mountPropagation, ok := swarmapi.Mount_BindOptions_MountPropagation_value[strings.ToUpper(string(m.BindOptions.Propagation))]; ok {
mount.BindOptions = &swarmapi.Mount_BindOptions{Propagation: swarmapi.Mount_BindOptions_MountPropagation(mountPropagation)}
} else if string(m.BindOptions.Propagation) != "" {
return nil, fmt.Errorf("invalid MountPropagation: %q", m.BindOptions.Propagation)
}
}
if m.VolumeOptions != nil {
mount.VolumeOptions = &swarmapi.Mount_VolumeOptions{
Populate: m.VolumeOptions.Populate,
Labels: m.VolumeOptions.Labels,
}
if m.VolumeOptions.DriverConfig != nil {
mount.VolumeOptions.DriverConfig = &swarmapi.Driver{
Name: m.VolumeOptions.DriverConfig.Name,
Options: m.VolumeOptions.DriverConfig.Options,
}
}
}
containerSpec.Mounts = append(containerSpec.Mounts, mount)
}
return containerSpec, nil
}

View file

@ -0,0 +1,194 @@
package convert
import (
"strings"
basictypes "github.com/docker/engine-api/types"
networktypes "github.com/docker/engine-api/types/network"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
func networkAttachementFromGRPC(na *swarmapi.NetworkAttachment) types.NetworkAttachment {
if na != nil {
return types.NetworkAttachment{
Network: networkFromGRPC(na.Network),
Addresses: na.Addresses,
}
}
return types.NetworkAttachment{}
}
func networkFromGRPC(n *swarmapi.Network) types.Network {
if n != nil {
network := types.Network{
ID: n.ID,
Spec: types.NetworkSpec{
IPv6Enabled: n.Spec.Ipv6Enabled,
Internal: n.Spec.Internal,
IPAMOptions: ipamFromGRPC(n.Spec.IPAM),
},
IPAMOptions: ipamFromGRPC(n.IPAM),
}
// Meta
network.Version.Index = n.Meta.Version.Index
network.CreatedAt, _ = ptypes.Timestamp(n.Meta.CreatedAt)
network.UpdatedAt, _ = ptypes.Timestamp(n.Meta.UpdatedAt)
//Annotations
network.Spec.Name = n.Spec.Annotations.Name
network.Spec.Labels = n.Spec.Annotations.Labels
//DriverConfiguration
if n.Spec.DriverConfig != nil {
network.Spec.DriverConfiguration = &types.Driver{
Name: n.Spec.DriverConfig.Name,
Options: n.Spec.DriverConfig.Options,
}
}
//DriverState
if n.DriverState != nil {
network.DriverState = types.Driver{
Name: n.DriverState.Name,
Options: n.DriverState.Options,
}
}
return network
}
return types.Network{}
}
func ipamFromGRPC(i *swarmapi.IPAMOptions) *types.IPAMOptions {
var ipam *types.IPAMOptions
if i != nil {
ipam = &types.IPAMOptions{}
if i.Driver != nil {
ipam.Driver.Name = i.Driver.Name
ipam.Driver.Options = i.Driver.Options
}
for _, config := range i.Configs {
ipam.Configs = append(ipam.Configs, types.IPAMConfig{
Subnet: config.Subnet,
Range: config.Range,
Gateway: config.Gateway,
})
}
}
return ipam
}
func endpointSpecFromGRPC(es *swarmapi.EndpointSpec) *types.EndpointSpec {
var endpointSpec *types.EndpointSpec
if es != nil {
endpointSpec = &types.EndpointSpec{}
endpointSpec.Mode = types.ResolutionMode(strings.ToLower(es.Mode.String()))
for _, portState := range es.Ports {
endpointSpec.Ports = append(endpointSpec.Ports, types.PortConfig{
Name: portState.Name,
Protocol: types.PortConfigProtocol(strings.ToLower(swarmapi.PortConfig_Protocol_name[int32(portState.Protocol)])),
TargetPort: portState.TargetPort,
PublishedPort: portState.PublishedPort,
})
}
}
return endpointSpec
}
func endpointFromGRPC(e *swarmapi.Endpoint) types.Endpoint {
endpoint := types.Endpoint{}
if e != nil {
if espec := endpointSpecFromGRPC(e.Spec); espec != nil {
endpoint.Spec = *espec
}
for _, portState := range e.Ports {
endpoint.Ports = append(endpoint.Ports, types.PortConfig{
Name: portState.Name,
Protocol: types.PortConfigProtocol(strings.ToLower(swarmapi.PortConfig_Protocol_name[int32(portState.Protocol)])),
TargetPort: portState.TargetPort,
PublishedPort: portState.PublishedPort,
})
}
for _, v := range e.VirtualIPs {
endpoint.VirtualIPs = append(endpoint.VirtualIPs, types.EndpointVirtualIP{
NetworkID: v.NetworkID,
Addr: v.Addr})
}
}
return endpoint
}
// BasicNetworkFromGRPC converts a grpc Network to a NetworkResource.
func BasicNetworkFromGRPC(n swarmapi.Network) basictypes.NetworkResource {
spec := n.Spec
var ipam networktypes.IPAM
if spec.IPAM != nil {
if spec.IPAM.Driver != nil {
ipam.Driver = spec.IPAM.Driver.Name
ipam.Options = spec.IPAM.Driver.Options
}
ipam.Config = make([]networktypes.IPAMConfig, 0, len(spec.IPAM.Configs))
for _, ic := range spec.IPAM.Configs {
ipamConfig := networktypes.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
AuxAddress: ic.Reserved,
}
ipam.Config = append(ipam.Config, ipamConfig)
}
}
return basictypes.NetworkResource{
ID: n.ID,
Name: n.Spec.Annotations.Name,
Scope: "swarm",
Driver: n.DriverState.Name,
EnableIPv6: spec.Ipv6Enabled,
IPAM: ipam,
Internal: spec.Internal,
Options: n.DriverState.Options,
Labels: n.Spec.Annotations.Labels,
}
}
// BasicNetworkCreateToGRPC converts a NetworkCreateRequest to a grpc NetworkSpec.
func BasicNetworkCreateToGRPC(create basictypes.NetworkCreateRequest) swarmapi.NetworkSpec {
ns := swarmapi.NetworkSpec{
Annotations: swarmapi.Annotations{
Name: create.Name,
Labels: create.Labels,
},
DriverConfig: &swarmapi.Driver{
Name: create.Driver,
Options: create.Options,
},
Ipv6Enabled: create.EnableIPv6,
Internal: create.Internal,
IPAM: &swarmapi.IPAMOptions{
Driver: &swarmapi.Driver{
Name: create.IPAM.Driver,
Options: create.IPAM.Options,
},
},
}
ipamSpec := make([]*swarmapi.IPAMConfig, 0, len(create.IPAM.Config))
for _, ipamConfig := range create.IPAM.Config {
ipamSpec = append(ipamSpec, &swarmapi.IPAMConfig{
Subnet: ipamConfig.Subnet,
Range: ipamConfig.IPRange,
Gateway: ipamConfig.Gateway,
})
}
ns.IPAM.Configs = ipamSpec
return ns
}

View file

@ -0,0 +1,95 @@
package convert
import (
"fmt"
"strings"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// NodeFromGRPC converts a grpc Node to a Node.
func NodeFromGRPC(n swarmapi.Node) types.Node {
node := types.Node{
ID: n.ID,
Spec: types.NodeSpec{
Role: types.NodeRole(strings.ToLower(n.Spec.Role.String())),
Membership: types.NodeMembership(strings.ToLower(n.Spec.Membership.String())),
Availability: types.NodeAvailability(strings.ToLower(n.Spec.Availability.String())),
},
Status: types.NodeStatus{
State: types.NodeState(strings.ToLower(n.Status.State.String())),
Message: n.Status.Message,
},
}
// Meta
node.Version.Index = n.Meta.Version.Index
node.CreatedAt, _ = ptypes.Timestamp(n.Meta.CreatedAt)
node.UpdatedAt, _ = ptypes.Timestamp(n.Meta.UpdatedAt)
//Annotations
node.Spec.Name = n.Spec.Annotations.Name
node.Spec.Labels = n.Spec.Annotations.Labels
//Description
if n.Description != nil {
node.Description.Hostname = n.Description.Hostname
if n.Description.Platform != nil {
node.Description.Platform.Architecture = n.Description.Platform.Architecture
node.Description.Platform.OS = n.Description.Platform.OS
}
if n.Description.Resources != nil {
node.Description.Resources.NanoCPUs = n.Description.Resources.NanoCPUs
node.Description.Resources.MemoryBytes = n.Description.Resources.MemoryBytes
}
if n.Description.Engine != nil {
node.Description.Engine.EngineVersion = n.Description.Engine.EngineVersion
node.Description.Engine.Labels = n.Description.Engine.Labels
for _, plugin := range n.Description.Engine.Plugins {
node.Description.Engine.Plugins = append(node.Description.Engine.Plugins, types.PluginDescription{Type: plugin.Type, Name: plugin.Name})
}
}
}
//Manager
if n.ManagerStatus != nil {
node.ManagerStatus = &types.ManagerStatus{
Leader: n.ManagerStatus.Raft.Status.Leader,
Reachability: types.Reachability(strings.ToLower(n.ManagerStatus.Raft.Status.Reachability.String())),
Addr: n.ManagerStatus.Raft.Addr,
}
}
return node
}
// NodeSpecToGRPC converts a NodeSpec to a grpc NodeSpec.
func NodeSpecToGRPC(s types.NodeSpec) (swarmapi.NodeSpec, error) {
spec := swarmapi.NodeSpec{
Annotations: swarmapi.Annotations{
Name: s.Name,
Labels: s.Labels,
},
}
if role, ok := swarmapi.NodeRole_value[strings.ToUpper(string(s.Role))]; ok {
spec.Role = swarmapi.NodeRole(role)
} else {
return swarmapi.NodeSpec{}, fmt.Errorf("invalid Role: %q", s.Role)
}
if membership, ok := swarmapi.NodeSpec_Membership_value[strings.ToUpper(string(s.Membership))]; ok {
spec.Membership = swarmapi.NodeSpec_Membership(membership)
} else {
return swarmapi.NodeSpec{}, fmt.Errorf("invalid Membership: %q", s.Membership)
}
if availability, ok := swarmapi.NodeSpec_Availability_value[strings.ToUpper(string(s.Availability))]; ok {
spec.Availability = swarmapi.NodeSpec_Availability(availability)
} else {
return swarmapi.NodeSpec{}, fmt.Errorf("invalid Availability: %q", s.Availability)
}
return spec, nil
}

View file

@ -0,0 +1,252 @@
package convert
import (
"fmt"
"strings"
"github.com/docker/docker/pkg/namesgenerator"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// ServiceFromGRPC converts a grpc Service to a Service.
func ServiceFromGRPC(s swarmapi.Service) types.Service {
spec := s.Spec
containerConfig := spec.Task.Runtime.(*swarmapi.TaskSpec_Container).Container
networks := make([]types.NetworkAttachmentConfig, 0, len(spec.Networks))
for _, n := range spec.Networks {
networks = append(networks, types.NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases})
}
service := types.Service{
ID: s.ID,
Spec: types.ServiceSpec{
TaskTemplate: types.TaskSpec{
ContainerSpec: containerSpecFromGRPC(containerConfig),
Resources: resourcesFromGRPC(s.Spec.Task.Resources),
RestartPolicy: restartPolicyFromGRPC(s.Spec.Task.Restart),
Placement: placementFromGRPC(s.Spec.Task.Placement),
},
Networks: networks,
EndpointSpec: endpointSpecFromGRPC(s.Spec.Endpoint),
},
Endpoint: endpointFromGRPC(s.Endpoint),
}
// Meta
service.Version.Index = s.Meta.Version.Index
service.CreatedAt, _ = ptypes.Timestamp(s.Meta.CreatedAt)
service.UpdatedAt, _ = ptypes.Timestamp(s.Meta.UpdatedAt)
// Annotations
service.Spec.Name = s.Spec.Annotations.Name
service.Spec.Labels = s.Spec.Annotations.Labels
// UpdateConfig
if s.Spec.Update != nil {
service.Spec.UpdateConfig = &types.UpdateConfig{
Parallelism: s.Spec.Update.Parallelism,
}
service.Spec.UpdateConfig.Delay, _ = ptypes.Duration(&s.Spec.Update.Delay)
}
//Mode
switch t := s.Spec.GetMode().(type) {
case *swarmapi.ServiceSpec_Global:
service.Spec.Mode.Global = &types.GlobalService{}
case *swarmapi.ServiceSpec_Replicated:
service.Spec.Mode.Replicated = &types.ReplicatedService{
Replicas: &t.Replicated.Replicas,
}
}
return service
}
// ServiceSpecToGRPC converts a ServiceSpec to a grpc ServiceSpec.
func ServiceSpecToGRPC(s types.ServiceSpec) (swarmapi.ServiceSpec, error) {
name := s.Name
if name == "" {
name = namesgenerator.GetRandomName(0)
}
networks := make([]*swarmapi.ServiceSpec_NetworkAttachmentConfig, 0, len(s.Networks))
for _, n := range s.Networks {
networks = append(networks, &swarmapi.ServiceSpec_NetworkAttachmentConfig{Target: n.Target, Aliases: n.Aliases})
}
spec := swarmapi.ServiceSpec{
Annotations: swarmapi.Annotations{
Name: name,
Labels: s.Labels,
},
Task: swarmapi.TaskSpec{
Resources: resourcesToGRPC(s.TaskTemplate.Resources),
},
Networks: networks,
}
containerSpec, err := containerToGRPC(s.TaskTemplate.ContainerSpec)
if err != nil {
return swarmapi.ServiceSpec{}, err
}
spec.Task.Runtime = &swarmapi.TaskSpec_Container{Container: containerSpec}
restartPolicy, err := restartPolicyToGRPC(s.TaskTemplate.RestartPolicy)
if err != nil {
return swarmapi.ServiceSpec{}, err
}
spec.Task.Restart = restartPolicy
if s.TaskTemplate.Placement != nil {
spec.Task.Placement = &swarmapi.Placement{
Constraints: s.TaskTemplate.Placement.Constraints,
}
}
if s.UpdateConfig != nil {
spec.Update = &swarmapi.UpdateConfig{
Parallelism: s.UpdateConfig.Parallelism,
Delay: *ptypes.DurationProto(s.UpdateConfig.Delay),
}
}
if s.EndpointSpec != nil {
if s.EndpointSpec.Mode != "" &&
s.EndpointSpec.Mode != types.ResolutionModeVIP &&
s.EndpointSpec.Mode != types.ResolutionModeDNSRR {
return swarmapi.ServiceSpec{}, fmt.Errorf("invalid resolution mode: %q", s.EndpointSpec.Mode)
}
spec.Endpoint = &swarmapi.EndpointSpec{}
spec.Endpoint.Mode = swarmapi.EndpointSpec_ResolutionMode(swarmapi.EndpointSpec_ResolutionMode_value[strings.ToUpper(string(s.EndpointSpec.Mode))])
for _, portConfig := range s.EndpointSpec.Ports {
spec.Endpoint.Ports = append(spec.Endpoint.Ports, &swarmapi.PortConfig{
Name: portConfig.Name,
Protocol: swarmapi.PortConfig_Protocol(swarmapi.PortConfig_Protocol_value[strings.ToUpper(string(portConfig.Protocol))]),
TargetPort: portConfig.TargetPort,
PublishedPort: portConfig.PublishedPort,
})
}
}
//Mode
if s.Mode.Global != nil {
spec.Mode = &swarmapi.ServiceSpec_Global{
Global: &swarmapi.GlobalService{},
}
} else if s.Mode.Replicated != nil && s.Mode.Replicated.Replicas != nil {
spec.Mode = &swarmapi.ServiceSpec_Replicated{
Replicated: &swarmapi.ReplicatedService{Replicas: *s.Mode.Replicated.Replicas},
}
} else {
spec.Mode = &swarmapi.ServiceSpec_Replicated{
Replicated: &swarmapi.ReplicatedService{Replicas: 1},
}
}
return spec, nil
}
func resourcesFromGRPC(res *swarmapi.ResourceRequirements) *types.ResourceRequirements {
var resources *types.ResourceRequirements
if res != nil {
resources = &types.ResourceRequirements{}
if res.Limits != nil {
resources.Limits = &types.Resources{
NanoCPUs: res.Limits.NanoCPUs,
MemoryBytes: res.Limits.MemoryBytes,
}
}
if res.Reservations != nil {
resources.Reservations = &types.Resources{
NanoCPUs: res.Reservations.NanoCPUs,
MemoryBytes: res.Reservations.MemoryBytes,
}
}
}
return resources
}
func resourcesToGRPC(res *types.ResourceRequirements) *swarmapi.ResourceRequirements {
var reqs *swarmapi.ResourceRequirements
if res != nil {
reqs = &swarmapi.ResourceRequirements{}
if res.Limits != nil {
reqs.Limits = &swarmapi.Resources{
NanoCPUs: res.Limits.NanoCPUs,
MemoryBytes: res.Limits.MemoryBytes,
}
}
if res.Reservations != nil {
reqs.Reservations = &swarmapi.Resources{
NanoCPUs: res.Reservations.NanoCPUs,
MemoryBytes: res.Reservations.MemoryBytes,
}
}
}
return reqs
}
func restartPolicyFromGRPC(p *swarmapi.RestartPolicy) *types.RestartPolicy {
var rp *types.RestartPolicy
if p != nil {
rp = &types.RestartPolicy{}
rp.Condition = types.RestartPolicyCondition(strings.ToLower(p.Condition.String()))
if p.Delay != nil {
delay, _ := ptypes.Duration(p.Delay)
rp.Delay = &delay
}
if p.Window != nil {
window, _ := ptypes.Duration(p.Window)
rp.Window = &window
}
rp.MaxAttempts = &p.MaxAttempts
}
return rp
}
func restartPolicyToGRPC(p *types.RestartPolicy) (*swarmapi.RestartPolicy, error) {
var rp *swarmapi.RestartPolicy
if p != nil {
rp = &swarmapi.RestartPolicy{}
if condition, ok := swarmapi.RestartPolicy_RestartCondition_value[strings.ToUpper(string(p.Condition))]; ok {
rp.Condition = swarmapi.RestartPolicy_RestartCondition(condition)
} else if string(p.Condition) == "" {
rp.Condition = swarmapi.RestartOnAny
} else {
return nil, fmt.Errorf("invalid RestartCondition: %q", p.Condition)
}
if p.Delay != nil {
rp.Delay = ptypes.DurationProto(*p.Delay)
}
if p.Window != nil {
rp.Window = ptypes.DurationProto(*p.Window)
}
if p.MaxAttempts != nil {
rp.MaxAttempts = *p.MaxAttempts
}
}
return rp, nil
}
func placementFromGRPC(p *swarmapi.Placement) *types.Placement {
var r *types.Placement
if p != nil {
r = &types.Placement{}
r.Constraints = p.Constraints
}
return r
}

View file

@ -0,0 +1,116 @@
package convert
import (
"fmt"
"strings"
"golang.org/x/crypto/bcrypt"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// SwarmFromGRPC converts a grpc Cluster to a Swarm.
func SwarmFromGRPC(c swarmapi.Cluster) types.Swarm {
swarm := types.Swarm{
ID: c.ID,
Spec: types.Spec{
Orchestration: types.OrchestrationConfig{
TaskHistoryRetentionLimit: c.Spec.Orchestration.TaskHistoryRetentionLimit,
},
Raft: types.RaftConfig{
SnapshotInterval: c.Spec.Raft.SnapshotInterval,
KeepOldSnapshots: c.Spec.Raft.KeepOldSnapshots,
LogEntriesForSlowFollowers: c.Spec.Raft.LogEntriesForSlowFollowers,
HeartbeatTick: c.Spec.Raft.HeartbeatTick,
ElectionTick: c.Spec.Raft.ElectionTick,
},
Dispatcher: types.DispatcherConfig{
HeartbeatPeriod: c.Spec.Dispatcher.HeartbeatPeriod,
},
},
}
swarm.Spec.CAConfig.NodeCertExpiry, _ = ptypes.Duration(c.Spec.CAConfig.NodeCertExpiry)
// Meta
swarm.Version.Index = c.Meta.Version.Index
swarm.CreatedAt, _ = ptypes.Timestamp(c.Meta.CreatedAt)
swarm.UpdatedAt, _ = ptypes.Timestamp(c.Meta.UpdatedAt)
// Annotations
swarm.Spec.Name = c.Spec.Annotations.Name
swarm.Spec.Labels = c.Spec.Annotations.Labels
for _, policy := range c.Spec.AcceptancePolicy.Policies {
p := types.Policy{
Role: types.NodeRole(strings.ToLower(policy.Role.String())),
Autoaccept: policy.Autoaccept,
}
if policy.Secret != nil {
p.Secret = string(policy.Secret.Data)
}
swarm.Spec.AcceptancePolicy.Policies = append(swarm.Spec.AcceptancePolicy.Policies, p)
}
return swarm
}
// SwarmSpecToGRPC converts a Spec to a grpc ClusterSpec.
func SwarmSpecToGRPC(s types.Spec) (swarmapi.ClusterSpec, error) {
spec := swarmapi.ClusterSpec{
Annotations: swarmapi.Annotations{
Name: s.Name,
Labels: s.Labels,
},
Orchestration: swarmapi.OrchestrationConfig{
TaskHistoryRetentionLimit: s.Orchestration.TaskHistoryRetentionLimit,
},
Raft: swarmapi.RaftConfig{
SnapshotInterval: s.Raft.SnapshotInterval,
KeepOldSnapshots: s.Raft.KeepOldSnapshots,
LogEntriesForSlowFollowers: s.Raft.LogEntriesForSlowFollowers,
HeartbeatTick: s.Raft.HeartbeatTick,
ElectionTick: s.Raft.ElectionTick,
},
Dispatcher: swarmapi.DispatcherConfig{
HeartbeatPeriod: s.Dispatcher.HeartbeatPeriod,
},
CAConfig: swarmapi.CAConfig{
NodeCertExpiry: ptypes.DurationProto(s.CAConfig.NodeCertExpiry),
},
}
if err := SwarmSpecUpdateAcceptancePolicy(&spec, s.AcceptancePolicy); err != nil {
return swarmapi.ClusterSpec{}, err
}
return spec, nil
}
// SwarmSpecUpdateAcceptancePolicy updates a grpc ClusterSpec using AcceptancePolicy.
func SwarmSpecUpdateAcceptancePolicy(spec *swarmapi.ClusterSpec, acceptancePolicy types.AcceptancePolicy) error {
spec.AcceptancePolicy.Policies = nil
for _, p := range acceptancePolicy.Policies {
role, ok := swarmapi.NodeRole_value[strings.ToUpper(string(p.Role))]
if !ok {
return fmt.Errorf("invalid Role: %q", p.Role)
}
policy := &swarmapi.AcceptancePolicy_RoleAdmissionPolicy{
Role: swarmapi.NodeRole(role),
Autoaccept: p.Autoaccept,
}
if p.Secret != "" {
hashPwd, _ := bcrypt.GenerateFromPassword([]byte(p.Secret), 0)
policy.Secret = &swarmapi.AcceptancePolicy_RoleAdmissionPolicy_HashedSecret{
Data: hashPwd,
Alg: "bcrypt",
}
}
spec.AcceptancePolicy.Policies = append(spec.AcceptancePolicy.Policies, policy)
}
return nil
}

View file

@ -0,0 +1,53 @@
package convert
import (
"strings"
types "github.com/docker/engine-api/types/swarm"
swarmapi "github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/protobuf/ptypes"
)
// TaskFromGRPC converts a grpc Task to a Task.
func TaskFromGRPC(t swarmapi.Task) types.Task {
containerConfig := t.Spec.Runtime.(*swarmapi.TaskSpec_Container).Container
containerStatus := t.Status.GetContainer()
task := types.Task{
ID: t.ID,
ServiceID: t.ServiceID,
Slot: int(t.Slot),
NodeID: t.NodeID,
Spec: types.TaskSpec{
ContainerSpec: containerSpecFromGRPC(containerConfig),
Resources: resourcesFromGRPC(t.Spec.Resources),
RestartPolicy: restartPolicyFromGRPC(t.Spec.Restart),
Placement: placementFromGRPC(t.Spec.Placement),
},
Status: types.TaskStatus{
State: types.TaskState(strings.ToLower(t.Status.State.String())),
Message: t.Status.Message,
Err: t.Status.Err,
},
DesiredState: types.TaskState(strings.ToLower(t.DesiredState.String())),
}
// Meta
task.Version.Index = t.Meta.Version.Index
task.CreatedAt, _ = ptypes.Timestamp(t.Meta.CreatedAt)
task.UpdatedAt, _ = ptypes.Timestamp(t.Meta.UpdatedAt)
task.Status.Timestamp, _ = ptypes.Timestamp(t.Status.Timestamp)
if containerStatus != nil {
task.Status.ContainerStatus.ContainerID = containerStatus.ContainerID
task.Status.ContainerStatus.PID = int(containerStatus.PID)
task.Status.ContainerStatus.ExitCode = int(containerStatus.ExitCode)
}
// NetworksAttachments
for _, na := range t.Networks {
task.NetworksAttachments = append(task.NetworksAttachments, networkAttachementFromGRPC(na))
}
return task
}

View file

@ -0,0 +1,35 @@
package executor
import (
"io"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/network"
"github.com/docker/libnetwork/cluster"
networktypes "github.com/docker/libnetwork/types"
"golang.org/x/net/context"
)
// Backend defines the executor component for a swarm agent.
type Backend interface {
CreateManagedNetwork(clustertypes.NetworkCreateRequest) error
DeleteManagedNetwork(name string) error
SetupIngress(req clustertypes.NetworkCreateRequest, nodeIP string) error
PullImage(ctx context.Context, image, tag string, metaHeaders map[string][]string, authConfig *types.AuthConfig, outStream io.Writer) error
CreateManagedContainer(types.ContainerCreateConfig) (types.ContainerCreateResponse, error)
ContainerStart(name string, hostConfig *container.HostConfig) error
ContainerStop(name string, seconds int) error
ConnectContainerToNetwork(containerName, networkName string, endpointConfig *network.EndpointSettings) error
UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error
ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error)
ContainerWaitWithContext(ctx context.Context, name string) (<-chan int, error)
ContainerRm(name string, config *types.ContainerRmConfig) error
ContainerKill(name string, sig uint64) error
SystemInfo() (*types.Info, error)
VolumeCreate(name, driverName string, opts, labels map[string]string) (*types.Volume, error)
ListContainersForNode(nodeID string) []string
SetNetworkBootstrapKeys([]*networktypes.EncryptionKey) error
SetClusterProvider(provider cluster.Provider)
}

View file

@ -0,0 +1,229 @@
package container
import (
"encoding/base64"
"encoding/json"
"fmt"
"io"
"strings"
"syscall"
"github.com/Sirupsen/logrus"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/engine-api/types"
"github.com/docker/libnetwork"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)
// containerAdapter conducts remote operations for a container. All calls
// are mostly naked calls to the client API, seeded with information from
// containerConfig.
type containerAdapter struct {
backend executorpkg.Backend
container *containerConfig
}
func newContainerAdapter(b executorpkg.Backend, task *api.Task) (*containerAdapter, error) {
ctnr, err := newContainerConfig(task)
if err != nil {
return nil, err
}
return &containerAdapter{
container: ctnr,
backend: b,
}, nil
}
func (c *containerAdapter) pullImage(ctx context.Context) error {
// if the image needs to be pulled, the auth config will be retrieved and updated
encodedAuthConfig := c.container.task.ServiceAnnotations.Labels[fmt.Sprintf("%v.registryauth", systemLabelPrefix)]
authConfig := &types.AuthConfig{}
if encodedAuthConfig != "" {
if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuthConfig))).Decode(authConfig); err != nil {
logrus.Warnf("invalid authconfig: %v", err)
}
}
pr, pw := io.Pipe()
metaHeaders := map[string][]string{}
go func() {
err := c.backend.PullImage(ctx, c.container.image(), "", metaHeaders, authConfig, pw)
pw.CloseWithError(err)
}()
dec := json.NewDecoder(pr)
m := map[string]interface{}{}
for {
if err := dec.Decode(&m); err != nil {
if err == io.EOF {
break
}
return err
}
// TOOD(stevvooe): Report this status somewhere.
logrus.Debugln("pull progress", m)
}
// if the final stream object contained an error, return it
if errMsg, ok := m["error"]; ok {
return fmt.Errorf("%v", errMsg)
}
return nil
}
func (c *containerAdapter) createNetworks(ctx context.Context) error {
for _, network := range c.container.networks() {
ncr, err := c.container.networkCreateRequest(network)
if err != nil {
return err
}
if err := c.backend.CreateManagedNetwork(ncr); err != nil { // todo name missing
if _, ok := err.(libnetwork.NetworkNameError); ok {
continue
}
return err
}
}
return nil
}
func (c *containerAdapter) removeNetworks(ctx context.Context) error {
for _, nid := range c.container.networks() {
if err := c.backend.DeleteManagedNetwork(nid); err != nil {
if _, ok := err.(*libnetwork.ActiveEndpointsError); ok {
continue
}
log.G(ctx).Errorf("network %s remove failed: %v", nid, err)
return err
}
}
return nil
}
func (c *containerAdapter) create(ctx context.Context, backend executorpkg.Backend) error {
var cr types.ContainerCreateResponse
var err error
if cr, err = backend.CreateManagedContainer(types.ContainerCreateConfig{
Name: c.container.name(),
Config: c.container.config(),
HostConfig: c.container.hostConfig(),
// Use the first network in container create
NetworkingConfig: c.container.createNetworkingConfig(),
}); err != nil {
return err
}
// Docker daemon currently doesnt support multiple networks in container create
// Connect to all other networks
nc := c.container.connectNetworkingConfig()
if nc != nil {
for n, ep := range nc.EndpointsConfig {
logrus.Errorf("CONNECT %s : %v", n, ep.IPAMConfig.IPv4Address)
if err := backend.ConnectContainerToNetwork(cr.ID, n, ep); err != nil {
return err
}
}
}
if err := backend.UpdateContainerServiceConfig(cr.ID, c.container.serviceConfig()); err != nil {
return err
}
return nil
}
func (c *containerAdapter) start(ctx context.Context) error {
return c.backend.ContainerStart(c.container.name(), nil)
}
func (c *containerAdapter) inspect(ctx context.Context) (types.ContainerJSON, error) {
cs, err := c.backend.ContainerInspectCurrent(c.container.name(), false)
if ctx.Err() != nil {
return types.ContainerJSON{}, ctx.Err()
}
if err != nil {
return types.ContainerJSON{}, err
}
return *cs, nil
}
// events issues a call to the events API and returns a channel with all
// events. The stream of events can be shutdown by cancelling the context.
//
// A chan struct{} is returned that will be closed if the event procressing
// fails and needs to be restarted.
func (c *containerAdapter) wait(ctx context.Context) (<-chan int, error) {
return c.backend.ContainerWaitWithContext(ctx, c.container.name())
}
func (c *containerAdapter) shutdown(ctx context.Context) error {
// Default stop grace period to 10s.
stopgrace := 10
spec := c.container.spec()
if spec.StopGracePeriod != nil {
stopgrace = int(spec.StopGracePeriod.Seconds)
}
return c.backend.ContainerStop(c.container.name(), stopgrace)
}
func (c *containerAdapter) terminate(ctx context.Context) error {
return c.backend.ContainerKill(c.container.name(), uint64(syscall.SIGKILL))
}
func (c *containerAdapter) remove(ctx context.Context) error {
return c.backend.ContainerRm(c.container.name(), &types.ContainerRmConfig{
RemoveVolume: true,
ForceRemove: true,
})
}
func (c *containerAdapter) createVolumes(ctx context.Context, backend executorpkg.Backend) error {
// Create plugin volumes that are embedded inside a Mount
for _, mount := range c.container.task.Spec.GetContainer().Mounts {
if mount.Type != api.MountTypeVolume {
continue
}
if mount.VolumeOptions != nil {
continue
}
if mount.VolumeOptions.DriverConfig == nil {
continue
}
req := c.container.volumeCreateRequest(&mount)
// Check if this volume exists on the engine
if _, err := backend.VolumeCreate(req.Name, req.Driver, req.DriverOpts, req.Labels); err != nil {
// TODO(amitshukla): Today, volume create through the engine api does not return an error
// when the named volume with the same parameters already exists.
// It returns an error if the driver name is different - that is a valid error
return err
}
}
return nil
}
// todo: typed/wrapped errors
func isContainerCreateNameConflict(err error) bool {
return strings.Contains(err.Error(), "Conflict. The name")
}
func isUnknownContainer(err error) bool {
return strings.Contains(err.Error(), "No such container:")
}
func isStoppedContainer(err error) bool {
return strings.Contains(err.Error(), "is already stopped")
}

View file

@ -0,0 +1,415 @@
package container
import (
"errors"
"fmt"
"log"
"net"
"strings"
"time"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/docker/reference"
"github.com/docker/engine-api/types"
enginecontainer "github.com/docker/engine-api/types/container"
"github.com/docker/engine-api/types/network"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
)
const (
// Explictly use the kernel's default setting for CPU quota of 100ms.
// https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
cpuQuotaPeriod = 100 * time.Millisecond
// systemLabelPrefix represents the reserved namespace for system labels.
systemLabelPrefix = "com.docker.swarm"
)
// containerConfig converts task properties into docker container compatible
// components.
type containerConfig struct {
task *api.Task
networksAttachments map[string]*api.NetworkAttachment
}
// newContainerConfig returns a validated container config. No methods should
// return an error if this function returns without error.
func newContainerConfig(t *api.Task) (*containerConfig, error) {
var c containerConfig
return &c, c.setTask(t)
}
func (c *containerConfig) setTask(t *api.Task) error {
container := t.Spec.GetContainer()
if container == nil {
return exec.ErrRuntimeUnsupported
}
if container.Image == "" {
return ErrImageRequired
}
// index the networks by name
c.networksAttachments = make(map[string]*api.NetworkAttachment, len(t.Networks))
for _, attachment := range t.Networks {
c.networksAttachments[attachment.Network.Spec.Annotations.Name] = attachment
}
c.task = t
return nil
}
func (c *containerConfig) endpoint() *api.Endpoint {
return c.task.Endpoint
}
func (c *containerConfig) spec() *api.ContainerSpec {
return c.task.Spec.GetContainer()
}
func (c *containerConfig) name() string {
if c.task.Annotations.Name != "" {
// if set, use the container Annotations.Name field, set in the orchestrator.
return c.task.Annotations.Name
}
// fallback to service.slot.id.
return strings.Join([]string{c.task.ServiceAnnotations.Name, fmt.Sprint(c.task.Slot), c.task.ID}, ".")
}
func (c *containerConfig) image() string {
raw := c.spec().Image
ref, err := reference.ParseNamed(raw)
if err != nil {
return raw
}
return reference.WithDefaultTag(ref).String()
}
func (c *containerConfig) volumes() map[string]struct{} {
r := make(map[string]struct{})
for _, mount := range c.spec().Mounts {
// pick off all the volume mounts.
if mount.Type != api.MountTypeVolume {
continue
}
r[fmt.Sprintf("%s:%s", mount.Target, getMountMask(&mount))] = struct{}{}
}
return r
}
func (c *containerConfig) config() *enginecontainer.Config {
config := &enginecontainer.Config{
Labels: c.labels(),
User: c.spec().User,
Env: c.spec().Env,
WorkingDir: c.spec().Dir,
Image: c.image(),
Volumes: c.volumes(),
}
if len(c.spec().Command) > 0 {
// If Command is provided, we replace the whole invocation with Command
// by replacing Entrypoint and specifying Cmd. Args is ignored in this
// case.
config.Entrypoint = append(config.Entrypoint, c.spec().Command[0])
config.Cmd = append(config.Cmd, c.spec().Command[1:]...)
} else if len(c.spec().Args) > 0 {
// In this case, we assume the image has an Entrypoint and Args
// specifies the arguments for that entrypoint.
config.Cmd = c.spec().Args
}
return config
}
func (c *containerConfig) labels() map[string]string {
var (
system = map[string]string{
"task": "", // mark as cluster task
"task.id": c.task.ID,
"task.name": fmt.Sprintf("%v.%v", c.task.ServiceAnnotations.Name, c.task.Slot),
"node.id": c.task.NodeID,
"service.id": c.task.ServiceID,
"service.name": c.task.ServiceAnnotations.Name,
}
labels = make(map[string]string)
)
// base labels are those defined in the spec.
for k, v := range c.spec().Labels {
labels[k] = v
}
// we then apply the overrides from the task, which may be set via the
// orchestrator.
for k, v := range c.task.Annotations.Labels {
labels[k] = v
}
// finally, we apply the system labels, which override all labels.
for k, v := range system {
labels[strings.Join([]string{systemLabelPrefix, k}, ".")] = v
}
return labels
}
func (c *containerConfig) bindMounts() []string {
var r []string
for _, val := range c.spec().Mounts {
mask := getMountMask(&val)
if val.Type == api.MountTypeBind {
r = append(r, fmt.Sprintf("%s:%s:%s", val.Source, val.Target, mask))
}
}
return r
}
func getMountMask(m *api.Mount) string {
maskOpts := []string{"ro"}
if m.Writable {
maskOpts[0] = "rw"
}
if m.BindOptions != nil {
switch m.BindOptions.Propagation {
case api.MountPropagationPrivate:
maskOpts = append(maskOpts, "private")
case api.MountPropagationRPrivate:
maskOpts = append(maskOpts, "rprivate")
case api.MountPropagationShared:
maskOpts = append(maskOpts, "shared")
case api.MountPropagationRShared:
maskOpts = append(maskOpts, "rshared")
case api.MountPropagationSlave:
maskOpts = append(maskOpts, "slave")
case api.MountPropagationRSlave:
maskOpts = append(maskOpts, "rslave")
}
}
if m.VolumeOptions != nil {
if !m.VolumeOptions.Populate {
maskOpts = append(maskOpts, "nocopy")
}
}
return strings.Join(maskOpts, ",")
}
func (c *containerConfig) hostConfig() *enginecontainer.HostConfig {
return &enginecontainer.HostConfig{
Resources: c.resources(),
Binds: c.bindMounts(),
}
}
// This handles the case of volumes that are defined inside a service Mount
func (c *containerConfig) volumeCreateRequest(mount *api.Mount) *types.VolumeCreateRequest {
var (
driverName string
driverOpts map[string]string
labels map[string]string
)
if mount.VolumeOptions != nil && mount.VolumeOptions.DriverConfig != nil {
driverName = mount.VolumeOptions.DriverConfig.Name
driverOpts = mount.VolumeOptions.DriverConfig.Options
labels = mount.VolumeOptions.Labels
}
if mount.VolumeOptions != nil {
return &types.VolumeCreateRequest{
Name: mount.Source,
Driver: driverName,
DriverOpts: driverOpts,
Labels: labels,
}
}
return nil
}
func (c *containerConfig) resources() enginecontainer.Resources {
resources := enginecontainer.Resources{}
// If no limits are specified let the engine use its defaults.
//
// TODO(aluzzardi): We might want to set some limits anyway otherwise
// "unlimited" tasks will step over the reservation of other tasks.
r := c.task.Spec.Resources
if r == nil || r.Limits == nil {
return resources
}
if r.Limits.MemoryBytes > 0 {
resources.Memory = r.Limits.MemoryBytes
}
if r.Limits.NanoCPUs > 0 {
// CPU Period must be set in microseconds.
resources.CPUPeriod = int64(cpuQuotaPeriod / time.Microsecond)
resources.CPUQuota = r.Limits.NanoCPUs * resources.CPUPeriod / 1e9
}
return resources
}
// Docker daemon supports just 1 network during container create.
func (c *containerConfig) createNetworkingConfig() *network.NetworkingConfig {
var networks []*api.NetworkAttachment
if c.task.Spec.GetContainer() != nil {
networks = c.task.Networks
}
epConfig := make(map[string]*network.EndpointSettings)
if len(networks) > 0 {
epConfig[networks[0].Network.Spec.Annotations.Name] = getEndpointConfig(networks[0])
}
return &network.NetworkingConfig{EndpointsConfig: epConfig}
}
// TODO: Merge this function with createNetworkingConfig after daemon supports multiple networks in container create
func (c *containerConfig) connectNetworkingConfig() *network.NetworkingConfig {
var networks []*api.NetworkAttachment
if c.task.Spec.GetContainer() != nil {
networks = c.task.Networks
}
// First network is used during container create. Other networks are used in "docker network connect"
if len(networks) < 2 {
return nil
}
epConfig := make(map[string]*network.EndpointSettings)
for _, na := range networks[1:] {
epConfig[na.Network.Spec.Annotations.Name] = getEndpointConfig(na)
}
return &network.NetworkingConfig{EndpointsConfig: epConfig}
}
func getEndpointConfig(na *api.NetworkAttachment) *network.EndpointSettings {
var ipv4, ipv6 string
for _, addr := range na.Addresses {
ip, _, err := net.ParseCIDR(addr)
if err != nil {
continue
}
if ip.To4() != nil {
ipv4 = ip.String()
continue
}
if ip.To16() != nil {
ipv6 = ip.String()
}
}
return &network.EndpointSettings{
IPAMConfig: &network.EndpointIPAMConfig{
IPv4Address: ipv4,
IPv6Address: ipv6,
},
}
}
func (c *containerConfig) virtualIP(networkID string) string {
if c.task.Endpoint == nil {
return ""
}
for _, eVip := range c.task.Endpoint.VirtualIPs {
// We only support IPv4 VIPs for now.
if eVip.NetworkID == networkID {
vip, _, err := net.ParseCIDR(eVip.Addr)
if err != nil {
return ""
}
return vip.String()
}
}
return ""
}
func (c *containerConfig) serviceConfig() *clustertypes.ServiceConfig {
if len(c.task.Networks) == 0 {
return nil
}
log.Printf("Creating service config in agent for t = %+v", c.task)
svcCfg := &clustertypes.ServiceConfig{
Name: c.task.ServiceAnnotations.Name,
ID: c.task.ServiceID,
VirtualAddresses: make(map[string]*clustertypes.VirtualAddress),
}
for _, na := range c.task.Networks {
svcCfg.VirtualAddresses[na.Network.ID] = &clustertypes.VirtualAddress{
// We support only IPv4 virtual IP for now.
IPv4: c.virtualIP(na.Network.ID),
}
}
if c.task.Endpoint != nil {
for _, ePort := range c.task.Endpoint.Ports {
svcCfg.ExposedPorts = append(svcCfg.ExposedPorts, &clustertypes.PortConfig{
Name: ePort.Name,
Protocol: int32(ePort.Protocol),
TargetPort: ePort.TargetPort,
PublishedPort: ePort.PublishedPort,
})
}
}
return svcCfg
}
// networks returns a list of network names attached to the container. The
// returned name can be used to lookup the corresponding network create
// options.
func (c *containerConfig) networks() []string {
var networks []string
for name := range c.networksAttachments {
networks = append(networks, name)
}
return networks
}
func (c *containerConfig) networkCreateRequest(name string) (clustertypes.NetworkCreateRequest, error) {
na, ok := c.networksAttachments[name]
if !ok {
return clustertypes.NetworkCreateRequest{}, errors.New("container: unknown network referenced")
}
options := types.NetworkCreate{
// ID: na.Network.ID,
Driver: na.Network.DriverState.Name,
IPAM: network.IPAM{
Driver: na.Network.IPAM.Driver.Name,
},
Options: na.Network.DriverState.Options,
CheckDuplicate: true,
}
for _, ic := range na.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
return clustertypes.NetworkCreateRequest{na.Network.ID, types.NetworkCreateRequest{Name: name, NetworkCreate: options}}, nil
}

View file

@ -0,0 +1,305 @@
package container
import (
"errors"
"fmt"
"strings"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
"github.com/docker/engine-api/types"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"github.com/docker/swarmkit/log"
"golang.org/x/net/context"
)
// controller implements agent.Controller against docker's API.
//
// Most operations against docker's API are done through the container name,
// which is unique to the task.
type controller struct {
backend executorpkg.Backend
task *api.Task
adapter *containerAdapter
closed chan struct{}
err error
}
var _ exec.Controller = &controller{}
// NewController returns a dockerexec runner for the provided task.
func newController(b executorpkg.Backend, task *api.Task) (*controller, error) {
adapter, err := newContainerAdapter(b, task)
if err != nil {
return nil, err
}
return &controller{
backend: b,
task: task,
adapter: adapter,
closed: make(chan struct{}),
}, nil
}
func (r *controller) Task() (*api.Task, error) {
return r.task, nil
}
// ContainerStatus returns the container-specific status for the task.
func (r *controller) ContainerStatus(ctx context.Context) (*api.ContainerStatus, error) {
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
if isUnknownContainer(err) {
return nil, nil
}
return nil, err
}
return parseContainerStatus(ctnr)
}
// Update tasks a recent task update and applies it to the container.
func (r *controller) Update(ctx context.Context, t *api.Task) error {
log.G(ctx).Warnf("task updates not yet supported")
// TODO(stevvooe): While assignment of tasks is idempotent, we do allow
// updates of metadata, such as labelling, as well as any other properties
// that make sense.
return nil
}
// Prepare creates a container and ensures the image is pulled.
//
// If the container has already be created, exec.ErrTaskPrepared is returned.
func (r *controller) Prepare(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
// Make sure all the networks that the task needs are created.
if err := r.adapter.createNetworks(ctx); err != nil {
return err
}
// Make sure all the volumes that the task needs are created.
if err := r.adapter.createVolumes(ctx, r.backend); err != nil {
return err
}
for {
if err := r.checkClosed(); err != nil {
return err
}
if err := r.adapter.create(ctx, r.backend); err != nil {
if isContainerCreateNameConflict(err) {
if _, err := r.adapter.inspect(ctx); err != nil {
return err
}
// container is already created. success!
return exec.ErrTaskPrepared
}
if !strings.Contains(err.Error(), "No such image") { // todo: better error detection
return err
}
if err := r.adapter.pullImage(ctx); err != nil {
return err
}
continue // retry to create the container
}
break
}
return nil
}
// Start the container. An error will be returned if the container is already started.
func (r *controller) Start(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
return err
}
// Detect whether the container has *ever* been started. If so, we don't
// issue the start.
//
// TODO(stevvooe): This is very racy. While reading inspect, another could
// start the process and we could end up starting it twice.
if ctnr.State.Status != "created" {
return exec.ErrTaskStarted
}
if err := r.adapter.start(ctx); err != nil {
return err
}
return nil
}
// Wait on the container to exit.
func (r *controller) Wait(pctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
ctx, cancel := context.WithCancel(pctx)
defer cancel()
c, err := r.adapter.wait(ctx)
if err != nil {
return err
}
<-c
if ctx.Err() != nil {
return ctx.Err()
}
ctnr, err := r.adapter.inspect(ctx)
if err != nil {
// TODO(stevvooe): Need to handle missing container here. It is likely
// that a Wait call with a not found error should result in no waiting
// and no error at all.
return err
}
if ctnr.State.ExitCode != 0 {
var cause error
if ctnr.State.Error != "" {
cause = errors.New(ctnr.State.Error)
}
cstatus, _ := parseContainerStatus(ctnr)
return &exitError{
code: ctnr.State.ExitCode,
cause: cause,
containerStatus: cstatus,
}
}
return nil
}
// Shutdown the container cleanly.
func (r *controller) Shutdown(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
if err := r.adapter.shutdown(ctx); err != nil {
if isUnknownContainer(err) || isStoppedContainer(err) {
return nil
}
return err
}
return nil
}
// Terminate the container, with force.
func (r *controller) Terminate(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
if err := r.adapter.terminate(ctx); err != nil {
if isUnknownContainer(err) {
return nil
}
return err
}
return nil
}
// Remove the container and its resources.
func (r *controller) Remove(ctx context.Context) error {
if err := r.checkClosed(); err != nil {
return err
}
// It may be necessary to shut down the task before removing it.
if err := r.Shutdown(ctx); err != nil {
if isUnknownContainer(err) {
return nil
}
// This may fail if the task was already shut down.
log.G(ctx).WithError(err).Debug("shutdown failed on removal")
}
// Try removing networks referenced in this task in case this
// task is the last one referencing it
if err := r.adapter.removeNetworks(ctx); err != nil {
if isUnknownContainer(err) {
return nil
}
return err
}
if err := r.adapter.remove(ctx); err != nil {
if isUnknownContainer(err) {
return nil
}
return err
}
return nil
}
// Close the runner and clean up any ephemeral resources.
func (r *controller) Close() error {
select {
case <-r.closed:
return r.err
default:
r.err = exec.ErrControllerClosed
close(r.closed)
}
return nil
}
func (r *controller) checkClosed() error {
select {
case <-r.closed:
return r.err
default:
return nil
}
}
func parseContainerStatus(ctnr types.ContainerJSON) (*api.ContainerStatus, error) {
status := &api.ContainerStatus{
ContainerID: ctnr.ID,
PID: int32(ctnr.State.Pid),
ExitCode: int32(ctnr.State.ExitCode),
}
return status, nil
}
type exitError struct {
code int
cause error
containerStatus *api.ContainerStatus
}
func (e *exitError) Error() string {
if e.cause != nil {
return fmt.Sprintf("task: non-zero exit (%v): %v", e.code, e.cause)
}
return fmt.Sprintf("task: non-zero exit (%v)", e.code)
}
func (e *exitError) ExitCode() int {
return int(e.containerStatus.ExitCode)
}
func (e *exitError) Cause() error {
return e.cause
}

View file

@ -0,0 +1,12 @@
package container
import "fmt"
var (
// ErrImageRequired returned if a task is missing the image definition.
ErrImageRequired = fmt.Errorf("dockerexec: image required")
// ErrContainerDestroyed returned when a container is prematurely destroyed
// during a wait call.
ErrContainerDestroyed = fmt.Errorf("dockerexec: container destroyed")
)

View file

@ -0,0 +1,139 @@
package container
import (
"strings"
executorpkg "github.com/docker/docker/daemon/cluster/executor"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/network"
networktypes "github.com/docker/libnetwork/types"
"github.com/docker/swarmkit/agent/exec"
"github.com/docker/swarmkit/api"
"golang.org/x/net/context"
)
type executor struct {
backend executorpkg.Backend
}
// NewExecutor returns an executor from the docker client.
func NewExecutor(b executorpkg.Backend) exec.Executor {
return &executor{
backend: b,
}
}
// Describe returns the underlying node description from the docker client.
func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) {
info, err := e.backend.SystemInfo()
if err != nil {
return nil, err
}
var plugins []api.PluginDescription
addPlugins := func(typ string, names []string) {
for _, name := range names {
plugins = append(plugins, api.PluginDescription{
Type: typ,
Name: name,
})
}
}
addPlugins("Volume", info.Plugins.Volume)
// Add builtin driver "overlay" (the only builtin multi-host driver) to
// the plugin list by default.
addPlugins("Network", append([]string{"overlay"}, info.Plugins.Network...))
addPlugins("Authorization", info.Plugins.Authorization)
// parse []string labels into a map[string]string
labels := map[string]string{}
for _, l := range info.Labels {
stringSlice := strings.SplitN(l, "=", 2)
// this will take the last value in the list for a given key
// ideally, one shouldn't assign multiple values to the same key
if len(stringSlice) > 1 {
labels[stringSlice[0]] = stringSlice[1]
}
}
description := &api.NodeDescription{
Hostname: info.Name,
Platform: &api.Platform{
Architecture: info.Architecture,
OS: info.OSType,
},
Engine: &api.EngineDescription{
EngineVersion: info.ServerVersion,
Labels: labels,
Plugins: plugins,
},
Resources: &api.Resources{
NanoCPUs: int64(info.NCPU) * 1e9,
MemoryBytes: info.MemTotal,
},
}
return description, nil
}
func (e *executor) Configure(ctx context.Context, node *api.Node) error {
na := node.Attachment
if na == nil {
return nil
}
options := types.NetworkCreate{
Driver: na.Network.DriverState.Name,
IPAM: network.IPAM{
Driver: na.Network.IPAM.Driver.Name,
},
Options: na.Network.DriverState.Options,
CheckDuplicate: true,
}
for _, ic := range na.Network.IPAM.Configs {
c := network.IPAMConfig{
Subnet: ic.Subnet,
IPRange: ic.Range,
Gateway: ic.Gateway,
}
options.IPAM.Config = append(options.IPAM.Config, c)
}
return e.backend.SetupIngress(clustertypes.NetworkCreateRequest{
na.Network.ID,
types.NetworkCreateRequest{
Name: na.Network.Spec.Annotations.Name,
NetworkCreate: options,
},
}, na.Addresses[0])
}
// Controller returns a docker container runner.
func (e *executor) Controller(t *api.Task) (exec.Controller, error) {
ctlr, err := newController(e.backend, t)
if err != nil {
return nil, err
}
return ctlr, nil
}
func (e *executor) SetNetworkBootstrapKeys(keys []*api.EncryptionKey) error {
nwKeys := []*networktypes.EncryptionKey{}
for _, key := range keys {
nwKey := &networktypes.EncryptionKey{
Subsystem: key.Subsystem,
Algorithm: int32(key.Algorithm),
Key: make([]byte, len(key.Key)),
LamportTime: key.LamportTime,
}
copy(nwKey.Key, key.Key)
nwKeys = append(nwKeys, nwKey)
}
e.backend.SetNetworkBootstrapKeys(nwKeys)
return nil
}

93
daemon/cluster/filters.go Normal file
View file

@ -0,0 +1,93 @@
package cluster
import (
"fmt"
"strings"
runconfigopts "github.com/docker/docker/runconfig/opts"
"github.com/docker/engine-api/types/filters"
swarmapi "github.com/docker/swarmkit/api"
)
func newListNodesFilters(filter filters.Args) (*swarmapi.ListNodesRequest_Filters, error) {
accepted := map[string]bool{
"name": true,
"id": true,
"label": true,
"role": true,
"membership": true,
}
if err := filter.Validate(accepted); err != nil {
return nil, err
}
f := &swarmapi.ListNodesRequest_Filters{
Names: filter.Get("name"),
IDPrefixes: filter.Get("id"),
Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")),
}
for _, r := range filter.Get("role") {
if role, ok := swarmapi.NodeRole_value[strings.ToUpper(r)]; ok {
f.Roles = append(f.Roles, swarmapi.NodeRole(role))
} else if r != "" {
return nil, fmt.Errorf("Invalid role filter: '%s'", r)
}
}
for _, a := range filter.Get("membership") {
if membership, ok := swarmapi.NodeSpec_Membership_value[strings.ToUpper(a)]; ok {
f.Memberships = append(f.Memberships, swarmapi.NodeSpec_Membership(membership))
} else if a != "" {
return nil, fmt.Errorf("Invalid membership filter: '%s'", a)
}
}
return f, nil
}
func newListServicesFilters(filter filters.Args) (*swarmapi.ListServicesRequest_Filters, error) {
accepted := map[string]bool{
"name": true,
"id": true,
"label": true,
}
if err := filter.Validate(accepted); err != nil {
return nil, err
}
return &swarmapi.ListServicesRequest_Filters{
Names: filter.Get("name"),
IDPrefixes: filter.Get("id"),
Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")),
}, nil
}
func newListTasksFilters(filter filters.Args) (*swarmapi.ListTasksRequest_Filters, error) {
accepted := map[string]bool{
"name": true,
"id": true,
"label": true,
"service": true,
"node": true,
"desired_state": true,
}
if err := filter.Validate(accepted); err != nil {
return nil, err
}
f := &swarmapi.ListTasksRequest_Filters{
Names: filter.Get("name"),
IDPrefixes: filter.Get("id"),
Labels: runconfigopts.ConvertKVStringsToMap(filter.Get("label")),
ServiceIDs: filter.Get("service"),
NodeIDs: filter.Get("node"),
}
for _, s := range filter.Get("desired_state") {
if state, ok := swarmapi.TaskState_value[strings.ToUpper(s)]; ok {
f.DesiredStates = append(f.DesiredStates, swarmapi.TaskState(state))
} else if s != "" {
return nil, fmt.Errorf("Invalid desired_state filter: '%s'", s)
}
}
return f, nil
}

108
daemon/cluster/helpers.go Normal file
View file

@ -0,0 +1,108 @@
package cluster
import (
"fmt"
swarmapi "github.com/docker/swarmkit/api"
"golang.org/x/net/context"
)
func getSwarm(ctx context.Context, c swarmapi.ControlClient) (*swarmapi.Cluster, error) {
rl, err := c.ListClusters(ctx, &swarmapi.ListClustersRequest{})
if err != nil {
return nil, err
}
if len(rl.Clusters) == 0 {
return nil, fmt.Errorf("swarm not found")
}
// TODO: assume one cluster only
return rl.Clusters[0], nil
}
func getNode(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Node, error) {
// GetNode to match via full ID.
rg, err := c.GetNode(ctx, &swarmapi.GetNodeRequest{NodeID: input})
if err != nil {
// If any error (including NotFound), ListNodes to match via full name.
rl, err := c.ListNodes(ctx, &swarmapi.ListNodesRequest{Filters: &swarmapi.ListNodesRequest_Filters{Names: []string{input}}})
if err != nil || len(rl.Nodes) == 0 {
// If any error or 0 result, ListNodes to match via ID prefix.
rl, err = c.ListNodes(ctx, &swarmapi.ListNodesRequest{Filters: &swarmapi.ListNodesRequest_Filters{IDPrefixes: []string{input}}})
}
if err != nil {
return nil, err
}
if len(rl.Nodes) == 0 {
return nil, fmt.Errorf("node %s not found", input)
}
if l := len(rl.Nodes); l > 1 {
return nil, fmt.Errorf("node %s is ambigious (%d matches found)", input, l)
}
return rl.Nodes[0], nil
}
return rg.Node, nil
}
func getService(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Service, error) {
// GetService to match via full ID.
rg, err := c.GetService(ctx, &swarmapi.GetServiceRequest{ServiceID: input})
if err != nil {
// If any error (including NotFound), ListServices to match via full name.
rl, err := c.ListServices(ctx, &swarmapi.ListServicesRequest{Filters: &swarmapi.ListServicesRequest_Filters{Names: []string{input}}})
if err != nil || len(rl.Services) == 0 {
// If any error or 0 result, ListServices to match via ID prefix.
rl, err = c.ListServices(ctx, &swarmapi.ListServicesRequest{Filters: &swarmapi.ListServicesRequest_Filters{IDPrefixes: []string{input}}})
}
if err != nil {
return nil, err
}
if len(rl.Services) == 0 {
return nil, fmt.Errorf("service %s not found", input)
}
if l := len(rl.Services); l > 1 {
return nil, fmt.Errorf("service %s is ambigious (%d matches found)", input, l)
}
return rl.Services[0], nil
}
return rg.Service, nil
}
func getTask(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Task, error) {
// GetTask to match via full ID.
rg, err := c.GetTask(ctx, &swarmapi.GetTaskRequest{TaskID: input})
if err != nil {
// If any error (including NotFound), ListTasks to match via full name.
rl, err := c.ListTasks(ctx, &swarmapi.ListTasksRequest{Filters: &swarmapi.ListTasksRequest_Filters{Names: []string{input}}})
if err != nil || len(rl.Tasks) == 0 {
// If any error or 0 result, ListTasks to match via ID prefix.
rl, err = c.ListTasks(ctx, &swarmapi.ListTasksRequest{Filters: &swarmapi.ListTasksRequest_Filters{IDPrefixes: []string{input}}})
}
if err != nil {
return nil, err
}
if len(rl.Tasks) == 0 {
return nil, fmt.Errorf("task %s not found", input)
}
if l := len(rl.Tasks); l > 1 {
return nil, fmt.Errorf("task %s is ambigious (%d matches found)", input, l)
}
return rl.Tasks[0], nil
}
return rg.Task, nil
}

View file

@ -0,0 +1,36 @@
package provider
import "github.com/docker/engine-api/types"
// NetworkCreateRequest is a request when creating a network.
type NetworkCreateRequest struct {
ID string
types.NetworkCreateRequest
}
// NetworkCreateResponse is a response when creating a network.
type NetworkCreateResponse struct {
ID string `json:"Id"`
}
// VirtualAddress represents a virtual adress.
type VirtualAddress struct {
IPv4 string
IPv6 string
}
// PortConfig represents a port configuration.
type PortConfig struct {
Name string
Protocol int32
TargetPort uint32
PublishedPort uint32
}
// ServiceConfig represents a service configuration.
type ServiceConfig struct {
ID string
Name string
VirtualAddresses map[string]*VirtualAddress
ExposedPorts []*PortConfig
}

View file

@ -101,7 +101,7 @@ func (daemon *Daemon) Register(c *container.Container) error {
return nil
}
func (daemon *Daemon) newContainer(name string, config *containertypes.Config, imgID image.ID) (*container.Container, error) {
func (daemon *Daemon) newContainer(name string, config *containertypes.Config, imgID image.ID, managed bool) (*container.Container, error) {
var (
id string
err error
@ -117,6 +117,7 @@ func (daemon *Daemon) newContainer(name string, config *containertypes.Config, i
base := daemon.newBaseContainer(id)
base.Created = time.Now().UTC()
base.Managed = managed
base.Path = entrypoint
base.Args = args //FIXME: de-duplicate from config
base.Config = config

View file

@ -324,6 +324,10 @@ func (daemon *Daemon) updateNetwork(container *container.Container) error {
return nil
}
func errClusterNetworkOnRun(n string) error {
return fmt.Errorf("swarm-scoped network (%s) is not compatible with `docker create` or `docker run`. This network can be only used docker service", n)
}
// updateContainerNetworkSettings update the network settings
func (daemon *Daemon) updateContainerNetworkSettings(container *container.Container, endpointsConfig map[string]*networktypes.EndpointSettings) error {
var (
@ -345,6 +349,9 @@ func (daemon *Daemon) updateContainerNetworkSettings(container *container.Contai
if err != nil {
return err
}
if !container.Managed && n.Info().Dynamic() {
return errClusterNetworkOnRun(networkName)
}
networkName = n.Name()
}
if container.NetworkSettings == nil {

View file

@ -19,8 +19,17 @@ import (
"github.com/opencontainers/runc/libcontainer/label"
)
// ContainerCreate creates a container.
// CreateManagedContainer creates a container that is managed by a Service
func (daemon *Daemon) CreateManagedContainer(params types.ContainerCreateConfig) (types.ContainerCreateResponse, error) {
return daemon.containerCreate(params, true)
}
// ContainerCreate creates a regular container
func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types.ContainerCreateResponse, error) {
return daemon.containerCreate(params, false)
}
func (daemon *Daemon) containerCreate(params types.ContainerCreateConfig, managed bool) (types.ContainerCreateResponse, error) {
if params.Config == nil {
return types.ContainerCreateResponse{}, fmt.Errorf("Config cannot be empty in order to create a container")
}
@ -43,7 +52,7 @@ func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types
return types.ContainerCreateResponse{Warnings: warnings}, err
}
container, err := daemon.create(params)
container, err := daemon.create(params, managed)
if err != nil {
return types.ContainerCreateResponse{Warnings: warnings}, daemon.imageNotExistToErrcode(err)
}
@ -52,7 +61,7 @@ func (daemon *Daemon) ContainerCreate(params types.ContainerCreateConfig) (types
}
// Create creates a new container from the given configuration with a given name.
func (daemon *Daemon) create(params types.ContainerCreateConfig) (retC *container.Container, retErr error) {
func (daemon *Daemon) create(params types.ContainerCreateConfig, managed bool) (retC *container.Container, retErr error) {
var (
container *container.Container
img *image.Image
@ -76,7 +85,7 @@ func (daemon *Daemon) create(params types.ContainerCreateConfig) (retC *containe
return nil, err
}
if container, err = daemon.newContainer(params.Name, params.Config, imgID); err != nil {
if container, err = daemon.newContainer(params.Name, params.Config, imgID, managed); err != nil {
return nil, err
}
defer func() {

View file

@ -28,6 +28,7 @@ import (
"github.com/docker/docker/daemon/exec"
"github.com/docker/engine-api/types"
containertypes "github.com/docker/engine-api/types/container"
"github.com/docker/libnetwork/cluster"
// register graph drivers
_ "github.com/docker/docker/daemon/graphdriver/register"
dmetadata "github.com/docker/docker/distribution/metadata"
@ -94,6 +95,7 @@ type Daemon struct {
containerd libcontainerd.Client
containerdRemote libcontainerd.Remote
defaultIsolation containertypes.Isolation // Default isolation mode on Windows
clusterProvider cluster.Provider
}
func (daemon *Daemon) restore() error {
@ -344,6 +346,12 @@ func (daemon *Daemon) registerLink(parent, child *container.Container, alias str
return nil
}
// SetClusterProvider sets a component for quering the current cluster state.
func (daemon *Daemon) SetClusterProvider(clusterProvider cluster.Provider) {
daemon.clusterProvider = clusterProvider
daemon.netController.SetClusterProvider(clusterProvider)
}
// NewDaemon sets up everything for the daemon to be able to service
// requests from the webserver.
func NewDaemon(config *Config, registryService registry.Service, containerdRemote libcontainerd.Remote) (daemon *Daemon, err error) {
@ -893,6 +901,10 @@ func (daemon *Daemon) reloadClusterDiscovery(config *Config) error {
return nil
}
if daemon.clusterProvider != nil {
return fmt.Errorf("--cluster-store and --cluster-advertise daemon configurations are incompatible with swarm mode")
}
// enable discovery for the first time if it was not previously enabled
if daemon.discoveryWatcher == nil {
discoveryWatcher, err := initDiscovery(newClusterStore, newAdvertise, config.ClusterOpts)

View file

@ -23,10 +23,12 @@ func (daemon *Daemon) ContainerInspect(name string, size bool, version string) (
case versions.Equal(version, "1.20"):
return daemon.containerInspect120(name)
}
return daemon.containerInspectCurrent(name, size)
return daemon.ContainerInspectCurrent(name, size)
}
func (daemon *Daemon) containerInspectCurrent(name string, size bool) (*types.ContainerJSON, error) {
// ContainerInspectCurrent returns low-level information about a
// container in a most recent api version.
func (daemon *Daemon) ContainerInspectCurrent(name string, size bool) (*types.ContainerJSON, error) {
container, err := daemon.GetContainer(name)
if err != nil {
return nil, err

View file

@ -28,7 +28,7 @@ func addMountPoints(container *container.Container) []types.MountPoint {
// containerInspectPre120 get containers for pre 1.20 APIs.
func (daemon *Daemon) containerInspectPre120(name string) (*types.ContainerJSON, error) {
return daemon.containerInspectCurrent(name, false)
return daemon.ContainerInspectCurrent(name, false)
}
func inspectExecProcessConfig(e *exec.Config) *backend.ExecProcessConfig {

View file

@ -91,6 +91,17 @@ func (daemon *Daemon) Containers(config *types.ContainerListOptions) ([]*types.C
return daemon.reduceContainers(config, daemon.transformContainer)
}
// ListContainersForNode returns all containerID that match the specified nodeID
func (daemon *Daemon) ListContainersForNode(nodeID string) []string {
var ids []string
for _, c := range daemon.List() {
if c.Config.Labels["com.docker.swarm.node.id"] == nodeID {
ids = append(ids, c.ID)
}
}
return ids
}
func (daemon *Daemon) filterByNameIDMatches(ctx *listContext) []*container.Container {
idSearch := false
names := ctx.filters.Get("name")

View file

@ -5,13 +5,14 @@ import (
"net"
"strings"
netsettings "github.com/docker/docker/daemon/network"
"github.com/Sirupsen/logrus"
clustertypes "github.com/docker/docker/daemon/cluster/provider"
"github.com/docker/docker/errors"
"github.com/docker/docker/runconfig"
"github.com/docker/engine-api/types"
"github.com/docker/engine-api/types/filters"
"github.com/docker/engine-api/types/network"
"github.com/docker/libnetwork"
networktypes "github.com/docker/libnetwork/types"
)
// NetworkControllerEnabled checks if the networking stack is enabled.
@ -92,9 +93,106 @@ func (daemon *Daemon) getAllNetworks() []libnetwork.Network {
return list
}
func isIngressNetwork(name string) bool {
return name == "ingress"
}
var ingressChan = make(chan struct{}, 1)
func ingressWait() func() {
ingressChan <- struct{}{}
return func() { <-ingressChan }
}
// SetupIngress setups ingress networking.
func (daemon *Daemon) SetupIngress(create clustertypes.NetworkCreateRequest, nodeIP string) error {
ip, _, err := net.ParseCIDR(nodeIP)
if err != nil {
return err
}
go func() {
controller := daemon.netController
controller.AgentInitWait()
if n, err := daemon.GetNetworkByName(create.Name); err == nil && n != nil && n.ID() != create.ID {
if err := controller.SandboxDestroy("ingress-sbox"); err != nil {
logrus.Errorf("Failed to delete stale ingress sandbox: %v", err)
return
}
if err := n.Delete(); err != nil {
logrus.Errorf("Failed to delete stale ingress network %s: %v", n.ID(), err)
return
}
}
if _, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true); err != nil {
// If it is any other error other than already
// exists error log error and return.
if _, ok := err.(libnetwork.NetworkNameError); !ok {
logrus.Errorf("Failed creating ingress network: %v", err)
return
}
// Otherwise continue down the call to create or recreate sandbox.
}
n, err := daemon.GetNetworkByID(create.ID)
if err != nil {
logrus.Errorf("Failed getting ingress network by id after creating: %v", err)
return
}
sb, err := controller.NewSandbox("ingress-sbox", libnetwork.OptionIngress())
if err != nil {
logrus.Errorf("Failed creating ingress sanbox: %v", err)
return
}
ep, err := n.CreateEndpoint("ingress-endpoint", libnetwork.CreateOptionIpam(ip, nil, nil, nil))
if err != nil {
logrus.Errorf("Failed creating ingress endpoint: %v", err)
return
}
if err := ep.Join(sb, nil); err != nil {
logrus.Errorf("Failed joining ingress sandbox to ingress endpoint: %v", err)
}
}()
return nil
}
// SetNetworkBootstrapKeys sets the bootstrap keys.
func (daemon *Daemon) SetNetworkBootstrapKeys(keys []*networktypes.EncryptionKey) error {
return daemon.netController.SetKeys(keys)
}
// CreateManagedNetwork creates an agent network.
func (daemon *Daemon) CreateManagedNetwork(create clustertypes.NetworkCreateRequest) error {
_, err := daemon.createNetwork(create.NetworkCreateRequest, create.ID, true)
return err
}
// CreateNetwork creates a network with the given name, driver and other optional parameters
func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.NetworkCreateResponse, error) {
if runconfig.IsPreDefinedNetwork(create.Name) {
resp, err := daemon.createNetwork(create, "", false)
if err != nil {
return nil, err
}
return resp, err
}
func (daemon *Daemon) createNetwork(create types.NetworkCreateRequest, id string, agent bool) (*types.NetworkCreateResponse, error) {
// If there is a pending ingress network creation wait here
// since ingress network creation can happen via node download
// from manager or task download.
if isIngressNetwork(create.Name) {
defer ingressWait()()
}
if runconfig.IsPreDefinedNetwork(create.Name) && !agent {
err := fmt.Errorf("%s is a pre-defined network and cannot be created", create.Name)
return nil, errors.NewRequestForbiddenError(err)
}
@ -134,7 +232,16 @@ func (daemon *Daemon) CreateNetwork(create types.NetworkCreateRequest) (*types.N
if create.Internal {
nwOptions = append(nwOptions, libnetwork.NetworkOptionInternalNetwork())
}
n, err := c.NewNetwork(driver, create.Name, "", nwOptions...)
if agent {
nwOptions = append(nwOptions, libnetwork.NetworkOptionDynamic())
nwOptions = append(nwOptions, libnetwork.NetworkOptionPersist(false))
}
if isIngressNetwork(create.Name) {
nwOptions = append(nwOptions, libnetwork.NetworkOptionIngress())
}
n, err := c.NewNetwork(driver, create.Name, id, nwOptions...)
if err != nil {
return nil, err
}
@ -168,6 +275,17 @@ func getIpamConfig(data []network.IPAMConfig) ([]*libnetwork.IpamConf, []*libnet
return ipamV4Cfg, ipamV6Cfg, nil
}
// UpdateContainerServiceConfig updates a service configuration.
func (daemon *Daemon) UpdateContainerServiceConfig(containerName string, serviceConfig *clustertypes.ServiceConfig) error {
container, err := daemon.GetContainer(containerName)
if err != nil {
return err
}
container.NetworkSettings.Service = serviceConfig
return nil
}
// ConnectContainerToNetwork connects the given container to the given
// network. If either cannot be found, an err is returned. If the
// network cannot be set up, an err is returned.
@ -207,18 +325,29 @@ func (daemon *Daemon) GetNetworkDriverList() map[string]bool {
driver := network.Type()
pluginList[driver] = true
}
// TODO : Replace this with proper libnetwork API
pluginList["overlay"] = true
return pluginList
}
// DeleteManagedNetwork deletes an agent network.
func (daemon *Daemon) DeleteManagedNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, true)
}
// DeleteNetwork destroys a network unless it's one of docker's predefined networks.
func (daemon *Daemon) DeleteNetwork(networkID string) error {
return daemon.deleteNetwork(networkID, false)
}
func (daemon *Daemon) deleteNetwork(networkID string, dynamic bool) error {
nw, err := daemon.FindNetwork(networkID)
if err != nil {
return err
}
if runconfig.IsPreDefinedNetwork(nw.Name()) {
if runconfig.IsPreDefinedNetwork(nw.Name()) && !dynamic {
err := fmt.Errorf("%s is a pre-defined network and cannot be removed", nw.Name())
return errors.NewRequestForbiddenError(err)
}
@ -230,14 +359,7 @@ func (daemon *Daemon) DeleteNetwork(networkID string) error {
return nil
}
// FilterNetworks returns a list of networks filtered by the given arguments.
// It returns an error if the filters are not included in the list of accepted filters.
func (daemon *Daemon) FilterNetworks(netFilters filters.Args) ([]libnetwork.Network, error) {
if netFilters.Len() != 0 {
if err := netFilters.Validate(netsettings.AcceptedFilters); err != nil {
return nil, err
}
}
nwList := daemon.getAllNetworks()
return netsettings.FilterNetworks(nwList, netFilters)
// GetNetworks returns a list of all networks
func (daemon *Daemon) GetNetworks() []libnetwork.Network {
return daemon.getAllNetworks()
}

View file

@ -1,6 +1,7 @@
package network
import (
clustertypes "github.com/docker/docker/daemon/cluster/provider"
networktypes "github.com/docker/engine-api/types/network"
"github.com/docker/go-connections/nat"
)
@ -14,6 +15,7 @@ type Settings struct {
LinkLocalIPv6Address string
LinkLocalIPv6PrefixLen int
Networks map[string]*networktypes.EndpointSettings
Service *clustertypes.ServiceConfig
Ports nat.PortMap
SandboxKey string
SecondaryIPAddresses []networktypes.Address

View file

@ -1,6 +1,10 @@
package daemon
import "time"
import (
"time"
"golang.org/x/net/context"
)
// ContainerWait stops processing until the given container is
// stopped. If the container is not found, an error is returned. On a
@ -15,3 +19,14 @@ func (daemon *Daemon) ContainerWait(name string, timeout time.Duration) (int, er
return container.WaitStop(timeout)
}
// ContainerWaitWithContext returns a channel where exit code is sent
// when container stops. Channel can be cancelled with a context.
func (daemon *Daemon) ContainerWaitWithContext(ctx context.Context, name string) (<-chan int, error) {
container, err := daemon.GetContainer(name)
if err != nil {
return nil, err
}
return container.WaitWithContext(ctx), nil
}

View file

@ -5,6 +5,8 @@ import (
"net"
"regexp"
"strings"
"github.com/docker/engine-api/types/filters"
)
var (
@ -282,3 +284,38 @@ func ValidateSysctl(val string) (string, error) {
}
return "", fmt.Errorf("sysctl '%s' is not whitelisted", val)
}
// FilterOpt is a flag type for validating filters
type FilterOpt struct {
filter filters.Args
}
// NewFilterOpt returns a new FilterOpt
func NewFilterOpt() FilterOpt {
return FilterOpt{filter: filters.NewArgs()}
}
func (o *FilterOpt) String() string {
repr, err := filters.ToParam(o.filter)
if err != nil {
return "invalid filters"
}
return repr
}
// Set sets the value of the opt by parsing the command line value
func (o *FilterOpt) Set(value string) error {
var err error
o.filter, err = filters.ParseFlag(value, o.filter)
return err
}
// Type returns the option type
func (o *FilterOpt) Type() string {
return "filter"
}
// Value returns the value of this option
func (o *FilterOpt) Value() filters.Args {
return o.filter
}

View file

@ -19,7 +19,7 @@ func DefaultDaemonNetworkMode() container.NetworkMode {
// IsPreDefinedNetwork indicates if a network is predefined by the daemon
func IsPreDefinedNetwork(network string) bool {
n := container.NetworkMode(network)
return n.IsBridge() || n.IsHost() || n.IsNone() || n.IsDefault()
return n.IsBridge() || n.IsHost() || n.IsNone() || n.IsDefault() || network == "ingress"
}
// ValidateNetMode ensures that the various combinations of requested

View file

@ -140,6 +140,17 @@ func (m *MountPoint) Path() string {
return m.Source
}
// Type returns the type of mount point
func (m *MountPoint) Type() string {
if m.Name != "" {
return "VOLUME"
}
if m.Source != "" {
return "BIND"
}
return "EPHEMERAL"
}
// ParseVolumesFrom ensures that the supplied volumes-from is valid.
func ParseVolumesFrom(spec string) (string, string, error) {
if len(spec) == 0 {