1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #33463 from mavenugo/ln-vin-1706

Vendoring libnetwork 2e99f06621c23a5f4038968f1af1e28c84e4104e
This commit is contained in:
Tibor Vass 2017-06-06 20:24:17 -07:00 committed by GitHub
commit 64bb445463
10 changed files with 442 additions and 13 deletions

View file

@ -26,7 +26,7 @@ github.com/imdario/mergo 0.2.1
golang.org/x/sync de49d9dcd27d4f764488181bea099dfe6179bcf0
#get libnetwork packages
github.com/docker/libnetwork 83e1e49475b88a9f1f8ba89a690a7d5de42e24b9
github.com/docker/libnetwork 2e99f06621c23a5f4038968f1af1e28c84e4104e
github.com/docker/go-events 18b43f1bc85d9cdd42c05a6cd2d444c7a200a894
github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -738,7 +738,6 @@ func (c *controller) NewNetwork(networkType, name string, id string, options ...
if network.configOnly {
network.scope = datastore.LocalScope
network.networkType = "null"
network.ipamType = ""
goto addToStore
}

View file

@ -85,6 +85,23 @@ const (
ipvsDestAttrInactiveConnections
ipvsDestAttrPersistentConnections
ipvsDestAttrStats
ipvsDestAttrAddressFamily
)
// IPVS Svc Statistics constancs
const (
ipvsSvcStatsUnspec int = iota
ipvsSvcStatsConns
ipvsSvcStatsPktsIn
ipvsSvcStatsPktsOut
ipvsSvcStatsBytesIn
ipvsSvcStatsBytesOut
ipvsSvcStatsCPS
ipvsSvcStatsPPSIn
ipvsSvcStatsPPSOut
ipvsSvcStatsBPSIn
ipvsSvcStatsBPSOut
)
// Destination forwarding methods

View file

@ -6,6 +6,7 @@ import (
"net"
"syscall"
"fmt"
"github.com/vishvananda/netlink/nl"
"github.com/vishvananda/netns"
)
@ -25,6 +26,21 @@ type Service struct {
Netmask uint32
AddressFamily uint16
PEName string
Stats SvcStats
}
// SvcStats defines an IPVS service statistics
type SvcStats struct {
Connections uint32
PacketsIn uint32
PacketsOut uint32
BytesIn uint64
BytesOut uint64
CPS uint32
BPSOut uint32
PPSIn uint32
PPSOut uint32
BPSIn uint32
}
// Destination defines an IPVS destination (real server) in its
@ -117,3 +133,29 @@ func (i *Handle) UpdateDestination(s *Service, d *Destination) error {
func (i *Handle) DelDestination(s *Service, d *Destination) error {
return i.doCmd(s, d, ipvsCmdDelDest)
}
// GetServices returns an array of services configured on the Node
func (i *Handle) GetServices() ([]*Service, error) {
return i.doGetServicesCmd(nil)
}
// GetDestinations returns an array of Destinations configured for this Service
func (i *Handle) GetDestinations(s *Service) ([]*Destination, error) {
return i.doGetDestinationsCmd(s, nil)
}
// GetService gets details of a specific IPVS services, useful in updating statisics etc.,
func (i *Handle) GetService(s *Service) (*Service, error) {
res, err := i.doGetServicesCmd(s)
if err != nil {
return nil, err
}
// We are looking for exactly one service otherwise error out
if len(res) != 1 {
return nil, fmt.Errorf("Expected only one service obtained=%d", len(res))
}
return res[0], nil
}

View file

@ -19,6 +19,7 @@ import (
"github.com/vishvananda/netns"
)
// For Quick Reference IPVS related netlink message is described at the end of this file.
var (
native = nl.NativeEndian()
ipvsFamily int
@ -89,7 +90,6 @@ func fillService(s *Service) nl.NetlinkRequestData {
if s.PEName != "" {
nl.NewRtAttrChild(cmdAttr, ipvsSvcAttrPEName, nl.ZeroTerminated(s.PEName))
}
f := &ipvsFlags{
flags: s.Flags,
mask: 0xFFFFFFFF,
@ -117,20 +117,38 @@ func fillDestinaton(d *Destination) nl.NetlinkRequestData {
return cmdAttr
}
func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
func (i *Handle) doCmdwithResponse(s *Service, d *Destination, cmd uint8) ([][]byte, error) {
req := newIPVSRequest(cmd)
req.Seq = atomic.AddUint32(&i.seq, 1)
req.AddData(fillService(s))
if d != nil {
if s == nil {
req.Flags |= syscall.NLM_F_DUMP //Flag to dump all messages
req.AddData(nl.NewRtAttr(ipvsCmdAttrService, nil)) //Add a dummy attribute
} else {
req.AddData(fillService(s))
}
if d == nil {
if cmd == ipvsCmdGetDest {
req.Flags |= syscall.NLM_F_DUMP
}
} else {
req.AddData(fillDestinaton(d))
}
if _, err := execute(i.sock, req, 0); err != nil {
return err
res, err := execute(i.sock, req, 0)
if err != nil {
return [][]byte{}, err
}
return nil
return res, nil
}
func (i *Handle) doCmd(s *Service, d *Destination, cmd uint8) error {
_, err := i.doCmdwithResponse(s, d, cmd)
return err
}
func getIPVSFamily() (int, error) {
@ -171,7 +189,6 @@ func rawIPData(ip net.IP) []byte {
if family == nl.FAMILY_V4 {
return ip.To4()
}
return ip
}
@ -235,3 +252,295 @@ done:
}
return res, nil
}
func parseIP(ip []byte, family uint16) (net.IP, error) {
var resIP net.IP
switch family {
case syscall.AF_INET:
resIP = (net.IP)(ip[:4])
case syscall.AF_INET6:
resIP = (net.IP)(ip[:16])
default:
return nil, fmt.Errorf("parseIP Error ip=%v", ip)
}
return resIP, nil
}
// parseStats
func assembleStats(msg []byte) (SvcStats, error) {
var s SvcStats
attrs, err := nl.ParseRouteAttr(msg)
if err != nil {
return s, err
}
for _, attr := range attrs {
attrType := int(attr.Attr.Type)
switch attrType {
case ipvsSvcStatsConns:
s.Connections = native.Uint32(attr.Value)
case ipvsSvcStatsPktsIn:
s.PacketsIn = native.Uint32(attr.Value)
case ipvsSvcStatsPktsOut:
s.PacketsOut = native.Uint32(attr.Value)
case ipvsSvcStatsBytesIn:
s.BytesIn = native.Uint64(attr.Value)
case ipvsSvcStatsBytesOut:
s.BytesOut = native.Uint64(attr.Value)
case ipvsSvcStatsCPS:
s.CPS = native.Uint32(attr.Value)
case ipvsSvcStatsPPSIn:
s.PPSIn = native.Uint32(attr.Value)
case ipvsSvcStatsPPSOut:
s.PPSOut = native.Uint32(attr.Value)
case ipvsSvcStatsBPSIn:
s.BPSIn = native.Uint32(attr.Value)
case ipvsSvcStatsBPSOut:
s.BPSOut = native.Uint32(attr.Value)
}
}
return s, nil
}
// assembleService assembles a services back from a hain of netlink attributes
func assembleService(attrs []syscall.NetlinkRouteAttr) (*Service, error) {
var s Service
for _, attr := range attrs {
attrType := int(attr.Attr.Type)
switch attrType {
case ipvsSvcAttrAddressFamily:
s.AddressFamily = native.Uint16(attr.Value)
case ipvsSvcAttrProtocol:
s.Protocol = native.Uint16(attr.Value)
case ipvsSvcAttrAddress:
ip, err := parseIP(attr.Value, s.AddressFamily)
if err != nil {
return nil, err
}
s.Address = ip
case ipvsSvcAttrPort:
s.Port = binary.BigEndian.Uint16(attr.Value)
case ipvsSvcAttrFWMark:
s.FWMark = native.Uint32(attr.Value)
case ipvsSvcAttrSchedName:
s.SchedName = nl.BytesToString(attr.Value)
case ipvsSvcAttrFlags:
s.Flags = native.Uint32(attr.Value)
case ipvsSvcAttrTimeout:
s.Timeout = native.Uint32(attr.Value)
case ipvsSvcAttrNetmask:
s.Netmask = native.Uint32(attr.Value)
case ipvsSvcAttrStats:
stats, err := assembleStats(attr.Value)
if err != nil {
return nil, err
}
s.Stats = stats
}
}
return &s, nil
}
// parseService given a ipvs netlink response this function will respond with a valid service entry, an error otherwise
func (i *Handle) parseService(msg []byte) (*Service, error) {
var s *Service
//Remove General header for this message and parse the NetLink message
hdr := deserializeGenlMsg(msg)
NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
if err != nil {
return nil, err
}
if len(NetLinkAttrs) == 0 {
return nil, fmt.Errorf("error no valid netlink message found while parsing service record")
}
//Now Parse and get IPVS related attributes messages packed in this message.
ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
if err != nil {
return nil, err
}
//Assemble all the IPVS related attribute messages and create a service record
s, err = assembleService(ipvsAttrs)
if err != nil {
return nil, err
}
return s, nil
}
// doGetServicesCmd a wrapper which could be used commonly for both GetServices() and GetService(*Service)
func (i *Handle) doGetServicesCmd(svc *Service) ([]*Service, error) {
var res []*Service
msgs, err := i.doCmdwithResponse(svc, nil, ipvsCmdGetService)
if err != nil {
return nil, err
}
for _, msg := range msgs {
srv, err := i.parseService(msg)
if err != nil {
return nil, err
}
res = append(res, srv)
}
return res, nil
}
func assembleDestination(attrs []syscall.NetlinkRouteAttr) (*Destination, error) {
var d Destination
for _, attr := range attrs {
attrType := int(attr.Attr.Type)
switch attrType {
case ipvsDestAttrAddress:
ip, err := parseIP(attr.Value, syscall.AF_INET)
if err != nil {
return nil, err
}
d.Address = ip
case ipvsDestAttrPort:
d.Port = binary.BigEndian.Uint16(attr.Value)
case ipvsDestAttrForwardingMethod:
d.ConnectionFlags = native.Uint32(attr.Value)
case ipvsDestAttrWeight:
d.Weight = int(native.Uint16(attr.Value))
case ipvsDestAttrUpperThreshold:
d.UpperThreshold = native.Uint32(attr.Value)
case ipvsDestAttrLowerThreshold:
d.LowerThreshold = native.Uint32(attr.Value)
case ipvsDestAttrAddressFamily:
d.AddressFamily = native.Uint16(attr.Value)
}
}
return &d, nil
}
// parseDestination given a ipvs netlink response this function will respond with a valid destination entry, an error otherwise
func (i *Handle) parseDestination(msg []byte) (*Destination, error) {
var dst *Destination
//Remove General header for this message
hdr := deserializeGenlMsg(msg)
NetLinkAttrs, err := nl.ParseRouteAttr(msg[hdr.Len():])
if err != nil {
return nil, err
}
if len(NetLinkAttrs) == 0 {
return nil, fmt.Errorf("error no valid netlink message found while parsing destination record")
}
//Now Parse and get IPVS related attributes messages packed in this message.
ipvsAttrs, err := nl.ParseRouteAttr(NetLinkAttrs[0].Value)
if err != nil {
return nil, err
}
//Assemble netlink attributes and create a Destination record
dst, err = assembleDestination(ipvsAttrs)
if err != nil {
return nil, err
}
return dst, nil
}
// doGetDestinationsCmd a wrapper function to be used by GetDestinations and GetDestination(d) apis
func (i *Handle) doGetDestinationsCmd(s *Service, d *Destination) ([]*Destination, error) {
var res []*Destination
msgs, err := i.doCmdwithResponse(s, d, ipvsCmdGetDest)
if err != nil {
return nil, err
}
for _, msg := range msgs {
dest, err := i.parseDestination(msg)
if err != nil {
return res, err
}
res = append(res, dest)
}
return res, nil
}
// IPVS related netlink message format explained
/* EACH NETLINK MSG is of the below format, this is what we will receive from execute() api.
If we have multiple netlink objects to process like GetServices() etc., execute() will
supply an array of this below object
NETLINK MSG
|-----------------------------------|
0 1 2 3
|--------|--------|--------|--------| -
| CMD ID | VER | RESERVED | |==> General Message Header represented by genlMsgHdr
|-----------------------------------| -
| ATTR LEN | ATTR TYPE | |
|-----------------------------------| |
| | |
| VALUE | |
| []byte Array of IPVS MSG | |==> Attribute Message represented by syscall.NetlinkRouteAttr
| PADDED BY 4 BYTES | |
| | |
|-----------------------------------| -
Once We strip genlMsgHdr from above NETLINK MSG, we should parse the VALUE.
VALUE will have an array of netlink attributes (syscall.NetlinkRouteAttr) such that each attribute will
represent a "Service" or "Destination" object's field. If we assemble these attributes we can construct
Service or Destination.
IPVS MSG
|-----------------------------------|
0 1 2 3
|--------|--------|--------|--------|
| ATTR LEN | ATTR TYPE |
|-----------------------------------|
| |
| |
| []byte IPVS ATTRIBUTE BY 4 BYTES |
| |
| |
|-----------------------------------|
NEXT ATTRIBUTE
|-----------------------------------|
| ATTR LEN | ATTR TYPE |
|-----------------------------------|
| |
| |
| []byte IPVS ATTRIBUTE BY 4 BYTES |
| |
| |
|-----------------------------------|
NEXT ATTRIBUTE
|-----------------------------------|
| ATTR LEN | ATTR TYPE |
|-----------------------------------|
| |
| |
| []byte IPVS ATTRIBUTE BY 4 BYTES |
| |
| |
|-----------------------------------|
*/

View file

@ -412,6 +412,9 @@ func (n *network) applyConfigurationTo(to *network) error {
}
}
}
if len(n.ipamType) != 0 {
to.ipamType = n.ipamType
}
if len(n.ipamOptions) > 0 {
to.ipamOptions = make(map[string]string, len(n.ipamOptions))
for k, v := range n.ipamOptions {

View file

@ -284,7 +284,6 @@ func (nDB *NetworkDB) reconnectNode() {
}
if err := nDB.sendNodeEvent(NodeEventTypeJoin); err != nil {
logrus.Errorf("failed to send node join during reconnect: %v", err)
return
}

View file

@ -17,6 +17,25 @@ func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}
func (nDB *NetworkDB) getNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()
for _, nodes := range []map[string]*node{
nDB.failedNodes,
nDB.leftNodes,
nDB.nodes,
} {
if n, ok := nodes[nEvent.NodeName]; ok {
if n.ltime >= nEvent.LTime {
return nil
}
return n
}
}
return nil
}
func (nDB *NetworkDB) checkAndGetNode(nEvent *NodeEvent) *node {
nDB.Lock()
defer nDB.Unlock()
@ -63,10 +82,28 @@ func (nDB *NetworkDB) purgeSameNode(n *node) {
}
func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
n := nDB.checkAndGetNode(nEvent)
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)
n := nDB.getNode(nEvent)
if n == nil {
return false
}
// If its a node leave event for a manager and this is the only manager we
// know of we want the reconnect logic to kick in. In a single manager
// cluster manager's gossip can't be bootstrapped unless some other node
// connects to it.
if len(nDB.bootStrapIP) == 1 && nEvent.Type == NodeEventTypeLeave {
for _, ip := range nDB.bootStrapIP {
if ip.Equal(n.Addr) {
n.ltime = nEvent.LTime
return true
}
}
}
n = nDB.checkAndGetNode(nEvent)
nDB.purgeSameNode(n)
n.ltime = nEvent.LTime
@ -76,11 +113,13 @@ func (nDB *NetworkDB) handleNodeEvent(nEvent *NodeEvent) bool {
nDB.Lock()
nDB.nodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node join event for %s/%s", n.Name, n.Addr)
return true
case NodeEventTypeLeave:
nDB.Lock()
nDB.leftNodes[n.Name] = n
nDB.Unlock()
logrus.Infof("Node leave event for %s/%s", n.Name, n.Addr)
return true
}

View file

@ -22,6 +22,7 @@ func (e *eventDelegate) broadcastNodeEvent(addr net.IP, op opType) {
}
func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
logrus.Infof("Node %s/%s, joined gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opCreate)
e.nDB.Lock()
// In case the node is rejoining after a failure or leave,
@ -37,9 +38,12 @@ func (e *eventDelegate) NotifyJoin(mn *memberlist.Node) {
e.nDB.nodes[mn.Name] = &node{Node: *mn}
e.nDB.Unlock()
logrus.Infof("Node %s/%s, added to nodes list", mn.Name, mn.Addr)
}
func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
var failed bool
logrus.Infof("Node %s/%s, left gossip cluster", mn.Name, mn.Addr)
e.broadcastNodeEvent(mn.Addr, opDelete)
e.nDB.deleteNodeTableEntries(mn.Name)
e.nDB.deleteNetworkEntriesForNode(mn.Name)
@ -47,10 +51,17 @@ func (e *eventDelegate) NotifyLeave(mn *memberlist.Node) {
if n, ok := e.nDB.nodes[mn.Name]; ok {
delete(e.nDB.nodes, mn.Name)
n.reapTime = reapInterval
// In case of node failure, keep retrying to reconnect every retryInterval (1sec) for nodeReapInterval (24h)
// Explicit leave will have already removed the node from the list of nodes (nDB.nodes) and put it into the leftNodes map
n.reapTime = nodeReapInterval
e.nDB.failedNodes[mn.Name] = n
failed = true
}
e.nDB.Unlock()
if failed {
logrus.Infof("Node %s/%s, added to failed nodes list", mn.Name, mn.Addr)
}
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {

View file

@ -4,6 +4,7 @@ package networkdb
import (
"fmt"
"net"
"strings"
"sync"
"time"
@ -88,6 +89,10 @@ type NetworkDB struct {
// Reference to the memberlist's keyring to add & remove keys
keyring *memberlist.Keyring
// bootStrapIP is the list of IPs that can be used to bootstrap
// the gossip.
bootStrapIP []net.IP
}
// PeerInfo represents the peer (gossip cluster) nodes of a network
@ -194,6 +199,11 @@ func New(c *Config) (*NetworkDB, error) {
// Join joins this NetworkDB instance with a list of peer NetworkDB
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
nDB.Lock()
for _, m := range members {
nDB.bootStrapIP = append(nDB.bootStrapIP, net.ParseIP(m))
}
nDB.Unlock()
return nDB.clusterJoin(members)
}