2015-06-21 00:08:36 +00:00
|
|
|
package memberlist
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bufio"
|
|
|
|
"bytes"
|
|
|
|
"encoding/binary"
|
|
|
|
"fmt"
|
2017-04-27 20:20:11 +00:00
|
|
|
"hash/crc32"
|
2015-06-21 00:08:36 +00:00
|
|
|
"io"
|
|
|
|
"net"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/armon/go-metrics"
|
|
|
|
"github.com/hashicorp/go-msgpack/codec"
|
|
|
|
)
|
|
|
|
|
|
|
|
// This is the minimum and maximum protocol version that we can
|
|
|
|
// _understand_. We're allowed to speak at any version within this
|
|
|
|
// range. This range is inclusive.
|
|
|
|
const (
|
|
|
|
ProtocolVersionMin uint8 = 1
|
2016-05-08 07:31:30 +00:00
|
|
|
|
|
|
|
// Version 3 added support for TCP pings but we kept the default
|
|
|
|
// protocol version at 2 to ease transition to this new feature.
|
|
|
|
// A memberlist speaking version 2 of the protocol will attempt
|
|
|
|
// to TCP ping another memberlist who understands version 3 or
|
|
|
|
// greater.
|
2017-04-27 20:20:11 +00:00
|
|
|
//
|
|
|
|
// Version 4 added support for nacks as part of indirect probes.
|
|
|
|
// A memberlist speaking version 2 of the protocol will expect
|
|
|
|
// nacks from another memberlist who understands version 4 or
|
|
|
|
// greater, and likewise nacks will be sent to memberlists who
|
|
|
|
// understand version 4 or greater.
|
2016-05-08 07:31:30 +00:00
|
|
|
ProtocolVersion2Compatible = 2
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
ProtocolVersionMax = 5
|
2015-06-21 00:08:36 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// messageType is an integer ID of a type of message that can be received
|
|
|
|
// on network channels from other members.
|
|
|
|
type messageType uint8
|
|
|
|
|
|
|
|
// The list of available message types.
|
|
|
|
const (
|
|
|
|
pingMsg messageType = iota
|
|
|
|
indirectPingMsg
|
|
|
|
ackRespMsg
|
|
|
|
suspectMsg
|
|
|
|
aliveMsg
|
|
|
|
deadMsg
|
|
|
|
pushPullMsg
|
|
|
|
compoundMsg
|
|
|
|
userMsg // User mesg, not handled by us
|
|
|
|
compressMsg
|
|
|
|
encryptMsg
|
2017-04-27 20:20:11 +00:00
|
|
|
nackRespMsg
|
|
|
|
hasCrcMsg
|
2018-01-29 19:19:37 +00:00
|
|
|
errMsg
|
2015-06-21 00:08:36 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
// compressionType is used to specify the compression algorithm
|
|
|
|
type compressionType uint8
|
|
|
|
|
|
|
|
const (
|
|
|
|
lzwAlgo compressionType = iota
|
|
|
|
)
|
|
|
|
|
|
|
|
const (
|
|
|
|
MetaMaxSize = 512 // Maximum size for node meta data
|
|
|
|
compoundHeaderOverhead = 2 // Assumed header overhead
|
|
|
|
compoundOverhead = 2 // Assumed overhead per entry in compoundHeader
|
|
|
|
userMsgOverhead = 1
|
|
|
|
blockingWarning = 10 * time.Millisecond // Warn if a UDP packet takes this long to process
|
|
|
|
maxPushStateBytes = 10 * 1024 * 1024
|
|
|
|
)
|
|
|
|
|
|
|
|
// ping request sent directly to node
|
|
|
|
type ping struct {
|
|
|
|
SeqNo uint32
|
|
|
|
|
|
|
|
// Node is sent so the target can verify they are
|
|
|
|
// the intended recipient. This is to protect again an agent
|
|
|
|
// restart with a new name.
|
|
|
|
Node string
|
|
|
|
}
|
|
|
|
|
|
|
|
// indirect ping sent to an indirect ndoe
|
|
|
|
type indirectPingReq struct {
|
|
|
|
SeqNo uint32
|
|
|
|
Target []byte
|
|
|
|
Port uint16
|
|
|
|
Node string
|
2017-04-27 20:20:11 +00:00
|
|
|
Nack bool // true if we'd like a nack back
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// ack response is sent for a ping
|
|
|
|
type ackResp struct {
|
2016-05-08 07:31:30 +00:00
|
|
|
SeqNo uint32
|
|
|
|
Payload []byte
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// nack response is sent for an indirect ping when the pinger doesn't hear from
|
|
|
|
// the ping-ee within the configured timeout. This lets the original node know
|
|
|
|
// that the indirect ping attempt happened but didn't succeed.
|
|
|
|
type nackResp struct {
|
|
|
|
SeqNo uint32
|
|
|
|
}
|
|
|
|
|
2018-01-29 19:19:37 +00:00
|
|
|
// err response is sent to relay the error from the remote end
|
|
|
|
type errResp struct {
|
|
|
|
Error string
|
|
|
|
}
|
|
|
|
|
2015-06-21 00:08:36 +00:00
|
|
|
// suspect is broadcast when we suspect a node is dead
|
|
|
|
type suspect struct {
|
|
|
|
Incarnation uint32
|
|
|
|
Node string
|
|
|
|
From string // Include who is suspecting
|
|
|
|
}
|
|
|
|
|
|
|
|
// alive is broadcast when we know a node is alive.
|
|
|
|
// Overloaded for nodes joining
|
|
|
|
type alive struct {
|
|
|
|
Incarnation uint32
|
|
|
|
Node string
|
|
|
|
Addr []byte
|
|
|
|
Port uint16
|
|
|
|
Meta []byte
|
|
|
|
|
|
|
|
// The versions of the protocol/delegate that are being spoken, order:
|
|
|
|
// pmin, pmax, pcur, dmin, dmax, dcur
|
|
|
|
Vsn []uint8
|
|
|
|
}
|
|
|
|
|
|
|
|
// dead is broadcast when we confirm a node is dead
|
|
|
|
// Overloaded for nodes leaving
|
|
|
|
type dead struct {
|
|
|
|
Incarnation uint32
|
|
|
|
Node string
|
|
|
|
From string // Include who is suspecting
|
|
|
|
}
|
|
|
|
|
|
|
|
// pushPullHeader is used to inform the
|
2017-04-27 20:20:11 +00:00
|
|
|
// otherside how many states we are transferring
|
2015-06-21 00:08:36 +00:00
|
|
|
type pushPullHeader struct {
|
|
|
|
Nodes int
|
|
|
|
UserStateLen int // Encodes the byte lengh of user state
|
|
|
|
Join bool // Is this a join request or a anti-entropy run
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
// userMsgHeader is used to encapsulate a userMsg
|
|
|
|
type userMsgHeader struct {
|
|
|
|
UserMsgLen int // Encodes the byte lengh of user state
|
|
|
|
}
|
|
|
|
|
2015-06-21 00:08:36 +00:00
|
|
|
// pushNodeState is used for pushPullReq when we are
|
2017-04-27 20:20:11 +00:00
|
|
|
// transferring out node states
|
2015-06-21 00:08:36 +00:00
|
|
|
type pushNodeState struct {
|
|
|
|
Name string
|
|
|
|
Addr []byte
|
|
|
|
Port uint16
|
|
|
|
Meta []byte
|
|
|
|
Incarnation uint32
|
|
|
|
State nodeStateType
|
|
|
|
Vsn []uint8 // Protocol versions
|
|
|
|
}
|
|
|
|
|
|
|
|
// compress is used to wrap an underlying payload
|
|
|
|
// using a specified compression algorithm
|
|
|
|
type compress struct {
|
|
|
|
Algo compressionType
|
|
|
|
Buf []byte
|
|
|
|
}
|
|
|
|
|
|
|
|
// msgHandoff is used to transfer a message between goroutines
|
|
|
|
type msgHandoff struct {
|
|
|
|
msgType messageType
|
|
|
|
buf []byte
|
|
|
|
from net.Addr
|
|
|
|
}
|
|
|
|
|
|
|
|
// encryptionVersion returns the encryption version to use
|
|
|
|
func (m *Memberlist) encryptionVersion() encryptionVersion {
|
|
|
|
switch m.ProtocolVersion() {
|
|
|
|
case 1:
|
|
|
|
return 0
|
|
|
|
default:
|
|
|
|
return 1
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// streamListen is a long running goroutine that pulls incoming streams from the
|
|
|
|
// transport and hands them off for processing.
|
|
|
|
func (m *Memberlist) streamListen() {
|
2015-06-21 00:08:36 +00:00
|
|
|
for {
|
2017-04-27 20:20:11 +00:00
|
|
|
select {
|
|
|
|
case conn := <-m.transport.StreamCh():
|
|
|
|
go m.handleConn(conn)
|
2015-06-21 00:08:36 +00:00
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
case <-m.shutdownCh:
|
|
|
|
return
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// handleConn handles a single incoming stream connection from the transport.
|
|
|
|
func (m *Memberlist) handleConn(conn net.Conn) {
|
|
|
|
m.logger.Printf("[DEBUG] memberlist: Stream connection %s", LogConn(conn))
|
2016-05-08 07:31:30 +00:00
|
|
|
|
2015-06-21 00:08:36 +00:00
|
|
|
defer conn.Close()
|
|
|
|
metrics.IncrCounter([]string{"memberlist", "tcp", "accept"}, 1)
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
2017-04-27 20:20:11 +00:00
|
|
|
msgType, bufConn, dec, err := m.readStream(conn)
|
2015-06-21 00:08:36 +00:00
|
|
|
if err != nil {
|
2017-04-27 20:20:11 +00:00
|
|
|
if err != io.EOF {
|
|
|
|
m.logger.Printf("[ERR] memberlist: failed to receive: %s %s", err, LogConn(conn))
|
2018-01-29 19:19:37 +00:00
|
|
|
|
|
|
|
resp := errResp{err.Error()}
|
|
|
|
out, err := encode(errMsg, &resp)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to encode error response: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
err = m.rawSendMsgStream(conn, out.Bytes())
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to send error: %s %s", err, LogConn(conn))
|
|
|
|
return
|
|
|
|
}
|
2017-04-27 20:20:11 +00:00
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
switch msgType {
|
|
|
|
case userMsg:
|
|
|
|
if err := m.readUserMsg(bufConn, dec); err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to receive user message: %s %s", err, LogConn(conn))
|
|
|
|
}
|
|
|
|
case pushPullMsg:
|
|
|
|
join, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to read remote state: %s %s", err, LogConn(conn))
|
|
|
|
return
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
if err := m.sendLocalState(conn, join); err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to push local state: %s %s", err, LogConn(conn))
|
|
|
|
return
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
if err := m.mergeRemoteState(join, remoteNodes, userState); err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed push/pull merge: %s %s", err, LogConn(conn))
|
|
|
|
return
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
2016-05-08 07:31:30 +00:00
|
|
|
case pingMsg:
|
|
|
|
var p ping
|
|
|
|
if err := dec.Decode(&p); err != nil {
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode ping: %s %s", err, LogConn(conn))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
if p.Node != "" && p.Node != m.config.Name {
|
|
|
|
m.logger.Printf("[WARN] memberlist: Got ping for unexpected node %s %s", p.Node, LogConn(conn))
|
|
|
|
return
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
ack := ackResp{p.SeqNo, nil}
|
|
|
|
out, err := encode(ackRespMsg, &ack)
|
|
|
|
if err != nil {
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to encode ack: %s", err)
|
2016-05-08 07:31:30 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
err = m.rawSendMsgStream(conn, out.Bytes())
|
2016-05-08 07:31:30 +00:00
|
|
|
if err != nil {
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogConn(conn))
|
2016-05-08 07:31:30 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
default:
|
|
|
|
m.logger.Printf("[ERR] memberlist: Received invalid msgType (%d) %s", msgType, LogConn(conn))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// packetListen is a long running goroutine that pulls packets out of the
|
|
|
|
// transport and hands them off for processing.
|
|
|
|
func (m *Memberlist) packetListen() {
|
2015-06-21 00:08:36 +00:00
|
|
|
for {
|
2017-04-27 20:20:11 +00:00
|
|
|
select {
|
|
|
|
case packet := <-m.transport.PacketCh():
|
|
|
|
m.ingestPacket(packet.Buf, packet.From, packet.Timestamp)
|
2016-05-08 07:31:30 +00:00
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
case <-m.shutdownCh:
|
|
|
|
return
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Check if encryption is enabled
|
|
|
|
if m.config.EncryptionEnabled() {
|
|
|
|
// Decrypt the payload
|
|
|
|
plain, err := decryptPayload(m.config.Keyring.GetKeys(), buf, nil)
|
|
|
|
if err != nil {
|
2018-01-29 19:19:37 +00:00
|
|
|
if !m.config.GossipVerifyIncoming {
|
|
|
|
// Treat the message as plaintext
|
|
|
|
plain = buf
|
|
|
|
} else {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Decrypt packet failed: %v %s", err, LogAddress(from))
|
|
|
|
return
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Continue processing the plaintext buffer
|
|
|
|
buf = plain
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// See if there's a checksum included to verify the contents of the message
|
|
|
|
if len(buf) >= 5 && messageType(buf[0]) == hasCrcMsg {
|
|
|
|
crc := crc32.ChecksumIEEE(buf[5:])
|
|
|
|
expected := binary.BigEndian.Uint32(buf[1:5])
|
|
|
|
if crc != expected {
|
|
|
|
m.logger.Printf("[WARN] memberlist: Got invalid checksum for UDP packet: %x, %x", crc, expected)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
m.handleCommand(buf[5:], from, timestamp)
|
|
|
|
} else {
|
|
|
|
m.handleCommand(buf, from, timestamp)
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Decode the message type
|
|
|
|
msgType := messageType(buf[0])
|
|
|
|
buf = buf[1:]
|
|
|
|
|
|
|
|
// Switch on the msgType
|
|
|
|
switch msgType {
|
|
|
|
case compoundMsg:
|
2016-05-08 07:31:30 +00:00
|
|
|
m.handleCompound(buf, from, timestamp)
|
2015-06-21 00:08:36 +00:00
|
|
|
case compressMsg:
|
2016-05-08 07:31:30 +00:00
|
|
|
m.handleCompressed(buf, from, timestamp)
|
2015-06-21 00:08:36 +00:00
|
|
|
|
|
|
|
case pingMsg:
|
|
|
|
m.handlePing(buf, from)
|
|
|
|
case indirectPingMsg:
|
|
|
|
m.handleIndirectPing(buf, from)
|
|
|
|
case ackRespMsg:
|
2016-05-08 07:31:30 +00:00
|
|
|
m.handleAck(buf, from, timestamp)
|
2017-04-27 20:20:11 +00:00
|
|
|
case nackRespMsg:
|
|
|
|
m.handleNack(buf, from)
|
2015-06-21 00:08:36 +00:00
|
|
|
|
|
|
|
case suspectMsg:
|
|
|
|
fallthrough
|
|
|
|
case aliveMsg:
|
|
|
|
fallthrough
|
|
|
|
case deadMsg:
|
|
|
|
fallthrough
|
|
|
|
case userMsg:
|
|
|
|
select {
|
|
|
|
case m.handoff <- msgHandoff{msgType, buf, from}:
|
|
|
|
default:
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[WARN] memberlist: handler queue full, dropping message (%d) %s", msgType, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
default:
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: msg type (%d) not supported %s", msgType, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// packetHandler is a long running goroutine that processes messages received
|
|
|
|
// over the packet interface, but is decoupled from the listener to avoid
|
|
|
|
// blocking the listener which may cause ping/ack messages to be delayed.
|
|
|
|
func (m *Memberlist) packetHandler() {
|
2015-06-21 00:08:36 +00:00
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg := <-m.handoff:
|
|
|
|
msgType := msg.msgType
|
|
|
|
buf := msg.buf
|
|
|
|
from := msg.from
|
|
|
|
|
|
|
|
switch msgType {
|
|
|
|
case suspectMsg:
|
|
|
|
m.handleSuspect(buf, from)
|
|
|
|
case aliveMsg:
|
|
|
|
m.handleAlive(buf, from)
|
|
|
|
case deadMsg:
|
|
|
|
m.handleDead(buf, from)
|
|
|
|
case userMsg:
|
|
|
|
m.handleUser(buf, from)
|
|
|
|
default:
|
2017-04-27 20:20:11 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Message type (%d) not supported %s (packet handler)", msgType, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
case <-m.shutdownCh:
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) handleCompound(buf []byte, from net.Addr, timestamp time.Time) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Decode the parts
|
|
|
|
trunc, parts, err := decodeCompoundMessage(buf)
|
|
|
|
if err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode compound request: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Log any truncation
|
|
|
|
if trunc > 0 {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[WARN] memberlist: Compound request had %d truncated messages %s", trunc, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Handle each message
|
|
|
|
for _, part := range parts {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.handleCommand(part, from, timestamp)
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Memberlist) handlePing(buf []byte, from net.Addr) {
|
|
|
|
var p ping
|
|
|
|
if err := decode(buf, &p); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode ping request: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
// If node is provided, verify that it is for us
|
|
|
|
if p.Node != "" && p.Node != m.config.Name {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[WARN] memberlist: Got ping for unexpected node '%s' %s", p.Node, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
2016-05-08 07:31:30 +00:00
|
|
|
var ack ackResp
|
|
|
|
ack.SeqNo = p.SeqNo
|
|
|
|
if m.config.Ping != nil {
|
|
|
|
ack.Payload = m.config.Ping.AckPayload()
|
|
|
|
}
|
2017-04-27 20:20:11 +00:00
|
|
|
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to send ack: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Memberlist) handleIndirectPing(buf []byte, from net.Addr) {
|
|
|
|
var ind indirectPingReq
|
|
|
|
if err := decode(buf, &ind); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode indirect ping request: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// For proto versions < 2, there is no port provided. Mask old
|
2017-04-27 20:20:11 +00:00
|
|
|
// behavior by using the configured port.
|
2015-06-21 00:08:36 +00:00
|
|
|
if m.ProtocolVersion() < 2 || ind.Port == 0 {
|
|
|
|
ind.Port = uint16(m.config.BindPort)
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// Send a ping to the correct host.
|
2015-06-21 00:08:36 +00:00
|
|
|
localSeqNo := m.nextSeqNo()
|
|
|
|
ping := ping{SeqNo: localSeqNo, Node: ind.Node}
|
|
|
|
|
|
|
|
// Setup a response handler to relay the ack
|
2017-04-27 20:20:11 +00:00
|
|
|
cancelCh := make(chan struct{})
|
2016-05-08 07:31:30 +00:00
|
|
|
respHandler := func(payload []byte, timestamp time.Time) {
|
2017-04-27 20:20:11 +00:00
|
|
|
// Try to prevent the nack if we've caught it in time.
|
|
|
|
close(cancelCh)
|
|
|
|
|
|
|
|
// Forward the ack back to the requestor.
|
2016-05-08 07:31:30 +00:00
|
|
|
ack := ackResp{ind.SeqNo, nil}
|
2017-04-27 20:20:11 +00:00
|
|
|
if err := m.encodeAndSendMsg(from.String(), ackRespMsg, &ack); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to forward ack: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
m.setAckHandler(localSeqNo, respHandler, m.config.ProbeTimeout)
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// Send the ping.
|
|
|
|
addr := joinHostPort(net.IP(ind.Target).String(), ind.Port)
|
|
|
|
if err := m.encodeAndSendMsg(addr, pingMsg, &ping); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to send ping: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
2017-04-27 20:20:11 +00:00
|
|
|
|
|
|
|
// Setup a timer to fire off a nack if no ack is seen in time.
|
|
|
|
if ind.Nack {
|
|
|
|
go func() {
|
|
|
|
select {
|
|
|
|
case <-cancelCh:
|
|
|
|
return
|
|
|
|
case <-time.After(m.config.ProbeTimeout):
|
|
|
|
nack := nackResp{ind.SeqNo}
|
|
|
|
if err := m.encodeAndSendMsg(from.String(), nackRespMsg, &nack); err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to send nack: %s %s", err, LogAddress(from))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) handleAck(buf []byte, from net.Addr, timestamp time.Time) {
|
2015-06-21 00:08:36 +00:00
|
|
|
var ack ackResp
|
|
|
|
if err := decode(buf, &ack); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode ack response: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
2016-05-08 07:31:30 +00:00
|
|
|
m.invokeAckHandler(ack, timestamp)
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
func (m *Memberlist) handleNack(buf []byte, from net.Addr) {
|
|
|
|
var nack nackResp
|
|
|
|
if err := decode(buf, &nack); err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode nack response: %s %s", err, LogAddress(from))
|
|
|
|
return
|
|
|
|
}
|
|
|
|
m.invokeNackHandler(nack)
|
|
|
|
}
|
|
|
|
|
2015-06-21 00:08:36 +00:00
|
|
|
func (m *Memberlist) handleSuspect(buf []byte, from net.Addr) {
|
|
|
|
var sus suspect
|
|
|
|
if err := decode(buf, &sus); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode suspect message: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
m.suspectNode(&sus)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Memberlist) handleAlive(buf []byte, from net.Addr) {
|
|
|
|
var live alive
|
|
|
|
if err := decode(buf, &live); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode alive message: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// For proto versions < 2, there is no port provided. Mask old
|
|
|
|
// behavior by using the configured port
|
|
|
|
if m.ProtocolVersion() < 2 || live.Port == 0 {
|
|
|
|
live.Port = uint16(m.config.BindPort)
|
|
|
|
}
|
|
|
|
|
|
|
|
m.aliveNode(&live, nil, false)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (m *Memberlist) handleDead(buf []byte, from net.Addr) {
|
|
|
|
var d dead
|
|
|
|
if err := decode(buf, &d); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decode dead message: %s %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
m.deadNode(&d)
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleUser is used to notify channels of incoming user data
|
|
|
|
func (m *Memberlist) handleUser(buf []byte, from net.Addr) {
|
|
|
|
d := m.config.Delegate
|
|
|
|
if d != nil {
|
|
|
|
d.NotifyMsg(buf)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleCompressed is used to unpack a compressed message
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) handleCompressed(buf []byte, from net.Addr, timestamp time.Time) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Try to decode the payload
|
|
|
|
payload, err := decompressPayload(buf)
|
|
|
|
if err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to decompress payload: %v %s", err, LogAddress(from))
|
2015-06-21 00:08:36 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
// Recursively handle the payload
|
2016-05-08 07:31:30 +00:00
|
|
|
m.handleCommand(payload, from, timestamp)
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// encodeAndSendMsg is used to combine the encoding and sending steps
|
2017-04-27 20:20:11 +00:00
|
|
|
func (m *Memberlist) encodeAndSendMsg(addr string, msgType messageType, msg interface{}) error {
|
2015-06-21 00:08:36 +00:00
|
|
|
out, err := encode(msgType, msg)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-04-27 20:20:11 +00:00
|
|
|
if err := m.sendMsg(addr, out.Bytes()); err != nil {
|
2015-06-21 00:08:36 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// sendMsg is used to send a message via packet to another host. It will
|
|
|
|
// opportunistically create a compoundMsg and piggy back other broadcasts.
|
|
|
|
func (m *Memberlist) sendMsg(addr string, msg []byte) error {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Check if we can piggy back any messages
|
2017-04-27 20:20:11 +00:00
|
|
|
bytesAvail := m.config.UDPBufferSize - len(msg) - compoundHeaderOverhead
|
2018-01-29 19:19:37 +00:00
|
|
|
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
|
2015-06-21 00:08:36 +00:00
|
|
|
bytesAvail -= encryptOverhead(m.encryptionVersion())
|
|
|
|
}
|
|
|
|
extra := m.getBroadcasts(compoundOverhead, bytesAvail)
|
|
|
|
|
|
|
|
// Fast path if nothing to piggypack
|
|
|
|
if len(extra) == 0 {
|
2017-04-27 20:20:11 +00:00
|
|
|
return m.rawSendMsgPacket(addr, nil, msg)
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Join all the messages
|
|
|
|
msgs := make([][]byte, 0, 1+len(extra))
|
|
|
|
msgs = append(msgs, msg)
|
|
|
|
msgs = append(msgs, extra...)
|
|
|
|
|
|
|
|
// Create a compound message
|
|
|
|
compound := makeCompoundMessage(msgs)
|
|
|
|
|
|
|
|
// Send the message
|
2017-04-27 20:20:11 +00:00
|
|
|
return m.rawSendMsgPacket(addr, nil, compound.Bytes())
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// rawSendMsgPacket is used to send message via packet to another host without
|
|
|
|
// modification, other than compression or encryption if enabled.
|
|
|
|
func (m *Memberlist) rawSendMsgPacket(addr string, node *Node, msg []byte) error {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Check if we have compression enabled
|
|
|
|
if m.config.EnableCompression {
|
|
|
|
buf, err := compressPayload(msg)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[WARN] memberlist: Failed to compress payload: %v", err)
|
|
|
|
} else {
|
|
|
|
// Only use compression if it reduced the size
|
|
|
|
if buf.Len() < len(msg) {
|
|
|
|
msg = buf.Bytes()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// Try to look up the destination node
|
|
|
|
if node == nil {
|
|
|
|
toAddr, _, err := net.SplitHostPort(addr)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Failed to parse address %q: %v", addr, err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
m.nodeLock.RLock()
|
|
|
|
nodeState, ok := m.nodeMap[toAddr]
|
|
|
|
m.nodeLock.RUnlock()
|
|
|
|
if ok {
|
|
|
|
node = &nodeState.Node
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Add a CRC to the end of the payload if the recipient understands
|
|
|
|
// ProtocolVersion >= 5
|
|
|
|
if node != nil && node.PMax >= 5 {
|
|
|
|
crc := crc32.ChecksumIEEE(msg)
|
|
|
|
header := make([]byte, 5, 5+len(msg))
|
|
|
|
header[0] = byte(hasCrcMsg)
|
|
|
|
binary.BigEndian.PutUint32(header[1:], crc)
|
|
|
|
msg = append(header, msg...)
|
|
|
|
}
|
|
|
|
|
2015-06-21 00:08:36 +00:00
|
|
|
// Check if we have encryption enabled
|
2018-01-29 19:19:37 +00:00
|
|
|
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Encrypt the payload
|
|
|
|
var buf bytes.Buffer
|
|
|
|
primaryKey := m.config.Keyring.GetPrimaryKey()
|
|
|
|
err := encryptPayload(m.encryptionVersion(), primaryKey, msg, nil, &buf)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERR] memberlist: Encryption of message failed: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
msg = buf.Bytes()
|
|
|
|
}
|
|
|
|
|
|
|
|
metrics.IncrCounter([]string{"memberlist", "udp", "sent"}, float32(len(msg)))
|
2017-04-27 20:20:11 +00:00
|
|
|
_, err := m.transport.WriteTo(msg, addr)
|
2015-06-21 00:08:36 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// rawSendMsgStream is used to stream a message to another host without
|
|
|
|
// modification, other than applying compression and encryption if enabled.
|
|
|
|
func (m *Memberlist) rawSendMsgStream(conn net.Conn, sendBuf []byte) error {
|
2016-05-08 07:31:30 +00:00
|
|
|
// Check if compresion is enabled
|
|
|
|
if m.config.EnableCompression {
|
|
|
|
compBuf, err := compressPayload(sendBuf)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERROR] memberlist: Failed to compress payload: %v", err)
|
|
|
|
} else {
|
|
|
|
sendBuf = compBuf.Bytes()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Check if encryption is enabled
|
2018-01-29 19:19:37 +00:00
|
|
|
if m.config.EncryptionEnabled() && m.config.GossipVerifyOutgoing {
|
2016-05-08 07:31:30 +00:00
|
|
|
crypt, err := m.encryptLocalState(sendBuf)
|
|
|
|
if err != nil {
|
|
|
|
m.logger.Printf("[ERROR] memberlist: Failed to encrypt local state: %v", err)
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
sendBuf = crypt
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write out the entire send buffer
|
|
|
|
metrics.IncrCounter([]string{"memberlist", "tcp", "sent"}, float32(len(sendBuf)))
|
|
|
|
|
|
|
|
if n, err := conn.Write(sendBuf); err != nil {
|
|
|
|
return err
|
|
|
|
} else if n != len(sendBuf) {
|
|
|
|
return fmt.Errorf("only %d of %d bytes written", n, len(sendBuf))
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// sendUserMsg is used to stream a user message to another host.
|
|
|
|
func (m *Memberlist) sendUserMsg(addr string, sendBuf []byte) error {
|
|
|
|
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
|
2016-05-08 07:31:30 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
|
|
|
|
bufConn := bytes.NewBuffer(nil)
|
|
|
|
if err := bufConn.WriteByte(byte(userMsg)); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
header := userMsgHeader{UserMsgLen: len(sendBuf)}
|
|
|
|
hd := codec.MsgpackHandle{}
|
|
|
|
enc := codec.NewEncoder(bufConn, &hd)
|
|
|
|
if err := enc.Encode(&header); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if _, err := bufConn.Write(sendBuf); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-04-27 20:20:11 +00:00
|
|
|
return m.rawSendMsgStream(conn, bufConn.Bytes())
|
2016-05-08 07:31:30 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// sendAndReceiveState is used to initiate a push/pull over a stream with a
|
|
|
|
// remote host.
|
|
|
|
func (m *Memberlist) sendAndReceiveState(addr string, join bool) ([]pushNodeState, []byte, error) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Attempt to connect
|
2017-04-27 20:20:11 +00:00
|
|
|
conn, err := m.transport.DialTimeout(addr, m.config.TCPTimeout)
|
2015-06-21 00:08:36 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
m.logger.Printf("[DEBUG] memberlist: Initiating push/pull sync with: %s", conn.RemoteAddr())
|
|
|
|
metrics.IncrCounter([]string{"memberlist", "tcp", "connect"}, 1)
|
|
|
|
|
|
|
|
// Send our state
|
|
|
|
if err := m.sendLocalState(conn, join); err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
2017-04-27 20:20:11 +00:00
|
|
|
msgType, bufConn, dec, err := m.readStream(conn)
|
2015-06-21 00:08:36 +00:00
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
2018-01-29 19:19:37 +00:00
|
|
|
if msgType == errMsg {
|
|
|
|
var resp errResp
|
|
|
|
if err := dec.Decode(&resp); err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
return nil, nil, fmt.Errorf("remote error: %v", resp.Error)
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
// Quit if not push/pull
|
|
|
|
if msgType != pushPullMsg {
|
|
|
|
err := fmt.Errorf("received invalid msgType (%d), expected pushPullMsg (%d) %s", msgType, pushPullMsg, LogConn(conn))
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read remote state
|
|
|
|
_, remoteNodes, userState, err := m.readRemoteState(bufConn, dec)
|
|
|
|
return remoteNodes, userState, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// sendLocalState is invoked to send our local state over a stream connection.
|
2015-06-21 00:08:36 +00:00
|
|
|
func (m *Memberlist) sendLocalState(conn net.Conn, join bool) error {
|
|
|
|
// Setup a deadline
|
|
|
|
conn.SetDeadline(time.Now().Add(m.config.TCPTimeout))
|
|
|
|
|
|
|
|
// Prepare the local node state
|
|
|
|
m.nodeLock.RLock()
|
|
|
|
localNodes := make([]pushNodeState, len(m.nodes))
|
|
|
|
for idx, n := range m.nodes {
|
|
|
|
localNodes[idx].Name = n.Name
|
|
|
|
localNodes[idx].Addr = n.Addr
|
|
|
|
localNodes[idx].Port = n.Port
|
|
|
|
localNodes[idx].Incarnation = n.Incarnation
|
|
|
|
localNodes[idx].State = n.State
|
|
|
|
localNodes[idx].Meta = n.Meta
|
|
|
|
localNodes[idx].Vsn = []uint8{
|
|
|
|
n.PMin, n.PMax, n.PCur,
|
|
|
|
n.DMin, n.DMax, n.DCur,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
m.nodeLock.RUnlock()
|
|
|
|
|
|
|
|
// Get the delegate state
|
|
|
|
var userData []byte
|
|
|
|
if m.config.Delegate != nil {
|
|
|
|
userData = m.config.Delegate.LocalState(join)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Create a bytes buffer writer
|
|
|
|
bufConn := bytes.NewBuffer(nil)
|
|
|
|
|
|
|
|
// Send our node state
|
|
|
|
header := pushPullHeader{Nodes: len(localNodes), UserStateLen: len(userData), Join: join}
|
|
|
|
hd := codec.MsgpackHandle{}
|
|
|
|
enc := codec.NewEncoder(bufConn, &hd)
|
|
|
|
|
|
|
|
// Begin state push
|
|
|
|
if _, err := bufConn.Write([]byte{byte(pushPullMsg)}); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := enc.Encode(&header); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
for i := 0; i < header.Nodes; i++ {
|
|
|
|
if err := enc.Encode(&localNodes[i]); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Write the user state as well
|
|
|
|
if userData != nil {
|
|
|
|
if _, err := bufConn.Write(userData); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the send buffer
|
2017-04-27 20:20:11 +00:00
|
|
|
return m.rawSendMsgStream(conn, bufConn.Bytes())
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// encryptLocalState is used to help encrypt local state before sending
|
|
|
|
func (m *Memberlist) encryptLocalState(sendBuf []byte) ([]byte, error) {
|
|
|
|
var buf bytes.Buffer
|
|
|
|
|
|
|
|
// Write the encryptMsg byte
|
|
|
|
buf.WriteByte(byte(encryptMsg))
|
|
|
|
|
|
|
|
// Write the size of the message
|
|
|
|
sizeBuf := make([]byte, 4)
|
|
|
|
encVsn := m.encryptionVersion()
|
|
|
|
encLen := encryptedLength(encVsn, len(sendBuf))
|
|
|
|
binary.BigEndian.PutUint32(sizeBuf, uint32(encLen))
|
|
|
|
buf.Write(sizeBuf)
|
|
|
|
|
|
|
|
// Write the encrypted cipher text to the buffer
|
|
|
|
key := m.config.Keyring.GetPrimaryKey()
|
|
|
|
err := encryptPayload(encVsn, key, sendBuf, buf.Bytes()[:5], &buf)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// decryptRemoteState is used to help decrypt the remote state
|
|
|
|
func (m *Memberlist) decryptRemoteState(bufConn io.Reader) ([]byte, error) {
|
|
|
|
// Read in enough to determine message length
|
|
|
|
cipherText := bytes.NewBuffer(nil)
|
|
|
|
cipherText.WriteByte(byte(encryptMsg))
|
|
|
|
_, err := io.CopyN(cipherText, bufConn, 4)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Ensure we aren't asked to download too much. This is to guard against
|
|
|
|
// an attack vector where a huge amount of state is sent
|
|
|
|
moreBytes := binary.BigEndian.Uint32(cipherText.Bytes()[1:5])
|
|
|
|
if moreBytes > maxPushStateBytes {
|
|
|
|
return nil, fmt.Errorf("Remote node state is larger than limit (%d)", moreBytes)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read in the rest of the payload
|
|
|
|
_, err = io.CopyN(cipherText, bufConn, int64(moreBytes))
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Decrypt the cipherText
|
|
|
|
dataBytes := cipherText.Bytes()[:5]
|
|
|
|
cipherBytes := cipherText.Bytes()[5:]
|
|
|
|
|
|
|
|
// Decrypt the payload
|
|
|
|
keys := m.config.Keyring.GetKeys()
|
|
|
|
return decryptPayload(keys, cipherBytes, dataBytes)
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// readStream is used to read from a stream connection, decrypting and
|
|
|
|
// decompressing the stream if necessary.
|
|
|
|
func (m *Memberlist) readStream(conn net.Conn) (messageType, io.Reader, *codec.Decoder, error) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Created a buffered reader
|
|
|
|
var bufConn io.Reader = bufio.NewReader(conn)
|
|
|
|
|
|
|
|
// Read the message type
|
|
|
|
buf := [1]byte{0}
|
|
|
|
if _, err := bufConn.Read(buf[:]); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
msgType := messageType(buf[0])
|
|
|
|
|
|
|
|
// Check if the message is encrypted
|
|
|
|
if msgType == encryptMsg {
|
|
|
|
if !m.config.EncryptionEnabled() {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil,
|
2015-06-21 00:08:36 +00:00
|
|
|
fmt.Errorf("Remote state is encrypted and encryption is not configured")
|
|
|
|
}
|
|
|
|
|
|
|
|
plain, err := m.decryptRemoteState(bufConn)
|
|
|
|
if err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Reset message type and bufConn
|
|
|
|
msgType = messageType(plain[0])
|
|
|
|
bufConn = bytes.NewReader(plain[1:])
|
2018-01-29 19:19:37 +00:00
|
|
|
} else if m.config.EncryptionEnabled() && m.config.GossipVerifyIncoming {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil,
|
2015-06-21 00:08:36 +00:00
|
|
|
fmt.Errorf("Encryption is configured but remote state is not encrypted")
|
|
|
|
}
|
|
|
|
|
|
|
|
// Get the msgPack decoders
|
|
|
|
hd := codec.MsgpackHandle{}
|
|
|
|
dec := codec.NewDecoder(bufConn, &hd)
|
|
|
|
|
|
|
|
// Check if we have a compressed message
|
|
|
|
if msgType == compressMsg {
|
|
|
|
var c compress
|
|
|
|
if err := dec.Decode(&c); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
decomp, err := decompressBuffer(&c)
|
|
|
|
if err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return 0, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Reset the message type
|
|
|
|
msgType = messageType(decomp[0])
|
|
|
|
|
|
|
|
// Create a new bufConn
|
|
|
|
bufConn = bytes.NewReader(decomp[1:])
|
|
|
|
|
|
|
|
// Create a new decoder
|
|
|
|
dec = codec.NewDecoder(bufConn, &hd)
|
|
|
|
}
|
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
return msgType, bufConn, dec, nil
|
|
|
|
}
|
2015-06-21 00:08:36 +00:00
|
|
|
|
2016-05-08 07:31:30 +00:00
|
|
|
// readRemoteState is used to read the remote state from a connection
|
|
|
|
func (m *Memberlist) readRemoteState(bufConn io.Reader, dec *codec.Decoder) (bool, []pushNodeState, []byte, error) {
|
2015-06-21 00:08:36 +00:00
|
|
|
// Read the push/pull header
|
|
|
|
var header pushPullHeader
|
|
|
|
if err := dec.Decode(&header); err != nil {
|
|
|
|
return false, nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Allocate space for the transfer
|
|
|
|
remoteNodes := make([]pushNodeState, header.Nodes)
|
|
|
|
|
|
|
|
// Try to decode all the states
|
|
|
|
for i := 0; i < header.Nodes; i++ {
|
|
|
|
if err := dec.Decode(&remoteNodes[i]); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return false, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read the remote user state into a buffer
|
|
|
|
var userBuf []byte
|
|
|
|
if header.UserStateLen > 0 {
|
|
|
|
userBuf = make([]byte, header.UserStateLen)
|
|
|
|
bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserStateLen)
|
|
|
|
if err == nil && bytes != header.UserStateLen {
|
|
|
|
err = fmt.Errorf(
|
|
|
|
"Failed to read full user state (%d / %d)",
|
|
|
|
bytes, header.UserStateLen)
|
|
|
|
}
|
|
|
|
if err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return false, nil, nil, err
|
2015-06-21 00:08:36 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// For proto versions < 2, there is no port provided. Mask old
|
|
|
|
// behavior by using the configured port
|
|
|
|
for idx := range remoteNodes {
|
|
|
|
if m.ProtocolVersion() < 2 || remoteNodes[idx].Port == 0 {
|
|
|
|
remoteNodes[idx].Port = uint16(m.config.BindPort)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return header.Join, remoteNodes, userBuf, nil
|
|
|
|
}
|
2016-05-08 07:31:30 +00:00
|
|
|
|
|
|
|
// mergeRemoteState is used to merge the remote state with our local state
|
|
|
|
func (m *Memberlist) mergeRemoteState(join bool, remoteNodes []pushNodeState, userBuf []byte) error {
|
|
|
|
if err := m.verifyProtocol(remoteNodes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Invoke the merge delegate if any
|
|
|
|
if join && m.config.Merge != nil {
|
|
|
|
nodes := make([]*Node, len(remoteNodes))
|
|
|
|
for idx, n := range remoteNodes {
|
|
|
|
nodes[idx] = &Node{
|
|
|
|
Name: n.Name,
|
|
|
|
Addr: n.Addr,
|
|
|
|
Port: n.Port,
|
|
|
|
Meta: n.Meta,
|
|
|
|
PMin: n.Vsn[0],
|
|
|
|
PMax: n.Vsn[1],
|
|
|
|
PCur: n.Vsn[2],
|
|
|
|
DMin: n.Vsn[3],
|
|
|
|
DMax: n.Vsn[4],
|
|
|
|
DCur: n.Vsn[5],
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if err := m.config.Merge.NotifyMerge(nodes); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// Merge the membership state
|
|
|
|
m.mergeState(remoteNodes)
|
|
|
|
|
|
|
|
// Invoke the delegate for user state
|
|
|
|
if userBuf != nil && m.config.Delegate != nil {
|
|
|
|
m.config.Delegate.MergeRemoteState(userBuf, join)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// readUserMsg is used to decode a userMsg from a stream.
|
2016-05-08 07:31:30 +00:00
|
|
|
func (m *Memberlist) readUserMsg(bufConn io.Reader, dec *codec.Decoder) error {
|
|
|
|
// Read the user message header
|
|
|
|
var header userMsgHeader
|
|
|
|
if err := dec.Decode(&header); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Read the user message into a buffer
|
|
|
|
var userBuf []byte
|
|
|
|
if header.UserMsgLen > 0 {
|
|
|
|
userBuf = make([]byte, header.UserMsgLen)
|
|
|
|
bytes, err := io.ReadAtLeast(bufConn, userBuf, header.UserMsgLen)
|
|
|
|
if err == nil && bytes != header.UserMsgLen {
|
|
|
|
err = fmt.Errorf(
|
|
|
|
"Failed to read full user message (%d / %d)",
|
|
|
|
bytes, header.UserMsgLen)
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
d := m.config.Delegate
|
|
|
|
if d != nil {
|
|
|
|
d.NotifyMsg(userBuf)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
// sendPingAndWaitForAck makes a stream connection to the given address, sends
|
2016-05-08 07:31:30 +00:00
|
|
|
// a ping, and waits for an ack. All of this is done as a series of blocking
|
|
|
|
// operations, given the deadline. The bool return parameter is true if we
|
|
|
|
// we able to round trip a ping to the other node.
|
2017-04-27 20:20:11 +00:00
|
|
|
func (m *Memberlist) sendPingAndWaitForAck(addr string, ping ping, deadline time.Time) (bool, error) {
|
2018-01-29 19:19:37 +00:00
|
|
|
conn, err := m.transport.DialTimeout(addr, deadline.Sub(time.Now()))
|
2016-05-08 07:31:30 +00:00
|
|
|
if err != nil {
|
|
|
|
// If the node is actually dead we expect this to fail, so we
|
|
|
|
// shouldn't spam the logs with it. After this point, errors
|
|
|
|
// with the connection are real, unexpected errors and should
|
|
|
|
// get propagated up.
|
|
|
|
return false, nil
|
|
|
|
}
|
|
|
|
defer conn.Close()
|
|
|
|
conn.SetDeadline(deadline)
|
|
|
|
|
|
|
|
out, err := encode(pingMsg, &ping)
|
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
if err = m.rawSendMsgStream(conn, out.Bytes()); err != nil {
|
2016-05-08 07:31:30 +00:00
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
2017-04-27 20:20:11 +00:00
|
|
|
msgType, _, dec, err := m.readStream(conn)
|
2016-05-08 07:31:30 +00:00
|
|
|
if err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if msgType != ackRespMsg {
|
2017-04-27 20:20:11 +00:00
|
|
|
return false, fmt.Errorf("Unexpected msgType (%d) from ping %s", msgType, LogConn(conn))
|
2016-05-08 07:31:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
var ack ackResp
|
|
|
|
if err = dec.Decode(&ack); err != nil {
|
|
|
|
return false, err
|
|
|
|
}
|
|
|
|
|
|
|
|
if ack.SeqNo != ping.SeqNo {
|
2017-04-27 20:20:11 +00:00
|
|
|
return false, fmt.Errorf("Sequence number from ack (%d) doesn't match ping (%d)", ack.SeqNo, ping.SeqNo, LogConn(conn))
|
2016-05-08 07:31:30 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
return true, nil
|
|
|
|
}
|