diff --git a/container/container.go b/container/container.go index f1c8a10cd7..4b42924301 100644 --- a/container/container.go +++ b/container/container.go @@ -189,6 +189,21 @@ func (container *Container) ToDiskLocking() error { return err } +// CheckpointTo makes the Container's current state visible to queries. +// Callers must hold a Container lock. +func (container *Container) CheckpointTo(store ViewDB) error { + return store.Save(container.snapshot()) +} + +// CheckpointAndSaveToDisk is equivalent to calling CheckpointTo and ToDisk. +// Callers must hold a Container lock. +func (container *Container) CheckpointAndSaveToDisk(store ViewDB) error { + if err := container.CheckpointTo(store); err != nil { + return err + } + return container.ToDisk() +} + // readHostConfig reads the host configuration from disk for the container. func (container *Container) readHostConfig() error { container.HostConfig = &containertypes.HostConfig{} diff --git a/container/snapshot.go b/container/snapshot.go index a56cabcb96..1e81039bbc 100644 --- a/container/snapshot.go +++ b/container/snapshot.go @@ -41,7 +41,7 @@ type Snapshot struct { } // Snapshot provides a read only view of a Container. Callers must hold a Lock on the container object. -func (container *Container) Snapshot() *Snapshot { +func (container *Container) snapshot() *Snapshot { snapshot := &Snapshot{ ID: container.ID, Name: container.Name, diff --git a/container/view.go b/container/view.go index aa122fe2c7..dd4708c313 100644 --- a/container/view.go +++ b/container/view.go @@ -8,6 +8,19 @@ const ( memdbIDIndex = "id" ) +// ViewDB provides an in-memory transactional (ACID) container Store +type ViewDB interface { + Snapshot() View + Save(snapshot *Snapshot) error + Delete(id string) error +} + +// View can be used by readers to avoid locking +type View interface { + All() ([]Snapshot, error) + Get(id string) (*Snapshot, error) +} + var schema = &memdb.DBSchema{ Tables: map[string]*memdb.TableSchema{ memdbTable: { @@ -23,46 +36,44 @@ var schema = &memdb.DBSchema{ }, } -// MemDB provides an in-memory transactional (ACID) container Store -type MemDB struct { +type memDB struct { store *memdb.MemDB } -// NewMemDB provides the default implementation, with the default schema -func NewMemDB() (*MemDB, error) { +// NewViewDB provides the default implementation, with the default schema +func NewViewDB() (ViewDB, error) { store, err := memdb.NewMemDB(schema) if err != nil { return nil, err } - return &MemDB{store: store}, nil + return &memDB{store: store}, nil } // Snapshot provides a consistent read-only View of the database -func (db *MemDB) Snapshot() *View { - return &View{db.store.Txn(false)} +func (db *memDB) Snapshot() View { + return &memdbView{db.store.Txn(false)} } // Save atomically updates the in-memory store -func (db *MemDB) Save(snapshot *Snapshot) error { +func (db *memDB) Save(snapshot *Snapshot) error { txn := db.store.Txn(true) defer txn.Commit() return txn.Insert(memdbTable, snapshot) } // Delete removes an item by ID -func (db *MemDB) Delete(id string) error { +func (db *memDB) Delete(id string) error { txn := db.store.Txn(true) defer txn.Commit() return txn.Delete(memdbTable, &Snapshot{ID: id}) } -// View can be used by readers to avoid locking -type View struct { +type memdbView struct { txn *memdb.Txn } // All returns a all items in this snapshot -func (v *View) All() ([]Snapshot, error) { +func (v *memdbView) All() ([]Snapshot, error) { var all []Snapshot iter, err := v.txn.Get(memdbTable, memdbIDIndex) if err != nil { @@ -80,7 +91,7 @@ func (v *View) All() ([]Snapshot, error) { } //Get returns an item by id -func (v *View) Get(id string) (*Snapshot, error) { +func (v *memdbView) Get(id string) (*Snapshot, error) { s, err := v.txn.First(memdbTable, memdbIDIndex, id) if err != nil { return nil, err diff --git a/container/view_test.go b/container/view_test.go index 32bdf4957c..20424ffd98 100644 --- a/container/view_test.go +++ b/container/view_test.go @@ -3,27 +3,26 @@ package container import "testing" func TestViewSave(t *testing.T) { - db, err := NewMemDB() + db, err := NewViewDB() if err != nil { t.Fatal(err) } - snapshot := NewBaseContainer("id", "root").Snapshot() - if err := db.Save(snapshot); err != nil { + c := NewBaseContainer("id", "root") + if err := c.CheckpointTo(db); err != nil { t.Fatal(err) - } } func TestViewAll(t *testing.T) { var ( - db, _ = NewMemDB() - one = NewBaseContainer("id1", "root1").Snapshot() - two = NewBaseContainer("id2", "root2").Snapshot() + db, _ = NewViewDB() + one = NewBaseContainer("id1", "root1") + two = NewBaseContainer("id2", "root2") ) one.Pid = 10 two.Pid = 20 - db.Save(one) - db.Save(two) + one.CheckpointTo(db) + two.CheckpointTo(db) all, err := db.Snapshot().All() if err != nil { t.Fatal(err) @@ -44,10 +43,10 @@ func TestViewAll(t *testing.T) { } func TestViewGet(t *testing.T) { - db, _ := NewMemDB() + db, _ := NewViewDB() one := NewBaseContainer("id", "root") one.ImageID = "some-image-123" - db.Save(one.Snapshot()) + one.CheckpointTo(db) s, err := db.Snapshot().Get("id") if err != nil { t.Fatal(err) diff --git a/daemon/container.go b/daemon/container.go index 5d0d0438e4..72689fbbaa 100644 --- a/daemon/container.go +++ b/daemon/container.go @@ -108,13 +108,13 @@ func (daemon *Daemon) Register(c *container.Container) error { } // once in the memory store it is visible to other goroutines - // grab a Lock until it has been replicated to avoid races + // grab a Lock until it has been checkpointed to avoid races c.Lock() defer c.Unlock() daemon.containers.Add(c.ID, c) daemon.idIndex.Add(c.ID) - return daemon.containersReplica.Save(c.Snapshot()) + return c.CheckpointTo(daemon.containersReplica) } func (daemon *Daemon) newContainer(name string, platform string, config *containertypes.Config, hostConfig *containertypes.HostConfig, imgID image.ID, managed bool) (*container.Container, error) { @@ -218,10 +218,7 @@ func (daemon *Daemon) setHostConfig(container *container.Container, hostConfig * runconfig.SetDefaultNetModeIfBlank(hostConfig) container.HostConfig = hostConfig - if err := daemon.containersReplica.Save(container.Snapshot()); err != nil { - return err - } - return container.ToDisk() + return container.CheckpointAndSaveToDisk(daemon.containersReplica) } // verifyContainerSettings performs validation of the hostconfig and config diff --git a/daemon/container_operations.go b/daemon/container_operations.go index fe83928472..9e5a02c3f8 100644 --- a/daemon/container_operations.go +++ b/daemon/container_operations.go @@ -45,14 +45,11 @@ func (daemon *Daemon) getDNSSearchSettings(container *container.Container) []str return nil } -func (daemon *Daemon) saveAndReplicate(container *container.Container) error { +func (daemon *Daemon) checkpointAndSave(container *container.Container) error { container.Lock() defer container.Unlock() - if err := daemon.containersReplica.Save(container.Snapshot()); err != nil { - return fmt.Errorf("Error replicating container state: %v", err) - } - if err := container.ToDisk(); err != nil { - return fmt.Errorf("Error saving container to disk: %v", err) + if err := container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { + return fmt.Errorf("Error saving container state: %v", err) } return nil } @@ -1018,10 +1015,8 @@ func (daemon *Daemon) ConnectToNetwork(container *container.Container, idOrName return err } } - if err := daemon.saveAndReplicate(container); err != nil { - return fmt.Errorf("Error saving container to disk: %v", err) - } - return nil + + return daemon.checkpointAndSave(container) } // DisconnectFromNetwork disconnects container from network n. @@ -1057,8 +1052,8 @@ func (daemon *Daemon) DisconnectFromNetwork(container *container.Container, netw return err } - if err := daemon.saveAndReplicate(container); err != nil { - return fmt.Errorf("Error saving container to disk: %v", err) + if err := daemon.checkpointAndSave(container); err != nil { + return err } if n != nil { diff --git a/daemon/daemon.go b/daemon/daemon.go index f2a83cc4d0..ab46ad3341 100644 --- a/daemon/daemon.go +++ b/daemon/daemon.go @@ -83,7 +83,7 @@ type Daemon struct { ID string repository string containers container.Store - containersReplica *container.MemDB + containersReplica container.ViewDB execCommands *exec.Store downloadManager *xfer.LayerDownloadManager uploadManager *xfer.LayerUploadManager @@ -762,7 +762,7 @@ func NewDaemon(config *config.Config, registryService registry.Service, containe d.ID = trustKey.PublicKey().KeyID() d.repository = daemonRepo d.containers = container.NewMemoryStore() - if d.containersReplica, err = container.NewMemDB(); err != nil { + if d.containersReplica, err = container.NewViewDB(); err != nil { return nil, err } d.execCommands = exec.NewStore() diff --git a/daemon/delete.go b/daemon/delete.go index ad5eacc61c..3609a88e9f 100644 --- a/daemon/delete.go +++ b/daemon/delete.go @@ -105,7 +105,7 @@ func (daemon *Daemon) cleanupContainer(container *container.Container, forceRemo // Mark container dead. We don't want anybody to be restarting it. container.Lock() container.Dead = true - if err = daemon.containersReplica.Save(container.Snapshot()); err != nil { + if err = container.CheckpointTo(daemon.containersReplica); err != nil { container.Unlock() return err } diff --git a/daemon/health.go b/daemon/health.go index 3419511d36..00528db2ae 100644 --- a/daemon/health.go +++ b/daemon/health.go @@ -168,9 +168,9 @@ func handleProbeResult(d *Daemon, c *container.Container, result *types.Healthch } // replicate Health status changes - if err := d.containersReplica.Save(c.Snapshot()); err != nil { + if err := c.CheckpointTo(d.containersReplica); err != nil { // queries will be inconsistent until the next probe runs or other state mutations - // trigger a replication + // checkpoint the container logrus.Errorf("Error replicating health state for container %s: %v", c.ID, err) } diff --git a/daemon/health_test.go b/daemon/health_test.go index 0944e3fa33..4fd89140d3 100644 --- a/daemon/health_test.go +++ b/daemon/health_test.go @@ -29,7 +29,7 @@ func TestNoneHealthcheck(t *testing.T) { }, State: &container.State{}, } - store, err := container.NewMemDB() + store, err := container.NewViewDB() if err != nil { t.Fatal(err) } @@ -69,7 +69,7 @@ func TestHealthStates(t *testing.T) { }, } - store, err := container.NewMemDB() + store, err := container.NewViewDB() if err != nil { t.Fatal(err) } diff --git a/daemon/list.go b/daemon/list.go index b810164f28..2d10e3d6de 100644 --- a/daemon/list.go +++ b/daemon/list.go @@ -114,7 +114,7 @@ func (daemon *Daemon) Containers(config *types.ContainerListOptions) ([]*types.C return daemon.reduceContainers(config, daemon.transformContainer) } -func (daemon *Daemon) filterByNameIDMatches(view *container.View, ctx *listContext) ([]container.Snapshot, error) { +func (daemon *Daemon) filterByNameIDMatches(view container.View, ctx *listContext) ([]container.Snapshot, error) { idSearch := false names := ctx.filters.Get("name") ids := ctx.filters.Get("id") @@ -240,7 +240,7 @@ func (daemon *Daemon) reducePsContainer(container *container.Snapshot, ctx *list } // foldFilter generates the container filter based on the user's filtering options. -func (daemon *Daemon) foldFilter(view *container.View, config *types.ContainerListOptions) (*listContext, error) { +func (daemon *Daemon) foldFilter(view container.View, config *types.ContainerListOptions) (*listContext, error) { psFilters := config.Filters if err := psFilters.Validate(acceptedPsFilterTags); err != nil { diff --git a/daemon/monitor.go b/daemon/monitor.go index a581230ce5..be8a500020 100644 --- a/daemon/monitor.go +++ b/daemon/monitor.go @@ -90,10 +90,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { daemon.setStateCounter(c) defer c.Unlock() - if err := daemon.containersReplica.Save(c.Snapshot()); err != nil { - return err - } - if err := c.ToDisk(); err != nil { + if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { return err } return daemon.postRunProcessing(c, e) @@ -122,11 +119,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { c.HasBeenStartedBefore = true daemon.setStateCounter(c) - if err := daemon.containersReplica.Save(c.Snapshot()); err != nil { - c.Reset(false) - return err - } - if err := c.ToDisk(); err != nil { + if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { c.Reset(false) return err } @@ -137,10 +130,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { // Container is already locked in this case c.Paused = true daemon.setStateCounter(c) - if err := daemon.containersReplica.Save(c.Snapshot()); err != nil { - return err - } - if err := c.ToDisk(); err != nil { + if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { return err } daemon.updateHealthMonitor(c) @@ -149,10 +139,7 @@ func (daemon *Daemon) StateChanged(id string, e libcontainerd.StateInfo) error { // Container is already locked in this case c.Paused = false daemon.setStateCounter(c) - if err := daemon.containersReplica.Save(c.Snapshot()); err != nil { - return err - } - if err := c.ToDisk(); err != nil { + if err := c.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { return err } daemon.updateHealthMonitor(c) diff --git a/daemon/rename.go b/daemon/rename.go index 0a3730ff16..1a5e985cfc 100644 --- a/daemon/rename.go +++ b/daemon/rename.go @@ -82,10 +82,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error { daemon.nameIndex.Release(oldName + k) } daemon.releaseName(oldName) - if err = daemon.containersReplica.Save(container.Snapshot()); err != nil { - return err - } - if err = container.ToDisk(); err != nil { + if err = container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { return err } @@ -102,10 +99,7 @@ func (daemon *Daemon) ContainerRename(oldName, newName string) error { if err != nil { container.Name = oldName container.NetworkSettings.IsAnonymousEndpoint = oldIsAnonymousEndpoint - if e := daemon.containersReplica.Save(container.Snapshot()); err != nil { - logrus.Errorf("%s: Failed in replicating state on rename failure: %v", container.ID, e) - } - if e := container.ToDisk(); e != nil { + if e := container.CheckpointAndSaveToDisk(daemon.containersReplica); e != nil { logrus.Errorf("%s: Failed in writing to Disk on rename failure: %v", container.ID, e) } } diff --git a/daemon/start.go b/daemon/start.go index 5d8a1286aa..c2e9b7069f 100644 --- a/daemon/start.go +++ b/daemon/start.go @@ -117,11 +117,8 @@ func (daemon *Daemon) containerStart(container *container.Container, checkpoint if container.ExitCode() == 0 { container.SetExitCode(128) } - if err := daemon.containersReplica.Save(container.Snapshot()); err != nil { - logrus.Errorf("%s: failed replicating state on start failure: %v", container.ID, err) - } - if err := container.ToDisk(); err != nil { - logrus.Errorf("%s: failed writing to disk on start failure: %v", container.ID, err) + if err := container.CheckpointAndSaveToDisk(daemon.containersReplica); err != nil { + logrus.Errorf("%s: failed saving state on start failure: %v", container.ID, err) } container.Reset(false) diff --git a/daemon/update.go b/daemon/update.go index bdcfa2d067..af9bf7b991 100644 --- a/daemon/update.go +++ b/daemon/update.go @@ -38,8 +38,7 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro if restoreConfig { container.Lock() container.HostConfig = &backupHostConfig - daemon.containersReplica.Save(container.Snapshot()) - container.ToDisk() + container.CheckpointAndSaveToDisk(daemon.containersReplica) container.Unlock() } }() @@ -54,7 +53,7 @@ func (daemon *Daemon) update(name string, hostConfig *container.HostConfig) erro container.Unlock() return errCannotUpdate(container.ID, err) } - if err := daemon.containersReplica.Save(container.Snapshot()); err != nil { + if err := container.CheckpointTo(daemon.containersReplica); err != nil { restoreConfig = true container.Unlock() return errCannotUpdate(container.ID, err)