mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
d04c177a10
Signed-off-by: Chun Chen <ramichen@tencent.com>
383 lines
7.8 KiB
Go
383 lines
7.8 KiB
Go
package libnetwork
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
|
|
log "github.com/Sirupsen/logrus"
|
|
"github.com/docker/libkv/store"
|
|
"github.com/docker/libnetwork/datastore"
|
|
)
|
|
|
|
func (c *controller) validateDatastoreConfig() bool {
|
|
return c.cfg != nil && c.cfg.Datastore.Client.Provider != "" && c.cfg.Datastore.Client.Address != ""
|
|
}
|
|
|
|
func (c *controller) initDataStore() error {
|
|
c.Lock()
|
|
cfg := c.cfg
|
|
c.Unlock()
|
|
if !c.validateDatastoreConfig() {
|
|
return fmt.Errorf("datastore initialization requires a valid configuration")
|
|
}
|
|
|
|
store, err := datastore.NewDataStore(&cfg.Datastore)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
c.store = store
|
|
c.Unlock()
|
|
|
|
nws, err := c.getNetworksFromStore()
|
|
if err == nil {
|
|
c.processNetworkUpdate(nws, nil)
|
|
} else if err != datastore.ErrKeyNotFound {
|
|
log.Warnf("failed to read networks from datastore during init : %v", err)
|
|
}
|
|
return c.watchNetworks()
|
|
}
|
|
|
|
func (c *controller) getNetworksFromStore() ([]*store.KVPair, error) {
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
return cs.KVStore().List(datastore.Key(datastore.NetworkKeyPrefix))
|
|
}
|
|
|
|
func (c *controller) newNetworkFromStore(n *network) error {
|
|
n.Lock()
|
|
n.ctrlr = c
|
|
n.endpoints = endpointTable{}
|
|
n.Unlock()
|
|
|
|
return c.addNetwork(n)
|
|
}
|
|
|
|
func (c *controller) updateNetworkToStore(n *network) error {
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name())
|
|
return nil
|
|
}
|
|
|
|
return cs.PutObjectAtomic(n)
|
|
}
|
|
|
|
func (c *controller) deleteNetworkFromStore(n *network) error {
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. Network %s is not deleted from datastore", n.Name())
|
|
return nil
|
|
}
|
|
|
|
if err := cs.DeleteObjectAtomic(n); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *controller) getNetworkFromStore(nid string) (*network, error) {
|
|
n := network{id: nid}
|
|
if err := c.store.GetObject(datastore.Key(n.Key()...), &n); err != nil {
|
|
return nil, err
|
|
}
|
|
return &n, nil
|
|
}
|
|
|
|
func (c *controller) newEndpointFromStore(key string, ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
id := ep.id
|
|
ep.Unlock()
|
|
|
|
_, err := n.EndpointByID(id)
|
|
if err != nil {
|
|
if _, ok := err.(ErrNoSuchEndpoint); ok {
|
|
return n.addEndpoint(ep)
|
|
}
|
|
}
|
|
return err
|
|
}
|
|
|
|
func (c *controller) updateEndpointToStore(ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
name := ep.name
|
|
ep.Unlock()
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. endpoint %s is not added to the store", name)
|
|
return nil
|
|
}
|
|
|
|
return cs.PutObjectAtomic(ep)
|
|
}
|
|
|
|
func (c *controller) getEndpointFromStore(eid string) (*endpoint, error) {
|
|
ep := endpoint{id: eid}
|
|
if err := c.store.GetObject(datastore.Key(ep.Key()...), &ep); err != nil {
|
|
return nil, err
|
|
}
|
|
return &ep, nil
|
|
}
|
|
|
|
func (c *controller) deleteEndpointFromStore(ep *endpoint) error {
|
|
ep.Lock()
|
|
n := ep.network
|
|
ep.Unlock()
|
|
global, err := n.isGlobalScoped()
|
|
if err != nil || !global {
|
|
return err
|
|
}
|
|
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
if cs == nil {
|
|
log.Debugf("datastore not initialized. endpoint %s is not deleted from datastore", ep.Name())
|
|
return nil
|
|
}
|
|
|
|
if err := cs.DeleteObjectAtomic(ep); err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *controller) watchNetworks() error {
|
|
if !c.validateDatastoreConfig() {
|
|
return nil
|
|
}
|
|
|
|
c.Lock()
|
|
cs := c.store
|
|
c.Unlock()
|
|
|
|
networkKey := datastore.Key(datastore.NetworkKeyPrefix)
|
|
if err := ensureKeys(networkKey, cs); err != nil {
|
|
return fmt.Errorf("failed to ensure if the network keys are valid and present in store: %v", err)
|
|
}
|
|
nwPairs, err := cs.KVStore().WatchTree(networkKey, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case nws := <-nwPairs:
|
|
c.Lock()
|
|
tmpview := networkTable{}
|
|
lview := c.networks
|
|
c.Unlock()
|
|
for k, v := range lview {
|
|
global, _ := v.isGlobalScoped()
|
|
if global {
|
|
tmpview[k] = v
|
|
}
|
|
}
|
|
c.processNetworkUpdate(nws, &tmpview)
|
|
|
|
// Delete processing
|
|
for k := range tmpview {
|
|
c.Lock()
|
|
existing, ok := c.networks[k]
|
|
c.Unlock()
|
|
if !ok {
|
|
continue
|
|
}
|
|
tmp := network{}
|
|
if err := c.store.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
|
continue
|
|
}
|
|
if err := existing.deleteNetwork(); err != nil {
|
|
log.Debugf("Delete failed %s: %s", existing.name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (n *network) watchEndpoints() error {
|
|
if !n.ctrlr.validateDatastoreConfig() {
|
|
return nil
|
|
}
|
|
|
|
n.Lock()
|
|
cs := n.ctrlr.store
|
|
tmp := endpoint{network: n}
|
|
n.stopWatchCh = make(chan struct{})
|
|
stopCh := n.stopWatchCh
|
|
n.Unlock()
|
|
|
|
endpointKey := datastore.Key(tmp.KeyPrefix()...)
|
|
if err := ensureKeys(endpointKey, cs); err != nil {
|
|
return fmt.Errorf("failed to ensure if the endpoint keys are valid and present in store: %v", err)
|
|
}
|
|
epPairs, err := cs.KVStore().WatchTree(endpointKey, stopCh)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-stopCh:
|
|
return
|
|
case eps := <-epPairs:
|
|
n.Lock()
|
|
tmpview := endpointTable{}
|
|
lview := n.endpoints
|
|
n.Unlock()
|
|
for k, v := range lview {
|
|
global, _ := v.network.isGlobalScoped()
|
|
if global {
|
|
tmpview[k] = v
|
|
}
|
|
}
|
|
for _, epe := range eps {
|
|
var ep endpoint
|
|
err := json.Unmarshal(epe.Value, &ep)
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
delete(tmpview, ep.id)
|
|
ep.SetIndex(epe.LastIndex)
|
|
ep.network = n
|
|
if n.ctrlr.processEndpointUpdate(&ep) {
|
|
err = n.ctrlr.newEndpointFromStore(epe.Key, &ep)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
// Delete processing
|
|
for k := range tmpview {
|
|
n.Lock()
|
|
existing, ok := n.endpoints[k]
|
|
n.Unlock()
|
|
if !ok {
|
|
continue
|
|
}
|
|
tmp := endpoint{}
|
|
if err := cs.GetObject(datastore.Key(existing.Key()...), &tmp); err != datastore.ErrKeyNotFound {
|
|
continue
|
|
}
|
|
if err := existing.deleteEndpoint(); err != nil {
|
|
log.Debugf("Delete failed %s: %s", existing.name, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (n *network) stopWatch() {
|
|
n.Lock()
|
|
if n.stopWatchCh != nil {
|
|
close(n.stopWatchCh)
|
|
n.stopWatchCh = nil
|
|
}
|
|
n.Unlock()
|
|
}
|
|
|
|
func (c *controller) processNetworkUpdate(nws []*store.KVPair, prune *networkTable) {
|
|
for _, kve := range nws {
|
|
var n network
|
|
err := json.Unmarshal(kve.Value, &n)
|
|
if err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
if prune != nil {
|
|
delete(*prune, n.id)
|
|
}
|
|
n.SetIndex(kve.LastIndex)
|
|
c.Lock()
|
|
existing, ok := c.networks[n.id]
|
|
c.Unlock()
|
|
if ok {
|
|
existing.Lock()
|
|
// Skip existing network update
|
|
if existing.dbIndex != n.Index() {
|
|
// Can't use SetIndex() since existing is locked.
|
|
existing.dbIndex = n.Index()
|
|
existing.dbExists = true
|
|
existing.endpointCnt = n.endpointCnt
|
|
}
|
|
existing.Unlock()
|
|
continue
|
|
}
|
|
|
|
if err = c.newNetworkFromStore(&n); err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *controller) processEndpointUpdate(ep *endpoint) bool {
|
|
nw := ep.network
|
|
if nw == nil {
|
|
return true
|
|
}
|
|
nw.Lock()
|
|
id := nw.id
|
|
nw.Unlock()
|
|
|
|
c.Lock()
|
|
n, ok := c.networks[id]
|
|
c.Unlock()
|
|
if !ok {
|
|
return true
|
|
}
|
|
existing, _ := n.EndpointByID(ep.id)
|
|
if existing == nil {
|
|
return true
|
|
}
|
|
|
|
ee := existing.(*endpoint)
|
|
ee.Lock()
|
|
if ee.dbIndex != ep.Index() {
|
|
// Can't use SetIndex() because ee is locked.
|
|
ee.dbIndex = ep.Index()
|
|
ee.dbExists = true
|
|
ee.sandboxID = ep.sandboxID
|
|
}
|
|
ee.Unlock()
|
|
|
|
return false
|
|
}
|
|
|
|
func ensureKeys(key string, cs datastore.DataStore) error {
|
|
exists, err := cs.KVStore().Exists(key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if exists {
|
|
return nil
|
|
}
|
|
return cs.KVStore().Put(key, []byte{}, nil)
|
|
}
|