mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #42028 from thaJeztah/fix_duplicate_volume_event
volumes: only send "create" event when actually creating volume
This commit is contained in:
commit
ef4d473401
4 changed files with 118 additions and 27 deletions
|
@ -182,15 +182,14 @@ func (s *DockerSuite) TestVolumeEvents(c *testing.T) {
|
||||||
until := daemonUnixTime(c)
|
until := daemonUnixTime(c)
|
||||||
out, _ := dockerCmd(c, "events", "--since", since, "--until", until)
|
out, _ := dockerCmd(c, "events", "--since", since, "--until", until)
|
||||||
events := strings.Split(strings.TrimSpace(out), "\n")
|
events := strings.Split(strings.TrimSpace(out), "\n")
|
||||||
assert.Assert(c, len(events) > 4)
|
assert.Assert(c, len(events) > 3)
|
||||||
|
|
||||||
volumeEvents := eventActionsByIDAndType(c, events, "test-event-volume-local", "volume")
|
volumeEvents := eventActionsByIDAndType(c, events, "test-event-volume-local", "volume")
|
||||||
assert.Equal(c, len(volumeEvents), 5)
|
assert.Equal(c, len(volumeEvents), 4)
|
||||||
assert.Equal(c, volumeEvents[0], "create")
|
assert.Equal(c, volumeEvents[0], "create")
|
||||||
assert.Equal(c, volumeEvents[1], "create")
|
assert.Equal(c, volumeEvents[1], "mount")
|
||||||
assert.Equal(c, volumeEvents[2], "mount")
|
assert.Equal(c, volumeEvents[2], "unmount")
|
||||||
assert.Equal(c, volumeEvents[3], "unmount")
|
assert.Equal(c, volumeEvents[3], "destroy")
|
||||||
assert.Equal(c, volumeEvents[4], "destroy")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *DockerSuite) TestNetworkEvents(c *testing.T) {
|
func (s *DockerSuite) TestNetworkEvents(c *testing.T) {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package system // import "github.com/docker/docker/integration/system"
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
|
@ -11,9 +12,12 @@ import (
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/api/types"
|
"github.com/docker/docker/api/types"
|
||||||
|
"github.com/docker/docker/api/types/events"
|
||||||
"github.com/docker/docker/api/types/filters"
|
"github.com/docker/docker/api/types/filters"
|
||||||
|
"github.com/docker/docker/api/types/mount"
|
||||||
"github.com/docker/docker/api/types/strslice"
|
"github.com/docker/docker/api/types/strslice"
|
||||||
"github.com/docker/docker/api/types/versions"
|
"github.com/docker/docker/api/types/versions"
|
||||||
|
"github.com/docker/docker/api/types/volume"
|
||||||
"github.com/docker/docker/integration/internal/container"
|
"github.com/docker/docker/integration/internal/container"
|
||||||
"github.com/docker/docker/pkg/jsonmessage"
|
"github.com/docker/docker/pkg/jsonmessage"
|
||||||
"github.com/docker/docker/testutil/request"
|
"github.com/docker/docker/testutil/request"
|
||||||
|
@ -122,3 +126,69 @@ func TestEventsBackwardsCompatible(t *testing.T) {
|
||||||
assert.Check(t, is.Equal(cID, containerCreateEvent.ID))
|
assert.Check(t, is.Equal(cID, containerCreateEvent.ID))
|
||||||
assert.Check(t, is.Equal("busybox", containerCreateEvent.From))
|
assert.Check(t, is.Equal("busybox", containerCreateEvent.From))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TestEventsVolumeCreate verifies that volume create events are only fired
|
||||||
|
// once: when creating the volume, and not when attaching to a container.
|
||||||
|
func TestEventsVolumeCreate(t *testing.T) {
|
||||||
|
skip.If(t, testEnv.OSType == "windows", "FIXME: Windows doesn't trigger the events? Could be a race")
|
||||||
|
|
||||||
|
defer setupTest(t)()
|
||||||
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
client := testEnv.APIClient()
|
||||||
|
|
||||||
|
since := request.DaemonUnixTime(ctx, t, client, testEnv)
|
||||||
|
volName := t.Name()
|
||||||
|
getEvents := func(messages <-chan events.Message, errs <-chan error) ([]events.Message, error) {
|
||||||
|
var evts []events.Message
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case m := <-messages:
|
||||||
|
evts = append(evts, m)
|
||||||
|
case err := <-errs:
|
||||||
|
if err == io.EOF {
|
||||||
|
return evts, nil
|
||||||
|
}
|
||||||
|
return nil, err
|
||||||
|
case <-time.After(time.Second * 3):
|
||||||
|
return nil, errors.New("timeout hit")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_, err := client.VolumeCreate(ctx, volume.VolumeCreateBody{Name: volName})
|
||||||
|
assert.NilError(t, err)
|
||||||
|
|
||||||
|
filter := filters.NewArgs(
|
||||||
|
filters.Arg("type", "volume"),
|
||||||
|
filters.Arg("event", "create"),
|
||||||
|
filters.Arg("volume", volName),
|
||||||
|
)
|
||||||
|
messages, errs := client.Events(ctx, types.EventsOptions{
|
||||||
|
Since: since,
|
||||||
|
Until: request.DaemonUnixTime(ctx, t, client, testEnv),
|
||||||
|
Filters: filter,
|
||||||
|
})
|
||||||
|
|
||||||
|
volEvents, err := getEvents(messages, errs)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Equal(t, len(volEvents), 1, "expected volume create event when creating a volume")
|
||||||
|
|
||||||
|
container.Create(ctx, t, client, container.WithMount(mount.Mount{
|
||||||
|
Type: mount.TypeVolume,
|
||||||
|
Source: volName,
|
||||||
|
Target: "/tmp/foo",
|
||||||
|
}))
|
||||||
|
|
||||||
|
messages, errs = client.Events(ctx, types.EventsOptions{
|
||||||
|
Since: since,
|
||||||
|
Until: request.DaemonUnixTime(ctx, t, client, testEnv),
|
||||||
|
Filters: filter,
|
||||||
|
})
|
||||||
|
|
||||||
|
volEvents, err = getEvents(messages, errs)
|
||||||
|
assert.NilError(t, err)
|
||||||
|
assert.Equal(t, len(volEvents), 1, "expected volume create event to be fired only once")
|
||||||
|
}
|
||||||
|
|
|
@ -23,7 +23,9 @@ type ds interface {
|
||||||
GetDriverList() []string
|
GetDriverList() []string
|
||||||
}
|
}
|
||||||
|
|
||||||
type volumeEventLogger interface {
|
// VolumeEventLogger interface provides methods to log volume-related events
|
||||||
|
type VolumeEventLogger interface {
|
||||||
|
// LogVolumeEvent generates an event related to a volume.
|
||||||
LogVolumeEvent(volumeID, action string, attributes map[string]string)
|
LogVolumeEvent(volumeID, action string, attributes map[string]string)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -33,17 +35,17 @@ type VolumesService struct {
|
||||||
vs *VolumeStore
|
vs *VolumeStore
|
||||||
ds ds
|
ds ds
|
||||||
pruneRunning int32
|
pruneRunning int32
|
||||||
eventLogger volumeEventLogger
|
eventLogger VolumeEventLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewVolumeService creates a new volume service
|
// NewVolumeService creates a new volume service
|
||||||
func NewVolumeService(root string, pg plugingetter.PluginGetter, rootIDs idtools.Identity, logger volumeEventLogger) (*VolumesService, error) {
|
func NewVolumeService(root string, pg plugingetter.PluginGetter, rootIDs idtools.Identity, logger VolumeEventLogger) (*VolumesService, error) {
|
||||||
ds := drivers.NewStore(pg)
|
ds := drivers.NewStore(pg)
|
||||||
if err := setupDefaultDriver(ds, root, rootIDs); err != nil {
|
if err := setupDefaultDriver(ds, root, rootIDs); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
vs, err := NewStore(root, ds)
|
vs, err := NewStore(root, ds, WithEventLogger(logger))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -71,7 +73,6 @@ func (s *VolumesService) Create(ctx context.Context, name, driverName string, op
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
s.eventLogger.LogVolumeEvent(v.Name(), "create", map[string]string{"driver": v.DriverName()})
|
|
||||||
apiV := volumeToAPIType(v)
|
apiV := volumeToAPIType(v)
|
||||||
return &apiV, nil
|
return &apiV, nil
|
||||||
}
|
}
|
||||||
|
@ -161,10 +162,6 @@ func (s *VolumesService) Remove(ctx context.Context, name string, rmOpts ...opts
|
||||||
} else if IsNotExist(err) && cfg.PurgeOnError {
|
} else if IsNotExist(err) && cfg.PurgeOnError {
|
||||||
err = nil
|
err = nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
s.eventLogger.LogVolumeEvent(v.Name(), "destroy", map[string]string{"driver": v.DriverName()})
|
|
||||||
}
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -68,8 +68,11 @@ func (v volumeWrapper) CachedPath() string {
|
||||||
return v.Volume.Path()
|
return v.Volume.Path()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// StoreOpt sets options for a VolumeStore
|
||||||
|
type StoreOpt func(store *VolumeStore) error
|
||||||
|
|
||||||
// NewStore creates a new volume store at the given path
|
// NewStore creates a new volume store at the given path
|
||||||
func NewStore(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
|
func NewStore(rootPath string, drivers *drivers.Store, opts ...StoreOpt) (*VolumeStore, error) {
|
||||||
vs := &VolumeStore{
|
vs := &VolumeStore{
|
||||||
locks: &locker.Locker{},
|
locks: &locker.Locker{},
|
||||||
names: make(map[string]volume.Volume),
|
names: make(map[string]volume.Volume),
|
||||||
|
@ -79,6 +82,12 @@ func NewStore(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
|
||||||
drivers: drivers,
|
drivers: drivers,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for _, o := range opts {
|
||||||
|
if err := o(vs); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if rootPath != "" {
|
if rootPath != "" {
|
||||||
// initialize metadata store
|
// initialize metadata store
|
||||||
volPath := filepath.Join(rootPath, volumeDataDir)
|
volPath := filepath.Join(rootPath, volumeDataDir)
|
||||||
|
@ -108,6 +117,14 @@ func NewStore(rootPath string, drivers *drivers.Store) (*VolumeStore, error) {
|
||||||
return vs, nil
|
return vs, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// WithEventLogger configures the VolumeStore with the given VolumeEventLogger
|
||||||
|
func WithEventLogger(logger VolumeEventLogger) StoreOpt {
|
||||||
|
return func(store *VolumeStore) error {
|
||||||
|
store.eventLogger = logger
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) {
|
func (s *VolumeStore) getNamed(name string) (volume.Volume, bool) {
|
||||||
s.globalLock.RLock()
|
s.globalLock.RLock()
|
||||||
v, exists := s.names[name]
|
v, exists := s.names[name]
|
||||||
|
@ -198,7 +215,9 @@ type VolumeStore struct {
|
||||||
labels map[string]map[string]string
|
labels map[string]map[string]string
|
||||||
// options stores volume options for each volume
|
// options stores volume options for each volume
|
||||||
options map[string]map[string]string
|
options map[string]map[string]string
|
||||||
|
|
||||||
db *bolt.DB
|
db *bolt.DB
|
||||||
|
eventLogger VolumeEventLogger
|
||||||
}
|
}
|
||||||
|
|
||||||
func filterByDriver(names []string) filterFunc {
|
func filterByDriver(names []string) filterFunc {
|
||||||
|
@ -464,7 +483,7 @@ func (s *VolumeStore) Create(ctx context.Context, name, driverName string, creat
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := s.create(ctx, name, driverName, cfg.Options, cfg.Labels)
|
v, created, err := s.create(ctx, name, driverName, cfg.Options, cfg.Labels)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if _, ok := err.(*OpErr); ok {
|
if _, ok := err.(*OpErr); ok {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
@ -472,6 +491,9 @@ func (s *VolumeStore) Create(ctx context.Context, name, driverName string, creat
|
||||||
return nil, &OpErr{Err: err, Name: name, Op: "create"}
|
return nil, &OpErr{Err: err, Name: name, Op: "create"}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if created && s.eventLogger != nil {
|
||||||
|
s.eventLogger.LogVolumeEvent(v.Name(), "create", map[string]string{"driver": v.DriverName()})
|
||||||
|
}
|
||||||
s.setNamed(v, cfg.Reference)
|
s.setNamed(v, cfg.Reference)
|
||||||
return v, nil
|
return v, nil
|
||||||
}
|
}
|
||||||
|
@ -552,7 +574,7 @@ func volumeExists(ctx context.Context, store *drivers.Store, v volume.Volume) (b
|
||||||
// for the given volume name, an error is returned after checking if the reference is stale.
|
// for the given volume name, an error is returned after checking if the reference is stale.
|
||||||
// If the reference is stale, it will be purged and this create can continue.
|
// If the reference is stale, it will be purged and this create can continue.
|
||||||
// It is expected that callers of this function hold any necessary locks.
|
// It is expected that callers of this function hold any necessary locks.
|
||||||
func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts, labels map[string]string) (volume.Volume, error) {
|
func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts, labels map[string]string) (volume.Volume, bool, error) {
|
||||||
// Validate the name in a platform-specific manner
|
// Validate the name in a platform-specific manner
|
||||||
|
|
||||||
// volume name validation is specific to the host os and not on container image
|
// volume name validation is specific to the host os and not on container image
|
||||||
|
@ -560,19 +582,19 @@ func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts,
|
||||||
parser := volumemounts.NewParser(runtime.GOOS)
|
parser := volumemounts.NewParser(runtime.GOOS)
|
||||||
err := parser.ValidateVolumeName(name)
|
err := parser.ValidateVolumeName(name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
v, err := s.checkConflict(ctx, name, driverName)
|
v, err := s.checkConflict(ctx, name, driverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if v != nil {
|
if v != nil {
|
||||||
// there is an existing volume, if we already have this stored locally, return it.
|
// there is an existing volume, if we already have this stored locally, return it.
|
||||||
// TODO: there could be some inconsistent details such as labels here
|
// TODO: there could be some inconsistent details such as labels here
|
||||||
if vv, _ := s.getNamed(v.Name()); vv != nil {
|
if vv, _ := s.getNamed(v.Name()); vv != nil {
|
||||||
return vv, nil
|
return vv, false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -580,7 +602,7 @@ func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts,
|
||||||
if driverName == "" {
|
if driverName == "" {
|
||||||
v, _ = s.getVolume(ctx, name, "")
|
v, _ = s.getVolume(ctx, name, "")
|
||||||
if v != nil {
|
if v != nil {
|
||||||
return v, nil
|
return v, false, nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -589,7 +611,7 @@ func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts,
|
||||||
}
|
}
|
||||||
vd, err := s.drivers.CreateDriver(driverName)
|
vd, err := s.drivers.CreateDriver(driverName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, &OpErr{Op: "create", Name: name, Err: err}
|
return nil, false, &OpErr{Op: "create", Name: name, Err: err}
|
||||||
}
|
}
|
||||||
|
|
||||||
logrus.Debugf("Registering new volume reference: driver %q, name %q", vd.Name(), name)
|
logrus.Debugf("Registering new volume reference: driver %q, name %q", vd.Name(), name)
|
||||||
|
@ -599,7 +621,7 @@ func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts,
|
||||||
if _, err := s.drivers.ReleaseDriver(driverName); err != nil {
|
if _, err := s.drivers.ReleaseDriver(driverName); err != nil {
|
||||||
logrus.WithError(err).WithField("driver", driverName).Error("Error releasing reference to volume driver")
|
logrus.WithError(err).WithField("driver", driverName).Error("Error releasing reference to volume driver")
|
||||||
}
|
}
|
||||||
return nil, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -617,9 +639,9 @@ func (s *VolumeStore) create(ctx context.Context, name, driverName string, opts,
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.setMeta(name, metadata); err != nil {
|
if err := s.setMeta(name, metadata); err != nil {
|
||||||
return nil, err
|
return nil, true, err
|
||||||
}
|
}
|
||||||
return volumeWrapper{v, labels, vd.Scope(), opts}, nil
|
return volumeWrapper{v, labels, vd.Scope(), opts}, true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get looks if a volume with the given name exists and returns it if so
|
// Get looks if a volume with the given name exists and returns it if so
|
||||||
|
@ -802,6 +824,9 @@ func (s *VolumeStore) Remove(ctx context.Context, v volume.Volume, rmOpts ...opt
|
||||||
err = e
|
err = e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if err == nil && s.eventLogger != nil {
|
||||||
|
s.eventLogger.LogVolumeEvent(v.Name(), "destroy", map[string]string{"driver": v.DriverName()})
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue