pkg/graphdb: use transactions for transactions

Signed-off-by: Cristian Staretu <cristian.staretu@gmail.com>
This commit is contained in:
unclejack 2014-12-19 13:57:21 +02:00
parent 610842f906
commit e9f37a0118
1 changed files with 41 additions and 47 deletions

View File

@ -79,46 +79,43 @@ func NewDatabase(conn *sql.DB) (*Database, error) {
} }
db := &Database{conn: conn} db := &Database{conn: conn}
if _, err := conn.Exec(createEntityTable); err != nil {
return nil, err
}
if _, err := conn.Exec(createEdgeTable); err != nil {
return nil, err
}
if _, err := conn.Exec(createEdgeIndices); err != nil {
return nil, err
}
rollback := func() {
conn.Exec("ROLLBACK")
}
// Create root entities // Create root entities
if _, err := conn.Exec("BEGIN"); err != nil { tx, err := conn.Begin()
if err != nil {
return nil, err return nil, err
} }
if _, err := conn.Exec("DELETE FROM entity where id = ?", "0"); err != nil { if _, err := tx.Exec(createEntityTable); err != nil {
rollback() return nil, err
}
if _, err := tx.Exec(createEdgeTable); err != nil {
return nil, err
}
if _, err := tx.Exec(createEdgeIndices); err != nil {
return nil, err return nil, err
} }
if _, err := conn.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil { if _, err := tx.Exec("DELETE FROM entity where id = ?", "0"); err != nil {
rollback() tx.Rollback()
return nil, err return nil, err
} }
if _, err := conn.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil { if _, err := tx.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil {
rollback() tx.Rollback()
return nil, err return nil, err
} }
if _, err := conn.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil { if _, err := tx.Exec("DELETE FROM edge where entity_id=? and name=?", "0", "/"); err != nil {
rollback() tx.Rollback()
return nil, err return nil, err
} }
if _, err := conn.Exec("COMMIT"); err != nil { if _, err := tx.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil {
tx.Rollback()
return nil, err
}
if err := tx.Commit(); err != nil {
return nil, err return nil, err
} }
@ -135,33 +132,32 @@ func (db *Database) Set(fullPath, id string) (*Entity, error) {
db.mux.Lock() db.mux.Lock()
defer db.mux.Unlock() defer db.mux.Unlock()
rollback := func() { tx, err := db.conn.Begin()
db.conn.Exec("ROLLBACK") if err != nil {
}
if _, err := db.conn.Exec("BEGIN EXCLUSIVE"); err != nil {
return nil, err return nil, err
} }
var entityID string var entityID string
if err := db.conn.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil { if err := tx.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityID); err != nil {
if err == sql.ErrNoRows { if err == sql.ErrNoRows {
if _, err := db.conn.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil { if _, err := tx.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil {
rollback() tx.Rollback()
return nil, err return nil, err
} }
} else { } else {
rollback() tx.Rollback()
return nil, err return nil, err
} }
} }
e := &Entity{id} e := &Entity{id}
parentPath, name := splitPath(fullPath) parentPath, name := splitPath(fullPath)
if err := db.setEdge(parentPath, name, e); err != nil { if err := db.setEdge(parentPath, name, e, tx); err != nil {
rollback() tx.Rollback()
return nil, err return nil, err
} }
if _, err := db.conn.Exec("COMMIT"); err != nil { if err := tx.Commit(); err != nil {
return nil, err return nil, err
} }
return e, nil return e, nil
@ -179,7 +175,7 @@ func (db *Database) Exists(name string) bool {
return e != nil return e != nil
} }
func (db *Database) setEdge(parentPath, name string, e *Entity) error { func (db *Database) setEdge(parentPath, name string, e *Entity, tx *sql.Tx) error {
parent, err := db.get(parentPath) parent, err := db.get(parentPath)
if err != nil { if err != nil {
return err return err
@ -188,7 +184,7 @@ func (db *Database) setEdge(parentPath, name string, e *Entity) error {
return fmt.Errorf("Cannot set self as child") return fmt.Errorf("Cannot set self as child")
} }
if _, err := db.conn.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil { if _, err := tx.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil {
return err return err
} }
return nil return nil
@ -371,18 +367,15 @@ func (db *Database) Purge(id string) (int, error) {
db.mux.Lock() db.mux.Lock()
defer db.mux.Unlock() defer db.mux.Unlock()
rollback := func() { tx, err := db.conn.Begin()
db.conn.Exec("ROLLBACK") if err != nil {
}
if _, err := db.conn.Exec("BEGIN"); err != nil {
return -1, err return -1, err
} }
// Delete all edges // Delete all edges
rows, err := db.conn.Exec("DELETE FROM edge WHERE entity_id = ?;", id) rows, err := tx.Exec("DELETE FROM edge WHERE entity_id = ?;", id)
if err != nil { if err != nil {
rollback() tx.Rollback()
return -1, err return -1, err
} }
@ -392,14 +385,15 @@ func (db *Database) Purge(id string) (int, error) {
} }
// Delete entity // Delete entity
if _, err := db.conn.Exec("DELETE FROM entity where id = ?;", id); err != nil { if _, err := tx.Exec("DELETE FROM entity where id = ?;", id); err != nil {
rollback() tx.Rollback()
return -1, err return -1, err
} }
if _, err := db.conn.Exec("COMMIT"); err != nil { if err := tx.Commit(); err != nil {
return -1, err return -1, err
} }
return int(changes), nil return int(changes), nil
} }