mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
711d033757
In case of IP reuse locally there was a race condition that was leaving the overlay namespace with wrong configuration causing connectivity issues. This commit introduces the use of setMatrix to handle the transient state and make sure that the proper configuration is maintained Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
229 lines
5.7 KiB
Go
229 lines
5.7 KiB
Go
package overlay
|
|
|
|
import (
|
|
"fmt"
|
|
"net"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/hashicorp/serf/serf"
|
|
"github.com/sirupsen/logrus"
|
|
)
|
|
|
|
type ovNotify struct {
|
|
action string
|
|
ep *endpoint
|
|
nw *network
|
|
}
|
|
|
|
type logWriter struct{}
|
|
|
|
func (l *logWriter) Write(p []byte) (int, error) {
|
|
str := string(p)
|
|
|
|
switch {
|
|
case strings.Contains(str, "[WARN]"):
|
|
logrus.Warn(str)
|
|
case strings.Contains(str, "[DEBUG]"):
|
|
logrus.Debug(str)
|
|
case strings.Contains(str, "[INFO]"):
|
|
logrus.Info(str)
|
|
case strings.Contains(str, "[ERR]"):
|
|
logrus.Error(str)
|
|
}
|
|
|
|
return len(p), nil
|
|
}
|
|
|
|
func (d *driver) serfInit() error {
|
|
var err error
|
|
|
|
config := serf.DefaultConfig()
|
|
config.Init()
|
|
config.MemberlistConfig.BindAddr = d.advertiseAddress
|
|
|
|
d.eventCh = make(chan serf.Event, 4)
|
|
config.EventCh = d.eventCh
|
|
config.UserCoalescePeriod = 1 * time.Second
|
|
config.UserQuiescentPeriod = 50 * time.Millisecond
|
|
|
|
config.LogOutput = &logWriter{}
|
|
config.MemberlistConfig.LogOutput = config.LogOutput
|
|
|
|
s, err := serf.Create(config)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create cluster node: %v", err)
|
|
}
|
|
defer func() {
|
|
if err != nil {
|
|
s.Shutdown()
|
|
}
|
|
}()
|
|
|
|
d.serfInstance = s
|
|
|
|
d.notifyCh = make(chan ovNotify)
|
|
d.exitCh = make(chan chan struct{})
|
|
|
|
go d.startSerfLoop(d.eventCh, d.notifyCh, d.exitCh)
|
|
return nil
|
|
}
|
|
|
|
func (d *driver) serfJoin(neighIP string) error {
|
|
if neighIP == "" {
|
|
return fmt.Errorf("no neighbor to join")
|
|
}
|
|
if _, err := d.serfInstance.Join([]string{neighIP}, true); err != nil {
|
|
return fmt.Errorf("Failed to join the cluster at neigh IP %s: %v",
|
|
neighIP, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (d *driver) notifyEvent(event ovNotify) {
|
|
ep := event.ep
|
|
|
|
ePayload := fmt.Sprintf("%s %s %s %s", event.action, ep.addr.IP.String(),
|
|
net.IP(ep.addr.Mask).String(), ep.mac.String())
|
|
eName := fmt.Sprintf("jl %s %s %s", d.serfInstance.LocalMember().Addr.String(),
|
|
event.nw.id, ep.id)
|
|
|
|
if err := d.serfInstance.UserEvent(eName, []byte(ePayload), true); err != nil {
|
|
logrus.Errorf("Sending user event failed: %v\n", err)
|
|
}
|
|
}
|
|
|
|
func (d *driver) processEvent(u serf.UserEvent) {
|
|
logrus.Debugf("Received user event name:%s, payload:%s LTime:%d \n", u.Name,
|
|
string(u.Payload), uint64(u.LTime))
|
|
|
|
var dummy, action, vtepStr, nid, eid, ipStr, maskStr, macStr string
|
|
if _, err := fmt.Sscan(u.Name, &dummy, &vtepStr, &nid, &eid); err != nil {
|
|
fmt.Printf("Failed to scan name string: %v\n", err)
|
|
}
|
|
|
|
if _, err := fmt.Sscan(string(u.Payload), &action,
|
|
&ipStr, &maskStr, &macStr); err != nil {
|
|
fmt.Printf("Failed to scan value string: %v\n", err)
|
|
}
|
|
|
|
logrus.Debugf("Parsed data = %s/%s/%s/%s/%s/%s\n", nid, eid, vtepStr, ipStr, maskStr, macStr)
|
|
|
|
mac, err := net.ParseMAC(macStr)
|
|
if err != nil {
|
|
logrus.Errorf("Failed to parse mac: %v\n", err)
|
|
}
|
|
|
|
if d.serfInstance.LocalMember().Addr.String() == vtepStr {
|
|
return
|
|
}
|
|
|
|
switch action {
|
|
case "join":
|
|
d.peerAdd(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false, false, false)
|
|
case "leave":
|
|
d.peerDelete(nid, eid, net.ParseIP(ipStr), net.IPMask(net.ParseIP(maskStr).To4()), mac, net.ParseIP(vtepStr), false)
|
|
}
|
|
}
|
|
|
|
func (d *driver) processQuery(q *serf.Query) {
|
|
logrus.Debugf("Received query name:%s, payload:%s\n", q.Name,
|
|
string(q.Payload))
|
|
|
|
var nid, ipStr string
|
|
if _, err := fmt.Sscan(string(q.Payload), &nid, &ipStr); err != nil {
|
|
fmt.Printf("Failed to scan query payload string: %v\n", err)
|
|
}
|
|
|
|
pKey, pEntry, err := d.peerDbSearch(nid, net.ParseIP(ipStr))
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
logrus.Debugf("Sending peer query resp mac %v, mask %s, vtep %s", pKey.peerMac, net.IP(pEntry.peerIPMask).String(), pEntry.vtep)
|
|
q.Respond([]byte(fmt.Sprintf("%s %s %s", pKey.peerMac.String(), net.IP(pEntry.peerIPMask).String(), pEntry.vtep.String())))
|
|
}
|
|
|
|
func (d *driver) resolvePeer(nid string, peerIP net.IP) (net.HardwareAddr, net.IPMask, net.IP, error) {
|
|
if d.serfInstance == nil {
|
|
return nil, nil, nil, fmt.Errorf("could not resolve peer: serf instance not initialized")
|
|
}
|
|
|
|
qPayload := fmt.Sprintf("%s %s", string(nid), peerIP.String())
|
|
resp, err := d.serfInstance.Query("peerlookup", []byte(qPayload), nil)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("resolving peer by querying the cluster failed: %v", err)
|
|
}
|
|
|
|
respCh := resp.ResponseCh()
|
|
select {
|
|
case r := <-respCh:
|
|
var macStr, maskStr, vtepStr string
|
|
if _, err := fmt.Sscan(string(r.Payload), &macStr, &maskStr, &vtepStr); err != nil {
|
|
return nil, nil, nil, fmt.Errorf("bad response %q for the resolve query: %v", string(r.Payload), err)
|
|
}
|
|
|
|
mac, err := net.ParseMAC(macStr)
|
|
if err != nil {
|
|
return nil, nil, nil, fmt.Errorf("failed to parse mac: %v", err)
|
|
}
|
|
|
|
logrus.Debugf("Received peer query response, mac %s, vtep %s, mask %s", macStr, vtepStr, maskStr)
|
|
return mac, net.IPMask(net.ParseIP(maskStr).To4()), net.ParseIP(vtepStr), nil
|
|
|
|
case <-time.After(time.Second):
|
|
return nil, nil, nil, fmt.Errorf("timed out resolving peer by querying the cluster")
|
|
}
|
|
}
|
|
|
|
func (d *driver) startSerfLoop(eventCh chan serf.Event, notifyCh chan ovNotify,
|
|
exitCh chan chan struct{}) {
|
|
|
|
for {
|
|
select {
|
|
case notify, ok := <-notifyCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
d.notifyEvent(notify)
|
|
case ch, ok := <-exitCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if err := d.serfInstance.Leave(); err != nil {
|
|
logrus.Errorf("failed leaving the cluster: %v\n", err)
|
|
}
|
|
|
|
d.serfInstance.Shutdown()
|
|
close(ch)
|
|
return
|
|
case e, ok := <-eventCh:
|
|
if !ok {
|
|
break
|
|
}
|
|
|
|
if e.EventType() == serf.EventQuery {
|
|
d.processQuery(e.(*serf.Query))
|
|
break
|
|
}
|
|
|
|
u, ok := e.(serf.UserEvent)
|
|
if !ok {
|
|
break
|
|
}
|
|
d.processEvent(u)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (d *driver) isSerfAlive() bool {
|
|
d.Lock()
|
|
serfInstance := d.serfInstance
|
|
d.Unlock()
|
|
if serfInstance == nil || serfInstance.State() != serf.SerfAlive {
|
|
return false
|
|
}
|
|
return true
|
|
}
|