From 250e05e42773a875d2fb8248b94fa72f2934a4b6 Mon Sep 17 00:00:00 2001 From: Vincent Demeester Date: Tue, 28 Feb 2017 11:12:11 +0100 Subject: [PATCH] =?UTF-8?q?Add=20a=20lockedManagerAction=20method=20to=20C?= =?UTF-8?q?luster=E2=80=A6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … in order to remove duplication. Each time we update a cluster object, we do some common operations (lock, verify it's on a manager, get the request context, and the update). This introduce a method and refactor few update/remove method that allows to duplicate less code. Signed-off-by: Vincent Demeester --- daemon/cluster/cluster.go | 15 ++ daemon/cluster/networks.go | 70 ++++---- daemon/cluster/nodes.go | 96 +++++------ daemon/cluster/secrets.go | 113 +++++-------- daemon/cluster/services.go | 320 +++++++++++++++++-------------------- daemon/cluster/swarm.go | 118 ++++++-------- daemon/cluster/tasks.go | 23 ++- 7 files changed, 332 insertions(+), 423 deletions(-) diff --git a/daemon/cluster/cluster.go b/daemon/cluster/cluster.go index 48e4003084..fe7ac34f86 100644 --- a/daemon/cluster/cluster.go +++ b/daemon/cluster/cluster.go @@ -386,3 +386,18 @@ func detectLockedError(err error) error { } return err } + +func (c *Cluster) lockedManagerAction(fn func(ctx context.Context, state nodeState) error) error { + c.mu.RLock() + defer c.mu.RUnlock() + + state := c.currentNodeState() + if !state.IsActiveManager() { + return c.errNoManager(state) + } + + ctx, cancel := c.getRequestContext() + defer cancel() + + return fn(ctx, state) +} diff --git a/daemon/cluster/networks.go b/daemon/cluster/networks.go index 154fe4b704..a87b033f6f 100644 --- a/daemon/cluster/networks.go +++ b/daemon/cluster/networks.go @@ -48,19 +48,16 @@ func (c *Cluster) getNetworks(filters *swarmapi.ListNetworksRequest_Filters) ([] // GetNetwork returns a cluster network by an ID. func (c *Cluster) GetNetwork(input string) (apitypes.NetworkResource, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var network *swarmapi.Network - state := c.currentNodeState() - if !state.IsActiveManager() { - return apitypes.NetworkResource{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - network, err := getNetwork(ctx, state.controlClient, input) - if err != nil { + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + n, err := getNetwork(ctx, state.controlClient, input) + if err != nil { + return err + } + network = n + return nil + }); err != nil { return apitypes.NetworkResource{}, err } return convert.BasicNetworkFromGRPC(*network), nil @@ -224,51 +221,38 @@ func (c *Cluster) DetachNetwork(target string, containerID string) error { // CreateNetwork creates a new cluster managed network. func (c *Cluster) CreateNetwork(s apitypes.NetworkCreateRequest) (string, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - state := c.currentNodeState() - if !state.IsActiveManager() { - return "", c.errNoManager(state) - } - if runconfig.IsPreDefinedNetwork(s.Name) { err := fmt.Errorf("%s is a pre-defined network and cannot be created", s.Name) return "", apierrors.NewRequestForbiddenError(err) } - ctx, cancel := c.getRequestContext() - defer cancel() - - networkSpec := convert.BasicNetworkCreateToGRPC(s) - r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) - if err != nil { + var resp *swarmapi.CreateNetworkResponse + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + networkSpec := convert.BasicNetworkCreateToGRPC(s) + r, err := state.controlClient.CreateNetwork(ctx, &swarmapi.CreateNetworkRequest{Spec: &networkSpec}) + if err != nil { + return err + } + resp = r + return nil + }); err != nil { return "", err } - return r.Network.ID, nil + return resp.Network.ID, nil } // RemoveNetwork removes a cluster network. func (c *Cluster) RemoveNetwork(input string) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + network, err := getNetwork(ctx, state.controlClient, input) + if err != nil { + return err + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - network, err := getNetwork(ctx, state.controlClient, input) - if err != nil { + _, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}) return err - } - - _, err = state.controlClient.RemoveNetwork(ctx, &swarmapi.RemoveNetworkRequest{NetworkID: network.ID}) - return err + }) } func (c *Cluster) populateNetworkID(ctx context.Context, client swarmapi.ControlClient, s *types.ServiceSpec) error { diff --git a/daemon/cluster/nodes.go b/daemon/cluster/nodes.go index 6104e88ba3..ebd47e9b61 100644 --- a/daemon/cluster/nodes.go +++ b/daemon/cluster/nodes.go @@ -6,6 +6,7 @@ import ( types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" swarmapi "github.com/docker/swarmkit/api" + "golang.org/x/net/context" ) // GetNodes returns a list of all nodes known to a cluster. @@ -43,78 +44,61 @@ func (c *Cluster) GetNodes(options apitypes.NodeListOptions) ([]types.Node, erro // GetNode returns a node based on an ID. func (c *Cluster) GetNode(input string) (types.Node, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var node *swarmapi.Node - state := c.currentNodeState() - if !state.IsActiveManager() { - return types.Node{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - node, err := getNode(ctx, state.controlClient, input) - if err != nil { + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + n, err := getNode(ctx, state.controlClient, input) + if err != nil { + return err + } + node = n + return nil + }); err != nil { return types.Node{}, err } + return convert.NodeFromGRPC(*node), nil } // UpdateNode updates existing nodes properties. func (c *Cluster) UpdateNode(input string, version uint64, spec types.NodeSpec) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + nodeSpec, err := convert.NodeSpecToGRPC(spec) + if err != nil { + return apierrors.NewBadRequestError(err) + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } + ctx, cancel := c.getRequestContext() + defer cancel() - nodeSpec, err := convert.NodeSpecToGRPC(spec) - if err != nil { - return apierrors.NewBadRequestError(err) - } + currentNode, err := getNode(ctx, state.controlClient, input) + if err != nil { + return err + } - ctx, cancel := c.getRequestContext() - defer cancel() - - currentNode, err := getNode(ctx, state.controlClient, input) - if err != nil { - return err - } - - _, err = state.controlClient.UpdateNode( - ctx, - &swarmapi.UpdateNodeRequest{ - NodeID: currentNode.ID, - Spec: &nodeSpec, - NodeVersion: &swarmapi.Version{ - Index: version, + _, err = state.controlClient.UpdateNode( + ctx, + &swarmapi.UpdateNodeRequest{ + NodeID: currentNode.ID, + Spec: &nodeSpec, + NodeVersion: &swarmapi.Version{ + Index: version, + }, }, - }, - ) - return err + ) + return err + }) } // RemoveNode removes a node from a cluster func (c *Cluster) RemoveNode(input string, force bool) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + node, err := getNode(ctx, state.controlClient, input) + if err != nil { + return err + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - node, err := getNode(ctx, state.controlClient, input) - if err != nil { + _, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}) return err - } - - _, err = state.controlClient.RemoveNode(ctx, &swarmapi.RemoveNodeRequest{NodeID: node.ID, Force: force}) - return err + }) } diff --git a/daemon/cluster/secrets.go b/daemon/cluster/secrets.go index 240b561047..fb76e66e31 100644 --- a/daemon/cluster/secrets.go +++ b/daemon/cluster/secrets.go @@ -5,23 +5,21 @@ import ( types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" swarmapi "github.com/docker/swarmkit/api" + "golang.org/x/net/context" ) // GetSecret returns a secret from a managed swarm cluster func (c *Cluster) GetSecret(input string) (types.Secret, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var secret *swarmapi.Secret - state := c.currentNodeState() - if !state.IsActiveManager() { - return types.Secret{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - secret, err := getSecret(ctx, state.controlClient, input) - if err != nil { + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + s, err := getSecret(ctx, state.controlClient, input) + if err != nil { + return err + } + secret = s + return nil + }); err != nil { return types.Secret{}, err } return convert.SecretFromGRPC(secret), nil @@ -61,77 +59,54 @@ func (c *Cluster) GetSecrets(options apitypes.SecretListOptions) ([]types.Secret // CreateSecret creates a new secret in a managed swarm cluster. func (c *Cluster) CreateSecret(s types.SecretSpec) (string, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var resp *swarmapi.CreateSecretResponse + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + secretSpec := convert.SecretSpecToGRPC(s) - state := c.currentNodeState() - if !state.IsActiveManager() { - return "", c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - secretSpec := convert.SecretSpecToGRPC(s) - - r, err := state.controlClient.CreateSecret(ctx, - &swarmapi.CreateSecretRequest{Spec: &secretSpec}) - if err != nil { + r, err := state.controlClient.CreateSecret(ctx, + &swarmapi.CreateSecretRequest{Spec: &secretSpec}) + if err != nil { + return err + } + resp = r + return nil + }); err != nil { return "", err } - - return r.Secret.ID, nil + return resp.Secret.ID, nil } // RemoveSecret removes a secret from a managed swarm cluster. func (c *Cluster) RemoveSecret(input string) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + secret, err := getSecret(ctx, state.controlClient, input) + if err != nil { + return err + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } + req := &swarmapi.RemoveSecretRequest{ + SecretID: secret.ID, + } - ctx, cancel := c.getRequestContext() - defer cancel() - - secret, err := getSecret(ctx, state.controlClient, input) - if err != nil { + _, err = state.controlClient.RemoveSecret(ctx, req) return err - } - - req := &swarmapi.RemoveSecretRequest{ - SecretID: secret.ID, - } - - _, err = state.controlClient.RemoveSecret(ctx, req) - return err + }) } // UpdateSecret updates a secret in a managed swarm cluster. // Note: this is not exposed to the CLI but is available from the API only func (c *Cluster) UpdateSecret(id string, version uint64, spec types.SecretSpec) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + secretSpec := convert.SecretSpecToGRPC(spec) - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - secretSpec := convert.SecretSpecToGRPC(spec) - - _, err := state.controlClient.UpdateSecret(ctx, - &swarmapi.UpdateSecretRequest{ - SecretID: id, - SecretVersion: &swarmapi.Version{ - Index: version, - }, - Spec: &secretSpec, - }) - return err + _, err := state.controlClient.UpdateSecret(ctx, + &swarmapi.UpdateSecretRequest{ + SecretID: id, + SecretVersion: &swarmapi.Version{ + Index: version, + }, + Spec: &secretSpec, + }) + return err + }) } diff --git a/daemon/cluster/services.go b/daemon/cluster/services.go index a80730c687..e8bb9234c0 100644 --- a/daemon/cluster/services.go +++ b/daemon/cluster/services.go @@ -59,19 +59,15 @@ func (c *Cluster) GetServices(options apitypes.ServiceListOptions) ([]types.Serv // GetService returns a service based on an ID or name. func (c *Cluster) GetService(input string) (types.Service, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - state := c.currentNodeState() - if !state.IsActiveManager() { - return types.Service{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - service, err := getService(ctx, state.controlClient, input) - if err != nil { + var service *swarmapi.Service + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + s, err := getService(ctx, state.controlClient, input) + if err != nil { + return err + } + service = s + return nil + }); err != nil { return types.Service{}, err } return convert.ServiceFromGRPC(*service), nil @@ -79,187 +75,165 @@ func (c *Cluster) GetService(input string) (types.Service, error) { // CreateService creates a new service in a managed swarm cluster. func (c *Cluster) CreateService(s types.ServiceSpec, encodedAuth string) (*apitypes.ServiceCreateResponse, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - state := c.currentNodeState() - if !state.IsActiveManager() { - return nil, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - err := c.populateNetworkID(ctx, state.controlClient, &s) - if err != nil { - return nil, err - } - - serviceSpec, err := convert.ServiceSpecToGRPC(s) - if err != nil { - return nil, apierrors.NewBadRequestError(err) - } - - ctnr := serviceSpec.Task.GetContainer() - if ctnr == nil { - return nil, errors.New("service does not use container tasks") - } - - if encodedAuth != "" { - ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} - } - - // retrieve auth config from encoded auth - authConfig := &apitypes.AuthConfig{} - if encodedAuth != "" { - if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { - logrus.Warnf("invalid authconfig: %v", err) - } - } - - resp := &apitypes.ServiceCreateResponse{} - - // pin image by digest - if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { - digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig) + var resp *apitypes.ServiceCreateResponse + err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + err := c.populateNetworkID(ctx, state.controlClient, &s) if err != nil { - logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()) - resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())) - } else if ctnr.Image != digestImage { - logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage) - ctnr.Image = digestImage - } else { - logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image) + return err } - } - r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) - if err != nil { - return nil, err - } + serviceSpec, err := convert.ServiceSpecToGRPC(s) + if err != nil { + return apierrors.NewBadRequestError(err) + } - resp.ID = r.Service.ID - return resp, nil + ctnr := serviceSpec.Task.GetContainer() + if ctnr == nil { + return errors.New("service does not use container tasks") + } + + if encodedAuth != "" { + ctnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} + } + + // retrieve auth config from encoded auth + authConfig := &apitypes.AuthConfig{} + if encodedAuth != "" { + if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { + logrus.Warnf("invalid authconfig: %v", err) + } + } + + resp = &apitypes.ServiceCreateResponse{} + + // pin image by digest + if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { + digestImage, err := c.imageWithDigestString(ctx, ctnr.Image, authConfig) + if err != nil { + logrus.Warnf("unable to pin image %s to digest: %s", ctnr.Image, err.Error()) + resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", ctnr.Image, err.Error())) + } else if ctnr.Image != digestImage { + logrus.Debugf("pinning image %s by digest: %s", ctnr.Image, digestImage) + ctnr.Image = digestImage + } else { + logrus.Debugf("creating service using supplied digest reference %s", ctnr.Image) + } + } + + r, err := state.controlClient.CreateService(ctx, &swarmapi.CreateServiceRequest{Spec: &serviceSpec}) + if err != nil { + return err + } + + resp.ID = r.Service.ID + return nil + }) + return resp, err } // UpdateService updates existing service to match new properties. func (c *Cluster) UpdateService(serviceIDOrName string, version uint64, spec types.ServiceSpec, encodedAuth string, registryAuthFrom string) (*apitypes.ServiceUpdateResponse, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var resp *apitypes.ServiceUpdateResponse - state := c.currentNodeState() - if !state.IsActiveManager() { - return nil, c.errNoManager(state) - } + err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { - ctx, cancel := c.getRequestContext() - defer cancel() - - err := c.populateNetworkID(ctx, state.controlClient, &spec) - if err != nil { - return nil, err - } - - serviceSpec, err := convert.ServiceSpecToGRPC(spec) - if err != nil { - return nil, apierrors.NewBadRequestError(err) - } - - currentService, err := getService(ctx, state.controlClient, serviceIDOrName) - if err != nil { - return nil, err - } - - newCtnr := serviceSpec.Task.GetContainer() - if newCtnr == nil { - return nil, errors.New("service does not use container tasks") - } - - if encodedAuth != "" { - newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} - } else { - // this is needed because if the encodedAuth isn't being updated then we - // shouldn't lose it, and continue to use the one that was already present - var ctnr *swarmapi.ContainerSpec - switch registryAuthFrom { - case apitypes.RegistryAuthFromSpec, "": - ctnr = currentService.Spec.Task.GetContainer() - case apitypes.RegistryAuthFromPreviousSpec: - if currentService.PreviousSpec == nil { - return nil, errors.New("service does not have a previous spec") - } - ctnr = currentService.PreviousSpec.Task.GetContainer() - default: - return nil, errors.New("unsupported registryAuthFrom value") - } - if ctnr == nil { - return nil, errors.New("service does not use container tasks") - } - newCtnr.PullOptions = ctnr.PullOptions - // update encodedAuth so it can be used to pin image by digest - if ctnr.PullOptions != nil { - encodedAuth = ctnr.PullOptions.RegistryAuth - } - } - - // retrieve auth config from encoded auth - authConfig := &apitypes.AuthConfig{} - if encodedAuth != "" { - if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { - logrus.Warnf("invalid authconfig: %v", err) - } - } - - resp := &apitypes.ServiceUpdateResponse{} - - // pin image by digest - if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { - digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig) + err := c.populateNetworkID(ctx, state.controlClient, &spec) if err != nil { - logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()) - resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())) - } else if newCtnr.Image != digestImage { - logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage) - newCtnr.Image = digestImage - } else { - logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image) + return err } - } - _, err = state.controlClient.UpdateService( - ctx, - &swarmapi.UpdateServiceRequest{ - ServiceID: currentService.ID, - Spec: &serviceSpec, - ServiceVersion: &swarmapi.Version{ - Index: version, + serviceSpec, err := convert.ServiceSpecToGRPC(spec) + if err != nil { + return apierrors.NewBadRequestError(err) + } + + currentService, err := getService(ctx, state.controlClient, serviceIDOrName) + if err != nil { + return err + } + + newCtnr := serviceSpec.Task.GetContainer() + if newCtnr == nil { + return errors.New("service does not use container tasks") + } + + if encodedAuth != "" { + newCtnr.PullOptions = &swarmapi.ContainerSpec_PullOptions{RegistryAuth: encodedAuth} + } else { + // this is needed because if the encodedAuth isn't being updated then we + // shouldn't lose it, and continue to use the one that was already present + var ctnr *swarmapi.ContainerSpec + switch registryAuthFrom { + case apitypes.RegistryAuthFromSpec, "": + ctnr = currentService.Spec.Task.GetContainer() + case apitypes.RegistryAuthFromPreviousSpec: + if currentService.PreviousSpec == nil { + return errors.New("service does not have a previous spec") + } + ctnr = currentService.PreviousSpec.Task.GetContainer() + default: + return errors.New("unsupported registryAuthFrom value") + } + if ctnr == nil { + return errors.New("service does not use container tasks") + } + newCtnr.PullOptions = ctnr.PullOptions + // update encodedAuth so it can be used to pin image by digest + if ctnr.PullOptions != nil { + encodedAuth = ctnr.PullOptions.RegistryAuth + } + } + + // retrieve auth config from encoded auth + authConfig := &apitypes.AuthConfig{} + if encodedAuth != "" { + if err := json.NewDecoder(base64.NewDecoder(base64.URLEncoding, strings.NewReader(encodedAuth))).Decode(authConfig); err != nil { + logrus.Warnf("invalid authconfig: %v", err) + } + } + + resp := &apitypes.ServiceUpdateResponse{} + + // pin image by digest + if os.Getenv("DOCKER_SERVICE_PREFER_OFFLINE_IMAGE") != "1" { + digestImage, err := c.imageWithDigestString(ctx, newCtnr.Image, authConfig) + if err != nil { + logrus.Warnf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error()) + resp.Warnings = append(resp.Warnings, fmt.Sprintf("unable to pin image %s to digest: %s", newCtnr.Image, err.Error())) + } else if newCtnr.Image != digestImage { + logrus.Debugf("pinning image %s by digest: %s", newCtnr.Image, digestImage) + newCtnr.Image = digestImage + } else { + logrus.Debugf("updating service using supplied digest reference %s", newCtnr.Image) + } + } + + _, err = state.controlClient.UpdateService( + ctx, + &swarmapi.UpdateServiceRequest{ + ServiceID: currentService.ID, + Spec: &serviceSpec, + ServiceVersion: &swarmapi.Version{ + Index: version, + }, }, - }, - ) - + ) + return err + }) return resp, err } // RemoveService removes a service from a managed swarm cluster. func (c *Cluster) RemoveService(input string) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + service, err := getService(ctx, state.controlClient, input) + if err != nil { + return err + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - service, err := getService(ctx, state.controlClient, input) - if err != nil { + _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}) return err - } - - _, err = state.controlClient.RemoveService(ctx, &swarmapi.RemoveServiceRequest{ServiceID: service.ID}) - return err + }) } // ServiceLogs collects service logs and writes them back to `config.OutStream` diff --git a/daemon/cluster/swarm.go b/daemon/cluster/swarm.go index 027e190b4d..84c8582fcd 100644 --- a/daemon/cluster/swarm.go +++ b/daemon/cluster/swarm.go @@ -187,95 +187,75 @@ func (c *Cluster) Join(req types.JoinRequest) error { // Inspect retrieves the configuration properties of a managed swarm cluster. func (c *Cluster) Inspect() (types.Swarm, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - state := c.currentNodeState() - if !state.IsActiveManager() { - return types.Swarm{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - swarm, err := getSwarm(ctx, state.controlClient) - if err != nil { + var swarm *swarmapi.Cluster + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + s, err := getSwarm(ctx, state.controlClient) + if err != nil { + return err + } + swarm = s + return nil + }); err != nil { return types.Swarm{}, err } - return convert.SwarmFromGRPC(*swarm), nil } // Update updates configuration of a managed swarm cluster. func (c *Cluster) Update(version uint64, spec types.Spec, flags types.UpdateFlags) error { - c.mu.RLock() - defer c.mu.RUnlock() + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + swarm, err := getSwarm(ctx, state.controlClient) + if err != nil { + return err + } - state := c.currentNodeState() - if !state.IsActiveManager() { - return c.errNoManager(state) - } + // In update, client should provide the complete spec of the swarm, including + // Name and Labels. If a field is specified with 0 or nil, then the default value + // will be used to swarmkit. + clusterSpec, err := convert.SwarmSpecToGRPC(spec) + if err != nil { + return apierrors.NewBadRequestError(err) + } - ctx, cancel := c.getRequestContext() - defer cancel() - - swarm, err := getSwarm(ctx, state.controlClient) - if err != nil { + _, err = state.controlClient.UpdateCluster( + ctx, + &swarmapi.UpdateClusterRequest{ + ClusterID: swarm.ID, + Spec: &clusterSpec, + ClusterVersion: &swarmapi.Version{ + Index: version, + }, + Rotation: swarmapi.KeyRotation{ + WorkerJoinToken: flags.RotateWorkerToken, + ManagerJoinToken: flags.RotateManagerToken, + ManagerUnlockKey: flags.RotateManagerUnlockKey, + }, + }, + ) return err - } - - // In update, client should provide the complete spec of the swarm, including - // Name and Labels. If a field is specified with 0 or nil, then the default value - // will be used to swarmkit. - clusterSpec, err := convert.SwarmSpecToGRPC(spec) - if err != nil { - return apierrors.NewBadRequestError(err) - } - - _, err = state.controlClient.UpdateCluster( - ctx, - &swarmapi.UpdateClusterRequest{ - ClusterID: swarm.ID, - Spec: &clusterSpec, - ClusterVersion: &swarmapi.Version{ - Index: version, - }, - Rotation: swarmapi.KeyRotation{ - WorkerJoinToken: flags.RotateWorkerToken, - ManagerJoinToken: flags.RotateManagerToken, - ManagerUnlockKey: flags.RotateManagerUnlockKey, - }, - }, - ) - return err + }) } // GetUnlockKey returns the unlock key for the swarm. func (c *Cluster) GetUnlockKey() (string, error) { - c.mu.RLock() - defer c.mu.RUnlock() + var resp *swarmapi.GetUnlockKeyResponse + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + client := swarmapi.NewCAClient(state.grpcConn) - state := c.currentNodeState() - if !state.IsActiveManager() { - return "", c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - client := swarmapi.NewCAClient(state.grpcConn) - - r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) - if err != nil { + r, err := client.GetUnlockKey(ctx, &swarmapi.GetUnlockKeyRequest{}) + if err != nil { + return err + } + resp = r + return nil + }); err != nil { return "", err } - - if len(r.UnlockKey) == 0 { + if len(resp.UnlockKey) == 0 { // no key return "", nil } - - return encryption.HumanReadableKey(r.UnlockKey), nil + return encryption.HumanReadableKey(resp.UnlockKey), nil } // UnlockSwarm provides a key to decrypt data that is encrypted at rest. diff --git a/daemon/cluster/tasks.go b/daemon/cluster/tasks.go index a882edb851..001a345a68 100644 --- a/daemon/cluster/tasks.go +++ b/daemon/cluster/tasks.go @@ -6,6 +6,7 @@ import ( types "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/daemon/cluster/convert" swarmapi "github.com/docker/swarmkit/api" + "golang.org/x/net/context" ) // GetTasks returns a list of tasks matching the filter options. @@ -71,19 +72,15 @@ func (c *Cluster) GetTasks(options apitypes.TaskListOptions) ([]types.Task, erro // GetTask returns a task by an ID. func (c *Cluster) GetTask(input string) (types.Task, error) { - c.mu.RLock() - defer c.mu.RUnlock() - - state := c.currentNodeState() - if !state.IsActiveManager() { - return types.Task{}, c.errNoManager(state) - } - - ctx, cancel := c.getRequestContext() - defer cancel() - - task, err := getTask(ctx, state.controlClient, input) - if err != nil { + var task *swarmapi.Task + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + t, err := getTask(ctx, state.controlClient, input) + if err != nil { + return err + } + task = t + return nil + }); err != nil { return types.Task{}, err } return convert.TaskFromGRPC(*task), nil