diff --git a/libnetwork/networkdb/delegate.go b/libnetwork/networkdb/delegate.go index 072c6221e5..c308fde795 100644 --- a/libnetwork/networkdb/delegate.go +++ b/libnetwork/networkdb/delegate.go @@ -165,16 +165,19 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { } } nDB.RUnlock() + if !ok || network.leaving || !nodePresent { // I'm out of the network OR the event owner is not anymore part of the network so do not propagate return false } + nDB.Lock() e, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key) if err == nil { // We have the latest state. Ignore the event // since it is stale. if e.ltime >= tEvent.LTime { + nDB.Unlock() return false } } @@ -195,8 +198,6 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool { nDB.config.Hostname, nDB.config.NodeID, tEvent) e.reapTime = nDB.config.reapEntryInterval } - - nDB.Lock() nDB.createOrUpdateEntry(tEvent.NetworkID, tEvent.TableName, tEvent.Key, e) nDB.Unlock() diff --git a/libnetwork/networkdb/event_delegate.go b/libnetwork/networkdb/event_delegate.go index 89aa7c470c..78ebe0fd9e 100644 --- a/libnetwork/networkdb/event_delegate.go +++ b/libnetwork/networkdb/event_delegate.go @@ -26,13 +26,10 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) { e.broadcastNodeEvent(mn.Addr, opCreate) e.nDB.Lock() defer e.nDB.Unlock() + // In case the node is rejoining after a failure or leave, - // wait until an explicit join message arrives before adding - // it to the nodes just to make sure this is not a stale - // join. If you don't know about this node add it immediately. - _, fOk := e.nDB.failedNodes[mn.Name] - _, lOk := e.nDB.leftNodes[mn.Name] - if fOk || lOk { + // just add the node back to active + if moved, _ := e.nDB.changeNodeState(mn.Name, nodeActiveState); moved { return } diff --git a/libnetwork/networkdb/networkdb.go b/libnetwork/networkdb/networkdb.go index 9ec6beca35..77cb386540 100644 --- a/libnetwork/networkdb/networkdb.go +++ b/libnetwork/networkdb/networkdb.go @@ -322,6 +322,8 @@ func (nDB *NetworkDB) Peers(nid string) []PeerInfo { // GetEntry retrieves the value of a table entry in a given (network, // table, key) tuple func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { + nDB.RLock() + defer nDB.RUnlock() entry, err := nDB.getEntry(tname, nid, key) if err != nil { return nil, err @@ -331,9 +333,6 @@ func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) { } func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { - nDB.RLock() - defer nDB.RUnlock() - e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key)) if !ok { return nil, types.NotFoundErrorf("could not get entry in table %s with network id %s and key %s", tname, nid, key) @@ -348,13 +347,10 @@ func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) { // entry for the same tuple for which there is already an existing // entry unless the current entry is deleting state. func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { + nDB.Lock() oldEntry, err := nDB.getEntry(tname, nid, key) - if err != nil { - if _, ok := err.(types.NotFoundError); !ok { - return fmt.Errorf("cannot create entry in table %s with network id %s and key %s: %v", tname, nid, key, err) - } - } - if oldEntry != nil && !oldEntry.deleting { + if err == nil || (oldEntry != nil && !oldEntry.deleting) { + nDB.Unlock() return fmt.Errorf("cannot create entry in table %s with network id %s and key %s, already exists", tname, nid, key) } @@ -364,14 +360,13 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { value: value, } + nDB.createOrUpdateEntry(nid, tname, key, entry) + nDB.Unlock() + if err := nDB.sendTableEvent(TableEventTypeCreate, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send create event for table %s, %v", tname, err) } - nDB.Lock() - nDB.createOrUpdateEntry(nid, tname, key, entry) - nDB.Unlock() - return nil } @@ -380,7 +375,9 @@ func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error { // propagates this event to the cluster. It is an error to update a // non-existent entry. func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { - if _, err := nDB.GetEntry(tname, nid, key); err != nil { + nDB.Lock() + if _, err := nDB.getEntry(tname, nid, key); err != nil { + nDB.Unlock() return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) } @@ -390,14 +387,13 @@ func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error { value: value, } + nDB.createOrUpdateEntry(nid, tname, key, entry) + nDB.Unlock() + if err := nDB.sendTableEvent(TableEventTypeUpdate, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send table update event: %v", err) } - nDB.Lock() - nDB.createOrUpdateEntry(nid, tname, key, entry) - nDB.Unlock() - return nil } @@ -427,27 +423,29 @@ func (nDB *NetworkDB) GetTableByNetwork(tname, nid string) map[string]*TableElem // table, key) tuple and if the NetworkDB is part of the cluster // propagates this event to the cluster. func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error { - value, err := nDB.GetEntry(tname, nid, key) - if err != nil { - return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key) + nDB.Lock() + oldEntry, err := nDB.getEntry(tname, nid, key) + if err != nil || oldEntry == nil || oldEntry.deleting { + nDB.Unlock() + return fmt.Errorf("cannot delete entry %s with network id %s and key %s "+ + "does not exist or is already being deleted", tname, nid, key) } entry := &entry{ ltime: nDB.tableClock.Increment(), node: nDB.config.NodeID, - value: value, + value: oldEntry.value, deleting: true, reapTime: nDB.config.reapEntryInterval, } + nDB.createOrUpdateEntry(nid, tname, key, entry) + nDB.Unlock() + if err := nDB.sendTableEvent(TableEventTypeDelete, nid, tname, key, entry); err != nil { return fmt.Errorf("cannot send table delete event: %v", err) } - nDB.Lock() - nDB.createOrUpdateEntry(nid, tname, key, entry) - nDB.Unlock() - return nil } diff --git a/libnetwork/networkdb/networkdb_test.go b/libnetwork/networkdb/networkdb_test.go index 6fea56193e..3dab060952 100644 --- a/libnetwork/networkdb/networkdb_test.go +++ b/libnetwork/networkdb/networkdb_test.go @@ -735,3 +735,64 @@ func TestNodeReincarnation(t *testing.T) { closeNetworkDBInstances(dbs) } + +func TestParallelCreate(t *testing.T) { + dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) + + startCh := make(chan int) + doneCh := make(chan error) + var success int32 + for i := 0; i < 20; i++ { + go func() { + <-startCh + err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value")) + if err == nil { + atomic.AddInt32(&success, 1) + } + doneCh <- err + }() + } + + close(startCh) + + for i := 0; i < 20; i++ { + <-doneCh + } + close(doneCh) + // Only 1 write should have succeeded + assert.Equal(t, int32(1), success) + + closeNetworkDBInstances(dbs) +} + +func TestParallelDelete(t *testing.T) { + dbs := createNetworkDBInstances(t, 1, "node", DefaultConfig()) + + err := dbs[0].CreateEntry("testTable", "testNetwork", "key", []byte("value")) + assert.NoError(t, err) + + startCh := make(chan int) + doneCh := make(chan error) + var success int32 + for i := 0; i < 20; i++ { + go func() { + <-startCh + err := dbs[0].DeleteEntry("testTable", "testNetwork", "key") + if err == nil { + atomic.AddInt32(&success, 1) + } + doneCh <- err + }() + } + + close(startCh) + + for i := 0; i < 20; i++ { + <-doneCh + } + close(doneCh) + // Only 1 write should have succeeded + assert.Equal(t, int32(1), success) + + closeNetworkDBInstances(dbs) +}