mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
Merge pull request #2551 from dotcloud/concurrent-db-access
Add lock around write operations in graph
This commit is contained in:
commit
b380866f44
2 changed files with 52 additions and 2 deletions
|
@ -4,6 +4,7 @@ import (
|
||||||
"database/sql"
|
"database/sql"
|
||||||
"fmt"
|
"fmt"
|
||||||
"path"
|
"path"
|
||||||
|
"sync"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -47,6 +48,7 @@ type WalkFunc func(fullPath string, entity *Entity) error
|
||||||
// Graph database for storing entities and their relationships
|
// Graph database for storing entities and their relationships
|
||||||
type Database struct {
|
type Database struct {
|
||||||
conn *sql.DB
|
conn *sql.DB
|
||||||
|
mux sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new graph database initialized with a root entity
|
// Create a new graph database initialized with a root entity
|
||||||
|
@ -54,7 +56,7 @@ func NewDatabase(conn *sql.DB, init bool) (*Database, error) {
|
||||||
if conn == nil {
|
if conn == nil {
|
||||||
return nil, fmt.Errorf("Database connection cannot be nil")
|
return nil, fmt.Errorf("Database connection cannot be nil")
|
||||||
}
|
}
|
||||||
db := &Database{conn}
|
db := &Database{conn: conn}
|
||||||
|
|
||||||
if init {
|
if init {
|
||||||
if _, err := conn.Exec(createEntityTable); err != nil {
|
if _, err := conn.Exec(createEntityTable); err != nil {
|
||||||
|
@ -99,7 +101,9 @@ func (db *Database) Close() error {
|
||||||
|
|
||||||
// Set the entity id for a given path
|
// Set the entity id for a given path
|
||||||
func (db *Database) Set(fullPath, id string) (*Entity, error) {
|
func (db *Database) Set(fullPath, id string) (*Entity, error) {
|
||||||
// FIXME: is rollback implicit when closing the connection?
|
db.mux.Lock()
|
||||||
|
defer db.mux.Unlock()
|
||||||
|
|
||||||
rollback := func() {
|
rollback := func() {
|
||||||
db.conn.Exec("ROLLBACK")
|
db.conn.Exec("ROLLBACK")
|
||||||
}
|
}
|
||||||
|
@ -256,6 +260,9 @@ func (db *Database) RefPaths(id string) Edges {
|
||||||
|
|
||||||
// Delete the reference to an entity at a given path
|
// Delete the reference to an entity at a given path
|
||||||
func (db *Database) Delete(name string) error {
|
func (db *Database) Delete(name string) error {
|
||||||
|
db.mux.Lock()
|
||||||
|
defer db.mux.Unlock()
|
||||||
|
|
||||||
if name == "/" {
|
if name == "/" {
|
||||||
return fmt.Errorf("Cannot delete root entity")
|
return fmt.Errorf("Cannot delete root entity")
|
||||||
}
|
}
|
||||||
|
@ -276,6 +283,9 @@ func (db *Database) Delete(name string) error {
|
||||||
// Walk the graph to make sure all references to the entity
|
// Walk the graph to make sure all references to the entity
|
||||||
// are removed and return the number of references removed
|
// are removed and return the number of references removed
|
||||||
func (db *Database) Purge(id string) (int, error) {
|
func (db *Database) Purge(id string) (int, error) {
|
||||||
|
db.mux.Lock()
|
||||||
|
defer db.mux.Unlock()
|
||||||
|
|
||||||
rollback := func() {
|
rollback := func() {
|
||||||
db.conn.Exec("ROLLBACK")
|
db.conn.Exec("ROLLBACK")
|
||||||
}
|
}
|
||||||
|
@ -310,6 +320,9 @@ func (db *Database) Purge(id string) (int, error) {
|
||||||
|
|
||||||
// Rename an edge for a given path
|
// Rename an edge for a given path
|
||||||
func (db *Database) Rename(currentName, newName string) error {
|
func (db *Database) Rename(currentName, newName string) error {
|
||||||
|
db.mux.Lock()
|
||||||
|
defer db.mux.Unlock()
|
||||||
|
|
||||||
parentPath, name := splitPath(currentName)
|
parentPath, name := splitPath(currentName)
|
||||||
newParentPath, newEdgeName := splitPath(newName)
|
newParentPath, newEdgeName := splitPath(newName)
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ package gograph
|
||||||
import (
|
import (
|
||||||
_ "code.google.com/p/gosqlite/sqlite3"
|
_ "code.google.com/p/gosqlite/sqlite3"
|
||||||
"database/sql"
|
"database/sql"
|
||||||
|
"fmt"
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
@ -501,3 +502,39 @@ func TestGetNameWithTrailingSlash(t *testing.T) {
|
||||||
t.Fatalf("Entity should not be nil")
|
t.Fatalf("Entity should not be nil")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestConcurrentWrites(t *testing.T) {
|
||||||
|
db, dbpath := newTestDb(t)
|
||||||
|
defer destroyTestDb(dbpath)
|
||||||
|
|
||||||
|
errs := make(chan error, 2)
|
||||||
|
|
||||||
|
save := func(name string, id string) {
|
||||||
|
if _, err := db.Set(fmt.Sprintf("/%s", name), id); err != nil {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
errs <- nil
|
||||||
|
}
|
||||||
|
purge := func(id string) {
|
||||||
|
if _, err := db.Purge(id); err != nil {
|
||||||
|
errs <- err
|
||||||
|
}
|
||||||
|
errs <- nil
|
||||||
|
}
|
||||||
|
|
||||||
|
save("/1", "1")
|
||||||
|
|
||||||
|
go purge("1")
|
||||||
|
go save("/2", "2")
|
||||||
|
|
||||||
|
any := false
|
||||||
|
for i := 0; i < 2; i++ {
|
||||||
|
if err := <-errs; err != nil {
|
||||||
|
any = true
|
||||||
|
t.Log(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if any {
|
||||||
|
t.Fatal()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue