290 lines
7.8 KiB
Go
290 lines
7.8 KiB
Go
package memberlist
|
|
|
|
import (
|
|
"fmt"
|
|
"log"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/armon/go-metrics"
|
|
sockaddr "github.com/hashicorp/go-sockaddr"
|
|
)
|
|
|
|
const (
|
|
// udpPacketBufSize is used to buffer incoming packets during read
|
|
// operations.
|
|
udpPacketBufSize = 65536
|
|
|
|
// udpRecvBufSize is a large buffer size that we attempt to set UDP
|
|
// sockets to in order to handle a large volume of messages.
|
|
udpRecvBufSize = 2 * 1024 * 1024
|
|
)
|
|
|
|
// NetTransportConfig is used to configure a net transport.
|
|
type NetTransportConfig struct {
|
|
// BindAddrs is a list of addresses to bind to for both TCP and UDP
|
|
// communications.
|
|
BindAddrs []string
|
|
|
|
// BindPort is the port to listen on, for each address above.
|
|
BindPort int
|
|
|
|
// Logger is a logger for operator messages.
|
|
Logger *log.Logger
|
|
}
|
|
|
|
// NetTransport is a Transport implementation that uses connectionless UDP for
|
|
// packet operations, and ad-hoc TCP connections for stream operations.
|
|
type NetTransport struct {
|
|
config *NetTransportConfig
|
|
packetCh chan *Packet
|
|
streamCh chan net.Conn
|
|
logger *log.Logger
|
|
wg sync.WaitGroup
|
|
tcpListeners []*net.TCPListener
|
|
udpListeners []*net.UDPConn
|
|
shutdown int32
|
|
}
|
|
|
|
// NewNetTransport returns a net transport with the given configuration. On
|
|
// success all the network listeners will be created and listening.
|
|
func NewNetTransport(config *NetTransportConfig) (*NetTransport, error) {
|
|
// If we reject the empty list outright we can assume that there's at
|
|
// least one listener of each type later during operation.
|
|
if len(config.BindAddrs) == 0 {
|
|
return nil, fmt.Errorf("At least one bind address is required")
|
|
}
|
|
|
|
// Build out the new transport.
|
|
var ok bool
|
|
t := NetTransport{
|
|
config: config,
|
|
packetCh: make(chan *Packet),
|
|
streamCh: make(chan net.Conn),
|
|
logger: config.Logger,
|
|
}
|
|
|
|
// Clean up listeners if there's an error.
|
|
defer func() {
|
|
if !ok {
|
|
t.Shutdown()
|
|
}
|
|
}()
|
|
|
|
// Build all the TCP and UDP listeners.
|
|
port := config.BindPort
|
|
for _, addr := range config.BindAddrs {
|
|
ip := net.ParseIP(addr)
|
|
|
|
tcpAddr := &net.TCPAddr{IP: ip, Port: port}
|
|
tcpLn, err := net.ListenTCP("tcp", tcpAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to start TCP listener on %q port %d: %v", addr, port, err)
|
|
}
|
|
t.tcpListeners = append(t.tcpListeners, tcpLn)
|
|
|
|
// If the config port given was zero, use the first TCP listener
|
|
// to pick an available port and then apply that to everything
|
|
// else.
|
|
if port == 0 {
|
|
port = tcpLn.Addr().(*net.TCPAddr).Port
|
|
}
|
|
|
|
udpAddr := &net.UDPAddr{IP: ip, Port: port}
|
|
udpLn, err := net.ListenUDP("udp", udpAddr)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Failed to start UDP listener on %q port %d: %v", addr, port, err)
|
|
}
|
|
if err := setUDPRecvBuf(udpLn); err != nil {
|
|
return nil, fmt.Errorf("Failed to resize UDP buffer: %v", err)
|
|
}
|
|
t.udpListeners = append(t.udpListeners, udpLn)
|
|
}
|
|
|
|
// Fire them up now that we've been able to create them all.
|
|
for i := 0; i < len(config.BindAddrs); i++ {
|
|
t.wg.Add(2)
|
|
go t.tcpListen(t.tcpListeners[i])
|
|
go t.udpListen(t.udpListeners[i])
|
|
}
|
|
|
|
ok = true
|
|
return &t, nil
|
|
}
|
|
|
|
// GetAutoBindPort returns the bind port that was automatically given by the
|
|
// kernel, if a bind port of 0 was given.
|
|
func (t *NetTransport) GetAutoBindPort() int {
|
|
// We made sure there's at least one TCP listener, and that one's
|
|
// port was applied to all the others for the dynamic bind case.
|
|
return t.tcpListeners[0].Addr().(*net.TCPAddr).Port
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) FinalAdvertiseAddr(ip string, port int) (net.IP, int, error) {
|
|
var advertiseAddr net.IP
|
|
var advertisePort int
|
|
if ip != "" {
|
|
// If they've supplied an address, use that.
|
|
advertiseAddr = net.ParseIP(ip)
|
|
if advertiseAddr == nil {
|
|
return nil, 0, fmt.Errorf("Failed to parse advertise address %q", ip)
|
|
}
|
|
|
|
// Ensure IPv4 conversion if necessary.
|
|
if ip4 := advertiseAddr.To4(); ip4 != nil {
|
|
advertiseAddr = ip4
|
|
}
|
|
advertisePort = port
|
|
} else {
|
|
if t.config.BindAddrs[0] == "0.0.0.0" {
|
|
// Otherwise, if we're not bound to a specific IP, let's
|
|
// use a suitable private IP address.
|
|
var err error
|
|
ip, err = sockaddr.GetPrivateIP()
|
|
if err != nil {
|
|
return nil, 0, fmt.Errorf("Failed to get interface addresses: %v", err)
|
|
}
|
|
if ip == "" {
|
|
return nil, 0, fmt.Errorf("No private IP address found, and explicit IP not provided")
|
|
}
|
|
|
|
advertiseAddr = net.ParseIP(ip)
|
|
if advertiseAddr == nil {
|
|
return nil, 0, fmt.Errorf("Failed to parse advertise address: %q", ip)
|
|
}
|
|
} else {
|
|
// Use the IP that we're bound to, based on the first
|
|
// TCP listener, which we already ensure is there.
|
|
advertiseAddr = t.tcpListeners[0].Addr().(*net.TCPAddr).IP
|
|
}
|
|
|
|
// Use the port we are bound to.
|
|
advertisePort = t.GetAutoBindPort()
|
|
}
|
|
|
|
return advertiseAddr, advertisePort, nil
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) WriteTo(b []byte, addr string) (time.Time, error) {
|
|
udpAddr, err := net.ResolveUDPAddr("udp", addr)
|
|
if err != nil {
|
|
return time.Time{}, err
|
|
}
|
|
|
|
// We made sure there's at least one UDP listener, so just use the
|
|
// packet sending interface on the first one. Take the time after the
|
|
// write call comes back, which will underestimate the time a little,
|
|
// but help account for any delays before the write occurs.
|
|
_, err = t.udpListeners[0].WriteTo(b, udpAddr)
|
|
return time.Now(), err
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) PacketCh() <-chan *Packet {
|
|
return t.packetCh
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) DialTimeout(addr string, timeout time.Duration) (net.Conn, error) {
|
|
dialer := net.Dialer{Timeout: timeout}
|
|
return dialer.Dial("tcp", addr)
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) StreamCh() <-chan net.Conn {
|
|
return t.streamCh
|
|
}
|
|
|
|
// See Transport.
|
|
func (t *NetTransport) Shutdown() error {
|
|
// This will avoid log spam about errors when we shut down.
|
|
atomic.StoreInt32(&t.shutdown, 1)
|
|
|
|
// Rip through all the connections and shut them down.
|
|
for _, conn := range t.tcpListeners {
|
|
conn.Close()
|
|
}
|
|
for _, conn := range t.udpListeners {
|
|
conn.Close()
|
|
}
|
|
|
|
// Block until all the listener threads have died.
|
|
t.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// tcpListen is a long running goroutine that accepts incoming TCP connections
|
|
// and hands them off to the stream channel.
|
|
func (t *NetTransport) tcpListen(tcpLn *net.TCPListener) {
|
|
defer t.wg.Done()
|
|
for {
|
|
conn, err := tcpLn.AcceptTCP()
|
|
if err != nil {
|
|
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
|
|
break
|
|
}
|
|
|
|
t.logger.Printf("[ERR] memberlist: Error accepting TCP connection: %v", err)
|
|
continue
|
|
}
|
|
|
|
t.streamCh <- conn
|
|
}
|
|
}
|
|
|
|
// udpListen is a long running goroutine that accepts incoming UDP packets and
|
|
// hands them off to the packet channel.
|
|
func (t *NetTransport) udpListen(udpLn *net.UDPConn) {
|
|
defer t.wg.Done()
|
|
for {
|
|
// Do a blocking read into a fresh buffer. Grab a time stamp as
|
|
// close as possible to the I/O.
|
|
buf := make([]byte, udpPacketBufSize)
|
|
n, addr, err := udpLn.ReadFrom(buf)
|
|
ts := time.Now()
|
|
if err != nil {
|
|
if s := atomic.LoadInt32(&t.shutdown); s == 1 {
|
|
break
|
|
}
|
|
|
|
t.logger.Printf("[ERR] memberlist: Error reading UDP packet: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Check the length - it needs to have at least one byte to be a
|
|
// proper message.
|
|
if n < 1 {
|
|
t.logger.Printf("[ERR] memberlist: UDP packet too short (%d bytes) %s",
|
|
len(buf), LogAddress(addr))
|
|
continue
|
|
}
|
|
|
|
// Ingest the packet.
|
|
metrics.IncrCounter([]string{"memberlist", "udp", "received"}, float32(n))
|
|
t.packetCh <- &Packet{
|
|
Buf: buf[:n],
|
|
From: addr,
|
|
Timestamp: ts,
|
|
}
|
|
}
|
|
}
|
|
|
|
// setUDPRecvBuf is used to resize the UDP receive window. The function
|
|
// attempts to set the read buffer to `udpRecvBuf` but backs off until
|
|
// the read buffer can be set.
|
|
func setUDPRecvBuf(c *net.UDPConn) error {
|
|
size := udpRecvBufSize
|
|
var err error
|
|
for size > 0 {
|
|
if err = c.SetReadBuffer(size); err == nil {
|
|
return nil
|
|
}
|
|
size = size / 2
|
|
}
|
|
return err
|
|
}
|