mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
PeerInit for the sandbox init
Move the sandbox init logic into the go routine that handles peer operations. This is to avoid deadlocks in the use of the pMap.Lock for the network Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
This commit is contained in:
parent
5c52ff49e0
commit
2e38c53def
4 changed files with 63 additions and 39 deletions
|
@ -741,11 +741,12 @@ func (n *network) addDriverWatches() {
|
|||
return
|
||||
}
|
||||
|
||||
agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte) bool {
|
||||
if nid == n.ID() {
|
||||
agent.networkDB.WalkTable(table.name, func(nid, key string, value []byte, deleted bool) bool {
|
||||
// skip the entries that are mark for deletion, this is safe because this function is
|
||||
// called at initialization time so there is no state to delete
|
||||
if nid == n.ID() && !deleted {
|
||||
d.EventNotify(driverapi.Create, nid, table.name, key, value)
|
||||
}
|
||||
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
|
|
@ -683,10 +683,12 @@ func (n *network) initSandbox(restore bool) error {
|
|||
return fmt.Errorf("could not get network sandbox (oper %t): %v", restore, err)
|
||||
}
|
||||
|
||||
// this is needed to let the peerAdd configure the sandbox
|
||||
n.setSandbox(sbox)
|
||||
|
||||
if !restore {
|
||||
n.driver.peerDbUpdateSandbox(n.id)
|
||||
// Initialize the sandbox with all the peers previously received from networkdb
|
||||
n.driver.initSandboxPeerDB(n.id)
|
||||
}
|
||||
|
||||
var nlSock *nl.NetlinkSocket
|
||||
|
|
|
@ -203,38 +203,31 @@ func (d *driver) peerDbDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPM
|
|||
return pEntry
|
||||
}
|
||||
|
||||
func (d *driver) peerDbUpdateSandbox(nid string) {
|
||||
d.peerDb.Lock()
|
||||
pMap, ok := d.peerDb.mp[nid]
|
||||
if !ok {
|
||||
d.peerDb.Unlock()
|
||||
return
|
||||
}
|
||||
d.peerDb.Unlock()
|
||||
|
||||
pMap.Lock()
|
||||
for pKeyStr, pEntry := range pMap.mp {
|
||||
var pKey peerKey
|
||||
if _, err := fmt.Sscan(pKeyStr, &pKey); err != nil {
|
||||
logrus.Errorf("peer key scan failed: %v", err)
|
||||
}
|
||||
|
||||
if pEntry.isLocal {
|
||||
continue
|
||||
}
|
||||
|
||||
// Go captures variables by reference. The pEntry could be
|
||||
// pointing to the same memory location for every iteration. Make
|
||||
// a copy of pEntry before capturing it in the following closure.
|
||||
entry := pEntry
|
||||
|
||||
d.peerAdd(nid, entry.eid, pKey.peerIP, entry.peerIPMask, pKey.peerMac, entry.vtep, false, false, false, false)
|
||||
}
|
||||
pMap.Unlock()
|
||||
// The overlay uses a lazy initialization approach, this means that when a network is created
|
||||
// and the driver registered the overlay does not allocate resources till the moment that a
|
||||
// sandbox is actually created.
|
||||
// At the moment of this call, that happens when a sandbox is initialized, is possible that
|
||||
// networkDB has already delivered some events of peers already available on remote nodes,
|
||||
// these peers are saved into the peerDB and this function is used to properly configure
|
||||
// the network sandbox with all those peers that got previously notified.
|
||||
// Note also that this method sends a single message on the channel and the go routine on the
|
||||
// other side, will atomically loop on the whole table of peers and will program their state
|
||||
// in one single atomic operation. This is fundamental to guarantee consistency, and avoid that
|
||||
// new peerAdd or peerDelete gets reordered during the sandbox init.
|
||||
func (d *driver) initSandboxPeerDB(nid string) {
|
||||
d.peerInit(nid)
|
||||
}
|
||||
|
||||
type peerOperationType int32
|
||||
|
||||
const (
|
||||
peerOperationINIT peerOperationType = iota
|
||||
peerOperationADD
|
||||
peerOperationDELETE
|
||||
)
|
||||
|
||||
type peerOperation struct {
|
||||
isAdd bool
|
||||
opType peerOperationType
|
||||
networkID string
|
||||
endpointID string
|
||||
peerIP net.IP
|
||||
|
@ -255,9 +248,12 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
|
|||
case <-ctx.Done():
|
||||
return
|
||||
case op := <-ch:
|
||||
if op.isAdd {
|
||||
switch op.opType {
|
||||
case peerOperationINIT:
|
||||
err = d.peerInitOp(op.networkID)
|
||||
case peerOperationADD:
|
||||
err = d.peerAddOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.updateDB, op.l2Miss, op.l3Miss, op.localPeer)
|
||||
} else {
|
||||
case peerOperationDELETE:
|
||||
err = d.peerDeleteOp(op.networkID, op.endpointID, op.peerIP, op.peerIPMask, op.peerMac, op.vtepIP, op.localPeer)
|
||||
}
|
||||
if err != nil {
|
||||
|
@ -267,11 +263,33 @@ func (d *driver) peerOpRoutine(ctx context.Context, ch chan *peerOperation) {
|
|||
}
|
||||
}
|
||||
|
||||
func (d *driver) peerInit(nid string) {
|
||||
callerName := common.CallerName(1)
|
||||
d.peerOpCh <- &peerOperation{
|
||||
opType: peerOperationINIT,
|
||||
networkID: nid,
|
||||
callerName: callerName,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *driver) peerInitOp(nid string) error {
|
||||
return d.peerDbNetworkWalk(nid, func(pKey *peerKey, pEntry *peerEntry) bool {
|
||||
// Local entries do not need to be added
|
||||
if pEntry.isLocal {
|
||||
return false
|
||||
}
|
||||
|
||||
d.peerAddOp(nid, pEntry.eid, pKey.peerIP, pEntry.peerIPMask, pKey.peerMac, pEntry.vtep, false, false, false, false)
|
||||
// return false to loop on all entries
|
||||
return false
|
||||
})
|
||||
}
|
||||
|
||||
func (d *driver) peerAdd(nid, eid string, peerIP net.IP, peerIPMask net.IPMask,
|
||||
peerMac net.HardwareAddr, vtep net.IP, updateDb, l2Miss, l3Miss, localPeer bool) {
|
||||
callerName := common.CallerName(1)
|
||||
d.peerOpCh <- &peerOperation{
|
||||
isAdd: true,
|
||||
opType: peerOperationADD,
|
||||
networkID: nid,
|
||||
endpointID: eid,
|
||||
peerIP: peerIP,
|
||||
|
@ -307,6 +325,9 @@ func (d *driver) peerAddOp(nid, eid string, peerIP net.IP, peerIPMask net.IPMask
|
|||
|
||||
sbox := n.sandbox()
|
||||
if sbox == nil {
|
||||
// We are hitting this case for all the events that are arriving before that the sandbox
|
||||
// is being created. The peer got already added into the database and the sanbox init will
|
||||
// call the peerDbUpdateSandbox that will configure all these peers from the database
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -350,7 +371,7 @@ func (d *driver) peerDelete(nid, eid string, peerIP net.IP, peerIPMask net.IPMas
|
|||
peerMac net.HardwareAddr, vtep net.IP, updateDb bool) {
|
||||
callerName := common.CallerName(1)
|
||||
d.peerOpCh <- &peerOperation{
|
||||
isAdd: false,
|
||||
opType: peerOperationDELETE,
|
||||
networkID: nid,
|
||||
endpointID: eid,
|
||||
peerIP: peerIP,
|
||||
|
|
|
@ -524,7 +524,7 @@ func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
|
|||
// WalkTable walks a single table in NetworkDB and invokes the passed
|
||||
// function for each entry in the table passing the network, key,
|
||||
// value. The walk stops if the passed function returns a true.
|
||||
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
|
||||
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte, bool) bool) error {
|
||||
nDB.RLock()
|
||||
values := make(map[string]interface{})
|
||||
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
|
||||
|
@ -537,7 +537,7 @@ func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bo
|
|||
params := strings.Split(k[1:], "/")
|
||||
nid := params[1]
|
||||
key := params[2]
|
||||
if fn(nid, key, v.(*entry).value) {
|
||||
if fn(nid, key, v.(*entry).value, v.(*entry).deleting) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue