1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/libnetwork/store.go
Madhu Venugopal 9f3d1ce3ff Moved the TOML based Configuration to dnet
The configuration format for docker runtime is based on daemon flags and
hence adjusting the libnetwork configuration to accomodate it by moving
the TOML based configuration to the dnet tool.

Also changed the controller configuration via options

Signed-off-by: Madhu Venugopal <madhu@docker.com>
2015-06-12 12:46:12 -07:00

299 lines
6.1 KiB
Go

package libnetwork
import (
"encoding/json"
"fmt"
log "github.com/Sirupsen/logrus"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/types"
)
func (c *controller) validateDatastoreConfig() bool {
if c.cfg == nil || c.cfg.Datastore.Client.Provider == "" || c.cfg.Datastore.Client.Address == "" {
return false
}
return true
}
func (c *controller) initDataStore() error {
c.Lock()
cfg := c.cfg
c.Unlock()
if !c.validateDatastoreConfig() {
return fmt.Errorf("datastore initialization requires a valid configuration")
}
store, err := datastore.NewDataStore(&cfg.Datastore)
if err != nil {
return err
}
c.Lock()
c.store = store
c.Unlock()
return c.watchStore()
}
func (c *controller) newNetworkFromStore(n *network) error {
n.Lock()
n.ctrlr = c
n.endpoints = endpointTable{}
n.Unlock()
return c.addNetwork(n)
}
func (c *controller) updateNetworkToStore(n *network) error {
global, err := n.isGlobalScoped()
if err != nil || !global {
return err
}
c.Lock()
cs := c.store
c.Unlock()
if cs == nil {
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
return nil
}
return cs.PutObjectAtomic(n)
}
func (c *controller) deleteNetworkFromStore(n *network) error {
global, err := n.isGlobalScoped()
if err != nil || !global {
return err
}
c.Lock()
cs := c.store
c.Unlock()
if cs == nil {
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
return nil
}
if err := cs.DeleteObjectAtomic(n); err != nil {
return err
}
return nil
}
func (c *controller) getNetworkFromStore(nid types.UUID) (*network, error) {
n := network{id: nid}
if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil {
return nil, err
}
return &n, nil
}
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
ep.Lock()
n := ep.network
id := ep.id
ep.Unlock()
if n == nil {
// Possibly the watch event for the network has not shown up yet
// Try to get network from the store
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
return err
}
n, err = c.getNetworkFromStore(nid)
if err != nil {
return err
}
if err := c.newNetworkFromStore(n); err != nil {
return err
}
n = c.networks[nid]
}
_, err := n.EndpointByID(string(id))
if err != nil {
if _, ok := err.(ErrNoSuchEndpoint); ok {
return n.addEndpoint(ep)
}
}
return err
}
func (c *controller) updateEndpointToStore(ep *endpoint) error {
ep.Lock()
n := ep.network
name := ep.name
ep.Unlock()
global, err := n.isGlobalScoped()
if err != nil || !global {
return err
}
c.Lock()
cs := c.store
c.Unlock()
if cs == nil {
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
return nil
}
return cs.PutObjectAtomic(ep)
}
func (c *controller) getEndpointFromStore(eid types.UUID) (*endpoint, error) {
ep := endpoint{id: eid}
if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil {
return nil, err
}
return &ep, nil
}
func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
ep.Lock()
n := ep.network
ep.Unlock()
global, err := n.isGlobalScoped()
if err != nil || !global {
return err
}
c.Lock()
cs := c.store
c.Unlock()
if cs == nil {
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name())
return nil
}
if err := cs.DeleteObjectAtomic(ep); err != nil {
return err
}
return nil
}
func (c *controller) watchStore() error {
c.Lock()
cs := c.store
c.Unlock()
nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), nil)
if err != nil {
return err
}
epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), nil)
if err != nil {
return err
}
go func() {
for {
select {
case nws := <-nwPairs:
for _, kve := range nws {
var n network
err := json.Unmarshal(kve.Value, &n)
if err != nil {
log.Error(err)
continue
}
n.dbIndex = kve.LastIndex
c.Lock()
existing, ok := c.networks[n.id]
c.Unlock()
if ok {
existing.Lock()
// Skip existing network update
if existing.dbIndex != n.dbIndex {
existing.dbIndex = n.dbIndex
existing.endpointCnt = n.endpointCnt
}
existing.Unlock()
continue
}
if err = c.newNetworkFromStore(&n); err != nil {
log.Error(err)
}
}
case eps := <-epPairs:
for _, epe := range eps {
var ep endpoint
err := json.Unmarshal(epe.Value, &ep)
if err != nil {
log.Error(err)
continue
}
ep.dbIndex = epe.LastIndex
n, err := c.networkFromEndpointKey(epe.Key, &ep)
if err != nil {
if _, ok := err.(ErrNoSuchNetwork); !ok {
log.Error(err)
continue
}
}
if n != nil {
ep.network = n.(*network)
}
if c.processEndpointUpdate(&ep) {
err = c.newEndpointFromStore(epe.Key, &ep)
if err != nil {
log.Error(err)
}
}
}
}
}
}()
return nil
}
func (c *controller) networkFromEndpointKey(key string, ep *endpoint) (Network, error) {
nid, err := networkIDFromEndpointKey(key, ep)
if err != nil {
return nil, err
}
return c.NetworkByID(string(nid))
}
func networkIDFromEndpointKey(key string, ep *endpoint) (types.UUID, error) {
eKey, err := datastore.ParseKey(key)
if err != nil {
return types.UUID(""), err
}
return ep.networkIDFromKey(eKey)
}
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
nw := ep.network
if nw == nil {
return true
}
nw.Lock()
id := nw.id
nw.Unlock()
c.Lock()
n, ok := c.networks[id]
c.Unlock()
if !ok {
return true
}
existing, _ := n.EndpointByID(string(ep.id))
if existing == nil {
return true
}
ee := existing.(*endpoint)
ee.Lock()
if ee.dbIndex != ep.dbIndex {
ee.dbIndex = ep.dbIndex
if ee.container != nil && ep.container != nil {
// we care only about the container id
ee.container.id = ep.container.id
} else {
// we still care only about the container id, but this is a short-cut to communicate join or leave operation
ee.container = ep.container
}
}
ee.Unlock()
return false
}