diff --git a/libnetwork/cmd/test/libnetwork.toml b/libnetwork/cmd/test/libnetwork.toml index e3bfc82099..4e22516d13 100644 --- a/libnetwork/cmd/test/libnetwork.toml +++ b/libnetwork/cmd/test/libnetwork.toml @@ -4,7 +4,7 @@ title = "LibNetwork Configuration file" debug = false [cluster] discovery = "token://22aa23948f4f6b31230687689636959e" - Address = "2.1.1.1" + Address = "1.1.1.1" [datastore] embedded = false [datastore.client] diff --git a/libnetwork/cmd/test/main.go b/libnetwork/cmd/test/main.go index fd489a2330..9b735528cc 100644 --- a/libnetwork/cmd/test/main.go +++ b/libnetwork/cmd/test/main.go @@ -35,6 +35,17 @@ func main() { } else { fmt.Println("Network Created Successfully :", netw) } + netw, _ = controller.NetworkByName(fmt.Sprintf("Gordon-%d", i)) + _, err = netw.CreateEndpoint(fmt.Sprintf("Gordon-Ep-%d", i), nil) + if err != nil { + log.Fatalf("Error creating endpoint 1 %v", err) + } + + _, err = netw.CreateEndpoint(fmt.Sprintf("Gordon-Ep2-%d", i), nil) + if err != nil { + log.Fatalf("Error creating endpoint 2 %v", err) + } + time.Sleep(10 * time.Second) } } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 6d1f26cf7d..34ec9630d5 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -45,7 +45,6 @@ create network namespaces and allocate interfaces for containers to use. package libnetwork import ( - "encoding/json" "fmt" "net" "os" @@ -161,21 +160,6 @@ func (c *controller) initConfig(configFile string) error { return nil } -func (c *controller) initDataStore() error { - if c.cfg == nil { - return fmt.Errorf("datastore initialization requires a valid configuration") - } - - store, err := datastore.NewDataStore(&c.cfg.Datastore) - if err != nil { - return err - } - c.Lock() - c.store = store - c.Unlock() - return c.watchNewNetworks() -} - func (c *controller) initDiscovery() error { if c.cfg == nil { return fmt.Errorf("discovery initialization requires a valid configuration") @@ -217,18 +201,6 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti if name == "" { return nil, ErrInvalidName(name) } - // Check if a driver for the specified network type is available - c.Lock() - d, ok := c.drivers[networkType] - c.Unlock() - if !ok { - var err error - d, err = c.loadDriver(networkType) - if err != nil { - return nil, err - } - } - // Check if a network already exists with the specified network name c.Lock() for _, n := range c.networks { @@ -245,21 +217,15 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti networkType: networkType, id: types.UUID(stringid.GenerateRandomID()), ctrlr: c, - driver: d, endpoints: endpointTable{}, } network.processOptions(options...) - // Create the network - if err := d.CreateNetwork(network.id, network.generic); err != nil { + + if err := c.addNetwork(network); err != nil { return nil, err } - // Store the network handler in controller - c.Lock() - c.networks[network.id] = network - c.Unlock() - if err := c.addNetworkToStore(network); err != nil { return nil, err } @@ -267,77 +233,31 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti return network, nil } -func (c *controller) newNetworkFromStore(n *network) { +func (c *controller) addNetwork(n *network) error { + c.Lock() - defer c.Unlock() - - if _, ok := c.drivers[n.networkType]; !ok { - log.Warnf("Network driver unavailable for type=%s. ignoring network updates for %s", n.Type(), n.Name()) - return - } - n.ctrlr = c - n.driver = c.drivers[n.networkType] - c.networks[n.id] = n - // TODO : Populate n.endpoints back from endpoint dbstore -} - -func (c *controller) addNetworkToStore(n *network) error { - if isReservedNetwork(n.Name()) { - return nil - } - c.Lock() - cs := c.store - c.Unlock() - if cs == nil { - log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name()) - return nil - } - - // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, - // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. - // return cs.PutObjectAtomic(n) - - return cs.PutObject(n) -} - -func (c *controller) watchNewNetworks() error { - c.Lock() - cs := c.store + // Check if a driver for the specified network type is available + d, ok := c.drivers[n.networkType] c.Unlock() - kvPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan) - if err != nil { + if !ok { + var err error + d, err = c.loadDriver(n.networkType) + if err != nil { + return err + } + } + + n.driver = d + + // Create the network + if err := d.CreateNetwork(n.id, n.generic); err != nil { return err } - go func() { - for { - select { - case kvs := <-kvPairs: - for _, kve := range kvs { - var n network - err := json.Unmarshal(kve.Value, &n) - if err != nil { - log.Error(err) - continue - } - n.dbIndex = kve.LastIndex - c.Lock() - existing, ok := c.networks[n.id] - c.Unlock() - if ok && existing.dbIndex == n.dbIndex { - // Skip any watch notification for a network that has not changed - continue - } else if ok { - // Received an update for an existing network object - log.Debugf("Skipping network update for %s (%s)", n.name, n.id) - continue - } + c.Lock() + c.networks[n.id] = n + c.Unlock() - c.newNetworkFromStore(&n) - } - } - } - }() return nil } diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 9f512b07df..cc9f45d686 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -37,6 +37,8 @@ type KV interface { const ( // NetworkKeyPrefix is the prefix for network key in the kv store NetworkKeyPrefix = "network" + // EndpointKeyPrefix is the prefix for endpoint key in the kv store + EndpointKeyPrefix = "endpoint" ) //Key provides convenient method to create a Key diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 5f114483fd..cf1634d853 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -2,6 +2,7 @@ package libnetwork import ( "bytes" + "encoding/json" "io/ioutil" "os" "path" @@ -10,6 +11,7 @@ import ( log "github.com/Sirupsen/logrus" "github.com/docker/docker/pkg/ioutils" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/etchosts" "github.com/docker/libnetwork/netlabel" "github.com/docker/libnetwork/resolvconf" @@ -108,9 +110,53 @@ type endpoint struct { exposedPorts []types.TransportPort generic map[string]interface{} joinLeaveDone chan struct{} + dbIndex uint64 sync.Mutex } +func (ep *endpoint) MarshalJSON() ([]byte, error) { + epMap := make(map[string]interface{}) + epMap["name"] = ep.name + epMap["id"] = string(ep.id) + epMap["network"] = ep.network + epMap["ep_iface"] = ep.iFaces + epMap["exposed_ports"] = ep.exposedPorts + epMap["generic"] = ep.generic + return json.Marshal(epMap) +} + +func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { + var epMap map[string]interface{} + if err := json.Unmarshal(b, &epMap); err != nil { + return err + } + ep.name = epMap["name"].(string) + ep.id = types.UUID(epMap["id"].(string)) + + nb, _ := json.Marshal(epMap["network"]) + var n network + json.Unmarshal(nb, &n) + ep.network = &n + + ib, _ := json.Marshal(epMap["ep_iface"]) + var ifaces []endpointInterface + json.Unmarshal(ib, &ifaces) + ep.iFaces = make([]*endpointInterface, 0) + for _, iface := range ifaces { + ep.iFaces = append(ep.iFaces, &iface) + } + + tb, _ := json.Marshal(epMap["exposed_ports"]) + var tPorts []types.TransportPort + json.Unmarshal(tb, &tPorts) + ep.exposedPorts = tPorts + + if epMap["generic"] != nil { + ep.generic = epMap["generic"].(map[string]interface{}) + } + return nil +} + const defaultPrefix = "/var/lib/docker/network/files" func (ep *endpoint) ID() string { @@ -134,6 +180,26 @@ func (ep *endpoint) Network() string { return ep.network.name } +func (ep *endpoint) Key() []string { + return []string{datastore.EndpointKeyPrefix, string(ep.network.id), string(ep.id)} +} + +func (ep *endpoint) Value() []byte { + b, err := json.Marshal(ep) + if err != nil { + return nil + } + return b +} + +func (ep *endpoint) Index() uint64 { + return ep.dbIndex +} + +func (ep *endpoint) SetIndex(index uint64) { + ep.dbIndex = index +} + func (ep *endpoint) processOptions(options ...EndpointOption) { ep.Lock() defer ep.Unlock() diff --git a/libnetwork/network.go b/libnetwork/network.go index d82f16945b..ee66ce799d 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -191,6 +191,28 @@ func (n *network) Delete() error { return nil } +func (n *network) addEndpoint(ep *endpoint) error { + var err error + n.Lock() + n.endpoints[ep.id] = ep + d := n.driver + n.Unlock() + + defer func() { + if err != nil { + n.Lock() + delete(n.endpoints, ep.id) + n.Unlock() + } + }() + + err = d.CreateEndpoint(n.id, ep.id, ep, ep.generic) + if err != nil { + return err + } + return nil +} + func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoint, error) { if name == "" { return nil, ErrInvalidName(name) @@ -205,15 +227,14 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi ep.network = n ep.processOptions(options...) - d := n.driver - err := d.CreateEndpoint(n.id, ep.id, ep, ep.generic) - if err != nil { + if err := n.addEndpoint(ep); err != nil { + return nil, err + } + + if err := n.ctrlr.addEndpointToStore(ep); err != nil { return nil, err } - n.Lock() - n.endpoints[ep.id] = ep - n.Unlock() return ep, nil } diff --git a/libnetwork/store.go b/libnetwork/store.go new file mode 100644 index 0000000000..092d9fa192 --- /dev/null +++ b/libnetwork/store.go @@ -0,0 +1,157 @@ +package libnetwork + +import ( + "encoding/json" + "fmt" + + log "github.com/Sirupsen/logrus" + "github.com/docker/libnetwork/datastore" +) + +func (c *controller) initDataStore() error { + if c.cfg == nil { + return fmt.Errorf("datastore initialization requires a valid configuration") + } + + store, err := datastore.NewDataStore(&c.cfg.Datastore) + if err != nil { + return err + } + c.Lock() + c.store = store + c.Unlock() + return c.watchStore() +} + +func (c *controller) newNetworkFromStore(n *network) { + c.Lock() + n.ctrlr = c + c.Unlock() + n.endpoints = endpointTable{} + + c.addNetwork(n) +} + +func (c *controller) addNetworkToStore(n *network) error { + if isReservedNetwork(n.Name()) { + return nil + } + c.Lock() + cs := c.store + c.Unlock() + if cs == nil { + log.Debugf("datastore not initialized. Network %s is not added to the store", n.Name()) + return nil + } + + // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, + // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. + // return cs.PutObjectAtomic(n) + + return cs.PutObject(n) +} + +func (c *controller) newEndpointFromStore(ep *endpoint) { + c.Lock() + defer c.Unlock() + + n, ok := c.networks[ep.network.id] + if !ok { + log.Warnf("Network (%s) unavailable for endpoint=%s. ignoring endpoint update", ep.network.id, ep.name) + // TODO : Get Network from Store and call newNetworkFromStore + return + } + ep.network = n + _, err := n.EndpointByID(string(ep.id)) + if _, ok := err.(ErrNoSuchEndpoint); ok { + n.addEndpoint(ep) + } +} + +func (c *controller) addEndpointToStore(ep *endpoint) error { + if isReservedNetwork(ep.network.name) { + return nil + } + c.Lock() + cs := c.store + c.Unlock() + if cs == nil { + log.Debugf("datastore not initialized. endpoint %s is not added to the store", ep.name) + return nil + } + + // Commenting out AtomicPut due to https://github.com/docker/swarm/issues/875, + // Also Network object is Keyed with UUID & hence an Atomic put is not mandatory. + // return cs.PutObjectAtomic(ep) + + return cs.PutObject(ep) +} + +func (c *controller) watchStore() error { + c.Lock() + cs := c.store + c.Unlock() + + nwPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.NetworkKeyPrefix), c.stopChan) + if err != nil { + return err + } + epPairs, err := cs.KVStore().WatchTree(datastore.Key(datastore.EndpointKeyPrefix), c.stopChan) + if err != nil { + return err + } + go func() { + for { + select { + case nws := <-nwPairs: + for _, kve := range nws { + var n network + err := json.Unmarshal(kve.Value, &n) + if err != nil { + log.Error(err) + continue + } + n.dbIndex = kve.LastIndex + c.Lock() + existing, ok := c.networks[n.id] + c.Unlock() + if ok { + // Skip existing network update + if existing.dbIndex != n.dbIndex { + log.Debugf("Skipping network update for %s (%s)", n.name, n.id) + } + continue + } + + c.newNetworkFromStore(&n) + } + case eps := <-epPairs: + for _, epe := range eps { + var ep endpoint + err := json.Unmarshal(epe.Value, &ep) + if err != nil { + log.Error(err) + continue + } + ep.dbIndex = epe.LastIndex + c.Lock() + n, ok := c.networks[ep.network.id] + c.Unlock() + if ok { + existing, _ := n.EndpointByID(string(ep.id)) + if existing != nil { + // Skip existing endpoint update + if existing.(*endpoint).dbIndex != ep.dbIndex { + log.Debugf("Skipping endpoint update for %s (%s)", ep.name, ep.id) + } + continue + } + } + + c.newEndpointFromStore(&ep) + } + } + } + }() + return nil +}