1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/libnetwork/networkdb/networkdb.go
Jana Radhakrishnan 28f4561e3f Add network scoped gossip database
Network DB is a network scoped gossip database built
on top of hashicorp/memberlist providing an eventually
consistent state store.

It limits the scope of the gossip and periodic bulk syncing
for table entries to only the nodes which participate in the
network to which the gossip belongs. This designs make the
gossip layer scale better and only consumes resources for the
network state that the node participates in.

Since the complete state for a network is maintained by all nodes
participating in the network, all nodes will eventually converge
to the same state.

NetworkDB also provides facilities for the users of the package to
watch on any table (or all tables) and get notified if there are
state changes of interest that happened anywhere in the cluster when
that state change eventually finds it's way to the watcher's node.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
2016-04-08 12:58:09 -07:00

424 lines
12 KiB
Go

package networkdb
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/armon/go-radix"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
const (
byTable int = 1 + iota
byNetwork
)
// NetworkDB instance drives the networkdb cluster and acts the broker
// for cluster-scoped and network-scoped gossip and watches.
type NetworkDB struct {
sync.RWMutex
// NetworkDB configuration.
config *Config
// local copy of memberlist config that we use to driver
// network scoped gossip and bulk sync.
mConfig *memberlist.Config
// All the tree index (byTable, byNetwork) that we maintain
// the db.
indexes map[int]*radix.Tree
// Memberlist we use to drive the cluster.
memberlist *memberlist.Memberlist
// List of all peer nodes in the cluster not-limited to any
// network.
nodes map[string]*memberlist.Node
// A multi-dimensional map of network/node attachmemts. The
// first key is a node name and the second key is a network ID
// for the network that node is participating in.
networks map[string]map[string]*network
// A map of nodes which are participating in a given
// network. The key is a network ID.
networkNodes map[string][]string
// A table of ack channels for every node from which we are
// waiting for an ack.
bulkSyncAckTbl map[string]chan struct{}
// Global lamport clock for node network attach events.
networkClock serf.LamportClock
// Global lamport clock for table events.
tableClock serf.LamportClock
// Broadcast queue for network event gossip.
networkBroadcasts *memberlist.TransmitLimitedQueue
// A central stop channel to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
// A central broadcaster for all local watchers watching table
// events.
broadcaster *events.Broadcaster
// List of all tickers which needed to be stopped when
// cleaning up.
tickers []*time.Ticker
}
// network describes the node/network attachment.
type network struct {
// Network ID
id string
// Lamport time for the latest state of the entry.
ltime serf.LamportTime
// Node leave is in progress.
leaving bool
// The time this node knew about the node's network leave.
leaveTime time.Time
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue
}
// Config represents the configuration of the networdb instance and
// can be passed by the caller.
type Config struct {
// NodeName is the cluster wide unique name for this node.
NodeName string
// BindAddr is the local node's IP address that we bind to for
// cluster communication.
BindAddr string
// BindPort is the local node's port to which we bind to for
// cluster communication.
BindPort int
}
// entry defines a table entry
type entry struct {
// node from which this entry was learned.
node string
// Lamport time for the most recent update to the entry
ltime serf.LamportTime
// Opaque value store in the entry
value []byte
// Deleting the entry is in progress. All entries linger in
// the cluster for certain amount of time after deletion.
deleting bool
// The wall clock time when this node learned about this deletion.
deleteTime time.Time
}
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
nDB := &NetworkDB{
config: c,
indexes: make(map[int]*radix.Tree),
networks: make(map[string]map[string]*network),
nodes: make(map[string]*memberlist.Node),
networkNodes: make(map[string][]string),
bulkSyncAckTbl: make(map[string]chan struct{}),
broadcaster: events.NewBroadcaster(),
}
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
// Join joins this NetworkDB instance with a list of peer NetworkDB
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
return nDB.clusterJoin(members)
}
// Close destroys this NetworkDB instance by leave the cluster,
// stopping timers, canceling goroutines etc.
func (nDB *NetworkDB) Close() {
if err := nDB.clusterLeave(); err != nil {
logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
}
}
// GetEntry retrieves the value of a table entry in a given (network,
// table, key) tuple
func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
entry, err := nDB.getEntry(tname, nid, key)
if err != nil {
return nil, err
}
return entry.value, nil
}
func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
nDB.RLock()
defer nDB.RUnlock()
e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
if !ok {
return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
}
return e.(*entry), nil
}
// CreateEntry creates a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster. It is an error to create an
// entry for the same tuple for which there is already an existing
// entry.
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err == nil {
return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
}
if err := nDB.sendTableEvent(tableEntryCreate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table create event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
return nil
}
// UpdateEntry updates a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster. It is an error to update a
// non-existent entry.
func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err != nil {
return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
}
if err := nDB.sendTableEvent(tableEntryUpdate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table update event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
return nil
}
// DeleteEntry deletes a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster.
func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
deleting: true,
deleteTime: time.Now(),
}
if err := nDB.sendTableEvent(tableEntryDelete, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table delete event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
return nil
}
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
oldEntry := v.(*entry)
if oldEntry.node != node {
return false
}
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
deleteTime: time.Now(),
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
return false
})
nDB.Unlock()
}
// WalkTable walks a single table in NetworkDB and invokes the passed
// function for each entry in the table passing the network, key,
// value. The walk stops if the passed function returns a true.
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
nDB.RLock()
values := make(map[string]interface{})
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
values[path] = v
return false
})
nDB.RUnlock()
for k, v := range values {
params := strings.Split(k[1:], "/")
nid := params[1]
key := params[2]
if fn(nid, key, v.(*entry).value) {
return nil
}
}
return nil
}
// JoinNetwork joins this node to a given network and propogates this
// event across the cluster. This triggers this node joining the
// sub-cluster of this network and participates in the network-scoped
// gossip and bulk sync for this network.
func (nDB *NetworkDB) JoinNetwork(nid string) error {
ltime := nDB.networkClock.Increment()
if err := nDB.sendNetworkEvent(nid, networkJoin, ltime); err != nil {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
}
nDB.Lock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
nDB.Unlock()
logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
if _, err := nDB.bulkSync(nid, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}
return nil
}
// LeaveNetwork leaves this node from a given network and propogates
// this event across the cluster. This triggers this node leaving the
// sub-cluster of this network and as a result will no longer
// participate in the network-scoped gossip and bulk sync for this
// network.
func (nDB *NetworkDB) LeaveNetwork(nid string) error {
ltime := nDB.networkClock.Increment()
if err := nDB.sendNetworkEvent(nid, networkLeave, ltime); err != nil {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
}
nDB.Lock()
defer nDB.Unlock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
}
n, ok := nodeNetworks[nid]
if !ok {
return fmt.Errorf("could not find network %s while trying to leave", nid)
}
n.ltime = ltime
n.leaving = true
return nil
}
// Deletes the node from the list of nodes which participate in the
// passed network. Caller should hold the NetworkDB lock while calling
// this
func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
nodes := nDB.networkNodes[nid]
for i, name := range nodes {
if name == nodeName {
nodes[i] = nodes[len(nodes)-1]
nodes = nodes[:len(nodes)-1]
break
}
}
nDB.networkNodes[nid] = nodes
}
// findCommonnetworks find the networks that both this node and the
// passed node have joined.
func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
nDB.RLock()
defer nDB.RUnlock()
var networks []string
for nid := range nDB.networks[nDB.config.NodeName] {
if _, ok := nDB.networks[nodeName][nid]; ok {
networks = append(networks, nid)
}
}
return networks
}