1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Do not rebroacast bulk sync updates

Bulksync is not meant to be rebroadcast in gossip. Stopped
rebroadcasting bulksync updates.

Signed-off-by: Jana Radhakrishnan <mrjana@docker.com>
This commit is contained in:
Jana Radhakrishnan 2016-06-12 20:19:40 -07:00
parent 888baafd60
commit 78a3cf5f6c

View file

@ -108,7 +108,7 @@ func (nDB *NetworkDB) handleTableEvent(tEvent *TableEvent) bool {
return true return true
} }
func (nDB *NetworkDB) handleCompound(buf []byte) { func (nDB *NetworkDB) handleCompound(buf []byte, isBulkSync bool) {
// Decode the parts // Decode the parts
parts, err := decodeCompoundMessage(buf) parts, err := decodeCompoundMessage(buf)
if err != nil { if err != nil {
@ -118,18 +118,19 @@ func (nDB *NetworkDB) handleCompound(buf []byte) {
// Handle each message // Handle each message
for _, part := range parts { for _, part := range parts {
nDB.handleMessage(part) nDB.handleMessage(part, isBulkSync)
} }
} }
func (nDB *NetworkDB) handleTableMessage(buf []byte) { func (nDB *NetworkDB) handleTableMessage(buf []byte, isBulkSync bool) {
var tEvent TableEvent var tEvent TableEvent
if err := proto.Unmarshal(buf, &tEvent); err != nil { if err := proto.Unmarshal(buf, &tEvent); err != nil {
logrus.Errorf("Error decoding table event message: %v", err) logrus.Errorf("Error decoding table event message: %v", err)
return return
} }
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast { // Do not rebroadcast a bulk sync
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast && !isBulkSync {
var err error var err error
buf, err = encodeRawMessage(MessageTypeTableEvent, buf) buf, err = encodeRawMessage(MessageTypeTableEvent, buf)
if err != nil { if err != nil {
@ -195,7 +196,7 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) {
nDB.tableClock.Witness(bsm.LTime) nDB.tableClock.Witness(bsm.LTime)
} }
nDB.handleMessage(bsm.Payload) nDB.handleMessage(bsm.Payload, true)
// Don't respond to a bulk sync which was not unsolicited // Don't respond to a bulk sync which was not unsolicited
if !bsm.Unsolicited { if !bsm.Unsolicited {
@ -214,7 +215,7 @@ func (nDB *NetworkDB) handleBulkSync(buf []byte) {
} }
} }
func (nDB *NetworkDB) handleMessage(buf []byte) { func (nDB *NetworkDB) handleMessage(buf []byte, isBulkSync bool) {
mType, data, err := decodeMessage(buf) mType, data, err := decodeMessage(buf)
if err != nil { if err != nil {
logrus.Errorf("Error decoding gossip message to get message type: %v", err) logrus.Errorf("Error decoding gossip message to get message type: %v", err)
@ -225,11 +226,11 @@ func (nDB *NetworkDB) handleMessage(buf []byte) {
case MessageTypeNetworkEvent: case MessageTypeNetworkEvent:
nDB.handleNetworkMessage(data) nDB.handleNetworkMessage(data)
case MessageTypeTableEvent: case MessageTypeTableEvent:
nDB.handleTableMessage(data) nDB.handleTableMessage(data, isBulkSync)
case MessageTypeBulkSync: case MessageTypeBulkSync:
nDB.handleBulkSync(data) nDB.handleBulkSync(data)
case MessageTypeCompound: case MessageTypeCompound:
nDB.handleCompound(data) nDB.handleCompound(data, isBulkSync)
default: default:
logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType) logrus.Errorf("%s: unknown message type %d", nDB.config.NodeName, mType)
} }
@ -240,7 +241,7 @@ func (d *delegate) NotifyMsg(buf []byte) {
return return
} }
d.nDB.handleMessage(buf) d.nDB.handleMessage(buf, false)
} }
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte { func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {