diff --git a/libnetwork/networkdb/cluster.go b/libnetwork/networkdb/cluster.go index 06a7aff4bd..8ff2129517 100644 --- a/libnetwork/networkdb/cluster.go +++ b/libnetwork/networkdb/cluster.go @@ -17,15 +17,10 @@ import ( ) const ( - // The garbage collection logic for entries leverage the presence of the network. - // For this reason the expiration time of the network is put slightly higher than the entry expiration so that - // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. - reapEntryInterval = 30 * time.Minute - reapNetworkInterval = reapEntryInterval + 5*reapPeriod - reapPeriod = 5 * time.Second - retryInterval = 1 * time.Second - nodeReapInterval = 24 * time.Hour - nodeReapPeriod = 2 * time.Hour + reapPeriod = 5 * time.Second + retryInterval = 1 * time.Second + nodeReapInterval = 24 * time.Hour + nodeReapPeriod = 2 * time.Hour ) type logWriter struct{} diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 6553810064..03fcfe1d9d 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -93,14 +93,14 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool { nDB.nodes[n.Name] = n nDB.Unlock() if !found { - logrus.Infof("Node join event for %s/%s", n.Name, n.Addr) + logrus.Infof("%v(%v): Node join event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) } return true case NodeEventTypeLeave: nDB.Lock() nDB.leftNodes[n.Name] = n nDB.Unlock() - logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr) + logrus.Infof("%v(%v): Node leave event for %s/%s", nDB.config.Hostname, nDB.config.NodeID, n.Name, n.Addr) return true } @@ -140,7 +140,7 @@ func (nDB *NetworkDB) handleNetworkEvent(nEvent *NetworkEvent) bool { n.ltime = nEvent.LTime n.leaving = nEvent.Type == NetworkEventTypeLeave if n.leaving { - n.reapTime = reapNetworkInterval + n.reapTime = nDB.config.reapNetworkInterval // The remote node is leaving the network, but not the gossip cluster. // Mark all its entries in deleted state, this will guarantee that @@ -216,8 +216,9 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // This case can happen if the cluster is running different versions of the engine where the old version does not have the // field. If that is not the case, this can be a BUG if e.deleting && e.reapTime == 0 { - logrus.Warnf("handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", tEvent) - e.reapTime = reapEntryInterval + logrus.Warnf("%v(%v) handleTableEvent object %+v has a 0 reapTime, is the cluster running the same docker engine version?", + nDB.config.Hostname, nDB.config.NodeID, tEvent) + e.reapTime = nDB.config.reapEntryInterval } nDB.Lock() @@ -229,7 +230,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { // If the residual reapTime is lower or equal to 1/6 of the total reapTime don't bother broadcasting it around // most likely the cluster is already aware of it, if not who will sync with this node will catch the state too. // This also avoids that deletion of entries close to their garbage collection ends up circuling around forever - return e.reapTime > reapEntryInterval/6 + return e.reapTime > nDB.config.reapEntryInterval/6 } var op opType diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 45bd9cc931..388a82c47b 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -181,6 +181,13 @@ type Config struct { // be able to increase this to get more content into each gossip packet. PacketBufferSize int + // reapEntryInterval duration of a deleted entry before being garbage collected + reapEntryInterval time.Duration + + // reapNetworkInterval duration of a delted network before being garbage collected + // NOTE this MUST always be higher than reapEntryInterval + reapNetworkInterval time.Duration + // StatsPrintPeriod the period to use to print queue stats // Default is 5min StatsPrintPeriod time.Duration @@ -220,12 +227,18 @@ func DefaultConfig() *Config { PacketBufferSize: 1400, StatsPrintPeriod: 5 * time.Minute, HealthPrintPeriod: 1 * time.Minute, + reapEntryInterval: 30 * time.Minute, } } // New creates a new instance of NetworkDB using the Config passed by // the caller. func New(c *Config) (*NetworkDB, error) { + // The garbage collection logic for entries leverage the presence of the network. + // For this reason the expiration time of the network is put slightly higher than the entry expiration so that + // there is at least 5 extra cycle to make sure that all the entries are properly deleted before deleting the network. + c.reapNetworkInterval = c.reapEntryInterval + 5*reapPeriod + nDB := &NetworkDB{ config: c, indexes: make(map[int]*radix.Tree), @@ -241,7 +254,7 @@ func New(c *Config) (*NetworkDB, error) { nDB.indexes[byTable] = radix.New() nDB.indexes[byNetwork] = radix.New() - logrus.Debugf("New memberlist node - Node:%v will use memberlist nodeID:%v", c.Hostname, c.NodeID) + logrus.Infof("New memberlist node - Node:%v will use memberlist nodeID:%v with config:%+v", c.Hostname, c.NodeID, c) if err := nDB.clusterInit(); err != nil { return nil, err } @@ -414,7 +427,7 @@ func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { node: nDB.config.NodeID, value: value, deleting: true, - reapTime: reapEntryInterval, + reapTime: nDB.config.reapEntryInterval, } if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil { @@ -487,7 +500,7 @@ func (nDB *NetworkDB) deleteNodeNetworkEntries(nid, node string) { node: oldEntry.node, value: oldEntry.value, deleting: true, - reapTime: reapEntryInterval, + reapTime: nDB.config.reapEntryInterval, } // we arrived at this point in 2 cases: @@ -635,7 +648,7 @@ func (nDB *NetworkDB) LeaveNetwork(nid string) error { logrus.Debugf("%v(%v): leaving network %s", nDB.config.Hostname, nDB.config.NodeID, nid) n.ltime = ltime - n.reapTime = reapNetworkInterval + n.reapTime = nDB.config.reapNetworkInterval n.leaving = true return nil } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 0779618ab3..01dee06769 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/docker/docker/pkg/stringid" "github.com/docker/go-events" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" @@ -27,13 +28,14 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB { +func createNetworkDBInstances(t *testing.T, num int, namePrefix string, conf *Config) []*NetworkDB { var dbs []*NetworkDB for i := 0; i < num; i++ { - conf := DefaultConfig() - conf.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1) - conf.BindPort = int(atomic.AddInt32(&dbPort, 1)) - db, err := New(conf) + localConfig := *conf + localConfig.Hostname = fmt.Sprintf("%s%d", namePrefix, i+1) + localConfig.NodeID = stringid.TruncateID(stringid.GenerateRandomID()) + localConfig.BindPort = int(atomic.AddInt32(&dbPort, 1)) + db, err := New(&localConfig) require.NoError(t, err) if i != 0 { @@ -44,10 +46,19 @@ func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*Netwo dbs = append(dbs, db) } + // Check that the cluster is properly created + for i := 0; i < num; i++ { + if num != len(dbs[i].ClusterPeers()) { + t.Fatalf("Number of nodes for %s into the cluster does not match %d != %d", + dbs[i].config.Hostname, num, len(dbs[i].ClusterPeers())) + } + } + return dbs } func closeNetworkDBInstances(dbs []*NetworkDB) { + log.Print("Closing DB instances...") for _, db := range dbs { db.Close() } @@ -147,12 +158,12 @@ func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, k } func TestNetworkDBSimple(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) closeNetworkDBInstances(dbs) } func TestNetworkDBJoinLeaveNetwork(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -167,7 +178,7 @@ func TestNetworkDBJoinLeaveNetwork(t *testing.T) { } func TestNetworkDBJoinLeaveNetworks(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) n := 10 for i := 1; i <= n; i++ { @@ -210,7 +221,7 @@ func TestNetworkDBJoinLeaveNetworks(t *testing.T) { } func TestNetworkDBCRUDTableEntry(t *testing.T) { - dbs := createNetworkDBInstances(t, 3, "node") + dbs := createNetworkDBInstances(t, 3, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -240,7 +251,7 @@ func TestNetworkDBCRUDTableEntry(t *testing.T) { } func TestNetworkDBCRUDTableEntries(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -308,7 +319,7 @@ func TestNetworkDBCRUDTableEntries(t *testing.T) { } func TestNetworkDBNodeLeave(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -327,7 +338,7 @@ func TestNetworkDBNodeLeave(t *testing.T) { } func TestNetworkDBWatch(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -356,7 +367,7 @@ func TestNetworkDBWatch(t *testing.T) { } func TestNetworkDBBulkSync(t *testing.T) { - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) err := dbs[0].JoinNetwork("network1") assert.NoError(t, err) @@ -389,7 +400,7 @@ func TestNetworkDBBulkSync(t *testing.T) { func TestNetworkDBCRUDMediumCluster(t *testing.T) { n := 5 - dbs := createNetworkDBInstances(t, n, "node") + dbs := createNetworkDBInstances(t, n, "node", DefaultConfig()) for i := 0; i < n; i++ { for j := 0; j < n; j++ { @@ -433,13 +444,12 @@ func TestNetworkDBCRUDMediumCluster(t *testing.T) { dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false) } - log.Print("Closing DB instances...") closeNetworkDBInstances(dbs) } func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { maxRetry := 5 - dbs := createNetworkDBInstances(t, 2, "node") + dbs := createNetworkDBInstances(t, 2, "node", DefaultConfig()) // Single node Join/Leave err := dbs[0].JoinNetwork("network1") @@ -517,6 +527,56 @@ func TestNetworkDBNodeJoinLeaveIteration(t *testing.T) { t.Fatalf("The networkNodes list has to have be 2 instead of %d - %v", len(dbs[1].networkNodes["network1"]), dbs[1].networkNodes["network1"]) } - dbs[0].Close() - dbs[1].Close() + closeNetworkDBInstances(dbs) +} + +func TestNetworkDBGarbageCollection(t *testing.T) { + keysWriteDelete := 5 + config := DefaultConfig() + config.reapEntryInterval = 30 * time.Second + config.StatsPrintPeriod = 15 * time.Second + + dbs := createNetworkDBInstances(t, 3, "node", config) + + // 2 Nodes join network + err := dbs[0].JoinNetwork("network1") + assert.NoError(t, err) + + err = dbs[1].JoinNetwork("network1") + assert.NoError(t, err) + + for i := 0; i < keysWriteDelete; i++ { + err = dbs[i%2].CreateEntry("testTable", "network1", "key-"+string(i), []byte("value")) + assert.NoError(t, err) + } + time.Sleep(time.Second) + for i := 0; i < keysWriteDelete; i++ { + err = dbs[i%2].DeleteEntry("testTable", "network1", "key-"+string(i)) + assert.NoError(t, err) + } + for i := 0; i < 2; i++ { + assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match") + } + + // from this point the timer for the garbage collection started, wait 5 seconds and then join a new node + time.Sleep(5 * time.Second) + + err = dbs[2].JoinNetwork("network1") + assert.NoError(t, err) + for i := 0; i < 3; i++ { + assert.Equal(t, keysWriteDelete, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries number should match") + } + // at this point the entries should had been all deleted + time.Sleep(30 * time.Second) + for i := 0; i < 3; i++ { + assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected") + } + + // make sure that entries are not coming back + time.Sleep(15 * time.Second) + for i := 0; i < 3; i++ { + assert.Equal(t, 0, dbs[i].networks[dbs[i].config.NodeID]["network1"].entriesNumber, "entries should had been garbage collected") + } + + closeNetworkDBInstances(dbs) }