mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Add lock around write operations in graph
The graph uses a persistent database connection so a lock is required because our current sqlite3 driver does not implement retry logic when the ErrBusy is received.
This commit is contained in:
parent
35690e76b4
commit
957db15ef4
2 changed files with 52 additions and 2 deletions
|
@ -4,6 +4,7 @@ import (
|
|||
"database/sql"
|
||||
"fmt"
|
||||
"path"
|
||||
"sync"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -47,6 +48,7 @@ type WalkFunc func(fullPath string, entity *Entity) error
|
|||
// Graph database for storing entities and their relationships
|
||||
type Database struct {
|
||||
conn *sql.DB
|
||||
mux sync.Mutex
|
||||
}
|
||||
|
||||
// Create a new graph database initialized with a root entity
|
||||
|
@ -54,7 +56,7 @@ func NewDatabase(conn *sql.DB, init bool) (*Database, error) {
|
|||
if conn == nil {
|
||||
return nil, fmt.Errorf("Database connection cannot be nil")
|
||||
}
|
||||
db := &Database{conn}
|
||||
db := &Database{conn: conn}
|
||||
|
||||
if init {
|
||||
if _, err := conn.Exec(createEntityTable); err != nil {
|
||||
|
@ -99,7 +101,9 @@ func (db *Database) Close() error {
|
|||
|
||||
// Set the entity id for a given path
|
||||
func (db *Database) Set(fullPath, id string) (*Entity, error) {
|
||||
// FIXME: is rollback implicit when closing the connection?
|
||||
db.mux.Lock()
|
||||
defer db.mux.Unlock()
|
||||
|
||||
rollback := func() {
|
||||
db.conn.Exec("ROLLBACK")
|
||||
}
|
||||
|
@ -256,6 +260,9 @@ func (db *Database) RefPaths(id string) Edges {
|
|||
|
||||
// Delete the reference to an entity at a given path
|
||||
func (db *Database) Delete(name string) error {
|
||||
db.mux.Lock()
|
||||
defer db.mux.Unlock()
|
||||
|
||||
if name == "/" {
|
||||
return fmt.Errorf("Cannot delete root entity")
|
||||
}
|
||||
|
@ -276,6 +283,9 @@ func (db *Database) Delete(name string) error {
|
|||
// Walk the graph to make sure all references to the entity
|
||||
// are removed and return the number of references removed
|
||||
func (db *Database) Purge(id string) (int, error) {
|
||||
db.mux.Lock()
|
||||
defer db.mux.Unlock()
|
||||
|
||||
rollback := func() {
|
||||
db.conn.Exec("ROLLBACK")
|
||||
}
|
||||
|
@ -310,6 +320,9 @@ func (db *Database) Purge(id string) (int, error) {
|
|||
|
||||
// Rename an edge for a given path
|
||||
func (db *Database) Rename(currentName, newName string) error {
|
||||
db.mux.Lock()
|
||||
defer db.mux.Unlock()
|
||||
|
||||
parentPath, name := splitPath(currentName)
|
||||
newParentPath, newEdgeName := splitPath(newName)
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ package gograph
|
|||
import (
|
||||
_ "code.google.com/p/gosqlite/sqlite3"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"os"
|
||||
"path"
|
||||
"strconv"
|
||||
|
@ -501,3 +502,39 @@ func TestGetNameWithTrailingSlash(t *testing.T) {
|
|||
t.Fatalf("Entity should not be nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConcurrentWrites(t *testing.T) {
|
||||
db, dbpath := newTestDb(t)
|
||||
defer destroyTestDb(dbpath)
|
||||
|
||||
errs := make(chan error, 2)
|
||||
|
||||
save := func(name string, id string) {
|
||||
if _, err := db.Set(fmt.Sprintf("/%s", name), id); err != nil {
|
||||
errs <- err
|
||||
}
|
||||
errs <- nil
|
||||
}
|
||||
purge := func(id string) {
|
||||
if _, err := db.Purge(id); err != nil {
|
||||
errs <- err
|
||||
}
|
||||
errs <- nil
|
||||
}
|
||||
|
||||
save("/1", "1")
|
||||
|
||||
go purge("1")
|
||||
go save("/2", "2")
|
||||
|
||||
any := false
|
||||
for i := 0; i < 2; i++ {
|
||||
if err := <-errs; err != nil {
|
||||
any = true
|
||||
t.Log(err)
|
||||
}
|
||||
}
|
||||
if any {
|
||||
t.Fatal()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue