mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
9f3d1ce3ff
The configuration format for docker runtime is based on daemon flags and hence adjusting the libnetwork configuration to accomodate it by moving the TOML based configuration to the dnet tool. Also changed the controller configuration via options Signed-off-by: Madhu Venugopal <madhu@docker.com>
299 lines
6.1 KiB
Go
299 lines
6.1 KiB
Go
package libnetwork
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/docker/libnetwork/datastore"
|
|
"github.com/docker/libnetwork/types"
|
|
)
|
|
|
|
func (c *controller) validateDatastoreConfig() bool {
|
|
if c.cfg == nil || c.cfg.Datastore.Client.Provider == "" || c.cfg.Datastore.Client.Address == "" {
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (c *controller) initDataStore() error {
|
|
c.Lock()
|
|
cfg := c.cfg
|
|
c.Unlock()
|
|
if !c.validateDatastoreConfig() {
|
|
return fmt.Errorf("datastore initialization requires a valid configuration")
|
|
}
|
|
|
|
store, err := datastore.NewDataStore(&cfg.Datastore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
c.store = store
|
|
c.Unlock()
|
|
return c.watchStore()
|
|
}
|
|
|
|
func (c *controller) newNetworkFromStore(n *network) error {
|
|
n.Lock()
|
|
n.ctrlr = c
|
|
n.endpoints = endpointTable{}
|
|
n.Unlock()
|
|
|
|
return c.addNetwork(n)
|
|
}
|
|
|
|
func (c *controller) updateNetworkToStore(n *network) error {
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
|
|
return nil
|
|
}
|
|
|
|
return cs.PutObjectAtomic(n)
|
|
}
|
|
|
|
func (c *controller) deleteNetworkFromStore(n *network) error {
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
|
|
return nil
|
|
}
|
|
|
|
if err := cs.DeleteObjectAtomic(n); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) {
|
|
n := network{id: nid}
|
|
if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil {
|
|
return nil, err
|
|
}
|
|
return &n, nil
|
|
}
|
|
|
|
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
id := ep.id
|
|
ep.Unlock()
|
|
if n == nil {
|
|
// Possibly the watch event for the network has not shown up yet
|
|
// Try to get network from the store
|
|
nid, err := networkIDFromEndpointKey(key, ep)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
n, err = c.getNetworkFromStore(nid)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := c.newNetworkFromStore(n); err != nil {
|
|
return err
|
|
}
|
|
n = c.networks[nid]
|
|
}
|
|
|
|
_, err := n.EndpointByID(string(id))
|
|
if err != nil {
|
|
if _, ok := err.(ErrNoSuchEndpoint); ok {
|
|
return n.addEndpoint(ep)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *controller) updateEndpointToStore(ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
name := ep.name
|
|
ep.Unlock()
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
|
|
return nil
|
|
}
|
|
|
|
return cs.PutObjectAtomic(ep)
|
|
}
|
|
|
|
func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) {
|
|
ep := endpoint{id: eid}
|
|
if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil {
|
|
return nil, err
|
|
}
|
|
return &ep, nil
|
|
}
|
|
|
|
func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
ep.Unlock()
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name())
|
|
return nil
|
|
}
|
|
|
|
if err := cs.DeleteObjectAtomic(ep); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *controller) watchStore() error {
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
|
|
nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case nws := <-nwPairs:
|
|
for _, kve := range nws {
|
|
var n network
|
|
err := json.Unmarshal(kve.Value, &n)
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
n.dbIndex = kve.LastIndex
|
|
c.Lock()
|
|
existing, ok := c.networks[n.id]
|
|
c.Unlock()
|
|
if ok {
|
|
existing.Lock()
|
|
// Skip existing network update
|
|
if existing.dbIndex != n.dbIndex {
|
|
existing.dbIndex = n.dbIndex
|
|
existing.endpointCnt = n.endpointCnt
|
|
}
|
|
existing.Unlock()
|
|
continue
|
|
}
|
|
|
|
if err = c.newNetworkFromStore(&n); err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
case eps := <-epPairs:
|
|
for _, epe := range eps {
|
|
var ep endpoint
|
|
err := json.Unmarshal(epe.Value, &ep)
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
ep.dbIndex = epe.LastIndex
|
|
n, err := c.networkFromEndpointKey(epe.Key, &ep)
|
|
if err != nil {
|
|
if _, ok := err.(ErrNoSuchNetwork); !ok {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
}
|
|
if n != nil {
|
|
ep.network = n.(*network)
|
|
}
|
|
if c.processEndpointUpdate(&ep) {
|
|
err = c.newEndpointFromStore(epe.Key, &ep)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) {
|
|
nid, err := networkIDFromEndpointKey(key, ep)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return c.NetworkByID(string(nid))
|
|
}
|
|
|
|
func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) {
|
|
eKey, err := datastore.ParseKey(key)
|
|
if err != nil {
|
|
return types.UUID(""), err
|
|
}
|
|
return ep.networkIDFromKey(eKey)
|
|
}
|
|
|
|
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
|
|
nw := ep.network
|
|
if nw == nil {
|
|
return true
|
|
}
|
|
nw.Lock()
|
|
id := nw.id
|
|
nw.Unlock()
|
|
|
|
c.Lock()
|
|
n, ok := c.networks[id]
|
|
c.Unlock()
|
|
if !ok {
|
|
return true
|
|
}
|
|
existing, _ := n.EndpointByID(string(ep.id))
|
|
if existing == nil {
|
|
return true
|
|
}
|
|
|
|
ee := existing.(*endpoint)
|
|
ee.Lock()
|
|
if ee.dbIndex != ep.dbIndex {
|
|
ee.dbIndex = ep.dbIndex
|
|
if ee.container != nil && ep.container != nil {
|
|
// we care only about the container id
|
|
ee.container.id = ep.container.id
|
|
} else {
|
|
// we still care only about the container id, but this is a short-cut to communicate join or leave operation
|
|
ee.container = ep.container
|
|
}
|
|
}
|
|
ee.Unlock()
|
|
|
|
return false
|
|
}
|