diff --git a/api/server/router/volume/backend.go b/api/server/router/volume/backend.go index a2f445ebf4..8de9a84d4b 100644 --- a/api/server/router/volume/backend.go +++ b/api/server/router/volume/backend.go @@ -19,3 +19,16 @@ type Backend interface { Remove(ctx context.Context, name string, opts ...opts.RemoveOption) error Prune(ctx context.Context, pruneFilters filters.Args) (*types.VolumesPruneReport, error) } + +// ClusterBackend is the backend used for Swarm Cluster Volumes. Regular +// volumes go through the volume service, but to avoid across-dependency +// between the cluster package and the volume package, we simply provide two +// backends here. +type ClusterBackend interface { + GetVolume(nameOrID string) (volume.Volume, error) + GetVolumes(options volume.ListOptions) ([]*volume.Volume, error) + CreateVolume(volume volume.CreateOptions) (*volume.Volume, error) + RemoveVolume(nameOrID string, force bool) error + UpdateVolume(nameOrID string, version uint64, volume volume.UpdateOptions) error + IsManager() bool +} diff --git a/api/server/router/volume/volume.go b/api/server/router/volume/volume.go index 875497b3eb..cb6ee65b5f 100644 --- a/api/server/router/volume/volume.go +++ b/api/server/router/volume/volume.go @@ -5,13 +5,15 @@ import "github.com/docker/docker/api/server/router" // volumeRouter is a router to talk with the volumes controller type volumeRouter struct { backend Backend + cluster ClusterBackend routes []router.Route } // NewRouter initializes a new volume router -func NewRouter(b Backend) router.Router { +func NewRouter(b Backend, cb ClusterBackend) router.Router { r := &volumeRouter{ backend: b, + cluster: cb, } r.initRoutes() return r @@ -30,6 +32,8 @@ func (r *volumeRouter) initRoutes() { // POST router.NewPostRoute("/volumes/create", r.postVolumesCreate), router.NewPostRoute("/volumes/prune", r.postVolumesPrune), + // PUT + router.NewPutRoute("/volumes/{name:.*}", r.putVolumesUpdate), // DELETE router.NewDeleteRoute("/volumes/{name:.*}", r.deleteVolumes), } diff --git a/api/server/router/volume/volume_routes.go b/api/server/router/volume/volume_routes.go index 6e79b04228..7f1adfaa78 100644 --- a/api/server/router/volume/volume_routes.go +++ b/api/server/router/volume/volume_routes.go @@ -2,13 +2,24 @@ package volume // import "github.com/docker/docker/api/server/router/volume" import ( "context" + "fmt" "net/http" + "strconv" "github.com/docker/docker/api/server/httputils" "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/versions" "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/errdefs" "github.com/docker/docker/volume/service/opts" "github.com/pkg/errors" + "github.com/sirupsen/logrus" +) + +const ( + // clusterVolumesVersion defines the API version that swarm cluster volume + // functionality was introduced. avoids the use of magic numbers. + clusterVolumesVersion = "1.42" ) func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { @@ -24,6 +35,21 @@ func (v *volumeRouter) getVolumesList(ctx context.Context, w http.ResponseWriter if err != nil { return err } + + version := httputils.VersionFromContext(ctx) + if versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() { + clusterVolumes, swarmErr := v.cluster.GetVolumes(volume.ListOptions{Filters: filters}) + if swarmErr != nil { + // if there is a swarm error, we may not want to error out right + // away. the local list probably worked. instead, let's do what we + // do if there's a bad driver while trying to list: add the error + // to the warnings. don't do this if swarm is not initialized. + warnings = append(warnings, swarmErr.Error()) + } + // add the cluster volumes to the return + volumes = append(volumes, clusterVolumes...) + } + return httputils.WriteJSON(w, http.StatusOK, &volume.ListResponse{Volumes: volumes, Warnings: warnings}) } @@ -31,11 +57,33 @@ func (v *volumeRouter) getVolumeByName(ctx context.Context, w http.ResponseWrite if err := httputils.ParseForm(r); err != nil { return err } + version := httputils.VersionFromContext(ctx) + // re: volume name duplication + // + // we prefer to get volumes locally before attempting to get them from the + // cluster. Local volumes can only be looked up by name, but cluster + // volumes can also be looked up by ID. vol, err := v.backend.Get(ctx, vars["name"], opts.WithGetResolveStatus) - if err != nil { + + // if the volume is not found in the regular volume backend, and the client + // is using an API version greater than 1.42 (when cluster volumes were + // introduced), then check if Swarm has the volume. + if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() { + swarmVol, err := v.cluster.GetVolume(vars["name"]) + // if swarm returns an error and that error indicates that swarm is not + // initialized, return original NotFound error. Otherwise, we'd return + // a weird swarm unavailable error on non-swarm engines. + if err != nil { + return err + } + vol = &swarmVol + } else if err != nil { + // otherwise, if this isn't NotFound, or this isn't a high enough version, + // just return the error by itself. return err } + return httputils.WriteJSON(w, http.StatusOK, vol) } @@ -49,21 +97,82 @@ func (v *volumeRouter) postVolumesCreate(ctx context.Context, w http.ResponseWri return err } - vol, err := v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels)) + var ( + vol *volume.Volume + err error + version = httputils.VersionFromContext(ctx) + ) + + // if the ClusterVolumeSpec is filled in, then this is a cluster volume + // and is created through the swarm cluster volume backend. + // + // re: volume name duplication + // + // As it happens, there is no good way to prevent duplication of a volume + // name between local and cluster volumes. This is because Swarm volumes + // can be created from any manager node, bypassing most of the protections + // we could put into the engine side. + // + // Instead, we will allow creating a volume with a duplicate name, which + // should not break anything. + if req.ClusterVolumeSpec != nil && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) { + logrus.Debug("using cluster volume") + vol, err = v.cluster.CreateVolume(req) + } else { + logrus.Debug("using regular volume") + vol, err = v.backend.Create(ctx, req.Name, req.Driver, opts.WithCreateOptions(req.DriverOpts), opts.WithCreateLabels(req.Labels)) + } + if err != nil { return err } return httputils.WriteJSON(w, http.StatusCreated, vol) } +func (v *volumeRouter) putVolumesUpdate(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { + if !v.cluster.IsManager() { + return errdefs.Unavailable(errors.New("volume update only valid for cluster volumes, but swarm is unavailable")) + } + + if err := httputils.ParseForm(r); err != nil { + return err + } + + rawVersion := r.URL.Query().Get("version") + version, err := strconv.ParseUint(rawVersion, 10, 64) + if err != nil { + err = fmt.Errorf("invalid swarm object version '%s': %v", rawVersion, err) + return errdefs.InvalidParameter(err) + } + + var req volume.UpdateOptions + if err := httputils.ReadJSON(r, &req); err != nil { + return err + } + + return v.cluster.UpdateVolume(vars["name"], version, req) +} + func (v *volumeRouter) deleteVolumes(ctx context.Context, w http.ResponseWriter, r *http.Request, vars map[string]string) error { if err := httputils.ParseForm(r); err != nil { return err } force := httputils.BoolValue(r, "force") - if err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force)); err != nil { - return err + + version := httputils.VersionFromContext(ctx) + + err := v.backend.Remove(ctx, vars["name"], opts.WithPurgeOnError(force)) + if err != nil { + if errdefs.IsNotFound(err) && versions.GreaterThanOrEqualTo(version, clusterVolumesVersion) && v.cluster.IsManager() { + err := v.cluster.RemoveVolume(vars["name"], force) + if err != nil { + return err + } + } else { + return err + } } + w.WriteHeader(http.StatusNoContent) return nil } diff --git a/api/server/router/volume/volume_routes_test.go b/api/server/router/volume/volume_routes_test.go new file mode 100644 index 0000000000..34e6a2090a --- /dev/null +++ b/api/server/router/volume/volume_routes_test.go @@ -0,0 +1,752 @@ +package volume + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http/httptest" + "testing" + + "gotest.tools/v3/assert" + + "github.com/docker/docker/api/server/httputils" + "github.com/docker/docker/api/types" + "github.com/docker/docker/api/types/filters" + "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/errdefs" + "github.com/docker/docker/volume/service/opts" +) + +func callGetVolume(v *volumeRouter, name string) (*httptest.ResponseRecorder, error) { + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + vars := map[string]string{"name": name} + req := httptest.NewRequest("GET", fmt.Sprintf("/volumes/%s", name), nil) + resp := httptest.NewRecorder() + + err := v.getVolumeByName(ctx, resp, req, vars) + return resp, err +} + +func callListVolumes(v *volumeRouter) (*httptest.ResponseRecorder, error) { + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + vars := map[string]string{} + req := httptest.NewRequest("GET", "/volumes", nil) + resp := httptest.NewRecorder() + + err := v.getVolumesList(ctx, resp, req, vars) + return resp, err +} + +func TestGetVolumeByNameNotFoundNoSwarm(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{}, + cluster: &fakeClusterBackend{}, + } + + _, err := callGetVolume(v, "notReal") + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err)) +} + +func TestGetVolumeByNameNotFoundNotManager(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{}, + cluster: &fakeClusterBackend{swarm: true}, + } + + _, err := callGetVolume(v, "notReal") + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err)) +} + +func TestGetVolumeByNameNotFound(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{}, + cluster: &fakeClusterBackend{swarm: true, manager: true}, + } + + _, err := callGetVolume(v, "notReal") + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err)) +} + +func TestGetVolumeByNameFoundRegular(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + + "volume1": &volume.Volume{ + Name: "volume1", + }, + }, + }, + cluster: &fakeClusterBackend{swarm: true, manager: true}, + } + + _, err := callGetVolume(v, "volume1") + assert.NilError(t, err) +} + +func TestGetVolumeByNameFoundSwarm(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{}, + cluster: &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{ + "volume1": &volume.Volume{ + Name: "volume1", + }, + }, + }, + } + + _, err := callGetVolume(v, "volume1") + assert.NilError(t, err) +} +func TestListVolumes(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "v1": &volume.Volume{Name: "v1"}, + "v2": &volume.Volume{Name: "v2"}, + }, + }, + cluster: &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{ + "v3": &volume.Volume{Name: "v3"}, + "v4": &volume.Volume{Name: "v4"}, + }, + }, + } + + resp, err := callListVolumes(v) + assert.NilError(t, err) + d := json.NewDecoder(resp.Result().Body) + respVols := volume.ListResponse{} + assert.NilError(t, d.Decode(&respVols)) + + assert.Assert(t, respVols.Volumes != nil) + assert.Equal(t, len(respVols.Volumes), 4, "volumes %v", respVols.Volumes) +} + +func TestListVolumesNoSwarm(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "v1": &volume.Volume{Name: "v1"}, + "v2": &volume.Volume{Name: "v2"}, + }, + }, + cluster: &fakeClusterBackend{}, + } + + _, err := callListVolumes(v) + assert.NilError(t, err) +} + +func TestListVolumesNoManager(t *testing.T) { + v := &volumeRouter{ + backend: &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "v1": &volume.Volume{Name: "v1"}, + "v2": &volume.Volume{Name: "v2"}, + }, + }, + cluster: &fakeClusterBackend{swarm: true}, + } + + resp, err := callListVolumes(v) + assert.NilError(t, err) + + d := json.NewDecoder(resp.Result().Body) + respVols := volume.ListResponse{} + assert.NilError(t, d.Decode(&respVols)) + + assert.Equal(t, len(respVols.Volumes), 2) + assert.Equal(t, len(respVols.Warnings), 0) +} + +func TestCreateRegularVolume(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + } + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeCreate := volume.CreateOptions{ + Name: "vol1", + Driver: "foodriver", + } + + buf := bytes.Buffer{} + e := json.NewEncoder(&buf) + e.Encode(volumeCreate) + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/create", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + err := v.postVolumesCreate(ctx, resp, req, nil) + + assert.NilError(t, err) + + respVolume := volume.Volume{} + + assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume)) + + assert.Equal(t, respVolume.Name, "vol1") + assert.Equal(t, respVolume.Driver, "foodriver") + + assert.Equal(t, 1, len(b.volumes)) + assert.Equal(t, 0, len(c.volumes)) +} + +func TestCreateSwarmVolumeNoSwarm(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{} + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeCreate := volume.CreateOptions{ + ClusterVolumeSpec: &volume.ClusterVolumeSpec{}, + Name: "volCluster", + Driver: "someCSI", + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeCreate) + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/create", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + err := v.postVolumesCreate(ctx, resp, req, nil) + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsUnavailable(err)) +} + +func TestCreateSwarmVolumeNotManager(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{swarm: true} + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeCreate := volume.CreateOptions{ + ClusterVolumeSpec: &volume.ClusterVolumeSpec{}, + Name: "volCluster", + Driver: "someCSI", + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeCreate) + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/create", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + err := v.postVolumesCreate(ctx, resp, req, nil) + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsUnavailable(err)) +} + +func TestCreateVolumeCluster(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + } + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeCreate := volume.CreateOptions{ + ClusterVolumeSpec: &volume.ClusterVolumeSpec{}, + Name: "volCluster", + Driver: "someCSI", + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeCreate) + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/create", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + err := v.postVolumesCreate(ctx, resp, req, nil) + + assert.NilError(t, err) + + respVolume := volume.Volume{} + + assert.NilError(t, json.NewDecoder(resp.Result().Body).Decode(&respVolume)) + + assert.Equal(t, respVolume.Name, "volCluster") + assert.Equal(t, respVolume.Driver, "someCSI") + + assert.Equal(t, 0, len(b.volumes)) + assert.Equal(t, 1, len(c.volumes)) +} + +func TestUpdateVolume(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{ + "vol1": &volume.Volume{ + Name: "vo1", + ClusterVolume: &volume.ClusterVolume{ + ID: "vol1", + }, + }, + }, + } + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeUpdate := volume.UpdateOptions{ + Spec: &volume.ClusterVolumeSpec{}, + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeUpdate) + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + + err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.NilError(t, err) + + assert.Equal(t, c.volumes["vol1"].ClusterVolume.Meta.Version.Index, uint64(1)) +} + +func TestUpdateVolumeNoSwarm(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{} + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeUpdate := volume.UpdateOptions{ + Spec: &volume.ClusterVolumeSpec{}, + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeUpdate) + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + + err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsUnavailable(err)) +} + +func TestUpdateVolumeNotFound(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{}, + } + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + volumeUpdate := volume.UpdateOptions{ + Spec: &volume.ClusterVolumeSpec{}, + } + + buf := bytes.Buffer{} + json.NewEncoder(&buf).Encode(volumeUpdate) + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("POST", "/volumes/vol1/update?version=0", &buf) + req.Header.Add("Content-Type", "application/json") + + resp := httptest.NewRecorder() + + err := v.putVolumesUpdate(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err)) +} + +func TestVolumeRemove(t *testing.T) { + b := &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "vol1": &volume.Volume{ + Name: "vol1", + }, + }, + } + c := &fakeClusterBackend{swarm: true, manager: true} + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.NilError(t, err) + assert.Equal(t, len(b.volumes), 0) +} + +func TestVolumeRemoveSwarm(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{ + "vol1": &volume.Volume{ + Name: "vol1", + ClusterVolume: &volume.ClusterVolume{}, + }, + }, + } + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.NilError(t, err) + assert.Equal(t, len(c.volumes), 0) +} + +func TestVolumeRemoveNotFoundNoSwarm(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{} + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err), err.Error()) +} + +func TestVolumeRemoveNotFoundNoManager(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{swarm: true} + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsNotFound(err)) +} + +func TestVolumeRemoveFoundNoSwarm(t *testing.T) { + b := &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "vol1": &volume.Volume{ + Name: "vol1", + }, + }, + } + c := &fakeClusterBackend{} + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + assert.NilError(t, err) + assert.Equal(t, len(b.volumes), 0) +} + +func TestVolumeRemoveNoSwarmInUse(t *testing.T) { + b := &fakeVolumeBackend{ + volumes: map[string]*volume.Volume{ + "inuse": &volume.Volume{ + Name: "inuse", + }, + }, + } + c := &fakeClusterBackend{} + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/inuse", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "inuse"}) + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsConflict(err)) +} + +func TestVolumeRemoveSwarmForce(t *testing.T) { + b := &fakeVolumeBackend{} + c := &fakeClusterBackend{ + swarm: true, + manager: true, + volumes: map[string]*volume.Volume{ + "vol1": &volume.Volume{ + Name: "vol1", + ClusterVolume: &volume.ClusterVolume{}, + Options: map[string]string{"mustforce": "yes"}, + }, + }, + } + + v := &volumeRouter{ + backend: b, + cluster: c, + } + + ctx := context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req := httptest.NewRequest("DELETE", "/volumes/vol1", nil) + resp := httptest.NewRecorder() + + err := v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + + assert.Assert(t, err != nil) + assert.Assert(t, errdefs.IsConflict(err)) + + ctx = context.WithValue(context.Background(), httputils.APIVersionKey{}, clusterVolumesVersion) + req = httptest.NewRequest("DELETE", "/volumes/vol1?force=1", nil) + resp = httptest.NewRecorder() + + err = v.deleteVolumes(ctx, resp, req, map[string]string{"name": "vol1"}) + + assert.NilError(t, err) + assert.Equal(t, len(b.volumes), 0) +} + +type fakeVolumeBackend struct { + volumes map[string]*volume.Volume +} + +func (b *fakeVolumeBackend) List(_ context.Context, _ filters.Args) ([]*volume.Volume, []string, error) { + volumes := []*volume.Volume{} + for _, v := range b.volumes { + volumes = append(volumes, v) + } + return volumes, nil, nil +} + +func (b *fakeVolumeBackend) Get(_ context.Context, name string, _ ...opts.GetOption) (*volume.Volume, error) { + if v, ok := b.volumes[name]; ok { + return v, nil + } + return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", name)) +} + +func (b *fakeVolumeBackend) Create(_ context.Context, name, driverName string, _ ...opts.CreateOption) (*volume.Volume, error) { + if _, ok := b.volumes[name]; ok { + // TODO(dperny): return appropriate error type + return nil, fmt.Errorf("already exists") + } + + v := &volume.Volume{ + Name: name, + Driver: driverName, + } + if b.volumes == nil { + b.volumes = map[string]*volume.Volume{ + name: v, + } + } else { + b.volumes[name] = v + } + + return v, nil +} + +func (b *fakeVolumeBackend) Remove(_ context.Context, name string, _ ...opts.RemoveOption) error { + if v, ok := b.volumes[name]; !ok { + return errdefs.NotFound(fmt.Errorf("volume %s not found", name)) + } else if v.Name == "inuse" { + return errdefs.Conflict(fmt.Errorf("volume in use")) + } + + delete(b.volumes, name) + + return nil +} + +func (b *fakeVolumeBackend) Prune(_ context.Context, _ filters.Args) (*types.VolumesPruneReport, error) { + return nil, nil +} + +type fakeClusterBackend struct { + swarm bool + manager bool + idCount int + volumes map[string]*volume.Volume +} + +func (c *fakeClusterBackend) checkSwarm() error { + if !c.swarm { + return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Use \"docker swarm init\" or \"docker swarm join\" to connect this node to swarm and try again")) + } else if !c.manager { + return errdefs.Unavailable(fmt.Errorf("this node is not a swarm manager. Worker nodes can't be used to view or modify cluster state. Please run this command on a manager node or promote the current node to a manager")) + } + + return nil +} + +func (c *fakeClusterBackend) IsManager() bool { + return c.swarm && c.manager +} + +func (c *fakeClusterBackend) GetVolume(nameOrID string) (volume.Volume, error) { + if err := c.checkSwarm(); err != nil { + return volume.Volume{}, err + } + + if v, ok := c.volumes[nameOrID]; ok { + return *v, nil + } + return volume.Volume{}, errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID)) +} + +func (c *fakeClusterBackend) GetVolumes(options volume.ListOptions) ([]*volume.Volume, error) { + if err := c.checkSwarm(); err != nil { + return nil, err + } + + volumes := []*volume.Volume{} + + for _, v := range c.volumes { + volumes = append(volumes, v) + } + return volumes, nil +} + +func (c *fakeClusterBackend) CreateVolume(volumeCreate volume.CreateOptions) (*volume.Volume, error) { + if err := c.checkSwarm(); err != nil { + return nil, err + } + + if _, ok := c.volumes[volumeCreate.Name]; ok { + // TODO(dperny): return appropriate already exists error + return nil, fmt.Errorf("already exists") + } + + v := &volume.Volume{ + Name: volumeCreate.Name, + Driver: volumeCreate.Driver, + Labels: volumeCreate.Labels, + Options: volumeCreate.DriverOpts, + Scope: "global", + } + + v.ClusterVolume = &volume.ClusterVolume{ + ID: fmt.Sprintf("cluster_%d", c.idCount), + Spec: *volumeCreate.ClusterVolumeSpec, + } + + c.idCount = c.idCount + 1 + if c.volumes == nil { + c.volumes = map[string]*volume.Volume{ + v.Name: v, + } + } else { + c.volumes[v.Name] = v + } + + return v, nil +} + +func (c *fakeClusterBackend) RemoveVolume(nameOrID string, force bool) error { + if err := c.checkSwarm(); err != nil { + return err + } + + v, ok := c.volumes[nameOrID] + if !ok { + return errdefs.NotFound(fmt.Errorf("volume %s not found", nameOrID)) + } + + if _, mustforce := v.Options["mustforce"]; mustforce && !force { + return errdefs.Conflict(fmt.Errorf("volume %s must be force removed", nameOrID)) + } + + delete(c.volumes, nameOrID) + + return nil +} + +func (c *fakeClusterBackend) UpdateVolume(nameOrID string, version uint64, _ volume.UpdateOptions) error { + if err := c.checkSwarm(); err != nil { + return err + } + + if v, ok := c.volumes[nameOrID]; ok { + if v.ClusterVolume.Meta.Version.Index != version { + return fmt.Errorf("wrong version") + } + v.ClusterVolume.Meta.Version.Index = v.ClusterVolume.Meta.Version.Index + 1 + // for testing, we don't actually need to change anything about the + // volume object. let's just increment the version so we can see the + // call happened. + } else { + return errdefs.NotFound(fmt.Errorf("volume %q not found", nameOrID)) + } + + return nil +} diff --git a/api/swagger.yaml b/api/swagger.yaml index be07e44c42..8959df6f7b 100644 --- a/api/swagger.yaml +++ b/api/swagger.yaml @@ -1996,6 +1996,8 @@ definitions: x-nullable: false enum: ["local", "global"] example: "local" + ClusterVolume: + $ref: "#/definitions/ClusterVolume" Options: type: "object" description: | @@ -2069,6 +2071,8 @@ definitions: example: com.example.some-label: "some-value" com.example.some-other-label: "some-other-value" + ClusterVolumeSpec: + $ref: "#/definitions/ClusterVolumeSpec" VolumeListResponse: type: "object" @@ -5740,6 +5744,242 @@ definitions: items: $ref: "#/definitions/OCIPlatform" + ClusterVolume: + type: "object" + description: | + Options and information specific to, and only present on, Swarm CSI + cluster volumes. + properties: + ID: + type: "string" + description: | + The Swarm ID of this volume. Because cluster volumes are Swarm + objects, they have an ID, unlike non-cluster volumes. This ID can + be used to refer to the Volume instead of the name. + Version: + $ref: "#/definitions/ObjectVersion" + CreatedAt: + type: "string" + format: "dateTime" + UpdatedAt: + type: "string" + format: "dateTime" + Spec: + $ref: "#/definitions/ClusterVolumeSpec" + Info: + type: "object" + description: | + Information about the global status of the volume. + properties: + CapacityBytes: + type: "integer" + format: "int64" + description: | + The capacity of the volume in bytes. A value of 0 indicates that + the capacity is unknown. + VolumeContext: + type: "object" + description: | + A map of strings to strings returned from the storage plugin when + the volume is created. + additionalProperties: + type: "string" + VolumeID: + type: "string" + description: | + The ID of the volume as returned by the CSI storage plugin. This + is distinct from the volume's ID as provided by Docker. This ID + is never used by the user when communicating with Docker to refer + to this volume. If the ID is blank, then the Volume has not been + successfully created in the plugin yet. + AccessibleTopology: + type: "array" + description: | + The topology this volume is actually accessible from. + items: + $ref: "#/definitions/Topology" + PublishStatus: + type: "array" + description: | + The status of the volume as it pertains to its publishing and use on + specific nodes + items: + type: "object" + properties: + NodeID: + type: "string" + description: | + The ID of the Swarm node the volume is published on. + State: + type: "string" + description: | + The published state of the volume. + * `pending-publish` The volume should be published to this node, but the call to the controller plugin to do so has not yet been successfully completed. + * `published` The volume is published successfully to the node. + * `pending-node-unpublish` The volume should be unpublished from the node, and the manager is awaiting confirmation from the worker that it has done so. + * `pending-controller-unpublish` The volume is successfully unpublished from the node, but has not yet been successfully unpublished on the controller. + enum: + - "pending-publish" + - "published" + - "pending-node-unpublish" + - "pending-controller-unpublish" + PublishContext: + type: "object" + description: | + A map of strings to strings returned by the CSI controller + plugin when a volume is published. + additionalProperties: + type: "string" + + ClusterVolumeSpec: + type: "object" + description: | + Cluster-specific options used to create the volume. + properties: + Group: + type: "string" + description: | + Group defines the volume group of this volume. Volumes belonging to + the same group can be referred to by group name when creating + Services. Referring to a volume by group instructs Swarm to treat + volumes in that group interchangeably for the purpose of scheduling. + Volumes with an empty string for a group technically all belong to + the same, emptystring group. + AccessMode: + type: "object" + description: | + Defines how the volume is used by tasks. + properties: + Scope: + type: "string" + description: | + The set of nodes this volume can be used on at one time. + - `single` The volume may only be scheduled to one node at a time. + - `multi` the volume may be scheduled to any supported number of nodes at a time. + default: "single" + enum: ["single", "multi"] + x-nullable: false + Sharing: + type: "string" + description: | + The number and way that different tasks can use this volume + at one time. + - `none` The volume may only be used by one task at a time. + - `readonly` The volume may be used by any number of tasks, but they all must mount the volume as readonly + - `onewriter` The volume may be used by any number of tasks, but only one may mount it as read/write. + - `all` The volume may have any number of readers and writers. + default: "none" + enum: ["none", "readonly", "onewriter", "all"] + x-nullable: false + MountVolume: + type: "object" + description: | + Options for using this volume as a Mount-type volume. + + Either MountVolume or BlockVolume, but not both, must be + present. + properties: + FsType: + type: "string" + description: | + Specifies the filesystem type for the mount volume. + Optional. + MountFlags: + type: "array" + description: | + Flags to pass when mounting the volume. Optional. + items: + type: "string" + BlockVolume: + type: "object" + description: | + Options for using this volume as a Block-type volume. + Intentionally empty. + Secrets: + type: "array" + description: | + Swarm Secrets that are passed to the CSI storage plugin when + operating on this volume. + items: + type: "object" + description: | + One cluster volume secret entry. Defines a key-value pair that + is passed to the plugin. + properties: + Key: + type: "string" + description: | + Key is the name of the key of the key-value pair passed to + the plugin. + Secret: + type: "string" + description: | + Secret is the swarm Secret object from which to read data. + This can be a Secret name or ID. The Secret data is + retrieved by swarm and used as the value of the key-value + pair passed to the plugin. + AccessibilityRequirements: + type: "object" + description: | + Requirements for the accessible topology of the volume. These + fields are optional. For an in-depth description of what these + fields mean, see the CSI specification. + properties: + Requisite: + type: "array" + description: | + A list of required topologies, at least one of which the + volume must be accessible from. + items: + $ref: "#/definitions/Topology" + Preferred: + type: "array" + description: | + A list of topologies that the volume should attempt to be + provisioned in. + items: + $ref: "#/definitions/Topology" + CapacityRange: + type: "object" + description: | + The desired capacity that the volume should be created with. If + empty, the plugin will decide the capacity. + properties: + RequiredBytes: + type: "integer" + format: "int64" + description: | + The volume must be at least this big. The value of 0 + indicates an unspecified minimum + LimitBytes: + type: "integer" + format: "int64" + description: | + The volume must not be bigger than this. The value of 0 + indicates an unspecified maximum. + Availability: + type: "string" + description: | + The availability of the volume for use in tasks. + - `active` The volume is fully available for scheduling on the cluster + - `pause` No new workloads should use the volume, but existing workloads are not stopped. + - `drain` All workloads using this volume should be stopped and rescheduled, and no new ones should be started. + default: "active" + x-nullable: false + enum: + - "active" + - "pause" + - "drain" + + Topology: + description: | + A map of topological domains to topological segments. For in depth + details, see documentation for the Topology object in the CSI + specification. + type: "object" + additionalProperties: + type: "string" + paths: /containers/json: get: @@ -9247,6 +9487,64 @@ paths: type: "string" tags: ["Volume"] + put: + summary: | + "Update a volume. Valid only for Swarm cluster volumes" + operationId: "VolumeUpdate" + consumes: ["application/json"] + produces: ["application/json"] + responses: + 200: + description: "no error" + 400: + description: "bad parameter" + schema: + $ref: "#/definitions/ErrorResponse" + 404: + description: "no such volume" + schema: + $ref: "#/definitions/ErrorResponse" + 500: + description: "server error" + schema: + $ref: "#/definitions/ErrorResponse" + 503: + description: "node is not part of a swarm" + schema: + $ref: "#/definitions/ErrorResponse" + parameters: + - name: "name" + in: "path" + description: "The name or ID of the volume" + type: "string" + required: true + - name: "body" + in: "body" + schema: + # though the schema for is an object that contains only a + # ClusterVolumeSpec, wrapping the ClusterVolumeSpec in this object + # means that if, later on, we support things like changing the + # labels, we can do so without duplicating that information to the + # ClusterVolumeSpec. + type: "object" + description: "Volume configuration" + properties: + Spec: + $ref: "#/definitions/ClusterVolumeSpec" + description: | + The spec of the volume to update. Currently, only Availability may + change. All other fields must remain unchanged. + - name: "version" + in: "query" + description: | + The version number of the volume being updated. This is required to + avoid conflicting writes. Found in the volume's `ClusterVolume` + field. + type: "integer" + format: "int64" + required: true + tags: ["Volume"] + delete: summary: "Remove a volume" description: "Instruct the driver to remove the volume." @@ -9278,6 +9576,7 @@ paths: type: "boolean" default: false tags: ["Volume"] + /volumes/prune: post: summary: "Delete unused volumes" diff --git a/api/types/mount/mount.go b/api/types/mount/mount.go index 443b8d07a9..6fc77f7c81 100644 --- a/api/types/mount/mount.go +++ b/api/types/mount/mount.go @@ -17,6 +17,8 @@ const ( TypeTmpfs Type = "tmpfs" // TypeNamedPipe is the type for mounting Windows named pipes TypeNamedPipe Type = "npipe" + // TypeCluster is the type for Swarm Cluster Volumes. + TypeCluster = "csi" ) // Mount represents a mount (volume). @@ -30,9 +32,10 @@ type Mount struct { ReadOnly bool `json:",omitempty"` Consistency Consistency `json:",omitempty"` - BindOptions *BindOptions `json:",omitempty"` - VolumeOptions *VolumeOptions `json:",omitempty"` - TmpfsOptions *TmpfsOptions `json:",omitempty"` + BindOptions *BindOptions `json:",omitempty"` + VolumeOptions *VolumeOptions `json:",omitempty"` + TmpfsOptions *TmpfsOptions `json:",omitempty"` + ClusterOptions *ClusterOptions `json:",omitempty"` } // Propagation represents the propagation of a mount. @@ -129,3 +132,8 @@ type TmpfsOptions struct { // Some of these may be straightforward to add, but others, such as // uid/gid have implications in a clustered system. } + +// ClusterOptions specifies options for a Cluster volume. +type ClusterOptions struct { + // intentionally empty +} diff --git a/api/types/swarm/node.go b/api/types/swarm/node.go index 1e30f5fa10..bb98d5eedc 100644 --- a/api/types/swarm/node.go +++ b/api/types/swarm/node.go @@ -53,6 +53,7 @@ type NodeDescription struct { Resources Resources `json:",omitempty"` Engine EngineDescription `json:",omitempty"` TLSInfo TLSInfo `json:",omitempty"` + CSIInfo []NodeCSIInfo `json:",omitempty"` } // Platform represents the platform (Arch/OS). @@ -68,6 +69,21 @@ type EngineDescription struct { Plugins []PluginDescription `json:",omitempty"` } +// NodeCSIInfo represents information about a CSI plugin available on the node +type NodeCSIInfo struct { + // PluginName is the name of the CSI plugin. + PluginName string `json:",omitempty"` + // NodeID is the ID of the node as reported by the CSI plugin. This is + // different from the swarm node ID. + NodeID string `json:",omitempty"` + // MaxVolumesPerNode is the maximum number of volumes that may be published + // to this node + MaxVolumesPerNode int64 `json:",omitempty"` + // AccessibleTopology indicates the location of this node in the CSI + // plugin's topology + AccessibleTopology *Topology `json:",omitempty"` +} + // PluginDescription represents the description of an engine plugin. type PluginDescription struct { Type string `json:",omitempty"` @@ -113,3 +129,11 @@ const ( // NodeStateDisconnected DISCONNECTED NodeStateDisconnected NodeState = "disconnected" ) + +// Topology defines the CSI topology of this node. This type is a duplicate of +// github.com/docker/docker/api/types.Topology. Because the type definition +// is so simple and to avoid complicated structure or circular imports, we just +// duplicate it here. See that type for full documentation +type Topology struct { + Segments map[string]string `json:",omitempty"` +} diff --git a/api/types/swarm/task.go b/api/types/swarm/task.go index a6f7ab7b5c..ad3eeca0b7 100644 --- a/api/types/swarm/task.go +++ b/api/types/swarm/task.go @@ -62,6 +62,11 @@ type Task struct { // used to determine which Tasks belong to which run of the job. This field // is absent if the Service mode is Replicated or Global. JobIteration *Version `json:",omitempty"` + + // Volumes is the list of VolumeAttachments for this task. It specifies + // which particular volumes are to be used by this particular task, and + // fulfilling what mounts in the spec. + Volumes []VolumeAttachment } // TaskSpec represents the spec of a task. @@ -204,3 +209,17 @@ type ContainerStatus struct { type PortStatus struct { Ports []PortConfig `json:",omitempty"` } + +// VolumeAttachment contains the associating a Volume to a Task. +type VolumeAttachment struct { + // ID is the Swarmkit ID of the Volume. This is not the CSI VolumeId. + ID string `json:",omitempty"` + + // Source, together with Target, indicates the Mount, as specified in the + // ContainerSpec, that this volume fulfills. + Source string `json:",omitempty"` + + // Target, together with Source, indicates the Mount, as specified + // in the ContainerSpec, that this volume fulfills. + Target string `json:",omitempty"` +} diff --git a/api/types/volume/cluster_volume.go b/api/types/volume/cluster_volume.go new file mode 100644 index 0000000000..124fb8ca14 --- /dev/null +++ b/api/types/volume/cluster_volume.go @@ -0,0 +1,420 @@ +package volume + +import ( + "github.com/docker/docker/api/types/swarm" +) + +// ClusterVolume contains options and information specific to, and only present +// on, Swarm CSI cluster volumes. +type ClusterVolume struct { + // ID is the Swarm ID of the volume. Because cluster volumes are Swarm + // objects, they have an ID, unlike non-cluster volumes, which only have a + // Name. This ID can be used to refer to the cluster volume. + ID string + + // Meta is the swarm metadata about this volume. + swarm.Meta + + // Spec is the cluster-specific options from which this volume is derived. + Spec ClusterVolumeSpec + + // PublishStatus contains the status of the volume as it pertains to its + // publishing on Nodes. + PublishStatus []*PublishStatus `json:",omitempty"` + + // Info is information about the global status of the volume. + Info *Info `json:",omitempty"` +} + +// ClusterVolumeSpec contains the spec used to create this volume. +type ClusterVolumeSpec struct { + // Group defines the volume group of this volume. Volumes belonging to the + // same group can be referred to by group name when creating Services. + // Referring to a volume by group instructs swarm to treat volumes in that + // group interchangeably for the purpose of scheduling. Volumes with an + // empty string for a group technically all belong to the same, emptystring + // group. + Group string `json:",omitempty"` + + // AccessMode defines how the volume is used by tasks. + AccessMode *AccessMode `json:",omitempty"` + + // AccessibilityRequirements specifies where in the cluster a volume must + // be accessible from. + // + // This field must be empty if the plugin does not support + // VOLUME_ACCESSIBILITY_CONSTRAINTS capabilities. If it is present but the + // plugin does not support it, volume will not be created. + // + // If AccessibilityRequirements is empty, but the plugin does support + // VOLUME_ACCESSIBILITY_CONSTRAINTS, then Swarmkit will assume the entire + // cluster is a valid target for the volume. + AccessibilityRequirements *TopologyRequirement `json:",omitempty"` + + // CapacityRange defines the desired capacity that the volume should be + // created with. If nil, the plugin will decide the capacity. + CapacityRange *CapacityRange `json:",omitempty"` + + // Secrets defines Swarm Secrets that are passed to the CSI storage plugin + // when operating on this volume. + Secrets []Secret `json:",omitempty"` + + // Availability is the Volume's desired availability. Analogous to Node + // Availability, this allows the user to take volumes offline in order to + // update or delete them. + Availability Availability `json:",omitempty"` +} + +// Availability specifies the availability of the volume. +type Availability string + +const ( + // AvailabilityActive indicates that the volume is active and fully + // schedulable on the cluster. + AvailabilityActive Availability = "active" + + // AvailabilityPause indicates that no new workloads should use the + // volume, but existing workloads can continue to use it. + AvailabilityPause Availability = "pause" + + // AvailabilityDrain indicates that all workloads using this volume + // should be rescheduled, and the volume unpublished from all nodes. + AvailabilityDrain Availability = "drain" +) + +// AccessMode defines the access mode of a volume. +type AccessMode struct { + // Scope defines the set of nodes this volume can be used on at one time. + Scope Scope `json:",omitempty"` + + // Sharing defines the number and way that different tasks can use this + // volume at one time. + Sharing SharingMode `json:",omitempty"` + + // MountVolume defines options for using this volume as a Mount-type + // volume. + // + // Either BlockVolume or MountVolume, but not both, must be present. + MountVolume *TypeMount `json:",omitempty"` + + // BlockVolume defines options for using this volume as a Block-type + // volume. + // + // Either BlockVolume or MountVolume, but not both, must be present. + BlockVolume *TypeBlock `json:",omitempty"` +} + +// Scope defines the Scope of a CSI Volume. This is how many nodes a +// Volume can be accessed simultaneously on. +type Scope string + +const ( + // ScopeSingleNode indicates the volume can be used on one node at a + // time. + ScopeSingleNode Scope = "single" + + // ScopeMultiNode indicates the volume can be used on many nodes at + // the same time. + ScopeMultiNode Scope = "multi" +) + +// SharingMode defines the Sharing of a CSI Volume. This is how Tasks using a +// Volume at the same time can use it. +type SharingMode string + +const ( + // SharingNone indicates that only one Task may use the Volume at a + // time. + SharingNone SharingMode = "none" + + // SharingReadOnly indicates that the Volume may be shared by any + // number of Tasks, but they must be read-only. + SharingReadOnly SharingMode = "readonly" + + // SharingOneWriter indicates that the Volume may be shared by any + // number of Tasks, but all after the first must be read-only. + SharingOneWriter SharingMode = "onewriter" + + // SharingAll means that the Volume may be shared by any number of + // Tasks, as readers or writers. + SharingAll SharingMode = "all" +) + +// TypeBlock defines options for using a volume as a block-type volume. +// +// Intentionally empty. +type TypeBlock struct{} + +// TypeMount contains options for using a volume as a Mount-type +// volume. +type TypeMount struct { + // FsType specifies the filesystem type for the mount volume. Optional. + FsType string `json:",omitempty"` + + // MountFlags defines flags to pass when mounting the volume. Optional. + MountFlags []string `json:",omitempty"` +} + +// TopologyRequirement expresses the user's requirements for a volume's +// accessible topology. +type TopologyRequirement struct { + // Requisite specifies a list of Topologies, at least one of which the + // volume must be accessible from. + // + // Taken verbatim from the CSI Spec: + // + // Specifies the list of topologies the provisioned volume MUST be + // accessible from. + // This field is OPTIONAL. If TopologyRequirement is specified either + // requisite or preferred or both MUST be specified. + // + // If requisite is specified, the provisioned volume MUST be + // accessible from at least one of the requisite topologies. + // + // Given + // x = number of topologies provisioned volume is accessible from + // n = number of requisite topologies + // The CO MUST ensure n >= 1. The SP MUST ensure x >= 1 + // If x==n, then the SP MUST make the provisioned volume available to + // all topologies from the list of requisite topologies. If it is + // unable to do so, the SP MUST fail the CreateVolume call. + // For example, if a volume should be accessible from a single zone, + // and requisite = + // {"region": "R1", "zone": "Z2"} + // then the provisioned volume MUST be accessible from the "region" + // "R1" and the "zone" "Z2". + // Similarly, if a volume should be accessible from two zones, and + // requisite = + // {"region": "R1", "zone": "Z2"}, + // {"region": "R1", "zone": "Z3"} + // then the provisioned volume MUST be accessible from the "region" + // "R1" and both "zone" "Z2" and "zone" "Z3". + // + // If xn, then the SP MUST make the provisioned volume available from + // all topologies from the list of requisite topologies and MAY choose + // the remaining x-n unique topologies from the list of all possible + // topologies. If it is unable to do so, the SP MUST fail the + // CreateVolume call. + // For example, if a volume should be accessible from two zones, and + // requisite = + // {"region": "R1", "zone": "Z2"} + // then the provisioned volume MUST be accessible from the "region" + // "R1" and the "zone" "Z2" and the SP may select the second zone + // independently, e.g. "R1/Z4". + Requisite []Topology `json:",omitempty"` + + // Preferred is a list of Topologies that the volume should attempt to be + // provisioned in. + // + // Taken from the CSI spec: + // + // Specifies the list of topologies the CO would prefer the volume to + // be provisioned in. + // + // This field is OPTIONAL. If TopologyRequirement is specified either + // requisite or preferred or both MUST be specified. + // + // An SP MUST attempt to make the provisioned volume available using + // the preferred topologies in order from first to last. + // + // If requisite is specified, all topologies in preferred list MUST + // also be present in the list of requisite topologies. + // + // If the SP is unable to to make the provisioned volume available + // from any of the preferred topologies, the SP MAY choose a topology + // from the list of requisite topologies. + // If the list of requisite topologies is not specified, then the SP + // MAY choose from the list of all possible topologies. + // If the list of requisite topologies is specified and the SP is + // unable to to make the provisioned volume available from any of the + // requisite topologies it MUST fail the CreateVolume call. + // + // Example 1: + // Given a volume should be accessible from a single zone, and + // requisite = + // {"region": "R1", "zone": "Z2"}, + // {"region": "R1", "zone": "Z3"} + // preferred = + // {"region": "R1", "zone": "Z3"} + // then the the SP SHOULD first attempt to make the provisioned volume + // available from "zone" "Z3" in the "region" "R1" and fall back to + // "zone" "Z2" in the "region" "R1" if that is not possible. + // + // Example 2: + // Given a volume should be accessible from a single zone, and + // requisite = + // {"region": "R1", "zone": "Z2"}, + // {"region": "R1", "zone": "Z3"}, + // {"region": "R1", "zone": "Z4"}, + // {"region": "R1", "zone": "Z5"} + // preferred = + // {"region": "R1", "zone": "Z4"}, + // {"region": "R1", "zone": "Z2"} + // then the the SP SHOULD first attempt to make the provisioned volume + // accessible from "zone" "Z4" in the "region" "R1" and fall back to + // "zone" "Z2" in the "region" "R1" if that is not possible. If that + // is not possible, the SP may choose between either the "zone" + // "Z3" or "Z5" in the "region" "R1". + // + // Example 3: + // Given a volume should be accessible from TWO zones (because an + // opaque parameter in CreateVolumeRequest, for example, specifies + // the volume is accessible from two zones, aka synchronously + // replicated), and + // requisite = + // {"region": "R1", "zone": "Z2"}, + // {"region": "R1", "zone": "Z3"}, + // {"region": "R1", "zone": "Z4"}, + // {"region": "R1", "zone": "Z5"} + // preferred = + // {"region": "R1", "zone": "Z5"}, + // {"region": "R1", "zone": "Z3"} + // then the the SP SHOULD first attempt to make the provisioned volume + // accessible from the combination of the two "zones" "Z5" and "Z3" in + // the "region" "R1". If that's not possible, it should fall back to + // a combination of "Z5" and other possibilities from the list of + // requisite. If that's not possible, it should fall back to a + // combination of "Z3" and other possibilities from the list of + // requisite. If that's not possible, it should fall back to a + // combination of other possibilities from the list of requisite. + Preferred []Topology `json:",omitempty"` +} + +// Topology is a map of topological domains to topological segments. +// +// This description is taken verbatim from the CSI Spec: +// +// A topological domain is a sub-division of a cluster, like "region", +// "zone", "rack", etc. +// A topological segment is a specific instance of a topological domain, +// like "zone3", "rack3", etc. +// For example {"com.company/zone": "Z1", "com.company/rack": "R3"} +// Valid keys have two segments: an OPTIONAL prefix and name, separated +// by a slash (/), for example: "com.company.example/zone". +// The key name segment is REQUIRED. The prefix is OPTIONAL. +// The key name MUST be 63 characters or less, begin and end with an +// alphanumeric character ([a-z0-9A-Z]), and contain only dashes (-), +// underscores (_), dots (.), or alphanumerics in between, for example +// "zone". +// The key prefix MUST be 63 characters or less, begin and end with a +// lower-case alphanumeric character ([a-z0-9]), contain only +// dashes (-), dots (.), or lower-case alphanumerics in between, and +// follow domain name notation format +// (https://tools.ietf.org/html/rfc1035#section-2.3.1). +// The key prefix SHOULD include the plugin's host company name and/or +// the plugin name, to minimize the possibility of collisions with keys +// from other plugins. +// If a key prefix is specified, it MUST be identical across all +// topology keys returned by the SP (across all RPCs). +// Keys MUST be case-insensitive. Meaning the keys "Zone" and "zone" +// MUST not both exist. +// Each value (topological segment) MUST contain 1 or more strings. +// Each string MUST be 63 characters or less and begin and end with an +// alphanumeric character with '-', '_', '.', or alphanumerics in +// between. +type Topology struct { + Segments map[string]string `json:",omitempty"` +} + +// CapacityRange describes the minimum and maximum capacity a volume should be +// created with +type CapacityRange struct { + // RequiredBytes specifies that a volume must be at least this big. The + // value of 0 indicates an unspecified minimum. + RequiredBytes int64 + + // LimitBytes specifies that a volume must not be bigger than this. The + // value of 0 indicates an unspecified maximum + LimitBytes int64 +} + +// Secret represents a Swarm Secret value that must be passed to the CSI +// storage plugin when operating on this Volume. It represents one key-value +// pair of possibly many. +type Secret struct { + // Key is the name of the key of the key-value pair passed to the plugin. + Key string + + // Secret is the swarm Secret object from which to read data. This can be a + // Secret name or ID. The Secret data is retrieved by Swarm and used as the + // value of the key-value pair passed to the plugin. + Secret string +} + +// PublishState represents the state of a Volume as it pertains to its +// use on a particular Node. +type PublishState string + +const ( + // StatePending indicates that the volume should be published on + // this node, but the call to ControllerPublishVolume has not been + // successfully completed yet and the result recorded by swarmkit. + StatePending PublishState = "pending-publish" + + // StatePublished means the volume is published successfully to the node. + StatePublished PublishState = "published" + + // StatePendingNodeUnpublish indicates that the Volume should be + // unpublished on the Node, and we're waiting for confirmation that it has + // done so. After the Node has confirmed that the Volume has been + // unpublished, the state will move to StatePendingUnpublish. + StatePendingNodeUnpublish PublishState = "pending-node-unpublish" + + // StatePendingUnpublish means the volume is still published to the node + // by the controller, awaiting the operation to unpublish it. + StatePendingUnpublish PublishState = "pending-controller-unpublish" +) + +// PublishStatus represents the status of the volume as published to an +// individual node +type PublishStatus struct { + // NodeID is the ID of the swarm node this Volume is published to. + NodeID string `json:",omitempty"` + + // State is the publish state of the volume. + State PublishState `json:",omitempty"` + + // PublishContext is the PublishContext returned by the CSI plugin when + // a volume is published. + PublishContext map[string]string `json:",omitempty"` +} + +// Info contains information about the Volume as a whole as provided by +// the CSI storage plugin. +type Info struct { + // CapacityBytes is the capacity of the volume in bytes. A value of 0 + // indicates that the capacity is unknown. + CapacityBytes int64 `json:",omitempty"` + + // VolumeContext is the context originating from the CSI storage plugin + // when the Volume is created. + VolumeContext map[string]string `json:",omitempty"` + + // VolumeID is the ID of the Volume as seen by the CSI storage plugin. This + // is distinct from the Volume's Swarm ID, which is the ID used by all of + // the Docker Engine to refer to the Volume. If this field is blank, then + // the Volume has not been successfully created yet. + VolumeID string `json:",omitempty"` + + // AccessibleTopolgoy is the topology this volume is actually accessible + // from. + AccessibleTopology []Topology `json:",omitempty"` +} diff --git a/api/types/volume/create_options.go b/api/types/volume/create_options.go index df7a252cf9..37c41a6096 100644 --- a/api/types/volume/create_options.go +++ b/api/types/volume/create_options.go @@ -9,6 +9,9 @@ package volume // swagger:model CreateOptions type CreateOptions struct { + // cluster volume spec + ClusterVolumeSpec *ClusterVolumeSpec `json:"ClusterVolumeSpec,omitempty"` + // Name of the volume driver to use. Driver string `json:"Driver,omitempty"` diff --git a/api/types/volume/options.go b/api/types/volume/options.go new file mode 100644 index 0000000000..8b0dd13899 --- /dev/null +++ b/api/types/volume/options.go @@ -0,0 +1,8 @@ +package volume // import "github.com/docker/docker/api/types/volume" + +import "github.com/docker/docker/api/types/filters" + +// ListOptions holds parameters to list volumes. +type ListOptions struct { + Filters filters.Args +} diff --git a/api/types/volume/volume.go b/api/types/volume/volume.go index 2e40247505..ea7d555e5b 100644 --- a/api/types/volume/volume.go +++ b/api/types/volume/volume.go @@ -7,6 +7,9 @@ package volume // swagger:model Volume type Volume struct { + // cluster volume + ClusterVolume *ClusterVolume `json:"ClusterVolume,omitempty"` + // Date/Time the volume was created. CreatedAt string `json:"CreatedAt,omitempty"` diff --git a/api/types/volume/volume_update.go b/api/types/volume/volume_update.go new file mode 100644 index 0000000000..f958f80a66 --- /dev/null +++ b/api/types/volume/volume_update.go @@ -0,0 +1,7 @@ +package volume // import "github.com/docker/docker/api/types/volume" + +// UpdateOptions is configuration to update a Volume with. +type UpdateOptions struct { + // Spec is the ClusterVolumeSpec to update the volume to. + Spec *ClusterVolumeSpec `json:"Spec,omitempty"` +} diff --git a/client/interface.go b/client/interface.go index f8dc4a6d28..e9c1ed722e 100644 --- a/client/interface.go +++ b/client/interface.go @@ -179,6 +179,7 @@ type VolumeAPIClient interface { VolumeList(ctx context.Context, filter filters.Args) (volume.ListResponse, error) VolumeRemove(ctx context.Context, volumeID string, force bool) error VolumesPrune(ctx context.Context, pruneFilter filters.Args) (types.VolumesPruneReport, error) + VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error } // SecretAPIClient defines API client methods for secrets diff --git a/client/request.go b/client/request.go index 3e4911e57f..086db59d01 100644 --- a/client/request.go +++ b/client/request.go @@ -49,6 +49,14 @@ func (cli *Client) postRaw(ctx context.Context, path string, query url.Values, b return cli.sendRequest(ctx, http.MethodPost, path, query, body, headers) } +func (cli *Client) put(ctx context.Context, path string, query url.Values, obj interface{}, headers map[string][]string) (serverResponse, error) { + body, headers, err := encodeBody(obj, headers) + if err != nil { + return serverResponse{}, err + } + return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers) +} + // putRaw sends an http request to the docker API using the method PUT. func (cli *Client) putRaw(ctx context.Context, path string, query url.Values, body io.Reader, headers map[string][]string) (serverResponse, error) { return cli.sendRequest(ctx, http.MethodPut, path, query, body, headers) diff --git a/client/volume_update.go b/client/volume_update.go new file mode 100644 index 0000000000..9753e05ef1 --- /dev/null +++ b/client/volume_update.go @@ -0,0 +1,25 @@ +package client // import "github.com/docker/docker/client" + +import ( + "context" + "net/url" + "strconv" + + "github.com/docker/docker/api/types/swarm" + "github.com/docker/docker/api/types/volume" +) + +// VolumeUpdate updates a volume. This only works for Cluster Volumes, and +// only some fields can be updated. +func (cli *Client) VolumeUpdate(ctx context.Context, volumeID string, version swarm.Version, options volume.UpdateOptions) error { + if err := cli.NewVersionError("1.42", "volume update"); err != nil { + return err + } + + query := url.Values{} + query.Set("version", strconv.FormatUint(version.Index, 10)) + + resp, err := cli.put(ctx, "/volumes/"+volumeID, query, options, nil) + ensureReaderClosed(resp) + return err +} diff --git a/client/volume_update_test.go b/client/volume_update_test.go new file mode 100644 index 0000000000..cac9711f92 --- /dev/null +++ b/client/volume_update_test.go @@ -0,0 +1,55 @@ +package client // import "github.com/docker/docker/client" + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "strings" + "testing" + + "github.com/docker/docker/api/types/swarm" + volumetypes "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/errdefs" +) + +func TestVolumeUpdateError(t *testing.T) { + client := &Client{ + client: newMockClient(errorMock(http.StatusInternalServerError, "Server error")), + } + + err := client.VolumeUpdate(context.Background(), "", swarm.Version{}, volumetypes.UpdateOptions{}) + + if !errdefs.IsSystem(err) { + t.Fatalf("expected a Server Error, got %[1]T: %[1]v", err) + } +} + +func TestVolumeUpdate(t *testing.T) { + expectedURL := "/volumes/test1" + expectedVersion := "version=10" + + client := &Client{ + client: newMockClient(func(req *http.Request) (*http.Response, error) { + if !strings.HasPrefix(req.URL.Path, expectedURL) { + return nil, fmt.Errorf("Expected URL '%s', got '%s'", expectedURL, req.URL) + } + if req.Method != http.MethodPut { + return nil, fmt.Errorf("expected PUT method, got %s", req.Method) + } + if !strings.Contains(req.URL.RawQuery, expectedVersion) { + return nil, fmt.Errorf("expected query to contain '%s', got '%s'", expectedVersion, req.URL.RawQuery) + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewReader([]byte("body"))), + }, nil + }), + } + + err := client.VolumeUpdate(context.Background(), "test1", swarm.Version{Index: uint64(10)}, volumetypes.UpdateOptions{}) + if err != nil { + t.Fatal(err) + } +} diff --git a/cmd/dockerd/daemon.go b/cmd/dockerd/daemon.go index 384807a61b..2fdcb5b8ab 100644 --- a/cmd/dockerd/daemon.go +++ b/cmd/dockerd/daemon.go @@ -531,7 +531,7 @@ func initRouter(opts routerOptions) { container.NewRouter(opts.daemon, decoder, opts.daemon.RawSysInfo().CgroupUnified), image.NewRouter(opts.daemon.ImageService()), systemrouter.NewRouter(opts.daemon, opts.cluster, opts.buildkit, opts.features), - volume.NewRouter(opts.daemon.VolumesService()), + volume.NewRouter(opts.daemon.VolumesService(), opts.cluster), build.NewRouter(opts.buildBackend, opts.daemon, opts.features), sessionrouter.NewRouter(opts.sessionManager), swarmrouter.NewRouter(opts.cluster), diff --git a/daemon/cluster/convert/node.go b/daemon/cluster/convert/node.go index ee502f269c..4ba9c62609 100644 --- a/daemon/cluster/convert/node.go +++ b/daemon/cluster/convert/node.go @@ -56,6 +56,18 @@ func NodeFromGRPC(n swarmapi.Node) types.Node { node.Description.TLSInfo.CertIssuerPublicKey = n.Description.TLSInfo.CertIssuerPublicKey node.Description.TLSInfo.CertIssuerSubject = n.Description.TLSInfo.CertIssuerSubject } + for _, csi := range n.Description.CSIInfo { + if csi != nil { + node.Description.CSIInfo = append( + node.Description.CSIInfo, + types.NodeCSIInfo{ + PluginName: csi.PluginName, + NodeID: csi.NodeID, + MaxVolumesPerNode: csi.MaxVolumesPerNode, + }, + ) + } + } } // Manager diff --git a/daemon/cluster/convert/task.go b/daemon/cluster/convert/task.go index 235316cf66..c34938da82 100644 --- a/daemon/cluster/convert/task.go +++ b/daemon/cluster/convert/task.go @@ -57,6 +57,17 @@ func TaskFromGRPC(t swarmapi.Task) (types.Task, error) { } } + // appending to a nil slice is valid. if there are no items in t.Volumes, + // then the task.Volumes will remain nil; otherwise, it will contain + // converted entries. + for _, v := range t.Volumes { + task.Volumes = append(task.Volumes, types.VolumeAttachment{ + ID: v.ID, + Source: v.Source, + Target: v.Target, + }) + } + if t.Status.PortStatus == nil { return task, nil } diff --git a/daemon/cluster/convert/volume.go b/daemon/cluster/convert/volume.go new file mode 100644 index 0000000000..4cd66755aa --- /dev/null +++ b/daemon/cluster/convert/volume.go @@ -0,0 +1,311 @@ +package convert // import "github.com/docker/docker/daemon/cluster/convert" + +import ( + volumetypes "github.com/docker/docker/api/types/volume" + gogotypes "github.com/gogo/protobuf/types" + swarmapi "github.com/moby/swarmkit/v2/api" +) + +// VolumeFromGRPC converts a swarmkit api Volume object to a docker api Volume +// object +func VolumeFromGRPC(v *swarmapi.Volume) volumetypes.Volume { + clusterVolumeSpec := volumetypes.ClusterVolumeSpec{ + Group: v.Spec.Group, + AccessMode: accessModeFromGRPC(v.Spec.AccessMode), + AccessibilityRequirements: topologyRequirementFromGRPC(v.Spec.AccessibilityRequirements), + CapacityRange: capacityRangeFromGRPC(v.Spec.CapacityRange), + Secrets: volumeSecretsFromGRPC(v.Spec.Secrets), + Availability: volumeAvailabilityFromGRPC(v.Spec.Availability), + } + + clusterVolume := &volumetypes.ClusterVolume{ + ID: v.ID, + Spec: clusterVolumeSpec, + PublishStatus: volumePublishStatusFromGRPC(v.PublishStatus), + Info: volumeInfoFromGRPC(v.VolumeInfo), + } + + clusterVolume.Version.Index = v.Meta.Version.Index + clusterVolume.CreatedAt, _ = gogotypes.TimestampFromProto(v.Meta.CreatedAt) + clusterVolume.UpdatedAt, _ = gogotypes.TimestampFromProto(v.Meta.UpdatedAt) + + return volumetypes.Volume{ + ClusterVolume: clusterVolume, + CreatedAt: clusterVolume.CreatedAt.String(), + Driver: v.Spec.Driver.Name, + Labels: v.Spec.Annotations.Labels, + Name: v.Spec.Annotations.Name, + Options: v.Spec.Driver.Options, + Scope: "global", + } +} + +func volumeSpecToGRPC(spec volumetypes.ClusterVolumeSpec) *swarmapi.VolumeSpec { + swarmSpec := &swarmapi.VolumeSpec{ + Group: spec.Group, + } + + if spec.AccessMode != nil { + swarmSpec.AccessMode = &swarmapi.VolumeAccessMode{} + + switch spec.AccessMode.Scope { + case volumetypes.ScopeSingleNode: + swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeSingleNode + case volumetypes.ScopeMultiNode: + swarmSpec.AccessMode.Scope = swarmapi.VolumeScopeMultiNode + } + + switch spec.AccessMode.Sharing { + case volumetypes.SharingNone: + swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingNone + case volumetypes.SharingReadOnly: + swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingReadOnly + case volumetypes.SharingOneWriter: + swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingOneWriter + case volumetypes.SharingAll: + swarmSpec.AccessMode.Sharing = swarmapi.VolumeSharingAll + } + + if spec.AccessMode.BlockVolume != nil { + swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Block{ + Block: &swarmapi.VolumeAccessMode_BlockVolume{}, + } + } + if spec.AccessMode.MountVolume != nil { + swarmSpec.AccessMode.AccessType = &swarmapi.VolumeAccessMode_Mount{ + Mount: &swarmapi.VolumeAccessMode_MountVolume{ + FsType: spec.AccessMode.MountVolume.FsType, + MountFlags: spec.AccessMode.MountVolume.MountFlags, + }, + } + } + } + + for _, secret := range spec.Secrets { + swarmSpec.Secrets = append(swarmSpec.Secrets, &swarmapi.VolumeSecret{ + Key: secret.Key, + Secret: secret.Secret, + }) + } + + if spec.AccessibilityRequirements != nil { + swarmSpec.AccessibilityRequirements = &swarmapi.TopologyRequirement{} + + for _, top := range spec.AccessibilityRequirements.Requisite { + swarmSpec.AccessibilityRequirements.Requisite = append( + swarmSpec.AccessibilityRequirements.Requisite, + &swarmapi.Topology{ + Segments: top.Segments, + }, + ) + } + + for _, top := range spec.AccessibilityRequirements.Preferred { + swarmSpec.AccessibilityRequirements.Preferred = append( + swarmSpec.AccessibilityRequirements.Preferred, + &swarmapi.Topology{ + Segments: top.Segments, + }, + ) + } + } + + if spec.CapacityRange != nil { + swarmSpec.CapacityRange = &swarmapi.CapacityRange{ + RequiredBytes: spec.CapacityRange.RequiredBytes, + LimitBytes: spec.CapacityRange.LimitBytes, + } + } + + // availability is not a pointer, it is a value. if the user does not + // specify an availability, it will be inferred as the 0-value, which is + // "active". + switch spec.Availability { + case volumetypes.AvailabilityActive: + swarmSpec.Availability = swarmapi.VolumeAvailabilityActive + case volumetypes.AvailabilityPause: + swarmSpec.Availability = swarmapi.VolumeAvailabilityPause + case volumetypes.AvailabilityDrain: + swarmSpec.Availability = swarmapi.VolumeAvailabilityDrain + } + + return swarmSpec +} + +// VolumeCreateToGRPC takes a VolumeCreateBody and outputs the matching +// swarmapi VolumeSpec. +func VolumeCreateToGRPC(volume *volumetypes.CreateOptions) *swarmapi.VolumeSpec { + var swarmSpec *swarmapi.VolumeSpec + if volume != nil && volume.ClusterVolumeSpec != nil { + swarmSpec = volumeSpecToGRPC(*volume.ClusterVolumeSpec) + } else { + swarmSpec = &swarmapi.VolumeSpec{} + } + + swarmSpec.Annotations = swarmapi.Annotations{ + Name: volume.Name, + Labels: volume.Labels, + } + + swarmSpec.Driver = &swarmapi.Driver{ + Name: volume.Driver, + Options: volume.DriverOpts, + } + + return swarmSpec +} + +func volumeInfoFromGRPC(info *swarmapi.VolumeInfo) *volumetypes.Info { + if info == nil { + return nil + } + + var accessibleTopology []volumetypes.Topology + if info.AccessibleTopology != nil { + accessibleTopology = make([]volumetypes.Topology, len(info.AccessibleTopology)) + for i, top := range info.AccessibleTopology { + accessibleTopology[i] = topologyFromGRPC(top) + } + } + + return &volumetypes.Info{ + CapacityBytes: info.CapacityBytes, + VolumeContext: info.VolumeContext, + VolumeID: info.VolumeID, + AccessibleTopology: accessibleTopology, + } +} + +func volumePublishStatusFromGRPC(publishStatus []*swarmapi.VolumePublishStatus) []*volumetypes.PublishStatus { + if publishStatus == nil { + return nil + } + + vps := make([]*volumetypes.PublishStatus, len(publishStatus)) + for i, status := range publishStatus { + var state volumetypes.PublishState + switch status.State { + case swarmapi.VolumePublishStatus_PENDING_PUBLISH: + state = volumetypes.StatePending + case swarmapi.VolumePublishStatus_PUBLISHED: + state = volumetypes.StatePublished + case swarmapi.VolumePublishStatus_PENDING_NODE_UNPUBLISH: + state = volumetypes.StatePendingNodeUnpublish + case swarmapi.VolumePublishStatus_PENDING_UNPUBLISH: + state = volumetypes.StatePendingUnpublish + } + + vps[i] = &volumetypes.PublishStatus{ + NodeID: status.NodeID, + State: state, + PublishContext: status.PublishContext, + } + } + + return vps +} + +func accessModeFromGRPC(accessMode *swarmapi.VolumeAccessMode) *volumetypes.AccessMode { + if accessMode == nil { + return nil + } + + convertedAccessMode := &volumetypes.AccessMode{} + + switch accessMode.Scope { + case swarmapi.VolumeScopeSingleNode: + convertedAccessMode.Scope = volumetypes.ScopeSingleNode + case swarmapi.VolumeScopeMultiNode: + convertedAccessMode.Scope = volumetypes.ScopeMultiNode + } + + switch accessMode.Sharing { + case swarmapi.VolumeSharingNone: + convertedAccessMode.Sharing = volumetypes.SharingNone + case swarmapi.VolumeSharingReadOnly: + convertedAccessMode.Sharing = volumetypes.SharingReadOnly + case swarmapi.VolumeSharingOneWriter: + convertedAccessMode.Sharing = volumetypes.SharingOneWriter + case swarmapi.VolumeSharingAll: + convertedAccessMode.Sharing = volumetypes.SharingAll + } + + if block := accessMode.GetBlock(); block != nil { + convertedAccessMode.BlockVolume = &volumetypes.TypeBlock{} + } + if mount := accessMode.GetMount(); mount != nil { + convertedAccessMode.MountVolume = &volumetypes.TypeMount{ + FsType: mount.FsType, + MountFlags: mount.MountFlags, + } + } + + return convertedAccessMode +} + +func volumeSecretsFromGRPC(secrets []*swarmapi.VolumeSecret) []volumetypes.Secret { + if secrets == nil { + return nil + } + convertedSecrets := make([]volumetypes.Secret, len(secrets)) + for i, secret := range secrets { + convertedSecrets[i] = volumetypes.Secret{ + Key: secret.Key, + Secret: secret.Secret, + } + } + return convertedSecrets +} + +func topologyRequirementFromGRPC(top *swarmapi.TopologyRequirement) *volumetypes.TopologyRequirement { + if top == nil { + return nil + } + + convertedTop := &volumetypes.TopologyRequirement{} + if top.Requisite != nil { + convertedTop.Requisite = make([]volumetypes.Topology, len(top.Requisite)) + for i, req := range top.Requisite { + convertedTop.Requisite[i] = topologyFromGRPC(req) + } + } + + if top.Preferred != nil { + convertedTop.Preferred = make([]volumetypes.Topology, len(top.Preferred)) + for i, pref := range top.Preferred { + convertedTop.Preferred[i] = topologyFromGRPC(pref) + } + } + + return convertedTop +} + +func topologyFromGRPC(top *swarmapi.Topology) volumetypes.Topology { + if top == nil { + return volumetypes.Topology{} + } + return volumetypes.Topology{ + Segments: top.Segments, + } +} + +func capacityRangeFromGRPC(capacity *swarmapi.CapacityRange) *volumetypes.CapacityRange { + if capacity == nil { + return nil + } + + return &volumetypes.CapacityRange{ + RequiredBytes: capacity.RequiredBytes, + LimitBytes: capacity.LimitBytes, + } +} + +func volumeAvailabilityFromGRPC(availability swarmapi.VolumeSpec_VolumeAvailability) volumetypes.Availability { + switch availability { + case swarmapi.VolumeAvailabilityActive: + return volumetypes.AvailabilityActive + case swarmapi.VolumeAvailabilityPause: + return volumetypes.AvailabilityPause + } + return volumetypes.AvailabilityDrain +} diff --git a/daemon/cluster/convert/volume_test.go b/daemon/cluster/convert/volume_test.go new file mode 100644 index 0000000000..b4ebab5177 --- /dev/null +++ b/daemon/cluster/convert/volume_test.go @@ -0,0 +1,210 @@ +package convert + +import ( + "testing" + + volumetypes "github.com/docker/docker/api/types/volume" + swarmapi "github.com/moby/swarmkit/v2/api" + + "gotest.tools/v3/assert" +) + +func TestTopologyFromGRPC(t *testing.T) { + nilTopology := topologyFromGRPC(nil) + assert.DeepEqual(t, nilTopology, volumetypes.Topology{}) + + swarmTop := &swarmapi.Topology{ + Segments: map[string]string{"foo": "bar"}, + } + + top := topologyFromGRPC(swarmTop) + assert.DeepEqual(t, top.Segments, swarmTop.Segments) +} + +func TestCapacityRangeFromGRPC(t *testing.T) { + nilCapacity := capacityRangeFromGRPC(nil) + assert.Assert(t, nilCapacity == nil) + + swarmZeroCapacity := &swarmapi.CapacityRange{} + zeroCapacity := capacityRangeFromGRPC(swarmZeroCapacity) + assert.Assert(t, zeroCapacity != nil) + assert.Equal(t, zeroCapacity.RequiredBytes, int64(0)) + assert.Equal(t, zeroCapacity.LimitBytes, int64(0)) + + swarmNonZeroCapacity := &swarmapi.CapacityRange{ + RequiredBytes: 1024, + LimitBytes: 2048, + } + nonZeroCapacity := capacityRangeFromGRPC(swarmNonZeroCapacity) + assert.Assert(t, nonZeroCapacity != nil) + assert.Equal(t, nonZeroCapacity.RequiredBytes, int64(1024)) + assert.Equal(t, nonZeroCapacity.LimitBytes, int64(2048)) +} + +func TestVolumeAvailabilityFromGRPC(t *testing.T) { + for _, tc := range []struct { + name string + in swarmapi.VolumeSpec_VolumeAvailability + expected volumetypes.Availability + }{ + { + name: "Active", + in: swarmapi.VolumeAvailabilityActive, + expected: volumetypes.AvailabilityActive, + }, { + name: "Pause", + in: swarmapi.VolumeAvailabilityPause, + expected: volumetypes.AvailabilityPause, + }, { + name: "Drain", + in: swarmapi.VolumeAvailabilityDrain, + expected: volumetypes.AvailabilityDrain, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + actual := volumeAvailabilityFromGRPC(tc.in) + assert.Equal(t, actual, tc.expected) + }) + } +} + +// TestAccessModeFromGRPC tests that the AccessMode type is correctly converted +func TestAccessModeFromGRPC(t *testing.T) { + for _, tc := range []struct { + name string + in *swarmapi.VolumeAccessMode + expected *volumetypes.AccessMode + }{ + { + name: "MountVolume", + in: &swarmapi.VolumeAccessMode{ + Scope: swarmapi.VolumeScopeSingleNode, + Sharing: swarmapi.VolumeSharingNone, + AccessType: &swarmapi.VolumeAccessMode_Mount{ + Mount: &swarmapi.VolumeAccessMode_MountVolume{ + FsType: "foo", + // TODO(dperny): maybe don't convert this? + MountFlags: []string{"one", "two"}, + }, + }, + }, + expected: &volumetypes.AccessMode{ + Scope: volumetypes.ScopeSingleNode, + Sharing: volumetypes.SharingNone, + MountVolume: &volumetypes.TypeMount{ + FsType: "foo", + MountFlags: []string{"one", "two"}, + }, + }, + }, { + name: "BlockVolume", + in: &swarmapi.VolumeAccessMode{ + Scope: swarmapi.VolumeScopeSingleNode, + Sharing: swarmapi.VolumeSharingNone, + AccessType: &swarmapi.VolumeAccessMode_Block{ + Block: &swarmapi.VolumeAccessMode_BlockVolume{}, + }, + }, + expected: &volumetypes.AccessMode{ + Scope: volumetypes.ScopeSingleNode, + Sharing: volumetypes.SharingNone, + BlockVolume: &volumetypes.TypeBlock{}, + }, + }, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + out := accessModeFromGRPC(tc.in) + assert.DeepEqual(t, tc.expected, out) + }) + } +} + +// TestVolumeCreateToGRPC tests that a docker-typed VolumeCreateBody is +// correctly converted to a swarm-typed VolumeSpec. +func TestVolumeCreateToGRPC(t *testing.T) { + volume := &volumetypes.CreateOptions{ + Driver: "plug1", + DriverOpts: map[string]string{"options": "yeah"}, + Labels: map[string]string{"labeled": "yeah"}, + Name: "volume1", + } + + spec := &volumetypes.ClusterVolumeSpec{ + Group: "gronp", + AccessMode: &volumetypes.AccessMode{ + Scope: volumetypes.ScopeMultiNode, + Sharing: volumetypes.SharingAll, + MountVolume: &volumetypes.TypeMount{ + FsType: "foo", + MountFlags: []string{"one", "two"}, + }, + }, + Secrets: []volumetypes.Secret{ + {Key: "key1", Secret: "secret1"}, + {Key: "key2", Secret: "secret2"}, + }, + AccessibilityRequirements: &volumetypes.TopologyRequirement{ + Requisite: []volumetypes.Topology{ + {Segments: map[string]string{"top1": "yup"}}, + {Segments: map[string]string{"top2": "def"}}, + {Segments: map[string]string{"top3": "nah"}}, + }, + Preferred: []volumetypes.Topology{}, + }, + CapacityRange: &volumetypes.CapacityRange{ + RequiredBytes: 1, + LimitBytes: 0, + }, + } + + volume.ClusterVolumeSpec = spec + + swarmSpec := VolumeCreateToGRPC(volume) + + assert.Assert(t, swarmSpec != nil) + expectedSwarmSpec := &swarmapi.VolumeSpec{ + Annotations: swarmapi.Annotations{ + Name: "volume1", + Labels: map[string]string{ + "labeled": "yeah", + }, + }, + Group: "gronp", + Driver: &swarmapi.Driver{ + Name: "plug1", + Options: map[string]string{ + "options": "yeah", + }, + }, + AccessMode: &swarmapi.VolumeAccessMode{ + Scope: swarmapi.VolumeScopeMultiNode, + Sharing: swarmapi.VolumeSharingAll, + AccessType: &swarmapi.VolumeAccessMode_Mount{ + Mount: &swarmapi.VolumeAccessMode_MountVolume{ + FsType: "foo", + MountFlags: []string{"one", "two"}, + }, + }, + }, + Secrets: []*swarmapi.VolumeSecret{ + {Key: "key1", Secret: "secret1"}, + {Key: "key2", Secret: "secret2"}, + }, + AccessibilityRequirements: &swarmapi.TopologyRequirement{ + Requisite: []*swarmapi.Topology{ + {Segments: map[string]string{"top1": "yup"}}, + {Segments: map[string]string{"top2": "def"}}, + {Segments: map[string]string{"top3": "nah"}}, + }, + Preferred: nil, + }, + CapacityRange: &swarmapi.CapacityRange{ + RequiredBytes: 1, + LimitBytes: 0, + }, + } + + assert.DeepEqual(t, swarmSpec, expectedSwarmSpec) +} diff --git a/daemon/cluster/executor/container/adapter.go b/daemon/cluster/executor/container/adapter.go index 590648dc4b..4ed45526d2 100644 --- a/daemon/cluster/executor/container/adapter.go +++ b/daemon/cluster/executor/container/adapter.go @@ -288,7 +288,7 @@ func (c *containerAdapter) create(ctx context.Context) error { if cr, err = c.backend.CreateManagedContainer(types.ContainerCreateConfig{ Name: c.container.name(), Config: c.container.config(), - HostConfig: c.container.hostConfig(), + HostConfig: c.container.hostConfig(c.dependencies.Volumes()), // Use the first network in container create NetworkingConfig: c.container.createNetworkingConfig(c.backend), }); err != nil { @@ -461,6 +461,30 @@ func (c *containerAdapter) createVolumes(ctx context.Context) error { return nil } +// waitClusterVolumes blocks until the VolumeGetter returns a path for each +// cluster volume in use by this task +func (c *containerAdapter) waitClusterVolumes(ctx context.Context) error { + for _, attached := range c.container.task.Volumes { + // for every attachment, try until we succeed or until the context + // is canceled. + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + // continue through the code. + } + path, err := c.dependencies.Volumes().Get(attached.ID) + if err == nil && path != "" { + // break out of the inner-most loop + break + } + } + } + log.G(ctx).Debug("volumes ready") + return nil +} + func (c *containerAdapter) activateServiceBinding() error { return c.backend.ActivateContainerServiceBinding(c.container.name()) } diff --git a/daemon/cluster/executor/container/container.go b/daemon/cluster/executor/container/container.go index c83d8fb482..77a38a714d 100644 --- a/daemon/cluster/executor/container/container.go +++ b/daemon/cluster/executor/container/container.go @@ -254,14 +254,44 @@ func (c *containerConfig) labels() map[string]string { return labels } -func (c *containerConfig) mounts() []enginemount.Mount { +func (c *containerConfig) mounts(deps exec.VolumeGetter) []enginemount.Mount { var r []enginemount.Mount for _, mount := range c.spec().Mounts { - r = append(r, convertMount(mount)) + if mount.Type == api.MountTypeCSI { + r = append(r, c.convertCSIMount(mount, deps)) + } else { + r = append(r, convertMount(mount)) + } } return r } +// convertCSIMount matches the CSI mount with the path of the CSI volume. +// +// technically quadratic with respect to the number of CSI mounts, but that +// number shouldn't ever be large enough for quadratic to matter. +// +// TODO(dperny): figure out a scheme for errors? or maybe add code to +// checkMounts? +func (c *containerConfig) convertCSIMount(m api.Mount, deps exec.VolumeGetter) enginemount.Mount { + var mount enginemount.Mount + + // these are actually bind mounts + mount.Type = enginemount.TypeBind + + for _, attach := range c.task.Volumes { + if attach.Source == m.Source && attach.Target == m.Target { + // we should not get an error here, because we should have checked + // already that the volume is ready + path, _ := deps.Get(attach.ID) + mount.Source = path + mount.Target = m.Target + } + } + + return mount +} + func convertMount(m api.Mount) enginemount.Mount { mount := enginemount.Mount{ Source: m.Source, @@ -278,6 +308,8 @@ func convertMount(m api.Mount) enginemount.Mount { mount.Type = enginemount.TypeTmpfs case api.MountTypeNamedPipe: mount.Type = enginemount.TypeNamedPipe + case api.MountTypeCSI: + mount.Type = enginemount.TypeCluster } if m.BindOptions != nil { @@ -350,12 +382,12 @@ func (c *containerConfig) healthcheck() *enginecontainer.HealthConfig { } } -func (c *containerConfig) hostConfig() *enginecontainer.HostConfig { +func (c *containerConfig) hostConfig(deps exec.VolumeGetter) *enginecontainer.HostConfig { hc := &enginecontainer.HostConfig{ Resources: c.resources(), GroupAdd: c.spec().Groups, PortBindings: c.portBindings(), - Mounts: c.mounts(), + Mounts: c.mounts(deps), ReadonlyRootfs: c.spec().ReadOnly, Isolation: c.isolation(), Init: c.init(), diff --git a/daemon/cluster/executor/container/container_test.go b/daemon/cluster/executor/container/container_test.go index 725c361bc8..8055e111fe 100644 --- a/daemon/cluster/executor/container/container_test.go +++ b/daemon/cluster/executor/container/container_test.go @@ -31,7 +31,10 @@ func TestIsolationConversion(t *testing.T) { }, } config := containerConfig{task: &task} - assert.Equal(t, c.to, config.hostConfig().Isolation) + // NOTE(dperny): you shouldn't ever pass nil outside of testing, + // because if there are CSI volumes, the code will panic. However, + // in testing. this is acceptable. + assert.Equal(t, c.to, config.hostConfig(nil).Isolation) }) } } @@ -129,7 +132,10 @@ func TestCredentialSpecConversion(t *testing.T) { }, } config := containerConfig{task: &task} - assert.DeepEqual(t, c.to, config.hostConfig().SecurityOpt) + // NOTE(dperny): you shouldn't ever pass nil outside of testing, + // because if there are CSI volumes, the code will panic. However, + // in testing. this is acceptable. + assert.DeepEqual(t, c.to, config.hostConfig(nil).SecurityOpt) }) } } diff --git a/daemon/cluster/executor/container/controller.go b/daemon/cluster/executor/container/controller.go index e2c037bc34..486342775f 100644 --- a/daemon/cluster/executor/container/controller.go +++ b/daemon/cluster/executor/container/controller.go @@ -121,6 +121,15 @@ func (r *controller) Prepare(ctx context.Context) error { return err } + // could take a while for the cluster volumes to become available. set for + // 5 minutes, I guess? + // TODO(dperny): do this more intelligently. return a better error. + waitClusterVolumesCtx, wcvcancel := context.WithTimeout(ctx, 5*time.Minute) + defer wcvcancel() + if err := r.adapter.waitClusterVolumes(waitClusterVolumesCtx); err != nil { + return err + } + // Make sure all the networks that the task needs are created. if err := r.adapter.createNetworks(ctx); err != nil { return err diff --git a/daemon/cluster/executor/container/executor.go b/daemon/cluster/executor/container/executor.go index 7a3e38876f..fd5b31a686 100644 --- a/daemon/cluster/executor/container/executor.go +++ b/daemon/cluster/executor/container/executor.go @@ -122,6 +122,9 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) { } } + // TODO(dperny): don't ignore the error here + csiInfo, _ := e.Volumes().Plugins().NodeInfo(ctx) + description := &api.NodeDescription{ Hostname: info.Name, Platform: &api.Platform{ @@ -138,6 +141,7 @@ func (e *executor) Describe(ctx context.Context) (*api.NodeDescription, error) { MemoryBytes: info.MemTotal, Generic: convert.GenericResourcesToGRPC(info.GenericResources), }, + CSIInfo: csiInfo, } // Save the node information in the executor field @@ -356,6 +360,10 @@ func (e *executor) Configs() exec.ConfigsManager { return e.dependencies.Configs() } +func (e *executor) Volumes() exec.VolumesManager { + return e.dependencies.Volumes() +} + type sortedPlugins []api.PluginDescription func (sp sortedPlugins) Len() int { return len(sp) } diff --git a/daemon/cluster/executor/container/validate.go b/daemon/cluster/executor/container/validate.go index 2fb2492a5c..03cfded8b5 100644 --- a/daemon/cluster/executor/container/validate.go +++ b/daemon/cluster/executor/container/validate.go @@ -37,6 +37,8 @@ func validateMounts(mounts []api.Mount) error { if mount.Source == "" { return errors.New("invalid npipe source, source must not be empty") } + case api.MountTypeCSI: + // nothing to do here. default: return fmt.Errorf("invalid mount type: %s", mount.Type) } diff --git a/daemon/cluster/helpers.go b/daemon/cluster/helpers.go index fc96ccc1a6..d25d6c1065 100644 --- a/daemon/cluster/helpers.go +++ b/daemon/cluster/helpers.go @@ -244,3 +244,39 @@ func getNetwork(ctx context.Context, c swarmapi.ControlClient, input string) (*s return rl.Networks[0], nil } + +func getVolume(ctx context.Context, c swarmapi.ControlClient, input string) (*swarmapi.Volume, error) { + // GetVolume to match via full ID + if v, err := c.GetVolume(ctx, &swarmapi.GetVolumeRequest{VolumeID: input}); err == nil { + return v.Volume, nil + } + + // If any error (including NotFound), list volumes to match via ID prefix + // and full name + resp, err := c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{ + Filters: &swarmapi.ListVolumesRequest_Filters{ + Names: []string{input}, + }, + }) + + if err != nil || len(resp.Volumes) == 0 { + resp, err = c.ListVolumes(ctx, &swarmapi.ListVolumesRequest{ + Filters: &swarmapi.ListVolumesRequest_Filters{ + IDPrefixes: []string{input}, + }, + }) + } + if err != nil { + return nil, err + } + + if len(resp.Volumes) == 0 { + return nil, errdefs.NotFound(fmt.Errorf("volume %s not found", input)) + } + + if l := len(resp.Volumes); l > 1 { + return nil, errdefs.InvalidParameter(fmt.Errorf("volume %s is ambiguous (%d matches found)", input, l)) + } + + return resp.Volumes[0], nil +} diff --git a/daemon/cluster/volumes.go b/daemon/cluster/volumes.go new file mode 100644 index 0000000000..6f7b085697 --- /dev/null +++ b/daemon/cluster/volumes.go @@ -0,0 +1,140 @@ +package cluster // import "github.com/docker/docker/daemon/cluster" + +import ( + "context" + "fmt" + + volumetypes "github.com/docker/docker/api/types/volume" + "github.com/docker/docker/daemon/cluster/convert" + "github.com/docker/docker/errdefs" + swarmapi "github.com/moby/swarmkit/v2/api" + "google.golang.org/grpc" +) + +// GetVolume returns a volume from the swarm cluster. +func (c *Cluster) GetVolume(nameOrID string) (volumetypes.Volume, error) { + var volume *swarmapi.Volume + + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + v, err := getVolume(ctx, state.controlClient, nameOrID) + if err != nil { + return err + } + volume = v + return nil + }); err != nil { + return volumetypes.Volume{}, err + } + return convert.VolumeFromGRPC(volume), nil +} + +// GetVolumes returns all of the volumes matching the given options from a swarm cluster. +func (c *Cluster) GetVolumes(options volumetypes.ListOptions) ([]*volumetypes.Volume, error) { + var volumes []*volumetypes.Volume + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + r, err := state.controlClient.ListVolumes( + ctx, &swarmapi.ListVolumesRequest{}, + grpc.MaxCallRecvMsgSize(defaultRecvSizeForListResponse), + ) + if err != nil { + return err + } + + volumes = make([]*volumetypes.Volume, 0, len(r.Volumes)) + for _, volume := range r.Volumes { + v := convert.VolumeFromGRPC(volume) + volumes = append(volumes, &v) + } + + return nil + }); err != nil { + return nil, err + } + + return volumes, nil +} + +// CreateVolume creates a new cluster volume in the swarm cluster. +// +// Returns the volume ID if creation is successful, or an error if not. +func (c *Cluster) CreateVolume(v volumetypes.CreateOptions) (*volumetypes.Volume, error) { + var resp *swarmapi.CreateVolumeResponse + if err := c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + volumeSpec := convert.VolumeCreateToGRPC(&v) + + r, err := state.controlClient.CreateVolume( + ctx, &swarmapi.CreateVolumeRequest{Spec: volumeSpec}, + ) + if err != nil { + return err + } + resp = r + return nil + }); err != nil { + return nil, err + } + createdVol, err := c.GetVolume(resp.Volume.ID) + if err != nil { + // If there's a failure of some sort in this operation the user would + // get a very unhelpful "not found" error on a create, which is not + // very helpful at all. Instead, before returning the error, add some + // context, and change this to a system-type error, because it's + // nothing the user did wrong. + return nil, errdefs.System(fmt.Errorf("unable to retrieve created volume: %w", err)) + } + return &createdVol, nil +} + +// RemoveVolume removes a volume from the swarm cluster. +func (c *Cluster) RemoveVolume(nameOrID string, force bool) error { + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + volume, err := getVolume(ctx, state.controlClient, nameOrID) + if err != nil { + return err + } + + req := &swarmapi.RemoveVolumeRequest{ + VolumeID: volume.ID, + Force: force, + } + _, err = state.controlClient.RemoveVolume(ctx, req) + return err + }) +} + +// UpdateVolume updates a volume in the swarm cluster. +func (c *Cluster) UpdateVolume(nameOrID string, version uint64, volume volumetypes.UpdateOptions) error { + return c.lockedManagerAction(func(ctx context.Context, state nodeState) error { + v, err := getVolume(ctx, state.controlClient, nameOrID) + if err != nil { + return err + } + + // For now, the only thing we can update is availability. Instead of + // converting the whole spec, just pluck out the availability if it has + // been set. + + if volume.Spec != nil { + switch volume.Spec.Availability { + case volumetypes.AvailabilityActive: + v.Spec.Availability = swarmapi.VolumeAvailabilityActive + case volumetypes.AvailabilityPause: + v.Spec.Availability = swarmapi.VolumeAvailabilityPause + case volumetypes.AvailabilityDrain: + v.Spec.Availability = swarmapi.VolumeAvailabilityDrain + } + // if default empty value, change nothing. + } + + _, err = state.controlClient.UpdateVolume( + ctx, &swarmapi.UpdateVolumeRequest{ + VolumeID: nameOrID, + VolumeVersion: &swarmapi.Version{ + Index: version, + }, + Spec: &v.Spec, + }, + ) + return err + }) +} diff --git a/docs/api/version-history.md b/docs/api/version-history.md index cba9957925..dea3a82fc4 100644 --- a/docs/api/version-history.md +++ b/docs/api/version-history.md @@ -77,6 +77,16 @@ keywords: "API, Docker, rcli, REST, documentation" `GET /services/{id}/logs` and `GET /tasks/{id}/logs` now set Content-Type header to `application/vnd.docker.multiplexed-stream` when a multiplexed stdout/stderr stream is sent to client, `application/vnd.docker.raw-stream` otherwise. +* `POST /volumes/create` now accepts a new `ClusterVolumeSpec` to create a cluster + volume (CNI). This option can only be used if the daemon is a Swarm manager. + The Volume response on creation now also can contain a `ClusterVolume` field + with information about the created volume. +* Volume information returned by `GET /volumes/{name}`, `GET /volumes` and + `GET /system/df` can now contain a `ClusterVolume` if the volume is a cluster + volume (requires the daemon to be a Swarm manager). +* The `Volume` type, as returned by `Added new `ClusterVolume` fields +* Added a new `PUT /volumes{name}` endpoint to update cluster volumes (CNI). + Cluster volumes are only supported if the daemon is a Swarm manager. ## v1.41 API changes diff --git a/docs/cluster_volumes.md b/docs/cluster_volumes.md new file mode 100644 index 0000000000..c19ac456a5 --- /dev/null +++ b/docs/cluster_volumes.md @@ -0,0 +1,210 @@ +Cluster Volumes +=============== + +Docker Cluster Volumes is a new feature which allows using CSI plugins to +create cluster-aware volumes + +## Installing a CSI plugin + +CSI, the Container Storage Interface, defines an API for storage providers to +write storage plugins which are cross-compatible between various container +orchestrators. However, most CSI plugins are shipped with configuration +specific to Kubernetes. Docker CSI Plugins use the same binaries as those for +Kubernetes, but in a different environment and sometimes with different +configuration. + +If a plugin is already adapted for and available for Docker, it can be +installed through the `docker plugin install` command. Though such plugins may +require configuration specific to the user's environment, they will ultimately +be detected by and work automatically with Docker once enabled. + +Currently, there is no way to automatically deploy a Docker Plugin across all +nodes in a cluster. Therefore, users must ensure the Docker Plugin is installed +on all nodes in the cluster on which it is desired. + +Docker Swarm worker nodes report their active plugins to the Docker Swarm +managers, and so it is not necessary to install a plugin on every worker node +if this is not desired. However, the plugin must be installed on every manager +node, or a leadership change could result in Docker Swarm no longer having the +ability to call the plugin. + +### Creating a Docker CSI Plugin + +Before following this section, readers should ensure they are acquainted with +the +[Docker Engine managed plugin system](https://docs.docker.com/engine/extend/). +Docker CSI plugins use this system to run. + +Docker CSI plugins are identified with a special interface type. There are two +related interfaces that CSI plugins can expose. In the `config.json`, this +should be set as such. + +```json + "interface": { + "types": ["docker.csicontroller/1.0","docker.csinode/1.0"] + } +``` + +Additionally, the CSI specification states that CSI plugins should have +`CAP_SYS_ADMIN` privileges, so this should be set in the `config.json` as +well: + +```json + "linux" : { + "capabilities": ["CAP_SYS_ADMIN"] + } +``` + +Other configuration is largely specific to the CSI plugin. + +#### Split-Component Plugins + +For split-component plugins, users can specify either the +`docker.csicontroller/1.0` or `docker.csinode/1.0` plugin interfaces. Manager +nodes should run plugin instances with the `docker.csicontroller/1.0` +interface, and worker nodes the `docker.csinode/1.0` interface. + +Docker does support running two plugins with the same name, nor does it support +specifying different drivers for the node and controller plugins. This means in +a fully split plugin, Swarm will be unable to schedule volumes to manager +nodes. + +If it is desired to run a split-component plugin such that the Volumes managed +by that plugin are accessible to Tasks on the manager node, the user will need +to build the plugin such that some proxy or multiplexer provides the illusion +of combined components to the manager through one socket, and ensure the plugin +reports both interface types. + +## Using Cluster Volumes + +### Create a Cluster Volume + +Creating a Cluster Volume is done with the same `docker volume` commands as any +other Volume. To create a Cluster Volume, one needs to do both of things: + +* Specify a CSI-capable driver with the `--driver` or `-d` option. +* Use any one of the cluster-specific `docker volume create` flags. + +For example, to create a Cluster Volume called `my-volume` with the +`democratic-csi` Volume Driver, one might use this command: + +```bash +docker volume create \ + --driver democratic-csi \ + --type mount \ + --sharing all \ + --scope multi \ + --limit-bytes 10G \ + --required-bytes 1G \ + my-volume +``` + +### List Cluster Volumes + +Cluster Volumes will be listed along with other volumes when doing +`docker volume ls`. However, if users want to see only Cluster Volumes, and +with cluster-specific information, the flag `--cluster` can be specified: + +``` +$ docker volume ls --cluster +VOLUME NAME GROUP DRIVER AVAILABILITY STATUS +volume1 group1 driver1 active pending creation +volume2 group1 driver1 pause created +volume3 group2 driver2 active in use (1 node) +volume4 group2 driver2 active in use (2 nodes) +``` + +### Deploying a Service + +Cluster Volumes are only compatible with Docker Services, not plain Docker +Containers. + +In Docker Services, a Cluster Volume is used the same way any other volume +would be used. The `type` should be set to `csi`. For example, to create a +Service that uses `my-volume` created above, one would execute a command like: + +```bash +docker service create \ + --name my-service \ + --mount type=csi,src=my-volume,dst=/srv/www \ + nginx:alpine +``` + +When scheduling Services which use Cluster Volumes, Docker Swarm uses the +volume's information and state to make decisions about Task placement. + +For example, the Service will be constrained to run only on nodes on which the +volume is available. If the volume is configured with `scope=single`, meaning +it can only be used on one node in the cluster at a time, then all Tasks for +that Service will be scheduled to that same node. If that node changes for some +reason, like a node failure, then the Tasks will be rescheduled to the new +node automatically, without user input. + +If the Cluster Volume is accessible only on some set of nodes at the same time, +and not the whole cluster, then Docker Swarm will only schedule the Service to +those nodes as reported by the plugin. + +### Using Volume Groups + +It is frequently desirable that a Service use any available volume out of an +interchangeable set. To accomplish this in the most simple and straightforward +manner possible, Cluster Volumes use the concept of a volume "Group". + +The Volume Group is a field, somewhat like a special label, which is used to +instruct Swarm that a given volume is interchangeable with every other volume +of the same Group. When creating a Cluster Volume, the Group can be specified +by using the `--group` flag. + +To use a Cluster Volume by Group instead of by Name, the mount `src` option is +prefixed with `group:`, followed by the group name. For example: + +``` +--mount type=csi,src=group:my-group,dst=/srv/www +``` + +This instructs Docker Swarm that any Volume with the Group `my-group` can be +used to satisfy the mounts. + +Volumes in a Group do not need to be identical, but they must be +interchangeable. These caveats should be kept in mind when using Groups: + +* No Service ever gets the monopoly on a Cluster Volume. If several Services + use the same Group, then the Cluster Volumes in that Group can be used with + any of those Services at any time. Just because a particular Volume was used + by a particular Service at one point does not mean it won't be used by a + different Service later. +* Volumes in a group can have different configurations, but all of those + configurations must be compatible with the Service. For example, if some of + the Volumes in a group have `sharing=readonly`, then the Service must be + capable of using the volume in read-only mode. +* Volumes in a Group are created statically ahead of time, not dynamically + as-needed. This means that the user must ensure a sufficient number of + Volumes belong to the desired Group to support the needs of the Service. + +### Taking Cluster Volumes Offline + +For various reasons, users may wish to take a particular Cluster Volume +offline, such that is not actively used by Services. To facilitate this, +Cluster Volumes have an `availability` option similar to Docker Swarm nodes. + +Cluster Volume availability can be one of three states: + +* `active` - Default. Volume can be used as normal. +* `pause` - The volume will not be used for new Services, but existing Tasks + using the volume will not be stopped. +* `drain` - The volume will not be used for new Services, and any running Tasks + using the volume will be stopped and rescheduled. + +A Volume can only be removed from the cluster entirely if its availability is +set to `drain`, and it has been fully unpublished from all nodes. + +#### Force-Removing Volumes + +There are cases where a Volume can get caught in a state where Swarm cannot +verify their removal. In these cases, + +## Unsupported Features + +The CSI Spec allows for a large number of features which Cluster Volumes in +this initial implementation do not support. Most notably, Cluster Volumes do +not support snapshots, cloning, or volume expansion. diff --git a/plugin/manager.go b/plugin/manager.go index 0504362f3f..12c120ec44 100644 --- a/plugin/manager.go +++ b/plugin/manager.go @@ -211,7 +211,7 @@ func (pm *Manager) reload() error { // todo: restore // We should only enable rootfs propagation for certain plugin types that need it. for _, typ := range p.PluginObj.Config.Interface.Types { - if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") { + if (typ.Capability == "volumedriver" || typ.Capability == "graphdriver" || typ.Capability == "csinode" || typ.Capability == "csicontroller") && typ.Prefix == "docker" && strings.HasPrefix(typ.Version, "1.") { if p.PluginObj.Config.PropagatedMount != "" { propRoot := filepath.Join(filepath.Dir(p.Rootfs), "propagated-mount")