1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/vendor/github.com/docker/libnetwork/networkdb/broadcast.go
Chris Telfer 0e162d9923 Bump libnetwork to 3ac297bc
Bump libnetwork to 3ac297bc7fd0afec9051bbb47024c9bc1d75bf5b in order to
get fix 0c3d9f00 which addresses a flaw that the scalable load balancing
code revealed.  Attempting to print sandbox IDs where the sandbox name
was too short results in a goroutine panic.  This can occur with
sandboxes with names of 1 or 2 characters in the previous code. But due
to naming updates in the scalable load balancing code, it could now
occur for networks whose name was 3 characters and at least one of the
integration tests employed such networks (named 'foo', 'bar' and 'baz').

This update also brings in several changes as well:
 * 6c7c6017 - Fix error handling about bridgeSetup
 * 5ed38221 - Optimize networkDB queue
 * cfa9afdb - ndots: produce error on negative numbers
 * 5586e226 - improve error message for invalid ndots number
 * 449672e5 - Allows to set generic knobs on the Sandbox
 * 6b4c4af7 - do not ignore user-provided "ndots:0" option
 * 843a0e42 - Adjust corner case for reconnect logic

Signed-off-by: Chris Telfer <ctelfer@docker.com>
2018-07-06 13:58:09 -04:00

172 lines
3.4 KiB
Go

package networkdb
import (
"errors"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
const broadcastTimeout = 5 * time.Second
type networkEventMessage struct {
id string
node string
msg []byte
}
func (m *networkEventMessage) Invalidates(other memberlist.Broadcast) bool {
otherm := other.(*networkEventMessage)
return m.id == otherm.id && m.node == otherm.node
}
func (m *networkEventMessage) Message() []byte {
return m.msg
}
func (m *networkEventMessage) Finished() {
}
func (nDB *NetworkDB) sendNetworkEvent(nid string, event NetworkEvent_Type, ltime serf.LamportTime) error {
nEvent := NetworkEvent{
Type: event,
LTime: ltime,
NodeName: nDB.config.NodeID,
NetworkID: nid,
}
raw, err := encodeMessage(MessageTypeNetworkEvent, &nEvent)
if err != nil {
return err
}
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: raw,
id: nid,
node: nDB.config.NodeID,
})
return nil
}
type nodeEventMessage struct {
msg []byte
notify chan<- struct{}
}
func (m *nodeEventMessage) Invalidates(other memberlist.Broadcast) bool {
return false
}
func (m *nodeEventMessage) Message() []byte {
return m.msg
}
func (m *nodeEventMessage) Finished() {
if m.notify != nil {
close(m.notify)
}
}
func (nDB *NetworkDB) sendNodeEvent(event NodeEvent_Type) error {
nEvent := NodeEvent{
Type: event,
LTime: nDB.networkClock.Increment(),
NodeName: nDB.config.NodeID,
}
raw, err := encodeMessage(MessageTypeNodeEvent, &nEvent)
if err != nil {
return err
}
notifyCh := make(chan struct{})
nDB.nodeBroadcasts.QueueBroadcast(&nodeEventMessage{
msg: raw,
notify: notifyCh,
})
nDB.RLock()
noPeers := len(nDB.nodes) <= 1
nDB.RUnlock()
// Message enqueued, do not wait for a send if no peer is present
if noPeers {
return nil
}
// Wait for the broadcast
select {
case <-notifyCh:
case <-time.After(broadcastTimeout):
return errors.New("timed out broadcasting node event")
}
return nil
}
type tableEventMessage struct {
id string
tname string
key string
msg []byte
}
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
otherm := other.(*tableEventMessage)
return m.tname == otherm.tname && m.id == otherm.id && m.key == otherm.key
}
func (m *tableEventMessage) Message() []byte {
return m.msg
}
func (m *tableEventMessage) Finished() {
}
func (nDB *NetworkDB) sendTableEvent(event TableEvent_Type, nid string, tname string, key string, entry *entry) error {
tEvent := TableEvent{
Type: event,
LTime: entry.ltime,
NodeName: nDB.config.NodeID,
NetworkID: nid,
TableName: tname,
Key: key,
Value: entry.value,
// The duration in second is a float that below would be truncated
ResidualReapTime: int32(entry.reapTime.Seconds()),
}
raw, err := encodeMessage(MessageTypeTableEvent, &tEvent)
if err != nil {
return err
}
var broadcastQ *memberlist.TransmitLimitedQueue
nDB.RLock()
thisNodeNetworks, ok := nDB.networks[nDB.config.NodeID]
if ok {
// The network may have been removed
network, networkOk := thisNodeNetworks[nid]
if !networkOk {
nDB.RUnlock()
return nil
}
broadcastQ = network.tableBroadcasts
}
nDB.RUnlock()
// The network may have been removed
if broadcastQ == nil {
return nil
}
broadcastQ.QueueBroadcast(&tableEventMessage{
msg: raw,
id: nid,
tname: tname,
key: key,
})
return nil
}