1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/libnetwork/networkdb/networkdb_test.go
Jana Radhakrishnan 28f4561e3f Add network scoped gossip database
Network DB is a network scoped gossip database built
on top of hashicorp/memberlist providing an eventually
consistent state store.

It limits the scope of the gossip and periodic bulk syncing
for table entries to only the nodes which participate in the
network to which the gossip belongs. This designs make the
gossip layer scale better and only consumes resources for the
network state that the node participates in.

Since the complete state for a network is maintained by all nodes
participating in the network, all nodes will eventually converge
to the same state.

NetworkDB also provides facilities for the users of the package to
watch on any table (or all tables) and get notified if there are
state changes of interest that happened anywhere in the cluster when
that state change eventually finds it's way to the watcher's node.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
2016-04-08 12:58:09 -07:00

431 lines
11 KiB
Go

package networkdb
import (
"flag"
"fmt"
"log"
"os"
"sync/atomic"
"testing"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
var (
dbPort int32 = 10000
runningInContainer = flag.Bool("incontainer", false, "Indicates if the test is running in a container")
)
func TestMain(m *testing.M) {
logrus.SetLevel(logrus.ErrorLevel)
os.Exit(m.Run())
}
func createNetworkDBInstances(t *testing.T, num int, namePrefix string) []*NetworkDB {
var dbs []*NetworkDB
for i := 0; i < num; i++ {
db, err := New(&Config{
NodeName: fmt.Sprintf("%s%d", namePrefix, i+1),
BindPort: int(atomic.AddInt32(&dbPort, 1)),
})
require.NoError(t, err)
if i != 0 {
err = db.Join([]string{fmt.Sprintf("localhost:%d", db.config.BindPort-1)})
assert.NoError(t, err)
}
dbs = append(dbs, db)
}
return dbs
}
func closeNetworkDBInstances(dbs []*NetworkDB) {
for _, db := range dbs {
db.Close()
}
}
func (db *NetworkDB) verifyNodeExistence(t *testing.T, node string, present bool) {
for i := 0; i < 80; i++ {
_, ok := db.nodes[node]
if present && ok {
return
}
if !present && !ok {
return
}
time.Sleep(50 * time.Millisecond)
}
assert.Fail(t, fmt.Sprintf("%s: Node existence verification for node %s failed", db.config.NodeName, node))
}
func (db *NetworkDB) verifyNetworkExistence(t *testing.T, node string, id string, present bool) {
for i := 0; i < 80; i++ {
if nn, nnok := db.networks[node]; nnok {
n, ok := nn[id]
if present && ok {
return
}
if !present &&
((ok && n.leaving) ||
!ok) {
return
}
}
time.Sleep(50 * time.Millisecond)
}
assert.Fail(t, "Network existence verification failed")
}
func (db *NetworkDB) verifyEntryExistence(t *testing.T, tname, nid, key, value string, present bool) {
n := 80
for i := 0; i < n; i++ {
entry, err := db.getEntry(tname, nid, key)
if present && err == nil && string(entry.value) == value {
return
}
if !present &&
((err == nil && entry.deleting) ||
(err != nil)) {
return
}
if i == n-1 && !present && err != nil {
return
}
time.Sleep(50 * time.Millisecond)
}
assert.Fail(t, fmt.Sprintf("Entry existence verification test failed for %s", db.config.NodeName))
}
func testWatch(t *testing.T, ch chan events.Event, ev interface{}, tname, nid, key, value string) {
select {
case rcvdEv := <-ch:
assert.Equal(t, fmt.Sprintf("%T", rcvdEv), fmt.Sprintf("%T", ev))
switch rcvdEv.(type) {
case CreateEvent:
assert.Equal(t, tname, rcvdEv.(CreateEvent).Table)
assert.Equal(t, nid, rcvdEv.(CreateEvent).NetworkID)
assert.Equal(t, key, rcvdEv.(CreateEvent).Key)
assert.Equal(t, value, string(rcvdEv.(CreateEvent).Value))
case UpdateEvent:
assert.Equal(t, tname, rcvdEv.(UpdateEvent).Table)
assert.Equal(t, nid, rcvdEv.(UpdateEvent).NetworkID)
assert.Equal(t, key, rcvdEv.(UpdateEvent).Key)
assert.Equal(t, value, string(rcvdEv.(UpdateEvent).Value))
case DeleteEvent:
assert.Equal(t, tname, rcvdEv.(DeleteEvent).Table)
assert.Equal(t, nid, rcvdEv.(DeleteEvent).NetworkID)
assert.Equal(t, key, rcvdEv.(DeleteEvent).Key)
}
case <-time.After(time.Second):
t.Fail()
return
}
}
func TestNetworkDBSimple(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
closeNetworkDBInstances(dbs)
}
func TestNetworkDBJoinLeaveNetwork(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
err = dbs[0].LeaveNetwork("network1")
assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", false)
closeNetworkDBInstances(dbs)
}
func TestNetworkDBJoinLeaveNetworks(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
n := 10
for i := 1; i <= n; i++ {
err := dbs[0].JoinNetwork(fmt.Sprintf("network0%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
err := dbs[1].JoinNetwork(fmt.Sprintf("network1%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), true)
}
for i := 1; i <= n; i++ {
dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), true)
}
for i := 1; i <= n; i++ {
err := dbs[0].LeaveNetwork(fmt.Sprintf("network0%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
err := dbs[1].LeaveNetwork(fmt.Sprintf("network1%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[1].verifyNetworkExistence(t, "node1", fmt.Sprintf("network0%d", i), false)
}
for i := 1; i <= n; i++ {
dbs[0].verifyNetworkExistence(t, "node2", fmt.Sprintf("network1%d", i), false)
}
closeNetworkDBInstances(dbs)
}
func TestNetworkDBCRUDTableEntry(t *testing.T) {
dbs := createNetworkDBInstances(t, 3, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NoError(t, err)
dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
dbs[2].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
assert.NoError(t, err)
dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
assert.NoError(t, err)
dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
closeNetworkDBInstances(dbs)
}
func TestNetworkDBCRUDTableEntries(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
n := 10
for i := 1; i <= n; i++ {
err = dbs[0].CreateEntry("test_table", "network1",
fmt.Sprintf("test_key0%d", i),
[]byte(fmt.Sprintf("test_value0%d", i)))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
err = dbs[1].CreateEntry("test_table", "network1",
fmt.Sprintf("test_key1%d", i),
[]byte(fmt.Sprintf("test_value1%d", i)))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[0].verifyEntryExistence(t, "test_table", "network1",
fmt.Sprintf("test_key1%d", i),
fmt.Sprintf("test_value1%d", i), true)
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[1].verifyEntryExistence(t, "test_table", "network1",
fmt.Sprintf("test_key0%d", i),
fmt.Sprintf("test_value0%d", i), true)
assert.NoError(t, err)
}
// Verify deletes
for i := 1; i <= n; i++ {
err = dbs[0].DeleteEntry("test_table", "network1",
fmt.Sprintf("test_key0%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
err = dbs[1].DeleteEntry("test_table", "network1",
fmt.Sprintf("test_key1%d", i))
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[0].verifyEntryExistence(t, "test_table", "network1",
fmt.Sprintf("test_key1%d", i), "", false)
assert.NoError(t, err)
}
for i := 1; i <= n; i++ {
dbs[1].verifyEntryExistence(t, "test_table", "network1",
fmt.Sprintf("test_key0%d", i), "", false)
assert.NoError(t, err)
}
closeNetworkDBInstances(dbs)
}
func TestNetworkDBNodeLeave(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NoError(t, err)
dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
dbs[0].Close()
dbs[1].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", false)
dbs[1].Close()
}
func TestNetworkDBWatch(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
ch, cancel := dbs[1].Watch("", "", "")
err = dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NoError(t, err)
testWatch(t, ch, CreateEvent{}, "test_table", "network1", "test_key", "test_value")
err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
assert.NoError(t, err)
testWatch(t, ch, UpdateEvent{}, "test_table", "network1", "test_key", "test_updated_value")
err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
assert.NoError(t, err)
testWatch(t, ch, DeleteEvent{}, "test_table", "network1", "test_key", "")
cancel()
closeNetworkDBInstances(dbs)
}
func TestNetworkDBBulkSync(t *testing.T) {
dbs := createNetworkDBInstances(t, 2, "node")
err := dbs[0].JoinNetwork("network1")
assert.NoError(t, err)
dbs[1].verifyNetworkExistence(t, "node1", "network1", true)
n := 1000
for i := 1; i <= n; i++ {
err = dbs[0].CreateEntry("test_table", "network1",
fmt.Sprintf("test_key0%d", i),
[]byte(fmt.Sprintf("test_value0%d", i)))
assert.NoError(t, err)
}
err = dbs[1].JoinNetwork("network1")
assert.NoError(t, err)
dbs[0].verifyNetworkExistence(t, "node2", "network1", true)
for i := 1; i <= n; i++ {
dbs[1].verifyEntryExistence(t, "test_table", "network1",
fmt.Sprintf("test_key0%d", i),
fmt.Sprintf("test_value0%d", i), true)
assert.NoError(t, err)
}
closeNetworkDBInstances(dbs)
}
func TestNetworkDBCRUDMediumCluster(t *testing.T) {
n := 5
dbs := createNetworkDBInstances(t, n, "node")
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
if i == j {
continue
}
dbs[i].verifyNodeExistence(t, fmt.Sprintf("node%d", j+1), true)
}
}
for i := 0; i < n; i++ {
err := dbs[i].JoinNetwork("network1")
assert.NoError(t, err)
}
for i := 0; i < n; i++ {
for j := 0; j < n; j++ {
dbs[i].verifyNetworkExistence(t, fmt.Sprintf("node%d", j+1), "network1", true)
}
}
err := dbs[0].CreateEntry("test_table", "network1", "test_key", []byte("test_value"))
assert.NoError(t, err)
for i := 1; i < n; i++ {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_value", true)
}
err = dbs[0].UpdateEntry("test_table", "network1", "test_key", []byte("test_updated_value"))
assert.NoError(t, err)
for i := 1; i < n; i++ {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "test_updated_value", true)
}
err = dbs[0].DeleteEntry("test_table", "network1", "test_key")
assert.NoError(t, err)
for i := 1; i < n; i++ {
dbs[i].verifyEntryExistence(t, "test_table", "network1", "test_key", "", false)
}
log.Printf("Closing DB instances...")
closeNetworkDBInstances(dbs)
}