1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Vendoring libnetwork b66c0385f30c6aa27b2957ed1072682c19a0b0b4

Signed-off-by: Alessandro Boch <aboch@docker.com>
This commit is contained in:
Alessandro Boch 2016-05-08 00:32:51 -07:00
parent ebcf785f2f
commit 1f9e3815aa
68 changed files with 3327 additions and 549 deletions

View file

@ -29,7 +29,7 @@ clone git github.com/RackSec/srslog 259aed10dfa74ea2961eddd1d9847619f6e98837
clone git github.com/imdario/mergo 0.2.1
#get libnetwork packages
clone git github.com/docker/libnetwork v0.8.0-dev.1
clone git github.com/docker/libnetwork b66c0385f30c6aa27b2957ed1072682c19a0b0b4
clone git github.com/docker/go-events 2e7d352816128aa84f4d29b2a21d400133701a0d
clone git github.com/armon/go-radix e39d623f12e8e41c7b5529e9a9dd67a1e2261f80
clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec

View file

@ -1,5 +1,17 @@
# Changelog
## 0.8.0-dev.2 (2016-05-07)
- Fix an issue which may arise during sandbox cleanup (https://github.com/docker/libnetwork/pull/1157)
- Fix cleanup logic in case of ipv6 allocation failure
- Don't add /etc/hosts record if container's ip is empty (--net=none)
- Fix default gw logic for internal networks
- Error when updating IPv6 gateway (https://github.com/docker/libnetwork/issues/1142)
- Fixes https://github.com/docker/libnetwork/issues/1113
- Fixes https://github.com/docker/libnetwork/issues/1069
- Fxies https://github.com/docker/libnetwork/issues/1117
- Increase the concurrent query rate-limit count
- Changes to build libnetwork in Solaris
## 0.8.0-dev.1 (2016-04-16)
- Fixes docker/docker#16964
- Added maximum egress bandwidth qos for Windows

View file

@ -5,8 +5,8 @@ dockerargs = --privileged -v $(shell pwd):/go/src/github.com/docker/libnetwork -
container_env = -e "INSIDECONTAINER=-incontainer=true"
docker = docker run --rm -it ${dockerargs} $$EXTRA_ARGS ${container_env} ${build_image}
ciargs = -e CIRCLECI -e "COVERALLS_TOKEN=$$COVERALLS_TOKEN" -e "INSIDECONTAINER=-incontainer=true"
cidocker = docker run ${dockerargs} ${ciargs} ${container_env} ${build_image}
CROSS_PLATFORMS = linux/amd64 linux/386 linux/arm windows/amd64 windows/386
cidocker = docker run ${dockerargs} ${ciargs} $$EXTRA_ARGS ${container_env} ${build_image}
CROSS_PLATFORMS = linux/amd64 linux/386 linux/arm windows/amd64
all: ${build_image}.created build check integration-tests clean
@ -102,4 +102,4 @@ circle-ci-check: ${build_image}.created
circle-ci-build: ${build_image}.created
@${cidocker} make build-local
circle-ci: circle-ci-check circle-ci-build integration-tests
circle-ci: circle-ci-check circle-ci-cross circle-ci-build integration-tests

View file

@ -34,7 +34,7 @@ func main() {
// Create a network for containers to join.
// NewNetwork accepts Variadic optional arguments that libnetwork and Drivers can use.
network, err := controller.NewNetwork(networkType, "network1")
network, err := controller.NewNetwork(networkType, "network1", "")
if err != nil {
log.Fatalf("controller.NewNetwork: %s", err)
}

View file

@ -0,0 +1,369 @@
package libnetwork
import (
"fmt"
"net"
"os"
"strings"
"github.com/Sirupsen/logrus"
"github.com/docker/go-events"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/networkdb"
)
type agent struct {
networkDB *networkdb.NetworkDB
bindAddr string
epTblCancel func()
driverCancelFuncs map[string][]func()
}
func getBindAddr(ifaceName string) (string, error) {
iface, err := net.InterfaceByName(ifaceName)
if err != nil {
return "", fmt.Errorf("failed to find interface %s: %v", ifaceName, err)
}
addrs, err := iface.Addrs()
if err != nil {
return "", fmt.Errorf("failed to get interface addresses: %v", err)
}
for _, a := range addrs {
addr, ok := a.(*net.IPNet)
if !ok {
continue
}
addrIP := addr.IP
if addrIP.IsLinkLocalUnicast() {
continue
}
return addrIP.String(), nil
}
return "", fmt.Errorf("failed to get bind address")
}
func resolveAddr(addrOrInterface string) (string, error) {
// Try and see if this is a valid IP address
if net.ParseIP(addrOrInterface) != nil {
return addrOrInterface, nil
}
// If not a valid IP address, it should be a valid interface
return getBindAddr(addrOrInterface)
}
func (c *controller) agentInit(bindAddrOrInterface string) error {
if !c.cfg.Daemon.IsAgent {
return nil
}
bindAddr, err := resolveAddr(bindAddrOrInterface)
if err != nil {
return err
}
hostname, _ := os.Hostname()
nDB, err := networkdb.New(&networkdb.Config{
BindAddr: bindAddr,
NodeName: hostname,
})
if err != nil {
return err
}
ch, cancel := nDB.Watch("endpoint_table", "", "")
c.agent = &agent{
networkDB: nDB,
bindAddr: bindAddr,
epTblCancel: cancel,
driverCancelFuncs: make(map[string][]func()),
}
go c.handleTableEvents(ch, c.handleEpTableEvent)
return nil
}
func (c *controller) agentJoin(remotes []string) error {
if c.agent == nil {
return nil
}
return c.agent.networkDB.Join(remotes)
}
func (c *controller) agentDriverNotify(d driverapi.Driver) {
if c.agent == nil {
return
}
d.DiscoverNew(discoverapi.NodeDiscovery, discoverapi.NodeDiscoveryData{
Address: c.agent.bindAddr,
Self: true,
})
}
func (c *controller) agentClose() {
if c.agent == nil {
return
}
for _, cancelFuncs := range c.agent.driverCancelFuncs {
for _, cancel := range cancelFuncs {
cancel()
}
}
c.agent.epTblCancel()
c.agent.networkDB.Close()
}
func (n *network) isClusterEligible() bool {
if n.driverScope() != datastore.GlobalScope {
return false
}
c := n.getController()
if c.agent == nil {
return false
}
return true
}
func (n *network) joinCluster() error {
if !n.isClusterEligible() {
return nil
}
c := n.getController()
return c.agent.networkDB.JoinNetwork(n.ID())
}
func (n *network) leaveCluster() error {
if !n.isClusterEligible() {
return nil
}
c := n.getController()
return c.agent.networkDB.LeaveNetwork(n.ID())
}
func (ep *endpoint) addToCluster() error {
n := ep.getNetwork()
if !n.isClusterEligible() {
return nil
}
c := n.getController()
if !ep.isAnonymous() && ep.Iface().Address() != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
return err
}
if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s,%s,%s,%s", ep.Name(), ep.svcName,
ep.svcID, ep.Iface().Address().IP))); err != nil {
return err
}
}
for _, te := range ep.joinInfo.driverTableEntries {
if err := c.agent.networkDB.CreateEntry(te.tableName, n.ID(), te.key, te.value); err != nil {
return err
}
}
return nil
}
func (ep *endpoint) deleteFromCluster() error {
n := ep.getNetwork()
if !n.isClusterEligible() {
return nil
}
c := n.getController()
if !ep.isAnonymous() {
if ep.Iface().Address() != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil {
return err
}
}
if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil {
return err
}
}
if ep.joinInfo == nil {
return nil
}
for _, te := range ep.joinInfo.driverTableEntries {
if err := c.agent.networkDB.DeleteEntry(te.tableName, n.ID(), te.key); err != nil {
return err
}
}
return nil
}
func (n *network) addDriverWatches() {
if !n.isClusterEligible() {
return
}
c := n.getController()
for _, tableName := range n.driverTables {
ch, cancel := c.agent.networkDB.Watch(tableName, n.ID(), "")
c.Lock()
c.agent.driverCancelFuncs[n.ID()] = append(c.agent.driverCancelFuncs[n.ID()], cancel)
c.Unlock()
go c.handleTableEvents(ch, n.handleDriverTableEvent)
d, err := n.driver(false)
if err != nil {
logrus.Errorf("Could not resolve driver %s while walking driver tabl: %v", n.networkType, err)
return
}
c.agent.networkDB.WalkTable(tableName, func(nid, key string, value []byte) bool {
d.EventNotify(driverapi.Create, n.ID(), tableName, key, value)
return false
})
}
}
func (n *network) cancelDriverWatches() {
if !n.isClusterEligible() {
return
}
c := n.getController()
c.Lock()
cancelFuncs := c.agent.driverCancelFuncs[n.ID()]
delete(c.agent.driverCancelFuncs, n.ID())
c.Unlock()
for _, cancel := range cancelFuncs {
cancel()
}
}
func (c *controller) handleTableEvents(ch chan events.Event, fn func(events.Event)) {
for {
select {
case ev, ok := <-ch:
if !ok {
return
}
fn(ev)
}
}
}
func (n *network) handleDriverTableEvent(ev events.Event) {
d, err := n.driver(false)
if err != nil {
logrus.Errorf("Could not resolve driver %s while handling driver table event: %v", n.networkType, err)
return
}
var (
etype driverapi.EventType
tname string
key string
value []byte
)
switch event := ev.(type) {
case networkdb.CreateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Create
case networkdb.DeleteEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
case networkdb.UpdateEvent:
tname = event.Table
key = event.Key
value = event.Value
etype = driverapi.Delete
}
d.EventNotify(etype, n.ID(), tname, key, value)
}
func (c *controller) handleEpTableEvent(ev events.Event) {
var (
nid string
eid string
value string
isAdd bool
)
switch event := ev.(type) {
case networkdb.CreateEvent:
nid = event.NetworkID
eid = event.Key
value = string(event.Value)
isAdd = true
case networkdb.DeleteEvent:
nid = event.NetworkID
eid = event.Key
value = string(event.Value)
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}
nw, err := c.NetworkByID(nid)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
return
}
n := nw.(*network)
vals := strings.Split(value, ",")
if len(vals) < 4 {
logrus.Errorf("Incorrect service table value = %s", value)
return
}
name := vals[0]
svcName := vals[1]
svcID := vals[2]
ip := net.ParseIP(vals[3])
if name == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}
if isAdd {
if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
n.addSvcRecords(name, ip, nil, true)
} else {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
return
}
n.deleteSvcRecords(name, ip, nil, true)
}
}

View file

@ -370,6 +370,8 @@ func (h *Handle) set(ordinal, start, end uint64, any bool, release bool) (uint64
// checks is needed because to cover the case where the number of bits is not a multiple of blockLen
func (h *Handle) validateOrdinal(ordinal uint64) error {
h.Lock()
defer h.Unlock()
if ordinal >= h.bits {
return fmt.Errorf("bit does not belong to the sequence")
}

View file

@ -75,6 +75,10 @@ func (h *Handle) CopyTo(o datastore.KVObject) error {
defer h.Unlock()
dstH := o.(*Handle)
if h == dstH {
return nil
}
dstH.Lock()
dstH.bits = h.bits
dstH.unselected = h.unselected
dstH.head = h.head.getCopy()
@ -83,6 +87,7 @@ func (h *Handle) CopyTo(o datastore.KVObject) error {
dstH.dbIndex = h.dbIndex
dstH.dbExists = h.dbExists
dstH.store = h.store
dstH.Unlock()
return nil
}

View file

@ -22,9 +22,12 @@ type Config struct {
// DaemonCfg represents libnetwork core configuration
type DaemonCfg struct {
Debug bool
IsAgent bool
DataDir string
DefaultNetwork string
DefaultDriver string
Bind string
Neighbors []string
Labels []string
DriverCfg map[string]interface{}
}
@ -81,6 +84,27 @@ func ParseConfigOptions(cfgOptions ...Option) *Config {
// to the controller
type Option func(c *Config)
// OptionBind function returns an option setter for setting a bind interface or address
func OptionBind(bind string) Option {
return func(c *Config) {
c.Daemon.Bind = bind
}
}
// OptionAgent function returns an option setter for setting agent mode
func OptionAgent() Option {
return func(c *Config) {
c.Daemon.IsAgent = true
}
}
// OptionNeighbors function returns an option setter for setting a list of neighbors to join.
func OptionNeighbors(neighbors []string) Option {
return func(c *Config) {
c.Daemon.Neighbors = neighbors
}
}
// OptionDefaultNetwork function returns an option setter for a default network
func OptionDefaultNetwork(dn string) Option {
return func(c *Config) {

View file

@ -15,7 +15,7 @@ create network namespaces and allocate interfaces for containers to use.
// Create a network for containers to join.
// NewNetwork accepts Variadic optional arguments that libnetwork and Drivers can make use of
network, err := controller.NewNetwork(networkType, "network1")
network, err := controller.NewNetwork(networkType, "network1", "")
if err != nil {
return
}
@ -58,6 +58,7 @@ import (
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/drvregistry"
"github.com/docker/libnetwork/hostdiscovery"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel"
@ -75,7 +76,7 @@ type NetworkController interface {
Config() config.Config
// Create a new network. The options parameter carries network specific options.
NewNetwork(networkType, name string, options ...NetworkOption) (Network, error)
NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error)
// Networks returns the list of Network(s) managed by this controller.
Networks() []Network
@ -119,55 +120,74 @@ type NetworkWalker func(nw Network) bool
// When the function returns true, the walk will stop.
type SandboxWalker func(sb Sandbox) bool
type driverData struct {
driver driverapi.Driver
capability driverapi.Capability
}
type ipamData struct {
driver ipamapi.Ipam
capability *ipamapi.Capability
// default address spaces are provided by ipam driver at registration time
defaultLocalAddressSpace, defaultGlobalAddressSpace string
}
type driverTable map[string]*driverData
type ipamTable map[string]*ipamData
type sandboxTable map[string]*sandbox
type controller struct {
id string
drivers driverTable
ipamDrivers ipamTable
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcDb map[string]svcInfo
nmap map[string]*netWatch
defOsSbox osl.Sandbox
sboxOnce sync.Once
id string
drvRegistry *drvregistry.DrvRegistry
sandboxes sandboxTable
cfg *config.Config
stores []datastore.DataStore
discovery hostdiscovery.HostDiscovery
extKeyListener net.Listener
watchCh chan *endpoint
unWatchCh chan *endpoint
svcRecords map[string]svcInfo
nmap map[string]*netWatch
serviceBindings map[string]*service
defOsSbox osl.Sandbox
sboxOnce sync.Once
agent *agent
sync.Mutex
}
type initializer struct {
fn drvregistry.InitFunc
ntype string
}
// New creates a new instance of network controller.
func New(cfgOptions ...config.Option) (NetworkController, error) {
c := &controller{
id: stringid.GenerateRandomID(),
cfg: config.ParseConfigOptions(cfgOptions...),
sandboxes: sandboxTable{},
drivers: driverTable{},
ipamDrivers: ipamTable{},
svcDb: make(map[string]svcInfo),
id: stringid.GenerateRandomID(),
cfg: config.ParseConfigOptions(cfgOptions...),
sandboxes: sandboxTable{},
svcRecords: make(map[string]svcInfo),
serviceBindings: make(map[string]*service),
}
if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
return nil, err
}
if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
return nil, err
}
if err := c.initStores(); err != nil {
return nil, err
}
drvRegistry, err := drvregistry.New(c.getStore(datastore.LocalScope), c.getStore(datastore.GlobalScope), c.RegisterDriver, nil)
if err != nil {
return nil, err
}
for _, i := range getInitializers() {
var dcfg map[string]interface{}
// External plugins don't need config passed through daemon. They can
// bootstrap themselves
if i.ntype != "remote" {
dcfg = c.makeDriverConfig(i.ntype)
}
if err := drvRegistry.AddDriver(i.ntype, i.fn, dcfg); err != nil {
return nil, err
}
}
c.drvRegistry = drvRegistry
if c.cfg != nil && c.cfg.Cluster.Watcher != nil {
if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
// Failing to initialize discovery is a bad situation to be in.
@ -176,15 +196,6 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
}
}
if err := initDrivers(c); err != nil {
return nil, err
}
if err := initIpams(c, c.getStore(datastore.LocalScope),
c.getStore(datastore.GlobalScope)); err != nil {
return nil, err
}
c.sandboxCleanup()
c.cleanupLocalEndpoints()
c.networkCleanup()
@ -196,8 +207,67 @@ func New(cfgOptions ...config.Option) (NetworkController, error) {
return c, nil
}
func (c *controller) makeDriverConfig(ntype string) map[string]interface{} {
if c.cfg == nil {
return nil
}
config := make(map[string]interface{})
for _, label := range c.cfg.Daemon.Labels {
if !strings.HasPrefix(netlabel.Key(label), netlabel.DriverPrefix+"."+ntype) {
continue
}
config[netlabel.Key(label)] = netlabel.Value(label)
}
drvCfg, ok := c.cfg.Daemon.DriverCfg[ntype]
if ok {
for k, v := range drvCfg.(map[string]interface{}) {
config[k] = v
}
}
for k, v := range c.cfg.Scopes {
if !v.IsValid() {
continue
}
config[netlabel.MakeKVClient(k)] = discoverapi.DatastoreConfigData{
Scope: k,
Provider: v.Client.Provider,
Address: v.Client.Address,
Config: v.Client.Config,
}
}
return config
}
var procReloadConfig = make(chan (bool), 1)
func (c *controller) processAgentConfig(cfg *config.Config) (bool, error) {
if c.cfg.Daemon.IsAgent == cfg.Daemon.IsAgent {
// Agent configuration not changed
return false, nil
}
c.Lock()
c.cfg = cfg
c.Unlock()
if err := c.agentInit(c.cfg.Daemon.Bind); err != nil {
return false, err
}
if err := c.agentJoin(c.cfg.Daemon.Neighbors); err != nil {
c.agentClose()
return false, err
}
return true, nil
}
func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
procReloadConfig <- true
defer func() { <-procReloadConfig }()
@ -206,6 +276,16 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
// Refuse the configuration if it alters an existing datastore client configuration.
update := false
cfg := config.ParseConfigOptions(cfgOptions...)
isAgentConfig, err := c.processAgentConfig(cfg)
if err != nil {
return err
}
if isAgentConfig {
return nil
}
for s := range c.cfg.Scopes {
if _, ok := cfg.Scopes[s]; !ok {
return types.ForbiddenErrorf("cannot accept new configuration because it removes an existing datastore client")
@ -228,16 +308,6 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
return nil
}
c.Lock()
c.cfg = cfg
c.Unlock()
if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
}
}
var dsConfig *discoverapi.DatastoreConfigData
for scope, sCfg := range cfg.Scopes {
if scope == datastore.LocalScope || !sCfg.IsValid() {
@ -255,17 +325,25 @@ func (c *controller) ReloadConfiguration(cfgOptions ...config.Option) error {
return nil
}
for nm, id := range c.getIpamDrivers() {
err := id.driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
c.drvRegistry.WalkIPAMs(func(name string, driver ipamapi.Ipam, cap *ipamapi.Capability) bool {
err := driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
if err != nil {
log.Errorf("Failed to set datastore in driver %s: %v", nm, err)
log.Errorf("Failed to set datastore in driver %s: %v", name, err)
}
}
return false
})
for nm, id := range c.getNetDrivers() {
err := id.driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
err := driver.DiscoverNew(discoverapi.DatastoreConfig, *dsConfig)
if err != nil {
log.Errorf("Failed to set datastore in driver %s: %v", nm, err)
log.Errorf("Failed to set datastore in driver %s: %v", name, err)
}
return false
})
if c.discovery == nil && c.cfg.Cluster.Watcher != nil {
if err := c.initDiscovery(c.cfg.Cluster.Watcher); err != nil {
log.Errorf("Failed to Initialize Discovery after configuration update: %v", err)
}
}
@ -333,34 +411,30 @@ func (c *controller) hostLeaveCallback(nodes []net.IP) {
}
func (c *controller) processNodeDiscovery(nodes []net.IP, add bool) {
c.Lock()
drivers := []*driverData{}
for _, d := range c.drivers {
drivers = append(drivers, d)
}
c.Unlock()
for _, d := range drivers {
c.pushNodeDiscovery(d, nodes, add)
}
c.drvRegistry.WalkDrivers(func(name string, driver driverapi.Driver, capability driverapi.Capability) bool {
c.pushNodeDiscovery(driver, capability, nodes, add)
return false
})
}
func (c *controller) pushNodeDiscovery(d *driverData, nodes []net.IP, add bool) {
func (c *controller) pushNodeDiscovery(d driverapi.Driver, cap driverapi.Capability, nodes []net.IP, add bool) {
var self net.IP
if c.cfg != nil {
addr := strings.Split(c.cfg.Cluster.Address, ":")
self = net.ParseIP(addr[0])
}
if d == nil || d.capability.DataScope != datastore.GlobalScope || nodes == nil {
if d == nil || cap.DataScope != datastore.GlobalScope || nodes == nil {
return
}
for _, node := range nodes {
nodeData := discoverapi.NodeDiscoveryData{Address: node.String(), Self: node.Equal(self)}
var err error
if add {
err = d.driver.DiscoverNew(discoverapi.NodeDiscovery, nodeData)
err = d.DiscoverNew(discoverapi.NodeDiscovery, nodeData)
} else {
err = d.driver.DiscoverDelete(discoverapi.NodeDiscovery, nodeData)
err = d.DiscoverDelete(discoverapi.NodeDiscovery, nodeData)
}
if err != nil {
log.Debugf("discovery notification error : %v", err)
@ -378,73 +452,36 @@ func (c *controller) Config() config.Config {
}
func (c *controller) RegisterDriver(networkType string, driver driverapi.Driver, capability driverapi.Capability) error {
if !config.IsValidName(networkType) {
return ErrInvalidName(networkType)
}
c.Lock()
if _, ok := c.drivers[networkType]; ok {
c.Unlock()
return driverapi.ErrActiveRegistration(networkType)
}
dData := &driverData{driver, capability}
c.drivers[networkType] = dData
hd := c.discovery
c.Unlock()
if hd != nil {
c.pushNodeDiscovery(dData, hd.Fetch(), true)
c.pushNodeDiscovery(driver, capability, hd.Fetch(), true)
}
c.agentDriverNotify(driver)
return nil
}
func (c *controller) registerIpamDriver(name string, driver ipamapi.Ipam, caps *ipamapi.Capability) error {
if !config.IsValidName(name) {
return ErrInvalidName(name)
}
c.Lock()
_, ok := c.ipamDrivers[name]
c.Unlock()
if ok {
return types.ForbiddenErrorf("ipam driver %q already registered", name)
}
locAS, glbAS, err := driver.GetDefaultAddressSpaces()
if err != nil {
return types.InternalErrorf("ipam driver %q failed to return default address spaces: %v", name, err)
}
c.Lock()
c.ipamDrivers[name] = &ipamData{driver: driver, defaultLocalAddressSpace: locAS, defaultGlobalAddressSpace: glbAS, capability: caps}
c.Unlock()
log.Debugf("Registering ipam driver: %q", name)
return nil
}
func (c *controller) RegisterIpamDriver(name string, driver ipamapi.Ipam) error {
return c.registerIpamDriver(name, driver, &ipamapi.Capability{})
}
func (c *controller) RegisterIpamDriverWithCapabilities(name string, driver ipamapi.Ipam, caps *ipamapi.Capability) error {
return c.registerIpamDriver(name, driver, caps)
}
// NewNetwork creates a new network of the specified network type. The options
// are network specific and modeled in a generic way.
func (c *controller) NewNetwork(networkType, name string, options ...NetworkOption) (Network, error) {
func (c *controller) NewNetwork(networkType, name string, id string, options ...NetworkOption) (Network, error) {
if !config.IsValidName(name) {
return nil, ErrInvalidName(name)
}
if id == "" {
id = stringid.GenerateRandomID()
}
// Construct the network object
network := &network{
name: name,
networkType: networkType,
generic: map[string]interface{}{netlabel.GenericData: make(map[string]string)},
ipamType: ipamapi.DefaultIPAM,
id: stringid.GenerateRandomID(),
id: id,
ctrlr: c,
persist: true,
drvOnce: &sync.Once{},
@ -468,7 +505,8 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
}
}()
if err = c.addNetwork(network); err != nil {
err = c.addNetwork(network)
if err != nil {
return nil, err
}
defer func() {
@ -499,6 +537,12 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti
return nil, err
}
if err = network.joinCluster(); err != nil {
log.Errorf("Failed to join network %s into agent cluster: %v", name, err)
}
network.addDriverWatches()
return network, nil
}
@ -509,7 +553,7 @@ func (c *controller) addNetwork(n *network) error {
}
// Create the network
if err := d.CreateNetwork(n.id, n.generic, n.getIPData(4), n.getIPData(6)); err != nil {
if err := d.CreateNetwork(n.id, n.generic, n, n.getIPData(4), n.getIPData(6)); err != nil {
return err
}
@ -745,78 +789,47 @@ func SandboxKeyWalker(out *Sandbox, key string) SandboxWalker {
}
}
func (c *controller) loadDriver(networkType string) (*driverData, error) {
func (c *controller) loadDriver(networkType string) error {
// Plugins pkg performs lazy loading of plugins that acts as remote drivers.
// As per the design, this Get call will result in remote driver discovery if there is a corresponding plugin available.
_, err := plugins.Get(networkType, driverapi.NetworkPluginEndpointType)
if err != nil {
if err == plugins.ErrNotFound {
return nil, types.NotFoundErrorf(err.Error())
return types.NotFoundErrorf(err.Error())
}
return nil, err
return err
}
c.Lock()
defer c.Unlock()
dd, ok := c.drivers[networkType]
if !ok {
return nil, ErrInvalidNetworkDriver(networkType)
}
return dd, nil
return nil
}
func (c *controller) loadIpamDriver(name string) (*ipamData, error) {
func (c *controller) loadIPAMDriver(name string) error {
if _, err := plugins.Get(name, ipamapi.PluginEndpointType); err != nil {
if err == plugins.ErrNotFound {
return nil, types.NotFoundErrorf(err.Error())
return types.NotFoundErrorf(err.Error())
}
return nil, err
return err
}
c.Lock()
id, ok := c.ipamDrivers[name]
c.Unlock()
if !ok {
return nil, types.BadRequestErrorf("invalid ipam driver: %q", name)
}
return id, nil
return nil
}
func (c *controller) getIPAM(name string) (id *ipamData, err error) {
var ok bool
c.Lock()
id, ok = c.ipamDrivers[name]
c.Unlock()
if !ok {
id, err = c.loadIpamDriver(name)
}
return id, err
}
func (c *controller) getIPAMDriver(name string) (ipamapi.Ipam, *ipamapi.Capability, error) {
id, cap := c.drvRegistry.IPAM(name)
if id == nil {
// Might be a plugin name. Try loading it
if err := c.loadIPAMDriver(name); err != nil {
return nil, nil, err
}
func (c *controller) getIpamDriver(name string) (ipamapi.Ipam, error) {
id, err := c.getIPAM(name)
if err != nil {
return nil, err
// Now that we resolved the plugin, try again looking up the registry
id, cap = c.drvRegistry.IPAM(name)
if id == nil {
return nil, nil, types.BadRequestErrorf("invalid ipam driver: %q", name)
}
}
return id.driver, nil
}
func (c *controller) getIpamDrivers() ipamTable {
c.Lock()
defer c.Unlock()
table := ipamTable{}
for i, d := range c.ipamDrivers {
table[i] = d
}
return table
}
func (c *controller) getNetDrivers() driverTable {
c.Lock()
defer c.Unlock()
table := driverTable{}
for i, d := range c.drivers {
table[i] = d
}
return table
return id, cap, nil
}
func (c *controller) Stop() {

View file

@ -5,7 +5,6 @@ import (
"sync"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
)
type kvMap map[string]KVObject
@ -42,9 +41,7 @@ func (c *cache) kmap(kvObject KVObject) (kvMap, error) {
kvList, err := c.ds.store.List(keyPrefix)
if err != nil {
// In case of BoltDB it may return ErrBoltBucketNotFound when no writes
// have ever happened on the db bucket. So check for both err codes
if err == store.ErrKeyNotFound || err == boltdb.ErrBoltBucketNotFound {
if err == store.ErrKeyNotFound {
// If the store doesn't have anything then there is nothing to
// populate in the cache. Just bail out.
goto out

View file

@ -9,10 +9,6 @@ import (
"github.com/docker/libkv"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/types"
)
@ -148,13 +144,6 @@ func makeDefaultScopes() map[string]*ScopeCfg {
var defaultRootChain = []string{"docker", "network", "v1.0"}
var rootChain = defaultRootChain
func init() {
consul.Register()
zookeeper.Register()
etcd.Register()
boltdb.Register()
}
// DefaultScopes returns a map of default scopes and it's config for clients to use.
func DefaultScopes(dataDir string) map[string]*ScopeCfg {
if dataDir != "" {
@ -411,6 +400,9 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error {
_, pair, err = ds.store.AtomicPut(Key(kvObject.Key()...), kvObjValue, previous, nil)
if err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
}
return err
}
@ -571,6 +563,9 @@ func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error {
}
if _, err := ds.store.AtomicDelete(Key(kvObject.Key()...), previous); err != nil {
if err == store.ErrKeyExists {
return ErrKeyModified
}
return err
}

View file

@ -14,7 +14,7 @@ func (c *controller) createGWNetwork() (Network, error) {
bridge.EnableIPMasquerade: strconv.FormatBool(true),
}
n, err := c.NewNetwork("bridge", libnGWNetwork,
n, err := c.NewNetwork("bridge", libnGWNetwork, "",
NetworkOptionDriverOpts(netOption),
NetworkOptionEnableIPv6(false),
)

View file

@ -0,0 +1,7 @@
package libnetwork
import "github.com/docker/libnetwork/types"
func (c *controller) createGWNetwork() (Network, error) {
return nil, types.NotImplementedErrorf("default gateway functionality is not implemented in solaris")
}

View file

@ -13,10 +13,25 @@ const NetworkPluginEndpointType = "NetworkDriver"
type Driver interface {
discoverapi.Discover
// CreateNetwork invokes the driver method to create a network passing
// the network id and network specific config. The config mechanism will
// eventually be replaced with labels which are yet to be introduced.
CreateNetwork(nid string, options map[string]interface{}, ipV4Data, ipV6Data []IPAMData) error
// NetworkAllocate invokes the driver method to allocate network
// specific resources passing network id and network specific config.
// It returns a key,value pair of network specific driver allocations
// to the caller.
NetworkAllocate(nid string, options map[string]string, ipV4Data, ipV6Data []IPAMData) (map[string]string, error)
// NetworkFree invokes the driver method to free network specific resources
// associated with a given network id.
NetworkFree(nid string) error
// CreateNetwork invokes the driver method to create a network
// passing the network id and network specific config. The
// config mechanism will eventually be replaced with labels
// which are yet to be introduced. The driver can return a
// list of table names for which it is interested in receiving
// notification when a CRUD operation is performed on any
// entry in that table. This will be ignored for local scope
// drivers.
CreateNetwork(nid string, options map[string]interface{}, nInfo NetworkInfo, ipV4Data, ipV6Data []IPAMData) error
// DeleteNetwork invokes the driver method to delete network passing
// the network id.
@ -50,10 +65,24 @@ type Driver interface {
// programming that was done so far
RevokeExternalConnectivity(nid, eid string) error
// EventNotify notifies the driver when a CRUD operation has
// happened on a table of its interest as soon as this node
// receives such an event in the gossip layer. This method is
// only invoked for the global scope driver.
EventNotify(event EventType, nid string, tableName string, key string, value []byte)
// Type returns the the type of this driver, the network type this driver manages
Type() string
}
// NetworkInfo provides a go interface for drivers to provide network
// specific information to libnetwork.
type NetworkInfo interface {
// TableEventRegister registers driver interest in a given
// table name.
TableEventRegister(tableName string) error
}
// InterfaceInfo provides a go interface for drivers to retrive
// network information to interface resources.
type InterfaceInfo interface {
@ -102,6 +131,10 @@ type JoinInfo interface {
// DisableGatewayService tells libnetwork not to provide Default GW for the container
DisableGatewayService()
// AddTableEntry adds a table entry to the gossip layer
// passing the table name, key and an opaque value.
AddTableEntry(tableName string, key string, value []byte) error
}
// DriverCallback provides a Callback interface for Drivers into LibNetwork
@ -124,3 +157,15 @@ type IPAMData struct {
Gateway *net.IPNet
AuxAddresses map[string]*net.IPNet
}
// EventType defines a type for the CRUD event
type EventType uint8
const (
// Create event is generated when a table entry is created,
Create EventType = 1 + iota
// Update event is generated when a table entry is updated.
Update
// Delete event is generated when a table entry is deleted.
Delete
)

View file

@ -1,84 +0,0 @@
package libnetwork
import (
"strings"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/netlabel"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
nullIpam "github.com/docker/libnetwork/ipams/null"
remoteIpam "github.com/docker/libnetwork/ipams/remote"
)
type initializer struct {
fn func(driverapi.DriverCallback, map[string]interface{}) error
ntype string
}
func initDrivers(c *controller) error {
for _, i := range getInitializers() {
if err := i.fn(c, makeDriverConfig(c, i.ntype)); err != nil {
return err
}
}
return nil
}
func makeDriverConfig(c *controller, ntype string) map[string]interface{} {
if c.cfg == nil {
return nil
}
config := make(map[string]interface{})
for _, label := range c.cfg.Daemon.Labels {
if !strings.HasPrefix(netlabel.Key(label), netlabel.DriverPrefix+"."+ntype) {
continue
}
config[netlabel.Key(label)] = netlabel.Value(label)
}
drvCfg, ok := c.cfg.Daemon.DriverCfg[ntype]
if ok {
for k, v := range drvCfg.(map[string]interface{}) {
config[k] = v
}
}
// We don't send datastore configs to external plugins
if ntype == "remote" {
return config
}
for k, v := range c.cfg.Scopes {
if !v.IsValid() {
continue
}
config[netlabel.MakeKVClient(k)] = discoverapi.DatastoreConfigData{
Scope: k,
Provider: v.Client.Provider,
Address: v.Client.Address,
Config: v.Client.Config,
}
}
return config
}
func initIpams(ic ipamapi.Callback, lDs, gDs interface{}) error {
for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){
builtinIpam.Init,
remoteIpam.Init,
nullIpam.Init,
} {
if err := fn(ic, lDs, gDs); err != nil {
return err
}
}
return nil
}

View file

@ -535,8 +535,19 @@ func (d *driver) getNetworks() []*bridgeNetwork {
return ls
}
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}
// Create a new network using bridge plugin
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
if len(ipV4Data) == 0 || ipV4Data[0].Pool.String() == "0.0.0.0/0" {
return types.BadRequestErrorf("ipv4 pool is empty")
}

View file

@ -6,7 +6,6 @@ import (
"net"
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/netlabel"
@ -35,7 +34,7 @@ func (d *driver) initStore(option map[string]interface{}) error {
func (d *driver) populateNetworks() error {
kvol, err := d.store.List(datastore.Key(bridgePrefix), &networkConfiguration{})
if err != nil && err != datastore.ErrKeyNotFound && err != boltdb.ErrBoltBucketNotFound {
if err != nil && err != datastore.ErrKeyNotFound {
return fmt.Errorf("failed to get bridge network configurations from store: %v", err)
}

View file

@ -24,7 +24,18 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
return dc.RegisterDriver(networkType, &driver{}, c)
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
d.Lock()
defer d.Unlock()

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/types"
)
const (
@ -64,6 +65,14 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
return dc.RegisterDriver(ipvlanType, d, c)
}
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EndpointOperInfo(nid, eid string) (map[string]interface{}, error) {
return make(map[string]interface{}, 0), nil
}
@ -89,3 +98,6 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{})
func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
return nil
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}

View file

@ -14,7 +14,7 @@ import (
)
// CreateNetwork the network for the specified driver type
func (d *driver) CreateNetwork(nid string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) CreateNetwork(nid string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
defer osl.InitOSContext()()
kv, err := kernel.GetKernelVersion()
if err != nil {

View file

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/netlabel"
@ -60,7 +59,7 @@ func (d *driver) initStore(option map[string]interface{}) error {
// populateNetworks is invoked at driver init to recreate persistently stored networks
func (d *driver) populateNetworks() error {
kvol, err := d.store.List(datastore.Key(ipvlanPrefix), &configuration{})
if err != nil && err != datastore.ErrKeyNotFound && err != boltdb.ErrBoltBucketNotFound {
if err != nil && err != datastore.ErrKeyNotFound {
return fmt.Errorf("failed to get ipvlan network configurations from store: %v", err)
}
// If empty it simply means no ipvlan networks have been created yet

View file

@ -8,6 +8,7 @@ import (
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/types"
)
const (
@ -66,6 +67,14 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
return dc.RegisterDriver(macvlanType, d, c)
}
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EndpointOperInfo(nid, eid string) (map[string]interface{}, error) {
return make(map[string]interface{}, 0), nil
}
@ -91,3 +100,6 @@ func (d *driver) DiscoverNew(dType discoverapi.DiscoveryType, data interface{})
func (d *driver) DiscoverDelete(dType discoverapi.DiscoveryType, data interface{}) error {
return nil
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}

View file

@ -14,7 +14,7 @@ import (
)
// CreateNetwork the network for the specified driver type
func (d *driver) CreateNetwork(nid string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) CreateNetwork(nid string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
defer osl.InitOSContext()()
kv, err := kernel.GetKernelVersion()
if err != nil {

View file

@ -5,7 +5,6 @@ import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/netlabel"
@ -60,7 +59,7 @@ func (d *driver) initStore(option map[string]interface{}) error {
// populateNetworks is invoked at driver init to recreate persistently stored networks
func (d *driver) populateNetworks() error {
kvol, err := d.store.List(datastore.Key(macvlanPrefix), &configuration{})
if err != nil && err != datastore.ErrKeyNotFound && err != boltdb.ErrBoltBucketNotFound {
if err != nil && err != datastore.ErrKeyNotFound {
return fmt.Errorf("failed to get macvlan network configurations from store: %v", err)
}
// If empty it simply means no macvlan networks have been created yet

View file

@ -24,7 +24,18 @@ func Init(dc driverapi.DriverCallback, config map[string]interface{}) error {
return dc.RegisterDriver(networkType, &driver{}, c)
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
d.Lock()
defer d.Unlock()

View file

@ -21,14 +21,18 @@ func chainExists(cname string) bool {
}
func setupGlobalChain() {
if err := iptables.RawCombinedOutput("-N", globalChain); err != nil {
logrus.Errorf("could not create global overlay chain: %v", err)
return
// Because of an ungraceful shutdown, chain could already be present
if !chainExists(globalChain) {
if err := iptables.RawCombinedOutput("-N", globalChain); err != nil {
logrus.Errorf("could not create global overlay chain: %v", err)
return
}
}
if err := iptables.RawCombinedOutput("-A", globalChain, "-j", "RETURN"); err != nil {
logrus.Errorf("could not install default return chain in the overlay global chain: %v", err)
return
if !iptables.Exists(iptables.Filter, globalChain, "-j", "RETURN") {
if err := iptables.RawCombinedOutput("-A", globalChain, "-j", "RETURN"); err != nil {
logrus.Errorf("could not install default return chain in the overlay global chain: %v", err)
}
}
}

View file

@ -3,6 +3,7 @@ package overlay
import (
"fmt"
"net"
"strings"
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/driverapi"
@ -104,11 +105,55 @@ func (d *driver) Join(nid, eid string, sboxKey string, jinfo driverapi.JoinInfo,
d.peerDbAdd(nid, eid, ep.addr.IP, ep.addr.Mask, ep.mac,
net.ParseIP(d.bindAddress), true)
if err := jinfo.AddTableEntry(ovPeerTable, eid, []byte(fmt.Sprintf("%s,%s,%s", ep.addr, ep.mac, d.bindAddress))); err != nil {
log.Errorf("overlay: Failed adding table entry to joininfo: %v", err)
}
d.pushLocalEndpointEvent("join", nid, eid)
return nil
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
if tableName != ovPeerTable {
log.Errorf("Unexpected table notification for table %s received", tableName)
return
}
eid := key
values := strings.Split(string(value), ",")
if len(values) < 3 {
log.Errorf("Invalid value %s received through event notify", string(value))
return
}
addr, err := types.ParseCIDR(values[0])
if err != nil {
log.Errorf("Invalid peer IP %s received in event notify", values[0])
return
}
mac, err := net.ParseMAC(values[1])
if err != nil {
log.Errorf("Invalid mac %s received in event notify", values[1])
return
}
vtep := net.ParseIP(values[2])
if vtep == nil {
log.Errorf("Invalid VTEP %s received in event notify", values[2])
return
}
if etype == driverapi.Delete {
d.peerDelete(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
return
}
d.peerAdd(nid, eid, addr.IP, addr.Mask, mac, vtep, true)
}
// Leave method is invoked when a Sandbox detaches from an endpoint.
func (d *driver) Leave(nid, eid string) error {
if err := validateID(nid, eid); err != nil {

View file

@ -6,6 +6,7 @@ import (
"net"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"syscall"
@ -13,6 +14,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/resolvconf"
@ -59,7 +61,15 @@ type network struct {
sync.Mutex
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
if id == "" {
return fmt.Errorf("invalid network id")
}
@ -81,12 +91,40 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat
subnets: []*subnet{},
}
for _, ipd := range ipV4Data {
vnis := make([]uint32, 0, len(ipV4Data))
if gval, ok := option[netlabel.GenericData]; ok {
optMap := gval.(map[string]string)
if val, ok := optMap[netlabel.OverlayVxlanIDList]; ok {
logrus.Debugf("overlay: Received vxlan IDs: %s", val)
vniStrings := strings.Split(val, ",")
for _, vniStr := range vniStrings {
vni, err := strconv.Atoi(vniStr)
if err != nil {
return fmt.Errorf("invalid vxlan id value %q passed", vniStr)
}
vnis = append(vnis, uint32(vni))
}
}
}
// If we are getting vnis from libnetwork, either we get for
// all subnets or none.
if len(vnis) != 0 && len(vnis) < len(ipV4Data) {
return fmt.Errorf("insufficient vnis(%d) passed to overlay", len(vnis))
}
for i, ipd := range ipV4Data {
s := &subnet{
subnetIP: ipd.Pool,
gwIP: ipd.Gateway,
once: &sync.Once{},
}
if len(vnis) != 0 {
s.vni = vnis[i]
}
n.subnets = append(n.subnets, s)
}
@ -94,8 +132,13 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Dat
return fmt.Errorf("failed to update data store for network %v: %v", n.id, err)
}
d.addNetwork(n)
if nInfo != nil {
if err := nInfo.TableEventRegister(ovPeerTable); err != nil {
return err
}
}
d.addNetwork(n)
return nil
}
@ -244,11 +287,21 @@ func setHostMode() {
}
func (n *network) generateVxlanName(s *subnet) string {
return "vx-" + fmt.Sprintf("%06x", n.vxlanID(s)) + "-" + n.id[:5]
id := n.id
if len(n.id) > 5 {
id = n.id[:5]
}
return "vx-" + fmt.Sprintf("%06x", n.vxlanID(s)) + "-" + id
}
func (n *network) generateBridgeName(s *subnet) string {
return "ov-" + fmt.Sprintf("%06x", n.vxlanID(s)) + "-" + n.id[:5]
id := n.id
if len(n.id) > 5 {
id = n.id[:5]
}
return "ov-" + fmt.Sprintf("%06x", n.vxlanID(s)) + "-" + id
}
func isOverlap(nw *net.IPNet) bool {
@ -395,9 +448,10 @@ func (n *network) watchMiss(nlSock *nl.NetlinkSocket) {
continue
}
if neigh.IP.To16() != nil {
if neigh.IP.To4() == nil {
continue
}
logrus.Debugf("miss notification for dest IP, %v", neigh.IP.String())
if neigh.State&(netlink.NUD_STALE|netlink.NUD_INCOMPLETE) == 0 {
continue
@ -575,32 +629,38 @@ func (n *network) DataScope() string {
}
func (n *network) writeToStore() error {
if n.driver.store == nil {
return nil
}
return n.driver.store.PutObjectAtomic(n)
}
func (n *network) releaseVxlanID() error {
if n.driver.store == nil {
return fmt.Errorf("no datastore configured. cannot release vxlan id")
}
if len(n.subnets) == 0 {
return nil
}
if err := n.driver.store.DeleteObjectAtomic(n); err != nil {
if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound {
// In both the above cases we can safely assume that the key has been removed by some other
// instance and so simply get out of here
return nil
}
if n.driver.store != nil {
if err := n.driver.store.DeleteObjectAtomic(n); err != nil {
if err == datastore.ErrKeyModified || err == datastore.ErrKeyNotFound {
// In both the above cases we can safely assume that the key has been removed by some other
// instance and so simply get out of here
return nil
}
return fmt.Errorf("failed to delete network to vxlan id map: %v", err)
return fmt.Errorf("failed to delete network to vxlan id map: %v", err)
}
}
for _, s := range n.subnets {
n.driver.vxlanIdm.Release(uint64(n.vxlanID(s)))
if n.driver.vxlanIdm != nil {
n.driver.vxlanIdm.Release(uint64(n.vxlanID(s)))
}
n.setVxlanID(s, 0)
}
return nil
}
@ -611,7 +671,7 @@ func (n *network) obtainVxlanID(s *subnet) error {
}
if n.driver.store == nil {
return fmt.Errorf("no datastore configured. cannot obtain vxlan id")
return fmt.Errorf("no valid vxlan id and no datastore configured, cannot obtain vxlan id")
}
for {

View file

@ -88,7 +88,7 @@ func Fini(drv driverapi.Driver) {
func (d *driver) configure() error {
if d.store == nil {
return types.NoServiceErrorf("datastore is not available")
return nil
}
if d.vxlanIdm == nil {
@ -147,10 +147,14 @@ func (d *driver) nodeJoin(node string, self bool) {
d.Lock()
d.bindAddress = node
d.Unlock()
err := d.serfInit()
if err != nil {
logrus.Errorf("initializing serf instance failed: %v", err)
return
// If there is no cluster store there is no need to start serf.
if d.store != nil {
err := d.serfInit()
if err != nil {
logrus.Errorf("initializing serf instance failed: %v", err)
return
}
}
}

View file

@ -7,6 +7,8 @@ import (
"syscall"
)
const ovPeerTable = "overlay_peer_table"
type peerKey struct {
peerIP net.IP
peerMac net.HardwareAddr

View file

@ -83,7 +83,18 @@ func (d *driver) call(methodName string, arg interface{}, retVal maybeError) err
return nil
}
func (d *driver) CreateNetwork(id string, options map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}
func (d *driver) CreateNetwork(id string, options map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
create := &api.CreateNetworkRequest{
NetworkID: id,
Options: options,

View file

@ -149,8 +149,11 @@ func (c *networkConfiguration) processIPAM(id string, ipamV4Data, ipamV6Data []d
return nil
}
func (d *driver) EventNotify(etype driverapi.EventType, nid, tableName, key string, value []byte) {
}
// Create a new network
func (d *driver) CreateNetwork(id string, option map[string]interface{}, ipV4Data, ipV6Data []driverapi.IPAMData) error {
func (d *driver) CreateNetwork(id string, option map[string]interface{}, nInfo driverapi.NetworkInfo, ipV4Data, ipV6Data []driverapi.IPAMData) error {
if _, err := d.getNetwork(id); err == nil {
return types.ForbiddenErrorf("network %s exists", id)
}
@ -414,6 +417,10 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
}
endpointStruct.Policies = append(endpointStruct.Policies, qosPolicies...)
if ifInfo.Address() != nil {
endpointStruct.IPAddress = ifInfo.Address().IP
}
configurationb, err := json.Marshal(endpointStruct)
if err != nil {
return err
@ -449,8 +456,13 @@ func (d *driver) CreateEndpoint(nid, eid string, ifInfo driverapi.InterfaceInfo,
n.endpoints[eid] = endpoint
n.Unlock()
ifInfo.SetIPAddress(endpoint.addr)
ifInfo.SetMacAddress(endpoint.macAddress)
if ifInfo.Address() == nil {
ifInfo.SetIPAddress(endpoint.addr)
}
if macAddress == nil {
ifInfo.SetMacAddress(endpoint.macAddress)
}
return nil
}
@ -560,6 +572,14 @@ func (d *driver) RevokeExternalConnectivity(nid, eid string) error {
return nil
}
func (d *driver) NetworkAllocate(id string, option map[string]string, ipV4Data, ipV6Data []driverapi.IPAMData) (map[string]string, error) {
return nil, types.NotImplementedErrorf("not implemented")
}
func (d *driver) NetworkFree(id string) error {
return types.NotImplementedErrorf("not implemented")
}
func (d *driver) Type() string {
return d.name
}

View file

@ -0,0 +1,5 @@
package libnetwork
func getInitializers() []initializer {
return []initializer{}
}

View file

@ -0,0 +1,241 @@
package drvregistry
import (
"fmt"
"strings"
"sync"
"github.com/docker/libnetwork/driverapi"
"github.com/docker/libnetwork/ipamapi"
"github.com/docker/libnetwork/types"
builtinIpam "github.com/docker/libnetwork/ipams/builtin"
nullIpam "github.com/docker/libnetwork/ipams/null"
remoteIpam "github.com/docker/libnetwork/ipams/remote"
)
type driverData struct {
driver driverapi.Driver
capability driverapi.Capability
}
type ipamData struct {
driver ipamapi.Ipam
capability *ipamapi.Capability
// default address spaces are provided by ipam driver at registration time
defaultLocalAddressSpace, defaultGlobalAddressSpace string
}
type driverTable map[string]*driverData
type ipamTable map[string]*ipamData
// DrvRegistry holds the registry of all network drivers and IPAM drivers that it knows about.
type DrvRegistry struct {
sync.Mutex
drivers driverTable
ipamDrivers ipamTable
dfn DriverNotifyFunc
ifn IPAMNotifyFunc
}
// Functors definition
// InitFunc defines the driver initialization function signature.
type InitFunc func(driverapi.DriverCallback, map[string]interface{}) error
// IPAMWalkFunc defines the IPAM driver table walker function signature.
type IPAMWalkFunc func(name string, driver ipamapi.Ipam, cap *ipamapi.Capability) bool
// DriverWalkFunc defines the network driver table walker function signature.
type DriverWalkFunc func(name string, driver driverapi.Driver, capability driverapi.Capability) bool
// IPAMNotifyFunc defines the notify function signature when a new IPAM driver gets registered.
type IPAMNotifyFunc func(name string, driver ipamapi.Ipam, cap *ipamapi.Capability) error
// DriverNotifyFunc defines the notify function signature when a new network driver gets registered.
type DriverNotifyFunc func(name string, driver driverapi.Driver, capability driverapi.Capability) error
// New retruns a new driver registry handle.
func New(lDs, gDs interface{}, dfn DriverNotifyFunc, ifn IPAMNotifyFunc) (*DrvRegistry, error) {
r := &DrvRegistry{
drivers: make(driverTable),
ipamDrivers: make(ipamTable),
dfn: dfn,
ifn: ifn,
}
if err := r.initIPAMs(lDs, gDs); err != nil {
return nil, err
}
return r, nil
}
// AddDriver adds a network driver to the registry.
func (r *DrvRegistry) AddDriver(ntype string, fn InitFunc, config map[string]interface{}) error {
return fn(r, config)
}
// WalkIPAMs walks the IPAM drivers registered in the registry and invokes the passed walk function and each one of them.
func (r *DrvRegistry) WalkIPAMs(ifn IPAMWalkFunc) {
type ipamVal struct {
name string
data *ipamData
}
r.Lock()
ivl := make([]ipamVal, 0, len(r.ipamDrivers))
for k, v := range r.ipamDrivers {
ivl = append(ivl, ipamVal{name: k, data: v})
}
r.Unlock()
for _, iv := range ivl {
if ifn(iv.name, iv.data.driver, iv.data.capability) {
break
}
}
}
// WalkDrivers walks the network drivers registered in the registry and invokes the passed walk function and each one of them.
func (r *DrvRegistry) WalkDrivers(dfn DriverWalkFunc) {
type driverVal struct {
name string
data *driverData
}
r.Lock()
dvl := make([]driverVal, 0, len(r.drivers))
for k, v := range r.drivers {
dvl = append(dvl, driverVal{name: k, data: v})
}
r.Unlock()
for _, dv := range dvl {
if dfn(dv.name, dv.data.driver, dv.data.capability) {
break
}
}
}
// Driver returns the actual network driver instance and its capability which registered with the passed name.
func (r *DrvRegistry) Driver(name string) (driverapi.Driver, *driverapi.Capability) {
r.Lock()
defer r.Unlock()
d, ok := r.drivers[name]
if !ok {
return nil, nil
}
return d.driver, &d.capability
}
// IPAM returns the actual IPAM driver instance and its capability which registered with the passed name.
func (r *DrvRegistry) IPAM(name string) (ipamapi.Ipam, *ipamapi.Capability) {
r.Lock()
defer r.Unlock()
i, ok := r.ipamDrivers[name]
if !ok {
return nil, nil
}
return i.driver, i.capability
}
// IPAMDefaultAddressSpaces returns the default address space strings for the passed IPAM driver name.
func (r *DrvRegistry) IPAMDefaultAddressSpaces(name string) (string, string, error) {
r.Lock()
defer r.Unlock()
i, ok := r.ipamDrivers[name]
if !ok {
return "", "", fmt.Errorf("ipam %s not found", name)
}
return i.defaultLocalAddressSpace, i.defaultGlobalAddressSpace, nil
}
func (r *DrvRegistry) initIPAMs(lDs, gDs interface{}) error {
for _, fn := range [](func(ipamapi.Callback, interface{}, interface{}) error){
builtinIpam.Init,
remoteIpam.Init,
nullIpam.Init,
} {
if err := fn(r, lDs, gDs); err != nil {
return err
}
}
return nil
}
// RegisterDriver registers the network driver when it gets discovered.
func (r *DrvRegistry) RegisterDriver(ntype string, driver driverapi.Driver, capability driverapi.Capability) error {
if strings.TrimSpace(ntype) == "" {
return fmt.Errorf("network type string cannot be empty")
}
r.Lock()
_, ok := r.drivers[ntype]
r.Unlock()
if ok {
return driverapi.ErrActiveRegistration(ntype)
}
if r.dfn != nil {
if err := r.dfn(ntype, driver, capability); err != nil {
return err
}
}
dData := &driverData{driver, capability}
r.Lock()
r.drivers[ntype] = dData
r.Unlock()
return nil
}
func (r *DrvRegistry) registerIpamDriver(name string, driver ipamapi.Ipam, caps *ipamapi.Capability) error {
if strings.TrimSpace(name) == "" {
return fmt.Errorf("ipam driver name string cannot be empty")
}
r.Lock()
_, ok := r.ipamDrivers[name]
r.Unlock()
if ok {
return types.ForbiddenErrorf("ipam driver %q already registered", name)
}
locAS, glbAS, err := driver.GetDefaultAddressSpaces()
if err != nil {
return types.InternalErrorf("ipam driver %q failed to return default address spaces: %v", name, err)
}
if r.ifn != nil {
if err := r.ifn(name, driver, caps); err != nil {
return err
}
}
r.Lock()
r.ipamDrivers[name] = &ipamData{driver: driver, defaultLocalAddressSpace: locAS, defaultGlobalAddressSpace: glbAS, capability: caps}
r.Unlock()
return nil
}
// RegisterIpamDriver registers the IPAM driver discovered with default capabilities.
func (r *DrvRegistry) RegisterIpamDriver(name string, driver ipamapi.Ipam) error {
return r.registerIpamDriver(name, driver, &ipamapi.Capability{})
}
// RegisterIpamDriverWithCapabilities registers the IPAM driver discovered with specified capabilities.
func (r *DrvRegistry) RegisterIpamDriverWithCapabilities(name string, driver ipamapi.Ipam, caps *ipamapi.Capability) error {
return r.registerIpamDriver(name, driver, caps)
}

View file

@ -67,6 +67,8 @@ type endpoint struct {
ipamOptions map[string]string
aliases map[string]string
myAliases []string
svcID string
svcName string
dbIndex uint64
dbExists bool
sync.Mutex
@ -89,6 +91,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) {
epMap["anonymous"] = ep.anonymous
epMap["disableResolution"] = ep.disableResolution
epMap["myAliases"] = ep.myAliases
epMap["svcName"] = ep.svcName
epMap["svcID"] = ep.svcID
return json.Marshal(epMap)
}
@ -172,6 +177,15 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) {
if l, ok := epMap["locator"]; ok {
ep.locator = l.(string)
}
if sn, ok := epMap["svcName"]; ok {
ep.svcName = sn.(string)
}
if si, ok := epMap["svcID"]; ok {
ep.svcID = si.(string)
}
ma, _ := json.Marshal(epMap["myAliases"])
var myAliases []string
json.Unmarshal(ma, &myAliases)
@ -196,6 +210,8 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error {
dstEp.dbExists = ep.dbExists
dstEp.anonymous = ep.anonymous
dstEp.disableResolution = ep.disableResolution
dstEp.svcName = ep.svcName
dstEp.svcID = ep.svcID
if ep.iface != nil {
dstEp.iface = &endpointInterface{}
@ -413,7 +429,9 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
}()
// Watch for service records
n.getController().watchSvcRecord(ep)
if !n.getController().cfg.Daemon.IsAgent {
n.getController().watchSvcRecord(ep)
}
address := ""
if ip := ep.getFirstInterfaceAddress(); ip != nil {
@ -446,6 +464,10 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error {
return err
}
if e := ep.addToCluster(); e != nil {
log.Errorf("Could not update state for endpoint %s into cluster: %v", ep.Name(), e)
}
if sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
return sb.setupDefaultGW()
}
@ -632,6 +654,10 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
return err
}
if e := ep.deleteFromCluster(); e != nil {
log.Errorf("Could not delete state for endpoint %s from cluster: %v", ep.Name(), e)
}
sb.deleteHostsEntries(n.getSvcRecords(ep))
if !sb.inDelete && sb.needDefaultGW() && sb.getEndpointInGWNetwork() == nil {
return sb.setupDefaultGW()
@ -730,7 +756,9 @@ func (ep *endpoint) Delete(force bool) error {
}()
// unwatch for service records
n.getController().unWatchSvcRecord(ep)
if !n.getController().cfg.Daemon.IsAgent {
n.getController().unWatchSvcRecord(ep)
}
if err = ep.deleteEndpoint(force); err != nil && !force {
return err
@ -863,6 +891,14 @@ func CreateOptionAlias(name string, alias string) EndpointOption {
}
}
// CreateOptionService function returns an option setter for setting service binding configuration
func CreateOptionService(name, id string) EndpointOption {
return func(ep *endpoint) {
ep.svcName = name
ep.svcID = id
}
}
//CreateOptionMyAlias function returns an option setter for setting endpoint's self alias
func CreateOptionMyAlias(alias string) EndpointOption {
return func(ep *endpoint) {
@ -981,7 +1017,7 @@ func (ep *endpoint) releaseAddress() {
log.Debugf("Releasing addresses for endpoint %s's interface on network %s", ep.Name(), n.Name())
ipam, err := n.getController().getIpamDriver(n.ipamType)
ipam, _, err := n.getController().getIPAMDriver(n.ipamType)
if err != nil {
log.Warnf("Failed to retrieve ipam driver to release interface address on delete of endpoint %s (%s): %v", ep.Name(), ep.ID(), err)
return

View file

@ -143,9 +143,16 @@ type endpointJoinInfo struct {
gw net.IP
gw6 net.IP
StaticRoutes []*types.StaticRoute
driverTableEntries []*tableEntry
disableGatewayService bool
}
type tableEntry struct {
tableName string
key string
value []byte
}
func (ep *endpoint) Info() EndpointInfo {
n, err := ep.getNetworkFromStore()
if err != nil {
@ -292,6 +299,19 @@ func (ep *endpoint) AddStaticRoute(destination *net.IPNet, routeType int, nextHo
return nil
}
func (ep *endpoint) AddTableEntry(tableName, key string, value []byte) error {
ep.Lock()
defer ep.Unlock()
ep.joinInfo.driverTableEntries = append(ep.joinInfo.driverTableEntries, &tableEntry{
tableName: tableName,
key: key,
value: value,
})
return nil
}
func (ep *endpoint) Sandbox() Sandbox {
cnt, ok := ep.getSandbox()
if !ok {

View file

@ -3,6 +3,7 @@ package ipam
import (
"fmt"
"net"
"sort"
"sync"
log "github.com/Sirupsen/logrus"
@ -58,9 +59,6 @@ func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) {
{localAddressSpace, lcDs},
{globalAddressSpace, glDs},
} {
if aspc.ds == nil {
continue
}
a.initializeAddressSpace(aspc.as, aspc.ds)
}
@ -143,15 +141,22 @@ func (a *Allocator) checkConsistency(as string) {
}
func (a *Allocator) initializeAddressSpace(as string, ds datastore.DataStore) error {
scope := ""
if ds != nil {
scope = ds.Scope()
}
a.Lock()
if _, ok := a.addrSpaces[as]; ok {
a.Unlock()
return types.ForbiddenErrorf("tried to add an axisting address space: %s", as)
if currAS, ok := a.addrSpaces[as]; ok {
if currAS.ds != nil {
a.Unlock()
return types.ForbiddenErrorf("a datastore is already configured for the address space %s", as)
}
}
a.addrSpaces[as] = &addrSpace{
subnets: map[SubnetKey]*PoolData{},
id: dsConfigKey + "/" + as,
scope: ds.Scope(),
scope: scope,
ds: ds,
alloc: a,
}
@ -313,10 +318,6 @@ func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error {
//log.Debugf("Inserting bitmask (%s, %s)", key.String(), pool.String())
store := a.getStore(key.AddressSpace)
if store == nil {
return types.InternalErrorf("could not find store for address space %s while inserting bit mask", key.AddressSpace)
}
ipVer := getAddressVersion(pool.IP)
ones, bits := pool.Mask.Size()
numAddresses := uint64(1 << uint(bits-ones))
@ -401,13 +402,6 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error)
}
if !aSpace.contains(as, nw) {
if as == localAddressSpace {
// Check if nw overlap with system routes, name servers
if _, err := ipamutils.FindAvailableNetwork([]*net.IPNet{nw}); err == nil {
return nw, nil
}
continue
}
return nw, nil
}
}
@ -563,13 +557,18 @@ func (a *Allocator) getAddress(nw *net.IPNet, bitmask *bitseq.Handle, prefAddres
func (a *Allocator) DumpDatabase() string {
a.Lock()
aspaces := make(map[string]*addrSpace, len(a.addrSpaces))
orderedAS := make([]string, 0, len(a.addrSpaces))
for as, aSpace := range a.addrSpaces {
orderedAS = append(orderedAS, as)
aspaces[as] = aSpace
}
a.Unlock()
sort.Strings(orderedAS)
var s string
for as, aSpace := range aspaces {
for _, as := range orderedAS {
aSpace := aspaces[as]
s = fmt.Sprintf("\n\n%s Config", as)
aSpace.Lock()
for k, config := range aSpace.subnets {

View file

@ -82,8 +82,10 @@ func (a *Allocator) getStore(as string) datastore.DataStore {
func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) {
store := a.getStore(as)
// IPAM may not have a valid store. In such cases it is just in-memory state.
if store == nil {
return nil, types.InternalErrorf("store for address space %s not found", as)
return nil, nil
}
pc := &addrSpace{id: dsConfigKey + "/" + as, ds: store, alloc: a}
@ -100,8 +102,10 @@ func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) {
func (a *Allocator) writeToStore(aSpace *addrSpace) error {
store := aSpace.store()
// IPAM may not have a valid store. In such cases it is just in-memory state.
if store == nil {
return types.InternalErrorf("invalid store while trying to write %s address space", aSpace.DataScope())
return nil
}
err := store.PutObjectAtomic(aSpace)
@ -114,8 +118,10 @@ func (a *Allocator) writeToStore(aSpace *addrSpace) error {
func (a *Allocator) deleteFromStore(aSpace *addrSpace) error {
store := aSpace.store()
// IPAM may not have a valid store. In such cases it is just in-memory state.
if store == nil {
return types.InternalErrorf("invalid store while trying to delete %s address space", aSpace.DataScope())
return nil
}
return store.DeleteObjectAtomic(aSpace)

View file

@ -1,4 +1,4 @@
// +build linux freebsd
// +build linux freebsd solaris darwin
package builtin

View file

@ -1,76 +0,0 @@
// Package ipamutils provides utililty functions for ipam management
package ipamutils
import (
"fmt"
"net"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/resolvconf"
"github.com/vishvananda/netlink"
)
// ElectInterfaceAddresses looks for an interface on the OS with the specified name
// and returns its IPv4 and IPv6 addresses in CIDR form. If the interface does not exist,
// it chooses from a predifined list the first IPv4 address which does not conflict
// with other interfaces on the system.
func ElectInterfaceAddresses(name string) (*net.IPNet, []*net.IPNet, error) {
var (
v4Net *net.IPNet
v6Nets []*net.IPNet
err error
)
InitNetworks()
defer osl.InitOSContext()()
link, _ := netlink.LinkByName(name)
if link != nil {
v4addr, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err != nil {
return nil, nil, err
}
v6addr, err := netlink.AddrList(link, netlink.FAMILY_V6)
if err != nil {
return nil, nil, err
}
if len(v4addr) > 0 {
v4Net = v4addr[0].IPNet
}
for _, nlAddr := range v6addr {
v6Nets = append(v6Nets, nlAddr.IPNet)
}
}
if link == nil || v4Net == nil {
// Choose from predifined broad networks
v4Net, err = FindAvailableNetwork(PredefinedBroadNetworks)
if err != nil {
return nil, nil, err
}
}
return v4Net, v6Nets, nil
}
// FindAvailableNetwork returns a network from the passed list which does not
// overlap with existing interfaces in the system
func FindAvailableNetwork(list []*net.IPNet) (*net.IPNet, error) {
// We don't check for an error here, because we don't really care if we
// can't read /etc/resolv.conf. So instead we skip the append if resolvConf
// is nil. It either doesn't exist, or we can't read it for some reason.
var nameservers []string
if rc, err := resolvconf.Get(); err == nil {
nameservers = resolvconf.GetNameserversAsCIDR(rc.Content)
}
for _, nw := range list {
if err := netutils.CheckNameserverOverlaps(nameservers, nw); err == nil {
if err := netutils.CheckRouteOverlaps(nw); err == nil {
return nw, nil
}
}
}
return nil, fmt.Errorf("no available network")
}

View file

@ -39,6 +39,9 @@ const (
// OverlayNeighborIP constant represents overlay driver neighbor IP
OverlayNeighborIP = DriverPrefix + ".overlay.neighbor_ip"
// OverlayVxlanIDList constant represents a list of VXLAN Ids as csv
OverlayVxlanIDList = DriverPrefix + ".overlay.vxlanid_list"
// Gateway represents the gateway for the network
Gateway = Prefix + ".gateway"

View file

@ -14,13 +14,6 @@ import (
"github.com/docker/libnetwork/types"
)
// constants for the IP address type
const (
IP = iota // IPv4 and IPv6
IPv4
IPv6
)
var (
// ErrNetworkOverlapsWithNameservers preformatted error
ErrNetworkOverlapsWithNameservers = errors.New("requested network overlaps with nameserver")

View file

@ -1,5 +1,4 @@
// Package ipamutils provides utililty functions for ipam management
package ipamutils
package netutils
import (
"net"

View file

@ -4,9 +4,13 @@
package netutils
import (
"fmt"
"net"
"strings"
"github.com/docker/libnetwork/ipamutils"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/resolvconf"
"github.com/docker/libnetwork/types"
"github.com/vishvananda/netlink"
)
@ -48,3 +52,66 @@ func GenerateIfaceName(prefix string, len int) (string, error) {
}
return "", types.InternalErrorf("could not generate interface name")
}
// ElectInterfaceAddresses looks for an interface on the OS with the
// specified name and returns its IPv4 and IPv6 addresses in CIDR
// form. If the interface does not exist, it chooses from a predifined
// list the first IPv4 address which does not conflict with other
// interfaces on the system.
func ElectInterfaceAddresses(name string) (*net.IPNet, []*net.IPNet, error) {
var (
v4Net *net.IPNet
v6Nets []*net.IPNet
err error
)
defer osl.InitOSContext()()
link, _ := netlink.LinkByName(name)
if link != nil {
v4addr, err := netlink.AddrList(link, netlink.FAMILY_V4)
if err != nil {
return nil, nil, err
}
v6addr, err := netlink.AddrList(link, netlink.FAMILY_V6)
if err != nil {
return nil, nil, err
}
if len(v4addr) > 0 {
v4Net = v4addr[0].IPNet
}
for _, nlAddr := range v6addr {
v6Nets = append(v6Nets, nlAddr.IPNet)
}
}
if link == nil || v4Net == nil {
// Choose from predifined broad networks
v4Net, err = FindAvailableNetwork(ipamutils.PredefinedBroadNetworks)
if err != nil {
return nil, nil, err
}
}
return v4Net, v6Nets, nil
}
// FindAvailableNetwork returns a network from the passed list which does not
// overlap with existing interfaces in the system
func FindAvailableNetwork(list []*net.IPNet) (*net.IPNet, error) {
// We don't check for an error here, because we don't really care if we
// can't read /etc/resolv.conf. So instead we skip the append if resolvConf
// is nil. It either doesn't exist, or we can't read it for some reason.
var nameservers []string
if rc, err := resolvconf.Get(); err == nil {
nameservers = resolvconf.GetNameserversAsCIDR(rc.Content)
}
for _, nw := range list {
if err := CheckNameserverOverlaps(nameservers, nw); err == nil {
if err := CheckRouteOverlaps(nw); err == nil {
return nw, nil
}
}
}
return nil, fmt.Errorf("no available network")
}

View file

@ -0,0 +1,32 @@
package netutils
// Solaris: TODO
import (
"net"
"github.com/docker/libnetwork/ipamutils"
)
// ElectInterfaceAddresses looks for an interface on the OS with the specified name
// and returns its IPv4 and IPv6 addresses in CIDR form. If the interface does not exist,
// it chooses from a predifined list the first IPv4 address which does not conflict
// with other interfaces on the system.
func ElectInterfaceAddresses(name string) (*net.IPNet, []*net.IPNet, error) {
var (
v4Net *net.IPNet
err error
)
v4Net, err = FindAvailableNetwork(ipamutils.PredefinedBroadNetworks)
if err != nil {
return nil, nil, err
}
return v4Net, nil, nil
}
// FindAvailableNetwork returns a network from the passed list which does not
// overlap with existing interfaces in the system
func FindAvailableNetwork(list []*net.IPNet) (*net.IPNet, error) {
return list[0], nil
}

View file

@ -1,5 +1,4 @@
// Package ipamutils provides utililty functions for ipam management
package ipamutils
package netutils
import (
"net"

View file

@ -171,6 +171,7 @@ type network struct {
drvOnce *sync.Once
internal bool
inDelete bool
driverTables []string
sync.Mutex
}
@ -620,49 +621,62 @@ func (n *network) processOptions(options ...NetworkOption) {
}
}
func (n *network) driverScope() string {
func (n *network) resolveDriver(name string, load bool) (driverapi.Driver, *driverapi.Capability, error) {
c := n.getController()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
d, cap := c.drvRegistry.Driver(name)
if d == nil {
if load {
var err error
err = c.loadDriver(name)
if err != nil {
return nil, nil, err
}
if !ok {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
// If driver could not be resolved simply return an empty string
return ""
d, cap = c.drvRegistry.Driver(name)
if d == nil {
return nil, nil, fmt.Errorf("could not resolve driver %s in registry", name)
}
} else {
// don't fail if driver loading is not required
return nil, nil, nil
}
}
return dd.capability.DataScope
return d, cap, nil
}
func (n *network) driverScope() string {
_, cap, err := n.resolveDriver(n.networkType, true)
if err != nil {
// If driver could not be resolved simply return an empty string
return ""
}
return cap.DataScope
}
func (n *network) driver(load bool) (driverapi.Driver, error) {
c := n.getController()
c.Lock()
// Check if a driver for the specified network type is available
dd, ok := c.drivers[n.networkType]
c.Unlock()
if !ok && load {
var err error
dd, err = c.loadDriver(n.networkType)
if err != nil {
return nil, err
}
} else if !ok {
// don't fail if driver loading is not required
return nil, nil
d, cap, err := n.resolveDriver(n.networkType, load)
if err != nil {
return nil, err
}
c := n.getController()
n.Lock()
n.scope = dd.capability.DataScope
// If load is not required, driver, cap and err may all be nil
if cap != nil {
n.scope = cap.DataScope
}
if c.cfg.Daemon.IsAgent {
// If we are running in agent mode then all networks
// in libnetwork are local scope regardless of the
// backing driver.
n.scope = datastore.LocalScope
}
n.Unlock()
return dd.driver, nil
return d, nil
}
func (n *network) Delete() error {
@ -717,6 +731,12 @@ func (n *network) delete(force bool) error {
return fmt.Errorf("error deleting network from store: %v", err)
}
n.cancelDriverWatches()
if err = n.leaveCluster(); err != nil {
log.Errorf("Failed leaving network %s from the agent cluster: %v", n.Name(), err)
}
return nil
}
@ -786,12 +806,12 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
}
}
ipam, err := n.getController().getIPAM(n.ipamType)
ipam, cap, err := n.getController().getIPAMDriver(n.ipamType)
if err != nil {
return nil, err
}
if ipam.capability.RequiresMACAddress {
if cap.RequiresMACAddress {
if ep.iface.mac == nil {
ep.iface.mac = netutils.GenerateRandomMAC()
}
@ -801,7 +821,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
ep.ipamOptions[netlabel.MacAddress] = ep.iface.mac.String()
}
if err = ep.assignAddress(ipam.driver, true, n.enableIPv6 && !n.postIPv6); err != nil {
if err = ep.assignAddress(ipam, true, n.enableIPv6 && !n.postIPv6); err != nil {
return nil, err
}
defer func() {
@ -821,7 +841,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi
}
}()
if err = ep.assignAddress(ipam.driver, false, n.enableIPv6 && n.postIPv6); err != nil {
if err = ep.assignAddress(ipam, false, n.enableIPv6 && n.postIPv6); err != nil {
return nil, err
}
@ -985,14 +1005,14 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp
c := n.getController()
c.Lock()
defer c.Unlock()
sr, ok := c.svcDb[n.ID()]
sr, ok := c.svcRecords[n.ID()]
if !ok {
sr = svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
ipMap: make(map[string]string),
}
c.svcDb[n.ID()] = sr
c.svcRecords[n.ID()] = sr
}
if ipMapUpdate {
@ -1012,7 +1032,7 @@ func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMa
c := n.getController()
c.Lock()
defer c.Unlock()
sr, ok := c.svcDb[n.ID()]
sr, ok := c.svcRecords[n.ID()]
if !ok {
return
}
@ -1037,7 +1057,7 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record {
defer n.Unlock()
var recs []etchosts.Record
sr, _ := n.ctrlr.svcDb[n.id]
sr, _ := n.ctrlr.svcRecords[n.id]
for h, ip := range sr.svcMap {
if ep != nil && strings.Split(h, ".")[0] == ep.Name() {
@ -1065,7 +1085,7 @@ func (n *network) ipamAllocate() error {
return nil
}
ipam, err := n.getController().getIpamDriver(n.ipamType)
ipam, _, err := n.getController().getIPAMDriver(n.ipamType)
if err != nil {
return err
}
@ -1091,7 +1111,53 @@ func (n *network) ipamAllocate() error {
return nil
}
return n.ipamAllocateVersion(6, ipam)
err = n.ipamAllocateVersion(6, ipam)
if err != nil {
return err
}
return nil
}
func (n *network) requestPoolHelper(ipam ipamapi.Ipam, addressSpace, preferredPool, subPool string, options map[string]string, v6 bool) (string, *net.IPNet, map[string]string, error) {
for {
poolID, pool, meta, err := ipam.RequestPool(addressSpace, preferredPool, subPool, options, v6)
if err != nil {
return "", nil, nil, err
}
// If the network belongs to global scope or the pool was
// explicitely chosen or it is invalid, do not perform the overlap check.
if n.Scope() == datastore.GlobalScope || preferredPool != "" || !types.IsIPNetValid(pool) {
return poolID, pool, meta, nil
}
// Check for overlap and if none found, we have found the right pool.
if _, err := netutils.FindAvailableNetwork([]*net.IPNet{pool}); err == nil {
return poolID, pool, meta, nil
}
// Pool obtained in this iteration is
// overlapping. Hold onto the pool and don't release
// it yet, because we don't want ipam to give us back
// the same pool over again. But make sure we still do
// a deferred release when we have either obtained a
// non-overlapping pool or ran out of pre-defined
// pools.
defer func() {
if err := ipam.ReleasePool(poolID); err != nil {
log.Warnf("Failed to release overlapping pool %s while returning from pool request helper for network %s", pool, n.Name())
}
}()
// If this is a preferred pool request and the network
// is local scope and there is a overlap, we fail the
// network creation right here. The pool will be
// released in the defer.
if preferredPool != "" {
return "", nil, nil, fmt.Errorf("requested subnet %s overlaps in the host", preferredPool)
}
}
}
func (n *network) ipamAllocateVersion(ipVer int, ipam ipamapi.Ipam) error {
@ -1130,7 +1196,7 @@ func (n *network) ipamAllocateVersion(ipVer int, ipam ipamapi.Ipam) error {
d := &IpamInfo{}
(*infoList)[i] = d
d.PoolID, d.Pool, d.Meta, err = ipam.RequestPool(n.addrSpace, cfg.PreferredPool, cfg.SubPool, n.ipamOptions, ipVer == 6)
d.PoolID, d.Pool, d.Meta, err = n.requestPoolHelper(ipam, n.addrSpace, cfg.PreferredPool, cfg.SubPool, n.ipamOptions, ipVer == 6)
if err != nil {
return err
}
@ -1189,7 +1255,7 @@ func (n *network) ipamRelease() {
if n.Type() == "host" || n.Type() == "null" {
return
}
ipam, err := n.getController().getIpamDriver(n.ipamType)
ipam, _, err := n.getController().getIPAMDriver(n.ipamType)
if err != nil {
log.Warnf("Failed to retrieve ipam driver to release address pool(s) on delete of network %s (%s): %v", n.Name(), n.ID(), err)
return
@ -1279,17 +1345,14 @@ func (n *network) getIPData(ipVer int) []driverapi.IPAMData {
}
func (n *network) deriveAddressSpace() (string, error) {
c := n.getController()
c.Lock()
ipd, ok := c.ipamDrivers[n.ipamType]
c.Unlock()
if !ok {
return "", types.NotFoundErrorf("could not find ipam driver %s to get default address space", n.ipamType)
local, global, err := n.getController().drvRegistry.IPAMDefaultAddressSpaces(n.ipamType)
if err != nil {
return "", types.NotFoundErrorf("failed to get default address space: %v", err)
}
if n.DataScope() == datastore.GlobalScope {
return ipd.defaultGlobalAddressSpace, nil
return global, nil
}
return ipd.defaultLocalAddressSpace, nil
return local, nil
}
func (n *network) Info() NetworkInfo {
@ -1382,3 +1445,11 @@ func (n *network) Labels() map[string]string {
return lbls
}
func (n *network) TableEventRegister(tableName string) error {
n.Lock()
defer n.Unlock()
n.driverTables = append(n.driverTables, tableName)
return nil
}

View file

@ -0,0 +1,127 @@
package networkdb
import (
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
type networkEventType uint8
const (
networkJoin networkEventType = 1 + iota
networkLeave
)
type networkEventData struct {
Event networkEventType
LTime serf.LamportTime
NodeName string
NetworkID string
}
type networkEventMessage struct {
id string
node string
msg []byte
}
func (m *networkEventMessage) Invalidates(other memberlist.Broadcast) bool {
otherm := other.(*networkEventMessage)
return m.id == otherm.id && m.node == otherm.node
}
func (m *networkEventMessage) Message() []byte {
return m.msg
}
func (m *networkEventMessage) Finished() {
}
func (nDB *NetworkDB) sendNetworkEvent(nid string, event networkEventType, ltime serf.LamportTime) error {
nEvent := networkEventData{
Event: event,
LTime: ltime,
NodeName: nDB.config.NodeName,
NetworkID: nid,
}
raw, err := encodeMessage(networkEventMsg, &nEvent)
if err != nil {
return err
}
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: raw,
id: nid,
node: nDB.config.NodeName,
})
return nil
}
type tableEventType uint8
const (
tableEntryCreate tableEventType = 1 + iota
tableEntryUpdate
tableEntryDelete
)
type tableEventData struct {
Event tableEventType
LTime serf.LamportTime
NetworkID string
TableName string
NodeName string
Value []byte
Key string
}
type tableEventMessage struct {
id string
tname string
key string
msg []byte
node string
}
func (m *tableEventMessage) Invalidates(other memberlist.Broadcast) bool {
otherm := other.(*tableEventMessage)
return m.id == otherm.id && m.tname == otherm.tname && m.key == otherm.key
}
func (m *tableEventMessage) Message() []byte {
return m.msg
}
func (m *tableEventMessage) Finished() {
}
func (nDB *NetworkDB) sendTableEvent(event tableEventType, nid string, tname string, key string, entry *entry) error {
tEvent := tableEventData{
Event: event,
LTime: entry.ltime,
NodeName: nDB.config.NodeName,
NetworkID: nid,
TableName: tname,
Key: key,
Value: entry.value,
}
raw, err := encodeMessage(tableEventMsg, &tEvent)
if err != nil {
return err
}
nDB.RLock()
broadcastQ := nDB.networks[nDB.config.NodeName][nid].tableBroadcasts
nDB.RUnlock()
broadcastQ.QueueBroadcast(&tableEventMessage{
msg: raw,
id: nid,
tname: tname,
key: key,
node: nDB.config.NodeName,
})
return nil
}

View file

@ -0,0 +1,446 @@
package networkdb
import (
"crypto/rand"
"fmt"
"math/big"
rnd "math/rand"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
const reapInterval = 2 * time.Second
type logWriter struct{}
func (l *logWriter) Write(p []byte) (int, error) {
str := string(p)
switch {
case strings.Contains(str, "[WARN]"):
logrus.Warn(str)
case strings.Contains(str, "[DEBUG]"):
logrus.Debug(str)
case strings.Contains(str, "[INFO]"):
logrus.Info(str)
case strings.Contains(str, "[ERR]"):
logrus.Warn(str)
}
return len(p), nil
}
func (nDB *NetworkDB) clusterInit() error {
config := memberlist.DefaultLANConfig()
config.Name = nDB.config.NodeName
config.BindAddr = nDB.config.BindAddr
if nDB.config.BindPort != 0 {
config.BindPort = nDB.config.BindPort
}
config.ProtocolVersion = memberlist.ProtocolVersionMax
config.Delegate = &delegate{nDB: nDB}
config.Events = &eventDelegate{nDB: nDB}
config.LogOutput = &logWriter{}
nDB.networkBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.nodes)
},
RetransmitMult: config.RetransmitMult,
}
mlist, err := memberlist.Create(config)
if err != nil {
return fmt.Errorf("failed to create memberlist: %v", err)
}
nDB.stopCh = make(chan struct{})
nDB.memberlist = mlist
nDB.mConfig = config
for _, trigger := range []struct {
interval time.Duration
fn func()
}{
{reapInterval, nDB.reapState},
{config.GossipInterval, nDB.gossip},
{config.PushPullInterval, nDB.bulkSyncTables},
} {
t := time.NewTicker(trigger.interval)
go nDB.triggerFunc(trigger.interval, t.C, nDB.stopCh, trigger.fn)
nDB.tickers = append(nDB.tickers, t)
}
return nil
}
func (nDB *NetworkDB) clusterJoin(members []string) error {
mlist := nDB.memberlist
if _, err := mlist.Join(members); err != nil {
return fmt.Errorf("could not join node to memberlist: %v", err)
}
return nil
}
func (nDB *NetworkDB) clusterLeave() error {
mlist := nDB.memberlist
if err := mlist.Leave(time.Second); err != nil {
return err
}
close(nDB.stopCh)
for _, t := range nDB.tickers {
t.Stop()
}
return mlist.Shutdown()
}
func (nDB *NetworkDB) triggerFunc(stagger time.Duration, C <-chan time.Time, stop <-chan struct{}, f func()) {
// Use a random stagger to avoid syncronizing
randStagger := time.Duration(uint64(rnd.Int63()) % uint64(stagger))
select {
case <-time.After(randStagger):
case <-stop:
return
}
for {
select {
case <-C:
f()
case <-stop:
return
}
}
}
func (nDB *NetworkDB) reapState() {
nDB.reapNetworks()
nDB.reapTableEntries()
}
func (nDB *NetworkDB) reapNetworks() {
now := time.Now()
nDB.Lock()
for name, nn := range nDB.networks {
for id, n := range nn {
if n.leaving && now.Sub(n.leaveTime) > reapInterval {
delete(nn, id)
nDB.deleteNetworkNode(id, name)
}
}
}
nDB.Unlock()
}
func (nDB *NetworkDB) reapTableEntries() {
var paths []string
now := time.Now()
nDB.RLock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}
if !entry.deleting || now.Sub(entry.deleteTime) <= reapInterval {
return false
}
paths = append(paths, path)
return false
})
nDB.RUnlock()
nDB.Lock()
for _, path := range paths {
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
if _, ok := nDB.indexes[byTable].Delete(fmt.Sprintf("/%s/%s/%s", tname, nid, key)); !ok {
logrus.Errorf("Could not delete entry in table %s with network id %s and key %s as it does not exist", tname, nid, key)
}
if _, ok := nDB.indexes[byNetwork].Delete(fmt.Sprintf("/%s/%s/%s", nid, tname, key)); !ok {
logrus.Errorf("Could not delete entry in network %s with table name %s and key %s as it does not exist", nid, tname, key)
}
}
nDB.Unlock()
}
func (nDB *NetworkDB) gossip() {
networkNodes := make(map[string][]string)
nDB.RLock()
for nid := range nDB.networks[nDB.config.NodeName] {
networkNodes[nid] = nDB.networkNodes[nid]
}
nDB.RUnlock()
for nid, nodes := range networkNodes {
mNodes := nDB.mRandomNodes(3, nodes)
bytesAvail := udpSendBuf - compoundHeaderOverhead
nDB.RLock()
broadcastQ := nDB.networks[nDB.config.NodeName][nid].tableBroadcasts
nDB.RUnlock()
if broadcastQ == nil {
logrus.Errorf("Invalid broadcastQ encountered while gossiping for network %s", nid)
continue
}
msgs := broadcastQ.GetBroadcasts(compoundOverhead, bytesAvail)
if len(msgs) == 0 {
continue
}
// Create a compound message
compound := makeCompoundMessage(msgs)
for _, node := range mNodes {
nDB.RLock()
mnode := nDB.nodes[node]
nDB.RUnlock()
if mnode == nil {
break
}
// Send the compound message
if err := nDB.memberlist.SendToUDP(mnode, compound.Bytes()); err != nil {
logrus.Errorf("Failed to send gossip to %s: %s", mnode.Addr, err)
}
}
}
}
type bulkSyncMessage struct {
LTime serf.LamportTime
Unsolicited bool
NodeName string
Networks []string
Payload []byte
}
func (nDB *NetworkDB) bulkSyncTables() {
var networks []string
nDB.RLock()
for nid := range nDB.networks[nDB.config.NodeName] {
networks = append(networks, nid)
}
nDB.RUnlock()
for {
if len(networks) == 0 {
break
}
nid := networks[0]
networks = networks[1:]
completed, err := nDB.bulkSync(nid, false)
if err != nil {
logrus.Errorf("periodic bulk sync failure for network %s: %v", nid, err)
continue
}
// Remove all the networks for which we have
// successfully completed bulk sync in this iteration.
updatedNetworks := make([]string, 0, len(networks))
for _, nid := range networks {
for _, completedNid := range completed {
if nid == completedNid {
continue
}
updatedNetworks = append(updatedNetworks, nid)
}
}
networks = updatedNetworks
}
}
func (nDB *NetworkDB) bulkSync(nid string, all bool) ([]string, error) {
nDB.RLock()
nodes := nDB.networkNodes[nid]
nDB.RUnlock()
if !all {
// If not all, then just pick one.
nodes = nDB.mRandomNodes(1, nodes)
}
logrus.Debugf("%s: Initiating bulk sync with nodes %v", nDB.config.NodeName, nodes)
var err error
var networks []string
for _, node := range nodes {
if node == nDB.config.NodeName {
continue
}
networks = nDB.findCommonNetworks(node)
err = nDB.bulkSyncNode(networks, node, true)
if err != nil {
err = fmt.Errorf("bulk sync failed on node %s: %v", node, err)
}
}
if err != nil {
return nil, err
}
return networks, nil
}
// Bulk sync all the table entries belonging to a set of networks to a
// single peer node. It can be unsolicited or can be in response to an
// unsolicited bulk sync
func (nDB *NetworkDB) bulkSyncNode(networks []string, node string, unsolicited bool) error {
var msgs [][]byte
logrus.Debugf("%s: Initiating bulk sync for networks %v with node %s", nDB.config.NodeName, networks, node)
nDB.RLock()
mnode := nDB.nodes[node]
if mnode == nil {
nDB.RUnlock()
return nil
}
for _, nid := range networks {
nDB.indexes[byNetwork].WalkPrefix(fmt.Sprintf("/%s", nid), func(path string, v interface{}) bool {
entry, ok := v.(*entry)
if !ok {
return false
}
params := strings.Split(path[1:], "/")
tEvent := tableEventData{
Event: tableEntryCreate,
LTime: entry.ltime,
NodeName: entry.node,
NetworkID: nid,
TableName: params[1],
Key: params[2],
Value: entry.value,
}
msg, err := encodeMessage(tableEventMsg, &tEvent)
if err != nil {
logrus.Errorf("Encode failure during bulk sync: %#v", tEvent)
return false
}
msgs = append(msgs, msg)
return false
})
}
nDB.RUnlock()
// Create a compound message
compound := makeCompoundMessage(msgs)
bsm := bulkSyncMessage{
LTime: nDB.tableClock.Time(),
Unsolicited: unsolicited,
NodeName: nDB.config.NodeName,
Networks: networks,
Payload: compound.Bytes(),
}
buf, err := encodeMessage(bulkSyncMsg, &bsm)
if err != nil {
return fmt.Errorf("failed to encode bulk sync message: %v", err)
}
nDB.Lock()
ch := make(chan struct{})
nDB.bulkSyncAckTbl[node] = ch
nDB.Unlock()
err = nDB.memberlist.SendToTCP(mnode, buf)
if err != nil {
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
nDB.Unlock()
return fmt.Errorf("failed to send a TCP message during bulk sync: %v", err)
}
startTime := time.Now()
select {
case <-time.After(30 * time.Second):
logrus.Errorf("Bulk sync to node %s timed out", node)
case <-ch:
nDB.Lock()
delete(nDB.bulkSyncAckTbl, node)
nDB.Unlock()
logrus.Debugf("%s: Bulk sync to node %s took %s", nDB.config.NodeName, node, time.Now().Sub(startTime))
}
return nil
}
// Returns a random offset between 0 and n
func randomOffset(n int) int {
if n == 0 {
return 0
}
val, err := rand.Int(rand.Reader, big.NewInt(int64(n)))
if err != nil {
logrus.Errorf("Failed to get a random offset: %v", err)
return 0
}
return int(val.Int64())
}
// mRandomNodes is used to select up to m random nodes. It is possible
// that less than m nodes are returned.
func (nDB *NetworkDB) mRandomNodes(m int, nodes []string) []string {
n := len(nodes)
mNodes := make([]string, 0, m)
OUTER:
// Probe up to 3*n times, with large n this is not necessary
// since k << n, but with small n we want search to be
// exhaustive
for i := 0; i < 3*n && len(mNodes) < m; i++ {
// Get random node
idx := randomOffset(n)
node := nodes[idx]
if node == nDB.config.NodeName {
continue
}
// Check if we have this node already
for j := 0; j < len(mNodes); j++ {
if node == mNodes[j] {
continue OUTER
}
}
// Append the node
mNodes = append(mNodes, node)
}
return mNodes
}

View file

@ -0,0 +1,315 @@
package networkdb
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/hashicorp/serf/serf"
)
type networkData struct {
LTime serf.LamportTime
ID string
NodeName string
Leaving bool
}
type networkPushPull struct {
LTime serf.LamportTime
Networks []networkData
}
type delegate struct {
nDB *NetworkDB
}
func (d *delegate) NodeMeta(limit int) []byte {
return []byte{}
}
func (nDB *NetworkDB) handleNetworkEvent(nEvent *networkEventData) bool {
// Update our local clock if the received messages has newer
// time.
nDB.networkClock.Witness(nEvent.LTime)
nDB.Lock()
defer nDB.Unlock()
nodeNetworks, ok := nDB.networks[nEvent.NodeName]
if !ok {
// We haven't heard about this node at all. Ignore the leave
if nEvent.Event == networkLeave {
return false
}
nodeNetworks = make(map[string]*network)
nDB.networks[nEvent.NodeName] = nodeNetworks
}
if n, ok := nodeNetworks[nEvent.NetworkID]; ok {
// We have the latest state. Ignore the event
// since it is stale.
if n.ltime >= nEvent.LTime {
return false
}
n.ltime = nEvent.LTime
n.leaving = nEvent.Event == networkLeave
if n.leaving {
n.leaveTime = time.Now()
}
return true
}
if nEvent.Event == networkLeave {
return false
}
// This remote network join is being seen the first time.
nodeNetworks[nEvent.NetworkID] = &network{
id: nEvent.NetworkID,
ltime: nEvent.LTime,
}
nDB.networkNodes[nEvent.NetworkID] = append(nDB.networkNodes[nEvent.NetworkID], nEvent.NodeName)
return true
}
func (nDB *NetworkDB) handleTableEvent(tEvent *tableEventData) bool {
// Update our local clock if the received messages has newer
// time.
nDB.tableClock.Witness(tEvent.LTime)
if entry, err := nDB.getEntry(tEvent.TableName, tEvent.NetworkID, tEvent.Key); err == nil {
// We have the latest state. Ignore the event
// since it is stale.
if entry.ltime >= tEvent.LTime {
return false
}
}
entry := &entry{
ltime: tEvent.LTime,
node: tEvent.NodeName,
value: tEvent.Value,
deleting: tEvent.Event == tableEntryDelete,
}
if entry.deleting {
entry.deleteTime = time.Now()
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.TableName, tEvent.NetworkID, tEvent.Key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", tEvent.NetworkID, tEvent.TableName, tEvent.Key), entry)
nDB.Unlock()
var op opType
switch tEvent.Event {
case tableEntryCreate:
op = opCreate
case tableEntryUpdate:
op = opUpdate
case tableEntryDelete:
op = opDelete
}
nDB.broadcaster.Write(makeEvent(op, tEvent.TableName, tEvent.NetworkID, tEvent.Key, tEvent.Value))
return true
}
func (nDB *NetworkDB) handleCompound(buf []byte) {
// Decode the parts
trunc, parts, err := decodeCompoundMessage(buf[1:])
if err != nil {
logrus.Errorf("Failed to decode compound request: %v", err)
return
}
// Log any truncation
if trunc > 0 {
logrus.Warnf("Compound request had %d truncated messages", trunc)
}
// Handle each message
for _, part := range parts {
nDB.handleMessage(part)
}
}
func (nDB *NetworkDB) handleTableMessage(buf []byte) {
var tEvent tableEventData
if err := decodeMessage(buf[1:], &tEvent); err != nil {
logrus.Errorf("Error decoding table event message: %v", err)
return
}
if rebroadcast := nDB.handleTableEvent(&tEvent); rebroadcast {
// Copy the buffer since we cannot rely on the slice not changing
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
nDB.RLock()
n, ok := nDB.networks[nDB.config.NodeName][tEvent.NetworkID]
nDB.RUnlock()
if !ok {
return
}
broadcastQ := n.tableBroadcasts
broadcastQ.QueueBroadcast(&tableEventMessage{
msg: newBuf,
id: tEvent.NetworkID,
tname: tEvent.TableName,
key: tEvent.Key,
node: nDB.config.NodeName,
})
}
}
func (nDB *NetworkDB) handleNetworkMessage(buf []byte) {
var nEvent networkEventData
if err := decodeMessage(buf[1:], &nEvent); err != nil {
logrus.Errorf("Error decoding network event message: %v", err)
return
}
if rebroadcast := nDB.handleNetworkEvent(&nEvent); rebroadcast {
// Copy the buffer since it we cannot rely on the slice not changing
newBuf := make([]byte, len(buf))
copy(newBuf, buf)
nDB.networkBroadcasts.QueueBroadcast(&networkEventMessage{
msg: newBuf,
id: nEvent.NetworkID,
node: nEvent.NodeName,
})
}
}
func (nDB *NetworkDB) handleBulkSync(buf []byte) {
var bsm bulkSyncMessage
if err := decodeMessage(buf[1:], &bsm); err != nil {
logrus.Errorf("Error decoding bulk sync message: %v", err)
return
}
if bsm.LTime > 0 {
nDB.tableClock.Witness(bsm.LTime)
}
nDB.handleMessage(bsm.Payload)
// Don't respond to a bulk sync which was not unsolicited
if !bsm.Unsolicited {
nDB.RLock()
ch, ok := nDB.bulkSyncAckTbl[bsm.NodeName]
nDB.RUnlock()
if ok {
close(ch)
}
return
}
if err := nDB.bulkSyncNode(bsm.Networks, bsm.NodeName, false); err != nil {
logrus.Errorf("Error in responding to bulk sync from node %s: %v", nDB.nodes[bsm.NodeName].Addr, err)
}
}
func (nDB *NetworkDB) handleMessage(buf []byte) {
msgType := messageType(buf[0])
switch msgType {
case networkEventMsg:
nDB.handleNetworkMessage(buf)
case tableEventMsg:
nDB.handleTableMessage(buf)
case compoundMsg:
nDB.handleCompound(buf)
case bulkSyncMsg:
nDB.handleBulkSync(buf)
default:
logrus.Errorf("%s: unknown message type %d payload = %v", nDB.config.NodeName, msgType, buf[:8])
}
}
func (d *delegate) NotifyMsg(buf []byte) {
if len(buf) == 0 {
return
}
d.nDB.handleMessage(buf)
}
func (d *delegate) GetBroadcasts(overhead, limit int) [][]byte {
return d.nDB.networkBroadcasts.GetBroadcasts(overhead, limit)
}
func (d *delegate) LocalState(join bool) []byte {
d.nDB.RLock()
defer d.nDB.RUnlock()
pp := networkPushPull{
LTime: d.nDB.networkClock.Time(),
}
for name, nn := range d.nDB.networks {
for _, n := range nn {
pp.Networks = append(pp.Networks, networkData{
LTime: n.ltime,
ID: n.id,
NodeName: name,
Leaving: n.leaving,
})
}
}
buf, err := encodeMessage(networkPushPullMsg, &pp)
if err != nil {
logrus.Errorf("Failed to encode local network state: %v", err)
return nil
}
return buf
}
func (d *delegate) MergeRemoteState(buf []byte, isJoin bool) {
if len(buf) == 0 {
logrus.Error("zero byte remote network state received")
return
}
if messageType(buf[0]) != networkPushPullMsg {
logrus.Errorf("Invalid message type %v received from remote", buf[0])
}
pp := networkPushPull{}
if err := decodeMessage(buf[1:], &pp); err != nil {
logrus.Errorf("Failed to decode remote network state: %v", err)
return
}
if pp.LTime > 0 {
d.nDB.networkClock.Witness(pp.LTime)
}
for _, n := range pp.Networks {
nEvent := &networkEventData{
LTime: n.LTime,
NodeName: n.NodeName,
NetworkID: n.ID,
Event: networkJoin,
}
if n.Leaving {
nEvent.Event = networkLeave
}
d.nDB.handleNetworkEvent(nEvent)
}
}

View file

@ -0,0 +1,23 @@
package networkdb
import "github.com/hashicorp/memberlist"
type eventDelegate struct {
nDB *NetworkDB
}
func (e *eventDelegate) NotifyJoin(n *memberlist.Node) {
e.nDB.Lock()
e.nDB.nodes[n.Name] = n
e.nDB.Unlock()
}
func (e *eventDelegate) NotifyLeave(n *memberlist.Node) {
e.nDB.deleteNodeTableEntries(n.Name)
e.nDB.Lock()
delete(e.nDB.nodes, n.Name)
e.nDB.Unlock()
}
func (e *eventDelegate) NotifyUpdate(n *memberlist.Node) {
}

View file

@ -0,0 +1,122 @@
package networkdb
import (
"bytes"
"encoding/binary"
"fmt"
"github.com/hashicorp/go-msgpack/codec"
)
type messageType uint8
const (
// For network join/leave event message
networkEventMsg messageType = 1 + iota
// For pushing/pulling network/node association state
networkPushPullMsg
// For table entry CRUD event message
tableEventMsg
// For building a compound message which packs many different
// message types together
compoundMsg
// For syncing table entries in bulk b/w nodes.
bulkSyncMsg
)
const (
// Max udp message size chosen to avoid network packet
// fragmentation.
udpSendBuf = 1400
// Compound message header overhead 1 byte(message type) + 4
// bytes (num messages)
compoundHeaderOverhead = 5
// Overhead for each embedded message in a compound message 2
// bytes (len of embedded message)
compoundOverhead = 2
)
func decodeMessage(buf []byte, out interface{}) error {
var handle codec.MsgpackHandle
return codec.NewDecoder(bytes.NewReader(buf), &handle).Decode(out)
}
func encodeMessage(t messageType, msg interface{}) ([]byte, error) {
buf := bytes.NewBuffer(nil)
buf.WriteByte(uint8(t))
handle := codec.MsgpackHandle{}
encoder := codec.NewEncoder(buf, &handle)
err := encoder.Encode(msg)
return buf.Bytes(), err
}
// makeCompoundMessage takes a list of messages and generates
// a single compound message containing all of them
func makeCompoundMessage(msgs [][]byte) *bytes.Buffer {
// Create a local buffer
buf := bytes.NewBuffer(nil)
// Write out the type
buf.WriteByte(uint8(compoundMsg))
// Write out the number of message
binary.Write(buf, binary.BigEndian, uint32(len(msgs)))
// Add the message lengths
for _, m := range msgs {
binary.Write(buf, binary.BigEndian, uint16(len(m)))
}
// Append the messages
for _, m := range msgs {
buf.Write(m)
}
return buf
}
// decodeCompoundMessage splits a compound message and returns
// the slices of individual messages. Also returns the number
// of truncated messages and any potential error
func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
if len(buf) < 1 {
err = fmt.Errorf("missing compound length byte")
return
}
numParts := binary.BigEndian.Uint32(buf[0:4])
buf = buf[4:]
// Check we have enough bytes
if len(buf) < int(numParts*2) {
err = fmt.Errorf("truncated len slice")
return
}
// Decode the lengths
lengths := make([]uint16, numParts)
for i := 0; i < int(numParts); i++ {
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
}
buf = buf[numParts*2:]
// Split each message
for idx, msgLen := range lengths {
if len(buf) < int(msgLen) {
trunc = int(numParts) - idx
return
}
// Extract the slice, seek past on the buffer
slice := buf[:msgLen]
buf = buf[msgLen:]
parts = append(parts, slice)
}
return
}

View file

@ -0,0 +1,424 @@
package networkdb
import (
"fmt"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/armon/go-radix"
"github.com/docker/go-events"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/serf/serf"
)
const (
byTable int = 1 + iota
byNetwork
)
// NetworkDB instance drives the networkdb cluster and acts the broker
// for cluster-scoped and network-scoped gossip and watches.
type NetworkDB struct {
sync.RWMutex
// NetworkDB configuration.
config *Config
// local copy of memberlist config that we use to driver
// network scoped gossip and bulk sync.
mConfig *memberlist.Config
// All the tree index (byTable, byNetwork) that we maintain
// the db.
indexes map[int]*radix.Tree
// Memberlist we use to drive the cluster.
memberlist *memberlist.Memberlist
// List of all peer nodes in the cluster not-limited to any
// network.
nodes map[string]*memberlist.Node
// A multi-dimensional map of network/node attachmemts. The
// first key is a node name and the second key is a network ID
// for the network that node is participating in.
networks map[string]map[string]*network
// A map of nodes which are participating in a given
// network. The key is a network ID.
networkNodes map[string][]string
// A table of ack channels for every node from which we are
// waiting for an ack.
bulkSyncAckTbl map[string]chan struct{}
// Global lamport clock for node network attach events.
networkClock serf.LamportClock
// Global lamport clock for table events.
tableClock serf.LamportClock
// Broadcast queue for network event gossip.
networkBroadcasts *memberlist.TransmitLimitedQueue
// A central stop channel to stop all go routines running on
// behalf of the NetworkDB instance.
stopCh chan struct{}
// A central broadcaster for all local watchers watching table
// events.
broadcaster *events.Broadcaster
// List of all tickers which needed to be stopped when
// cleaning up.
tickers []*time.Ticker
}
// network describes the node/network attachment.
type network struct {
// Network ID
id string
// Lamport time for the latest state of the entry.
ltime serf.LamportTime
// Node leave is in progress.
leaving bool
// The time this node knew about the node's network leave.
leaveTime time.Time
// The broadcast queue for table event gossip. This is only
// initialized for this node's network attachment entries.
tableBroadcasts *memberlist.TransmitLimitedQueue
}
// Config represents the configuration of the networdb instance and
// can be passed by the caller.
type Config struct {
// NodeName is the cluster wide unique name for this node.
NodeName string
// BindAddr is the local node's IP address that we bind to for
// cluster communication.
BindAddr string
// BindPort is the local node's port to which we bind to for
// cluster communication.
BindPort int
}
// entry defines a table entry
type entry struct {
// node from which this entry was learned.
node string
// Lamport time for the most recent update to the entry
ltime serf.LamportTime
// Opaque value store in the entry
value []byte
// Deleting the entry is in progress. All entries linger in
// the cluster for certain amount of time after deletion.
deleting bool
// The wall clock time when this node learned about this deletion.
deleteTime time.Time
}
// New creates a new instance of NetworkDB using the Config passed by
// the caller.
func New(c *Config) (*NetworkDB, error) {
nDB := &NetworkDB{
config: c,
indexes: make(map[int]*radix.Tree),
networks: make(map[string]map[string]*network),
nodes: make(map[string]*memberlist.Node),
networkNodes: make(map[string][]string),
bulkSyncAckTbl: make(map[string]chan struct{}),
broadcaster: events.NewBroadcaster(),
}
nDB.indexes[byTable] = radix.New()
nDB.indexes[byNetwork] = radix.New()
if err := nDB.clusterInit(); err != nil {
return nil, err
}
return nDB, nil
}
// Join joins this NetworkDB instance with a list of peer NetworkDB
// instances passed by the caller in the form of addr:port
func (nDB *NetworkDB) Join(members []string) error {
return nDB.clusterJoin(members)
}
// Close destroys this NetworkDB instance by leave the cluster,
// stopping timers, canceling goroutines etc.
func (nDB *NetworkDB) Close() {
if err := nDB.clusterLeave(); err != nil {
logrus.Errorf("Could not close DB %s: %v", nDB.config.NodeName, err)
}
}
// GetEntry retrieves the value of a table entry in a given (network,
// table, key) tuple
func (nDB *NetworkDB) GetEntry(tname, nid, key string) ([]byte, error) {
entry, err := nDB.getEntry(tname, nid, key)
if err != nil {
return nil, err
}
return entry.value, nil
}
func (nDB *NetworkDB) getEntry(tname, nid, key string) (*entry, error) {
nDB.RLock()
defer nDB.RUnlock()
e, ok := nDB.indexes[byTable].Get(fmt.Sprintf("/%s/%s/%s", tname, nid, key))
if !ok {
return nil, fmt.Errorf("could not get entry in table %s with network id %s and key %s", tname, nid, key)
}
return e.(*entry), nil
}
// CreateEntry creates a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster. It is an error to create an
// entry for the same tuple for which there is already an existing
// entry.
func (nDB *NetworkDB) CreateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err == nil {
return fmt.Errorf("cannot create entry as the entry in table %s with network id %s and key %s already exists", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
}
if err := nDB.sendTableEvent(tableEntryCreate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table create event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opCreate, tname, nid, key, value))
return nil
}
// UpdateEntry updates a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster. It is an error to update a
// non-existent entry.
func (nDB *NetworkDB) UpdateEntry(tname, nid, key string, value []byte) error {
if _, err := nDB.GetEntry(tname, nid, key); err != nil {
return fmt.Errorf("cannot update entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
}
if err := nDB.sendTableEvent(tableEntryUpdate, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table update event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opUpdate, tname, nid, key, value))
return nil
}
// DeleteEntry deletes a table entry in NetworkDB for given (network,
// table, key) tuple and if the NetworkDB is part of the cluster
// propogates this event to the cluster.
func (nDB *NetworkDB) DeleteEntry(tname, nid, key string) error {
value, err := nDB.GetEntry(tname, nid, key)
if err != nil {
return fmt.Errorf("cannot delete entry as the entry in table %s with network id %s and key %s does not exist", tname, nid, key)
}
entry := &entry{
ltime: nDB.tableClock.Increment(),
node: nDB.config.NodeName,
value: value,
deleting: true,
deleteTime: time.Now(),
}
if err := nDB.sendTableEvent(tableEntryDelete, nid, tname, key, entry); err != nil {
return fmt.Errorf("cannot send table delete event: %v", err)
}
nDB.Lock()
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
nDB.Unlock()
nDB.broadcaster.Write(makeEvent(opDelete, tname, nid, key, value))
return nil
}
func (nDB *NetworkDB) deleteNodeTableEntries(node string) {
nDB.Lock()
nDB.indexes[byTable].Walk(func(path string, v interface{}) bool {
oldEntry := v.(*entry)
if oldEntry.node != node {
return false
}
params := strings.Split(path[1:], "/")
tname := params[0]
nid := params[1]
key := params[2]
entry := &entry{
ltime: oldEntry.ltime,
node: node,
value: oldEntry.value,
deleting: true,
deleteTime: time.Now(),
}
nDB.indexes[byTable].Insert(fmt.Sprintf("/%s/%s/%s", tname, nid, key), entry)
nDB.indexes[byNetwork].Insert(fmt.Sprintf("/%s/%s/%s", nid, tname, key), entry)
return false
})
nDB.Unlock()
}
// WalkTable walks a single table in NetworkDB and invokes the passed
// function for each entry in the table passing the network, key,
// value. The walk stops if the passed function returns a true.
func (nDB *NetworkDB) WalkTable(tname string, fn func(string, string, []byte) bool) error {
nDB.RLock()
values := make(map[string]interface{})
nDB.indexes[byTable].WalkPrefix(fmt.Sprintf("/%s", tname), func(path string, v interface{}) bool {
values[path] = v
return false
})
nDB.RUnlock()
for k, v := range values {
params := strings.Split(k[1:], "/")
nid := params[1]
key := params[2]
if fn(nid, key, v.(*entry).value) {
return nil
}
}
return nil
}
// JoinNetwork joins this node to a given network and propogates this
// event across the cluster. This triggers this node joining the
// sub-cluster of this network and participates in the network-scoped
// gossip and bulk sync for this network.
func (nDB *NetworkDB) JoinNetwork(nid string) error {
ltime := nDB.networkClock.Increment()
nDB.Lock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
nodeNetworks = make(map[string]*network)
nDB.networks[nDB.config.NodeName] = nodeNetworks
}
nodeNetworks[nid] = &network{id: nid, ltime: ltime}
nodeNetworks[nid].tableBroadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: func() int {
return len(nDB.networkNodes[nid])
},
RetransmitMult: 4,
}
nDB.networkNodes[nid] = append(nDB.networkNodes[nid], nDB.config.NodeName)
nDB.Unlock()
if err := nDB.sendNetworkEvent(nid, networkJoin, ltime); err != nil {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
}
logrus.Debugf("%s: joined network %s", nDB.config.NodeName, nid)
if _, err := nDB.bulkSync(nid, true); err != nil {
logrus.Errorf("Error bulk syncing while joining network %s: %v", nid, err)
}
return nil
}
// LeaveNetwork leaves this node from a given network and propogates
// this event across the cluster. This triggers this node leaving the
// sub-cluster of this network and as a result will no longer
// participate in the network-scoped gossip and bulk sync for this
// network.
func (nDB *NetworkDB) LeaveNetwork(nid string) error {
ltime := nDB.networkClock.Increment()
if err := nDB.sendNetworkEvent(nid, networkLeave, ltime); err != nil {
return fmt.Errorf("failed to send leave network event for %s: %v", nid, err)
}
nDB.Lock()
defer nDB.Unlock()
nodeNetworks, ok := nDB.networks[nDB.config.NodeName]
if !ok {
return fmt.Errorf("could not find self node for network %s while trying to leave", nid)
}
n, ok := nodeNetworks[nid]
if !ok {
return fmt.Errorf("could not find network %s while trying to leave", nid)
}
n.ltime = ltime
n.leaving = true
return nil
}
// Deletes the node from the list of nodes which participate in the
// passed network. Caller should hold the NetworkDB lock while calling
// this
func (nDB *NetworkDB) deleteNetworkNode(nid string, nodeName string) {
nodes := nDB.networkNodes[nid]
for i, name := range nodes {
if name == nodeName {
nodes[i] = nodes[len(nodes)-1]
nodes = nodes[:len(nodes)-1]
break
}
}
nDB.networkNodes[nid] = nodes
}
// findCommonnetworks find the networks that both this node and the
// passed node have joined.
func (nDB *NetworkDB) findCommonNetworks(nodeName string) []string {
nDB.RLock()
defer nDB.RUnlock()
var networks []string
for nid := range nDB.networks[nDB.config.NodeName] {
if _, ok := nDB.networks[nodeName][nid]; ok {
networks = append(networks, nid)
}
}
return networks
}

View file

@ -0,0 +1,98 @@
package networkdb
import "github.com/docker/go-events"
type opType uint8
const (
opCreate opType = 1 + iota
opUpdate
opDelete
)
type event struct {
Table string
NetworkID string
Key string
Value []byte
}
// CreateEvent generates a table entry create event to the watchers
type CreateEvent event
// UpdateEvent generates a table entry update event to the watchers
type UpdateEvent event
// DeleteEvent generates a table entry delete event to the watchers
type DeleteEvent event
// Watch creates a watcher with filters for a particular table or
// network or key or any combination of the tuple. If any of the
// filter is an empty string it acts as a wildcard for that
// field. Watch returns a channel of events, where the events will be
// sent.
func (nDB *NetworkDB) Watch(tname, nid, key string) (chan events.Event, func()) {
var matcher events.Matcher
if tname != "" || nid != "" || key != "" {
matcher = events.MatcherFunc(func(ev events.Event) bool {
var evt event
switch ev := ev.(type) {
case CreateEvent:
evt = event(ev)
case UpdateEvent:
evt = event(ev)
case DeleteEvent:
evt = event(ev)
}
if tname != "" && evt.Table != tname {
return false
}
if nid != "" && evt.NetworkID != nid {
return false
}
if key != "" && evt.Key != key {
return false
}
return true
})
}
ch := events.NewChannel(0)
sink := events.Sink(events.NewQueue(ch))
if matcher != nil {
sink = events.NewFilter(sink, matcher)
}
nDB.broadcaster.Add(sink)
return ch.C, func() {
nDB.broadcaster.Remove(sink)
ch.Close()
sink.Close()
}
}
func makeEvent(op opType, tname, nid, key string, value []byte) events.Event {
ev := event{
Table: tname,
NetworkID: nid,
Key: key,
Value: value,
}
switch op {
case opCreate:
return CreateEvent(ev)
case opUpdate:
return UpdateEvent(ev)
case opDelete:
return DeleteEvent(ev)
}
return nil
}

View file

@ -0,0 +1,4 @@
package osl
// IfaceOption is a function option type to set interface options
type IfaceOption func()

View file

@ -0,0 +1,4 @@
package osl
// NeighOption is a function option type to set interface options
type NeighOption func()

View file

@ -142,7 +142,7 @@ func (n *networkNamespace) SetGatewayIPv6(gwv6 net.IP) error {
err := programGateway(n.nsPath(), gwv6, true)
if err == nil {
n.SetGatewayIPv6(gwv6)
n.setGatewayIPv6(gwv6)
}
return err

View file

@ -10,8 +10,8 @@ import (
"github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/ioutils"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/resolvconf/dns"
"github.com/docker/libnetwork/types"
)
var (
@ -122,7 +122,7 @@ func FilterResolvDNS(resolvConf []byte, ipv6Enabled bool) (*File, error) {
}
// if the resulting resolvConf has no more nameservers defined, add appropriate
// default DNS servers for IPv4 and (optionally) IPv6
if len(GetNameservers(cleanedResolvConf, netutils.IP)) == 0 {
if len(GetNameservers(cleanedResolvConf, types.IP)) == 0 {
logrus.Infof("No non-localhost DNS nameservers are left in resolv.conf. Using default external servers : %v", defaultIPv4Dns)
dns := defaultIPv4Dns
if ipv6Enabled {
@ -158,11 +158,11 @@ func GetNameservers(resolvConf []byte, kind int) []string {
nameservers := []string{}
for _, line := range getLines(resolvConf, []byte("#")) {
var ns [][]byte
if kind == netutils.IP {
if kind == types.IP {
ns = nsRegexp.FindSubmatch(line)
} else if kind == netutils.IPv4 {
} else if kind == types.IPv4 {
ns = nsIPv4Regexpmatch.FindSubmatch(line)
} else if kind == netutils.IPv6 {
} else if kind == types.IPv6 {
ns = nsIPv6Regexpmatch.FindSubmatch(line)
}
if len(ns) > 0 {
@ -177,7 +177,7 @@ func GetNameservers(resolvConf []byte, kind int) []string {
// This function's output is intended for net.ParseCIDR
func GetNameserversAsCIDR(resolvConf []byte) []string {
nameservers := []string{}
for _, nameserver := range GetNameservers(resolvConf, netutils.IP) {
for _, nameserver := range GetNameservers(resolvConf, types.IP) {
nameservers = append(nameservers, nameserver+"/32")
}
return nameservers

View file

@ -9,8 +9,7 @@ import (
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/iptables"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/types"
"github.com/miekg/dns"
)
@ -47,7 +46,7 @@ const (
maxExtDNS = 3 //max number of external servers to try
extIOTimeout = 4 * time.Second
defaultRespSize = 512
maxConcurrent = 50
maxConcurrent = 100
logInterval = 2 * time.Second
maxDNSID = 65536
)
@ -105,8 +104,6 @@ func (r *resolver) SetupFunc() func() {
r.err = fmt.Errorf("error in opening name server socket %v", err)
return
}
laddr := r.conn.LocalAddr()
_, ipPort, _ := net.SplitHostPort(laddr.String())
// Listen on a TCP as well
tcpaddr := &net.TCPAddr{
@ -118,21 +115,6 @@ func (r *resolver) SetupFunc() func() {
r.err = fmt.Errorf("error in opening name TCP server socket %v", err)
return
}
ltcpaddr := r.tcpListen.Addr()
_, tcpPort, _ := net.SplitHostPort(ltcpaddr.String())
rules := [][]string{
{"-t", "nat", "-A", "OUTPUT", "-d", resolverIP, "-p", "udp", "--dport", dnsPort, "-j", "DNAT", "--to-destination", laddr.String()},
{"-t", "nat", "-A", "POSTROUTING", "-s", resolverIP, "-p", "udp", "--sport", ipPort, "-j", "SNAT", "--to-source", ":" + dnsPort},
{"-t", "nat", "-A", "OUTPUT", "-d", resolverIP, "-p", "tcp", "--dport", dnsPort, "-j", "DNAT", "--to-destination", ltcpaddr.String()},
{"-t", "nat", "-A", "POSTROUTING", "-s", resolverIP, "-p", "tcp", "--sport", tcpPort, "-j", "SNAT", "--to-source", ":" + dnsPort},
}
for _, rule := range rules {
r.err = iptables.RawCombinedOutputNative(rule...)
if r.err != nil {
return
}
}
r.err = nil
})
}
@ -142,6 +124,11 @@ func (r *resolver) Start() error {
if r.err != nil {
return r.err
}
if err := r.setupIPTable(); err != nil {
return fmt.Errorf("setting up IP table rules failed: %v", err)
}
s := &dns.Server{Handler: r, PacketConn: r.conn}
r.server = s
go func() {
@ -240,7 +227,7 @@ func (r *resolver) handleIPQuery(name string, query *dns.Msg, ipType int) (*dns.
if len(addr) > 1 {
addr = shuffleAddr(addr)
}
if ipType == netutils.IPv4 {
if ipType == types.IPv4 {
for _, ip := range addr {
rr := new(dns.A)
rr.Hdr = dns.RR_Header{Name: name, Rrtype: dns.TypeA, Class: dns.ClassINET, Ttl: respTTL}
@ -305,6 +292,7 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
extConn net.Conn
resp *dns.Msg
err error
writer dns.ResponseWriter
)
if query == nil || len(query.Question) == 0 {
@ -312,9 +300,9 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
}
name := query.Question[0].Name
if query.Question[0].Qtype == dns.TypeA {
resp, err = r.handleIPQuery(name, query, netutils.IPv4)
resp, err = r.handleIPQuery(name, query, types.IPv4)
} else if query.Question[0].Qtype == dns.TypeAAAA {
resp, err = r.handleIPQuery(name, query, netutils.IPv6)
resp, err = r.handleIPQuery(name, query, types.IPv6)
} else if query.Question[0].Qtype == dns.TypePTR {
resp, err = r.handlePTRQuery(name, query)
}
@ -342,7 +330,9 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
if resp.Len() > maxSize {
truncateResp(resp, maxSize, proto == "tcp")
}
writer = w
} else {
queryID := query.Id
for i := 0; i < maxExtDNS; i++ {
extDNS := &r.extDNSList[i]
if extDNS.ipStr == "" {
@ -388,11 +378,11 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
// forwardQueryStart stores required context to mux multiple client queries over
// one connection; and limits the number of outstanding concurrent queries.
if r.forwardQueryStart(w, query) == false {
if r.forwardQueryStart(w, query, queryID) == false {
old := r.tStamp
r.tStamp = time.Now()
if r.tStamp.Sub(old) > logInterval {
log.Errorf("More than %v concurrent queries from %s", maxConcurrent, w.LocalAddr().String())
log.Errorf("More than %v concurrent queries from %s", maxConcurrent, extConn.LocalAddr().String())
}
continue
}
@ -418,32 +408,33 @@ func (r *resolver) ServeDNS(w dns.ResponseWriter, query *dns.Msg) {
// Retrieves the context for the forwarded query and returns the client connection
// to send the reply to
w = r.forwardQueryEnd(w, resp)
if w == nil {
writer = r.forwardQueryEnd(w, resp)
if writer == nil {
continue
}
resp.Compress = true
break
}
if resp == nil || w == nil {
if resp == nil || writer == nil {
return
}
}
err = w.WriteMsg(resp)
if err != nil {
if writer == nil {
return
}
if err = writer.WriteMsg(resp); err != nil {
log.Errorf("error writing resolver resp, %s", err)
}
}
func (r *resolver) forwardQueryStart(w dns.ResponseWriter, msg *dns.Msg) bool {
func (r *resolver) forwardQueryStart(w dns.ResponseWriter, msg *dns.Msg, queryID uint16) bool {
proto := w.LocalAddr().Network()
dnsID := uint16(rand.Intn(maxDNSID))
cc := clientConn{
dnsID: msg.Id,
dnsID: queryID,
respWriter: w,
}
@ -462,7 +453,7 @@ func (r *resolver) forwardQueryStart(w dns.ResponseWriter, msg *dns.Msg) bool {
for ok := true; ok == true; dnsID = uint16(rand.Intn(maxDNSID)) {
_, ok = r.client[dnsID]
}
log.Debugf("client dns id %v, changed id %v", msg.Id, dnsID)
log.Debugf("client dns id %v, changed id %v", queryID, dnsID)
r.client[dnsID] = cc
msg.Id = dnsID
default:
@ -497,6 +488,7 @@ func (r *resolver) forwardQueryEnd(w dns.ResponseWriter, msg *dns.Msg) dns.Respo
log.Debugf("Can't retrieve client context for dns id %v", msg.Id)
return nil
}
log.Debugf("dns msg id %v, client id %v", msg.Id, cc.dnsID)
delete(r.client, msg.Id)
msg.Id = cc.dnsID
w = cc.respWriter

View file

@ -0,0 +1,77 @@
// +build !windows
package libnetwork
import (
"fmt"
"net"
"os"
"os/exec"
"runtime"
log "github.com/Sirupsen/logrus"
"github.com/docker/docker/pkg/reexec"
"github.com/docker/libnetwork/iptables"
"github.com/vishvananda/netns"
)
func init() {
reexec.Register("setup-resolver", reexecSetupResolver)
}
func reexecSetupResolver() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
if len(os.Args) < 4 {
log.Error("invalid number of arguments..")
os.Exit(1)
}
_, ipPort, _ := net.SplitHostPort(os.Args[2])
_, tcpPort, _ := net.SplitHostPort(os.Args[3])
rules := [][]string{
{"-t", "nat", "-A", "OUTPUT", "-d", resolverIP, "-p", "udp", "--dport", dnsPort, "-j", "DNAT", "--to-destination", os.Args[2]},
{"-t", "nat", "-A", "POSTROUTING", "-s", resolverIP, "-p", "udp", "--sport", ipPort, "-j", "SNAT", "--to-source", ":" + dnsPort},
{"-t", "nat", "-A", "OUTPUT", "-d", resolverIP, "-p", "tcp", "--dport", dnsPort, "-j", "DNAT", "--to-destination", os.Args[3]},
{"-t", "nat", "-A", "POSTROUTING", "-s", resolverIP, "-p", "tcp", "--sport", tcpPort, "-j", "SNAT", "--to-source", ":" + dnsPort},
}
f, err := os.OpenFile(os.Args[1], os.O_RDONLY, 0)
if err != nil {
log.Errorf("failed get network namespace %q: %v", os.Args[1], err)
os.Exit(2)
}
defer f.Close()
nsFD := f.Fd()
if err = netns.Set(netns.NsHandle(nsFD)); err != nil {
log.Errorf("setting into container net ns %v failed, %v", os.Args[1], err)
os.Exit(3)
}
for _, rule := range rules {
if iptables.RawCombinedOutputNative(rule...) != nil {
log.Errorf("setting up rule failed, %v", rule)
}
}
}
func (r *resolver) setupIPTable() error {
if r.err != nil {
return r.err
}
laddr := r.conn.LocalAddr().String()
ltcpaddr := r.tcpListen.Addr().String()
cmd := &exec.Cmd{
Path: reexec.Self(),
Args: append([]string{"setup-resolver"}, r.sb.Key(), laddr, ltcpaddr),
Stdout: os.Stdout,
Stderr: os.Stderr,
}
if err := cmd.Run(); err != nil {
return fmt.Errorf("reexec failed: %v", err)
}
return nil
}

View file

@ -0,0 +1,7 @@
// +build windows
package libnetwork
func (r *resolver) setupIPTable() error {
return nil
}

View file

@ -12,7 +12,6 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/etchosts"
"github.com/docker/libnetwork/netlabel"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/osl"
"github.com/docker/libnetwork/types"
)
@ -406,7 +405,7 @@ func (sb *sandbox) ResolveIP(ip string) string {
for _, ep := range sb.getConnectedEndpoints() {
n := ep.getNetwork()
sr, ok := n.getController().svcDb[n.ID()]
sr, ok := n.getController().svcRecords[n.ID()]
if !ok {
continue
}
@ -436,6 +435,7 @@ func (sb *sandbox) ResolveName(name string, ipType int) ([]net.IP, bool) {
// {a.b in network c.d},
// {a in network b.c.d},
log.Debugf("Name To resolve: %v", name)
name = strings.TrimSuffix(name, ".")
reqName := []string{name}
networkName := []string{""}
@ -456,7 +456,6 @@ func (sb *sandbox) ResolveName(name string, ipType int) ([]net.IP, bool) {
epList := sb.getConnectedEndpoints()
for i := 0; i < len(reqName); i++ {
log.Debugf("To resolve: %v in %v", reqName[i], networkName[i])
// First check for local container alias
ip, ipv6Miss := sb.resolveName(reqName[i], networkName[i], epList, true, ipType)
@ -513,7 +512,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin
ep.Unlock()
}
sr, ok := n.getController().svcDb[n.ID()]
sr, ok := n.getController().svcRecords[n.ID()]
if !ok {
continue
}
@ -522,7 +521,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin
n.Lock()
ip, ok = sr.svcMap[name]
if ipType == netutils.IPv6 {
if ipType == types.IPv6 {
// If the name resolved to v4 address then its a valid name in
// the docker network domain. If the network is not v6 enabled
// set ipv6Miss to filter the DNS query from going to external
@ -972,6 +971,14 @@ func (eh epHeap) Less(i, j int) bool {
return true
}
if epi.getNetwork().Internal() {
return false
}
if epj.getNetwork().Internal() {
return true
}
if ci != nil {
cip, ok = ci.epPriority[eh[i].ID()]
if !ok {

View file

@ -11,7 +11,6 @@ import (
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/etchosts"
"github.com/docker/libnetwork/netutils"
"github.com/docker/libnetwork/resolvconf"
"github.com/docker/libnetwork/types"
)
@ -91,6 +90,10 @@ func (sb *sandbox) buildHostsFile() error {
func (sb *sandbox) updateHostsFile(ifaceIP string) error {
var mhost string
if ifaceIP == "" {
return nil
}
if sb.config.originHostsPath != "" {
return nil
}
@ -166,7 +169,7 @@ func (sb *sandbox) setupDNS() error {
if len(sb.config.dnsList) > 0 || len(sb.config.dnsSearchList) > 0 || len(sb.config.dnsOptionsList) > 0 {
var (
err error
dnsList = resolvconf.GetNameservers(currRC.Content, netutils.IP)
dnsList = resolvconf.GetNameservers(currRC.Content, types.IP)
dnsSearchList = resolvconf.GetSearchDomains(currRC.Content)
dnsOptionsList = resolvconf.GetOptions(currRC.Content)
)
@ -275,7 +278,7 @@ func (sb *sandbox) rebuildDNS() error {
// localhost entries have already been filtered out from the list
// retain only the v4 servers in sb for forwarding the DNS queries
sb.extDNS = resolvconf.GetNameservers(currRC.Content, netutils.IPv4)
sb.extDNS = resolvconf.GetNameservers(currRC.Content, types.IPv4)
var (
dnsList = []string{sb.resolver.NameServer()}
@ -284,7 +287,7 @@ func (sb *sandbox) rebuildDNS() error {
)
// external v6 DNS servers has to be listed in resolv.conf
dnsList = append(dnsList, resolvconf.GetNameservers(currRC.Content, netutils.IPv6)...)
dnsList = append(dnsList, resolvconf.GetNameservers(currRC.Content, types.IPv6)...)
// Resolver returns the options in the format resolv.conf expects
dnsOptionsList = append(dnsOptionsList, sb.resolver.ResolverOptions()...)

View file

@ -0,0 +1,45 @@
// +build solaris
package libnetwork
import (
"io"
"net"
"github.com/docker/libnetwork/types"
)
// processSetKeyReexec is a private function that must be called only on an reexec path
// It expects 3 args { [0] = "libnetwork-setkey", [1] = <container-id>, [2] = <controller-id> }
// It also expects libcontainer.State as a json string in <stdin>
// Refer to https://github.com/opencontainers/runc/pull/160/ for more information
func processSetKeyReexec() {
}
// SetExternalKey provides a convenient way to set an External key to a sandbox
func SetExternalKey(controllerID string, containerID string, key string) error {
return types.NotImplementedErrorf("SetExternalKey isn't supported on non linux systems")
}
func sendKey(c net.Conn, data setKeyData) error {
return types.NotImplementedErrorf("sendKey isn't supported on non linux systems")
}
func processReturn(r io.Reader) error {
return types.NotImplementedErrorf("processReturn isn't supported on non linux systems")
}
// no-op on non linux systems
func (c *controller) startExternalKeyListener() error {
return nil
}
func (c *controller) acceptClientConnections(sock string, l net.Listener) {
}
func (c *controller) processExternalKey(conn net.Conn) error {
return types.NotImplementedErrorf("processExternalKey isn't supported on non linux systems")
}
func (c *controller) stopExternalKeyListener() {
}

View file

@ -213,7 +213,7 @@ func (c *controller) sandboxCleanup() {
var ep *endpoint
if err != nil {
logrus.Errorf("getNetworkFromStore for nid %s failed while trying to build sandbox for cleanup: %v", eps.Nid, err)
n = &network{id: eps.Nid, ctrlr: c, drvOnce: &sync.Once{}}
n = &network{id: eps.Nid, ctrlr: c, drvOnce: &sync.Once{}, persist: true}
ep = &endpoint{id: eps.Eid, network: n, sandboxID: sbs.ID}
} else {
ep, err = n.getEndpointFromStore(eps.Eid)

View file

@ -0,0 +1,80 @@
package libnetwork
import "net"
type service struct {
name string
id string
backEnds map[string]map[string]net.IP
}
func newService(name string, id string) *service {
return &service{
name: name,
id: id,
backEnds: make(map[string]map[string]net.IP),
}
}
func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error {
var s *service
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
s = newService(name, sid)
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
netBackEnds = make(map[string]net.IP)
s.backEnds[nid] = netBackEnds
}
netBackEnds[eid] = ip
c.serviceBindings[sid] = s
c.Unlock()
n.(*network).addSvcRecords(name, ip, nil, false)
return nil
}
func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error {
n, err := c.NetworkByID(nid)
if err != nil {
return err
}
c.Lock()
s, ok := c.serviceBindings[sid]
if !ok {
c.Unlock()
return nil
}
netBackEnds, ok := s.backEnds[nid]
if !ok {
c.Unlock()
return nil
}
delete(netBackEnds, eid)
if len(netBackEnds) == 0 {
delete(s.backEnds, nid)
}
if len(s.backEnds) == 0 {
delete(c.serviceBindings, sid)
}
c.Unlock()
n.(*network).deleteSvcRecords(name, ip, nil, false)
return err
}

View file

@ -4,9 +4,20 @@ import (
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store/boltdb"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/zookeeper"
"github.com/docker/libnetwork/datastore"
)
func registerKVStores() {
consul.Register()
zookeeper.Register()
etcd.Register()
boltdb.Register()
}
func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) error {
store, err := datastore.NewDataStore(scope, scfg)
if err != nil {
@ -20,6 +31,8 @@ func (c *controller) initScopedStore(scope string, scfg *datastore.ScopeCfg) err
}
func (c *controller) initStores() error {
registerKVStores()
c.Lock()
if c.cfg == nil {
c.Unlock()
@ -208,8 +221,7 @@ func (n *network) getEndpointsFromStore() ([]*endpoint, error) {
func (c *controller) updateToStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Warnf("datastore for scope %s not initialized. kv object %s is not added to the store", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
}
if err := cs.PutObjectAtomic(kvObject); err != nil {
@ -225,8 +237,7 @@ func (c *controller) updateToStore(kvObject datastore.KVObject) error {
func (c *controller) deleteFromStore(kvObject datastore.KVObject) error {
cs := c.getStore(kvObject.DataScope())
if cs == nil {
log.Debugf("datastore for scope %s not initialized. kv object %s is not deleted from datastore", kvObject.DataScope(), datastore.Key(kvObject.Key()...))
return nil
return fmt.Errorf("datastore for scope %q is not initialized ", kvObject.DataScope())
}
retry:
@ -407,7 +418,7 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi
// This is the last container going away for the network. Destroy
// this network's svc db entry
delete(c.svcDb, ep.getNetwork().ID())
delete(c.svcRecords, ep.getNetwork().ID())
delete(nmap, ep.getNetwork().ID())
}

View file

@ -9,6 +9,13 @@ import (
"strings"
)
// constants for the IP address type
const (
IP = iota // IPv4 and IPv6
IPv4
IPv6
)
// UUID represents a globally unique ID of various resources like network and endpoint
type UUID string
@ -323,6 +330,12 @@ func GetMinimalIPNet(nw *net.IPNet) *net.IPNet {
return nw
}
// IsIPNetValid returns true if the ipnet is a valid network/mask
// combination. Otherwise returns false.
func IsIPNetValid(nw *net.IPNet) bool {
return nw.String() != "0.0.0.0/0"
}
var v4inV6MaskPrefix = []byte{0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff}
// compareIPMask checks if the passed ip and mask are semantically compatible.