From 7c08aeeba4ab94f77e221bfc01741cce0866d224 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Tue, 22 Oct 2013 16:23:52 -0700 Subject: [PATCH] Use persistent connection for links database Add close method to Runtime and Server to make sure that any underlying connections are cleaned up --- docker/docker.go | 14 ++- gograph/gograph.go | 195 +++++++++++++++------------------------- gograph/gograph_test.go | 95 ++++++++++---------- runtime.go | 21 ++++- runtime_test.go | 4 +- server.go | 4 + 6 files changed, 156 insertions(+), 177 deletions(-) diff --git a/docker/docker.go b/docker/docker.go index f34c0970e6..362e899ba1 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -148,18 +148,22 @@ func daemon(config *docker.DaemonConfig) error { } defer removePidFile(config.Pidfile) + server, err := docker.NewServer(config) + if err != nil { + return err + } + defer server.Close() + c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt, os.Kill, os.Signal(syscall.SIGTERM)) go func() { sig := <-c log.Printf("Received signal '%v', exiting\n", sig) + server.Close() removePidFile(config.Pidfile) os.Exit(0) }() - server, err := docker.NewServer(config) - if err != nil { - return err - } + chErrors := make(chan error, len(config.ProtoAddresses)) for _, protoAddr := range config.ProtoAddresses { protoAddrParts := strings.SplitN(protoAddr, "://", 2) @@ -170,6 +174,8 @@ func daemon(config *docker.DaemonConfig) error { log.Println("/!\\ DON'T BIND ON ANOTHER IP ADDRESS THAN 127.0.0.1 IF YOU DON'T KNOW WHAT YOU'RE DOING /!\\") } } else { + server.Close() + removePidFile(config.Pidfile) log.Fatal("Invalid protocol format.") } go func() { diff --git a/gograph/gograph.go b/gograph/gograph.go index 38ad71082b..a8af0600cc 100644 --- a/gograph/gograph.go +++ b/gograph/gograph.go @@ -1,10 +1,8 @@ package gograph import ( - _ "code.google.com/p/gosqlite/sqlite3" "database/sql" "fmt" - "os" "path" ) @@ -25,7 +23,7 @@ const ( ` createEdgeIndices = ` - CREATE UNIQUE INDEX "name_parent_ix" ON "edge" (parent_id, name); + CREATE UNIQUE INDEX IF NOT EXISTS "name_parent_ix" ON "edge" (parent_id, name); ` ) @@ -48,74 +46,71 @@ type WalkFunc func(fullPath string, entity *Entity) error // Graph database for storing entities and their relationships type Database struct { - dbPath string + conn *sql.DB } // Create a new graph database initialized with a root entity -func NewDatabase(dbPath string) (*Database, error) { - db := &Database{dbPath} - if _, err := os.Stat(dbPath); err == nil { - return db, nil +func NewDatabase(conn *sql.DB, init bool) (*Database, error) { + if conn == nil { + return nil, fmt.Errorf("Database connection cannot be nil") } - conn, err := db.openConn() - if err != nil { - return nil, err - } - defer conn.Close() + db := &Database{conn} - if _, err := conn.Exec(createEntityTable); err != nil { - return nil, err - } - if _, err := conn.Exec(createEdgeTable); err != nil { - return nil, err - } - if _, err := conn.Exec(createEdgeIndices); err != nil { - return nil, err - } + if init { + if _, err := conn.Exec(createEntityTable); err != nil { + return nil, err + } + if _, err := conn.Exec(createEdgeTable); err != nil { + return nil, err + } + if _, err := conn.Exec(createEdgeIndices); err != nil { + return nil, err + } - rollback := func() { - conn.Exec("ROLLBACK") - } + rollback := func() { + conn.Exec("ROLLBACK") + } - // Create root entities - if _, err := conn.Exec("BEGIN"); err != nil { - return nil, err - } - if _, err := conn.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil { - rollback() - return nil, err - } + // Create root entities + if _, err := conn.Exec("BEGIN"); err != nil { + return nil, err + } + if _, err := conn.Exec("INSERT INTO entity (id) VALUES (?);", "0"); err != nil { + rollback() + return nil, err + } - if _, err := conn.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil { - rollback() - return nil, err - } + if _, err := conn.Exec("INSERT INTO edge (entity_id, name) VALUES(?,?);", "0", "/"); err != nil { + rollback() + return nil, err + } - if _, err := conn.Exec("COMMIT"); err != nil { - return nil, err + if _, err := conn.Exec("COMMIT"); err != nil { + return nil, err + } } return db, nil } +// Close the underlying connection to the database +func (db *Database) Close() error { + return db.conn.Close() +} + // Set the entity id for a given path func (db *Database) Set(fullPath, id string) (*Entity, error) { - conn, err := db.openConn() - if err != nil { - return nil, err - } - defer conn.Close() // FIXME: is rollback implicit when closing the connection? rollback := func() { - conn.Exec("ROLLBACK") + db.conn.Exec("ROLLBACK") } // FIXME: use exclusive transactions to avoid race conditions - if _, err := conn.Exec("BEGIN"); err != nil { + if _, err := db.conn.Exec("BEGIN"); err != nil { return nil, err } var entityId string - if err := conn.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityId); err != nil { + if err := db.conn.QueryRow("SELECT id FROM entity WHERE id = ?;", id).Scan(&entityId); err != nil { if err == sql.ErrNoRows { - if _, err := conn.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil { + if _, err := db.conn.Exec("INSERT INTO entity (id) VALUES(?);", id); err != nil { rollback() return nil, err } @@ -127,19 +122,19 @@ func (db *Database) Set(fullPath, id string) (*Entity, error) { e := &Entity{id} parentPath, name := splitPath(fullPath) - if err := db.setEdge(conn, parentPath, name, e); err != nil { + if err := db.setEdge(parentPath, name, e); err != nil { rollback() return nil, err } - if _, err := conn.Exec("COMMIT"); err != nil { + if _, err := db.conn.Exec("COMMIT"); err != nil { return nil, err } return e, nil } -func (db *Database) setEdge(conn *sql.DB, parentPath, name string, e *Entity) error { - parent, err := db.get(conn, parentPath) +func (db *Database) setEdge(parentPath, name string, e *Entity) error { + parent, err := db.get(parentPath) if err != nil { return err } @@ -147,7 +142,7 @@ func (db *Database) setEdge(conn *sql.DB, parentPath, name string, e *Entity) er return fmt.Errorf("Cannot set self as child") } - if _, err := conn.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil { + if _, err := db.conn.Exec("INSERT INTO edge (parent_id, name, entity_id) VALUES (?,?,?);", parent.id, name, e.id); err != nil { return err } return nil @@ -162,20 +157,14 @@ func (db *Database) RootEntity() *Entity { // Return the entity for a given path func (db *Database) Get(name string) *Entity { - conn, err := db.openConn() - if err != nil { - return nil - } - defer conn.Close() - - e, err := db.get(conn, name) + e, err := db.get(name) if err != nil { return nil } return e } -func (db *Database) get(conn *sql.DB, name string) (*Entity, error) { +func (db *Database) get(name string) (*Entity, error) { e := db.RootEntity() // We always know the root name so return it if // it is requested @@ -187,7 +176,7 @@ func (db *Database) get(conn *sql.DB, name string) (*Entity, error) { for i := 1; i < len(parts); i++ { p := parts[i] - next := db.child(conn, e, p) + next := db.child(e, p) if next == nil { return nil, fmt.Errorf("Cannot find child") } @@ -201,26 +190,14 @@ func (db *Database) get(conn *sql.DB, name string) (*Entity, error) { // The key will be the full path of the entity func (db *Database) List(name string, depth int) Entities { out := Entities{} - conn, err := db.openConn() - if err != nil { - return out - } - defer conn.Close() - - for c := range db.children(conn, name, depth) { + for c := range db.children(name, depth) { out[c.FullPath] = c.Entity } return out } func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { - conn, err := db.openConn() - if err != nil { - return err - } - defer conn.Close() - - for c := range db.children(conn, name, depth) { + for c := range db.children(name, depth) { if err := walkFunc(c.FullPath, c.Entity); err != nil { return err } @@ -230,14 +207,8 @@ func (db *Database) Walk(name string, walkFunc WalkFunc, depth int) error { // Return the refrence count for a specified id func (db *Database) Refs(id string) int { - conn, err := db.openConn() - if err != nil { - return -1 - } - defer conn.Close() - var count int - if err := conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil { + if err := db.conn.QueryRow("SELECT COUNT(*) FROM edge WHERE entity_id = ?;", id).Scan(&count); err != nil { return 0 } return count @@ -246,13 +217,8 @@ func (db *Database) Refs(id string) int { // Return all the id's path references func (db *Database) RefPaths(id string) Edges { refs := Edges{} - conn, err := db.openConn() - if err != nil { - return refs - } - defer conn.Close() - rows, err := conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id) + rows, err := db.conn.Query("SELECT name, parent_id FROM edge WHERE entity_id = ?;", id) if err != nil { return refs } @@ -278,19 +244,14 @@ func (db *Database) Delete(name string) error { if name == "/" { return fmt.Errorf("Cannot delete root entity") } - conn, err := db.openConn() - if err != nil { - return err - } - defer conn.Close() parentPath, n := splitPath(name) - parent, err := db.get(conn, parentPath) + parent, err := db.get(parentPath) if err != nil { return err } - if _, err := conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name LIKE ?;", parent.id, n+"%"); err != nil { + if _, err := db.conn.Exec("DELETE FROM edge WHERE parent_id = ? AND name LIKE ?;", parent.id, n+"%"); err != nil { return err } return nil @@ -300,22 +261,16 @@ func (db *Database) Delete(name string) error { // Walk the graph to make sure all references to the entity // are removed and return the number of references removed func (db *Database) Purge(id string) (int, error) { - conn, err := db.openConn() - if err != nil { - return -1, err - } - defer conn.Close() - rollback := func() { - conn.Exec("ROLLBACK") + db.conn.Exec("ROLLBACK") } - if _, err := conn.Exec("BEGIN"); err != nil { + if _, err := db.conn.Exec("BEGIN"); err != nil { return -1, err } // Delete all edges - rows, err := conn.Exec("DELETE FROM edge WHERE entity_id = ?;", id) + rows, err := db.conn.Exec("DELETE FROM edge WHERE entity_id = ?;", id) if err != nil { rollback() return -1, err @@ -327,12 +282,12 @@ func (db *Database) Purge(id string) (int, error) { } // Delete entity - if _, err := conn.Exec("DELETE FROM entity where id = ?;", id); err != nil { + if _, err := db.conn.Exec("DELETE FROM entity where id = ?;", id); err != nil { rollback() return -1, err } - if _, err := conn.Exec("COMMIT"); err != nil { + if _, err := db.conn.Exec("COMMIT"); err != nil { return -1, err } return int(changes), nil @@ -347,18 +302,12 @@ func (db *Database) Rename(currentName, newName string) error { return fmt.Errorf("Cannot rename when root paths do not match %s != %s", parentPath, newParentPath) } - conn, err := db.openConn() - if err != nil { - return err - } - defer conn.Close() - - parent, err := db.get(conn, parentPath) + parent, err := db.get(parentPath) if err != nil { return err } - rows, err := conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name LIKE ?;", newEdgeName, parent.id, name+"%") + rows, err := db.conn.Exec("UPDATE edge SET name = ? WHERE parent_id = ? AND name LIKE ?;", newEdgeName, parent.id, name+"%") if err != nil { return err } @@ -379,16 +328,16 @@ type WalkMeta struct { Edge *Edge } -func (db *Database) children(conn *sql.DB, name string, depth int) <-chan WalkMeta { +func (db *Database) children(name string, depth int) <-chan WalkMeta { out := make(chan WalkMeta) - e, err := db.get(conn, name) + e, err := db.get(name) if err != nil { close(out) return out } go func() { - rows, err := conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id) + rows, err := db.conn.Query("SELECT entity_id, name FROM edge where parent_id = ?;", e.id) if err != nil { close(out) } @@ -422,7 +371,7 @@ func (db *Database) children(conn *sql.DB, name string, depth int) <-chan WalkMe if depth != -1 { nDepth -= 1 } - sc := db.children(conn, meta.FullPath, nDepth) + sc := db.children(meta.FullPath, nDepth) for c := range sc { out <- c } @@ -433,18 +382,14 @@ func (db *Database) children(conn *sql.DB, name string, depth int) <-chan WalkMe } // Return the entity based on the parent path and name -func (db *Database) child(conn *sql.DB, parent *Entity, name string) *Entity { +func (db *Database) child(parent *Entity, name string) *Entity { var id string - if err := conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name LIKE ?;", parent.id, name+"%").Scan(&id); err != nil { + if err := db.conn.QueryRow("SELECT entity_id FROM edge WHERE parent_id = ? AND name LIKE ?;", parent.id, name+"%").Scan(&id); err != nil { return nil } return &Entity{id} } -func (db *Database) openConn() (*sql.DB, error) { - return sql.Open("sqlite3", db.dbPath) -} - // Return the id used to reference this entity func (e *Entity) ID() string { return e.id diff --git a/gograph/gograph_test.go b/gograph/gograph_test.go index 5c12128ca8..57077e9f36 100644 --- a/gograph/gograph_test.go +++ b/gograph/gograph_test.go @@ -1,35 +1,39 @@ package gograph import ( + _ "code.google.com/p/gosqlite/sqlite3" + "database/sql" "os" "path" "strconv" "testing" ) -func newTestDb(t *testing.T) *Database { - db, err := NewDatabase(path.Join(os.TempDir(), "sqlite.db")) +func newTestDb(t *testing.T) (*Database, string) { + p := path.Join(os.TempDir(), "sqlite.db") + conn, err := sql.Open("sqlite3", p) + db, err := NewDatabase(conn, true) if err != nil { t.Fatal(err) } - return db + return db, p } -func destroyTestDb(db *Database) { - os.Remove(db.dbPath) +func destroyTestDb(dbPath string) { + os.Remove(dbPath) } func TestNewDatabase(t *testing.T) { - db := newTestDb(t) + db, dbpath := newTestDb(t) if db == nil { t.Fatal("Database should not be nil") } - defer destroyTestDb(db) + defer destroyTestDb(dbpath) } func TestCreateRootEnity(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) root := db.RootEntity() if root == nil { t.Fatal("Root entity should not be nil") @@ -37,8 +41,8 @@ func TestCreateRootEnity(t *testing.T) { } func TestGetRootEntity(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) e := db.Get("/") if e == nil { @@ -50,8 +54,8 @@ func TestGetRootEntity(t *testing.T) { } func TestSetEntityWithDifferentName(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/test", "1") if _, err := db.Set("/other", "1"); err != nil { @@ -60,8 +64,8 @@ func TestSetEntityWithDifferentName(t *testing.T) { } func TestSetDuplicateEntity(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) if _, err := db.Set("/foo", "42"); err != nil { t.Fatal(err) @@ -72,8 +76,8 @@ func TestSetDuplicateEntity(t *testing.T) { } func TestCreateChild(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) child, err := db.Set("/db", "1") if err != nil { @@ -88,8 +92,8 @@ func TestCreateChild(t *testing.T) { } func TestListAllRootChildren(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) for i := 1; i < 6; i++ { a := strconv.Itoa(i) @@ -104,8 +108,8 @@ func TestListAllRootChildren(t *testing.T) { } func TestListAllSubChildren(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) _, err := db.Set("/webapp", "1") if err != nil { @@ -146,8 +150,8 @@ func TestListAllSubChildren(t *testing.T) { } func TestAddSelfAsChild(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) child, err := db.Set("/test", "1") if err != nil { @@ -159,8 +163,8 @@ func TestAddSelfAsChild(t *testing.T) { } func TestAddChildToNonExistantRoot(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) if _, err := db.Set("/myapp", "1"); err != nil { t.Fatal(err) @@ -172,8 +176,8 @@ func TestAddChildToNonExistantRoot(t *testing.T) { } func TestWalkAll(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) _, err := db.Set("/webapp", "1") if err != nil { t.Fatal(err) @@ -218,8 +222,8 @@ func TestWalkAll(t *testing.T) { } func TestGetEntityByPath(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) _, err := db.Set("/webapp", "1") if err != nil { t.Fatal(err) @@ -265,8 +269,8 @@ func TestGetEntityByPath(t *testing.T) { } func TestEnitiesPaths(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) _, err := db.Set("/webapp", "1") if err != nil { t.Fatal(err) @@ -309,8 +313,8 @@ func TestEnitiesPaths(t *testing.T) { } func TestDeleteRootEntity(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) if err := db.Delete("/"); err == nil { t.Fatal("Error should not be nil") @@ -318,8 +322,8 @@ func TestDeleteRootEntity(t *testing.T) { } func TestDeleteEntity(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) _, err := db.Set("/webapp", "1") if err != nil { t.Fatal(err) @@ -365,8 +369,8 @@ func TestDeleteEntity(t *testing.T) { } func TestCountRefs(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/webapp", "1") @@ -382,8 +386,8 @@ func TestCountRefs(t *testing.T) { } func TestPurgeId(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/webapp", "1") @@ -404,8 +408,8 @@ func TestPurgeId(t *testing.T) { } func TestRename(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/webapp", "1") @@ -433,8 +437,8 @@ func TestRename(t *testing.T) { } func TestCreateMultipleNames(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/db", "1") if _, err := db.Set("/myapp", "1"); err != nil { @@ -448,8 +452,8 @@ func TestCreateMultipleNames(t *testing.T) { } func TestRefPaths(t *testing.T) { - db := newTestDb(t) - defer destroyTestDb(db) + db, dbpath := newTestDb(t) + defer destroyTestDb(dbpath) db.Set("/webapp", "1") @@ -460,5 +464,4 @@ func TestRefPaths(t *testing.T) { if len(refs) != 2 { t.Fatalf("Expected reference count to be 2, got %d", len(refs)) } - } diff --git a/runtime.go b/runtime.go index 0a492cd77e..5af7da7be8 100644 --- a/runtime.go +++ b/runtime.go @@ -1,7 +1,9 @@ package docker import ( + _ "code.google.com/p/gosqlite/sqlite3" "container/list" + "database/sql" "fmt" "github.com/dotcloud/docker/gograph" "github.com/dotcloud/docker/utils" @@ -593,7 +595,19 @@ func NewRuntimeFromDirectory(config *DaemonConfig) (*Runtime, error) { return nil, err } - graph, err := gograph.NewDatabase(path.Join(config.GraphPath, "linkgraph.db")) + gographPath := path.Join(config.GraphPath, "linkgraph.db") + initDatabase := false + if _, err := os.Stat(gographPath); err != nil { + if os.IsNotExist(err) { + initDatabase = true + } + return nil, err + } + conn, err := sql.Open("sqlite3", gographPath) + if err != nil { + return nil, err + } + graph, err := gograph.NewDatabase(conn, initDatabase) if err != nil { return nil, err } @@ -617,6 +631,11 @@ func NewRuntimeFromDirectory(config *DaemonConfig) (*Runtime, error) { return runtime, nil } +func (runtime *Runtime) Close() error { + runtime.networkManager.Close() + return runtime.containerGraph.Close() +} + // History is a convenience type for storing a list of containers, // ordered by creation date. type History []*Container diff --git a/runtime_test.go b/runtime_test.go index 0aa2d3d543..040d06851e 100644 --- a/runtime_test.go +++ b/runtime_test.go @@ -44,7 +44,9 @@ func nuke(runtime *Runtime) error { }(container) } wg.Wait() - runtime.networkManager.Close() + runtime.Close() + + os.Remove(filepath.Join(runtime.config.GraphPath, "linkgraph.db")) return os.RemoveAll(runtime.config.GraphPath) } diff --git a/server.go b/server.go index 8e7a31ce9b..31b351fcc6 100644 --- a/server.go +++ b/server.go @@ -24,6 +24,10 @@ import ( "time" ) +func (srv *Server) Close() error { + return srv.runtime.Close() +} + func (srv *Server) DockerVersion() APIVersion { return APIVersion{ Version: VERSION,