From ffdceda2551577c206f00ea2fb4adb5e86eaf25e Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Wed, 13 Apr 2016 17:53:41 -0700 Subject: [PATCH] Add service support Add a notion of service in libnetwork so that a group of endpoints which form a service can be treated as such so that service level features can be added on top. Initially as part of this PR the support to assign a name to the said service is added which results in DNS queries to the service name to return all the IPs of the backing endpoints so that DNS RR behavior on the service name can be achieved. Signed-off-by: Jana Radhakrishnan --- libnetwork/agent.go | 46 ++++++++--- libnetwork/controller.go | 38 ++++----- libnetwork/endpoint.go | 32 +++++++- libnetwork/network.go | 8 +- libnetwork/sandbox.go | 4 +- libnetwork/service.go | 80 +++++++++++++++++++ libnetwork/store.go | 2 +- libnetwork/test/integration/dnet/helpers.bash | 2 +- 8 files changed, 174 insertions(+), 38 deletions(-) create mode 100644 libnetwork/service.go diff --git a/libnetwork/agent.go b/libnetwork/agent.go index 825971077f..ca54d8c923 100644 --- a/libnetwork/agent.go +++ b/libnetwork/agent.go @@ -165,7 +165,12 @@ func (ep *endpoint) addToCluster() error { c := n.getController() if !ep.isAnonymous() && ep.Iface().Address() != nil { - if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s=%s", ep.Name(), ep.Iface().Address().IP))); err != nil { + if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { + return err + } + + if err := c.agent.networkDB.CreateEntry("endpoint_table", n.ID(), ep.ID(), []byte(fmt.Sprintf("%s,%s,%s,%s", ep.Name(), ep.svcName, + ep.svcID, ep.Iface().Address().IP))); err != nil { return err } } @@ -187,6 +192,12 @@ func (ep *endpoint) deleteFromCluster() error { c := n.getController() if !ep.isAnonymous() { + if ep.Iface().Address() != nil { + if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Iface().Address().IP); err != nil { + return err + } + } + if err := c.agent.networkDB.DeleteEntry("endpoint_table", n.ID(), ep.ID()); err != nil { return err } @@ -297,38 +308,43 @@ func (n *network) handleDriverTableEvent(ev events.Event) { func (c *controller) handleEpTableEvent(ev events.Event) { var ( - id string + nid string + eid string value string isAdd bool ) switch event := ev.(type) { case networkdb.CreateEvent: - id = event.NetworkID + nid = event.NetworkID + eid = event.Key value = string(event.Value) isAdd = true case networkdb.DeleteEvent: - id = event.NetworkID + nid = event.NetworkID + eid = event.Key value = string(event.Value) case networkdb.UpdateEvent: logrus.Errorf("Unexpected update service table event = %#v", event) } - nw, err := c.NetworkByID(id) + nw, err := c.NetworkByID(nid) if err != nil { - logrus.Errorf("Could not find network %s while handling service table event: %v", id, err) + logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err) return } n := nw.(*network) - pair := strings.Split(value, "=") - if len(pair) < 2 { + vals := strings.Split(value, ",") + if len(vals) < 4 { logrus.Errorf("Incorrect service table value = %s", value) return } - name := pair[0] - ip := net.ParseIP(pair[1]) + name := vals[0] + svcName := vals[1] + svcID := vals[2] + ip := net.ParseIP(vals[3]) if name == "" || ip == nil { logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value) @@ -336,8 +352,18 @@ func (c *controller) handleEpTableEvent(ev events.Event) { } if isAdd { + if err := c.addServiceBinding(svcName, svcID, nid, eid, ip); err != nil { + logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + return + } + n.addSvcRecords(name, ip, nil, true) } else { + if err := c.rmServiceBinding(svcName, svcID, nid, eid, ip); err != nil { + logrus.Errorf("Failed adding service binding for value %s: %v", value, err) + return + } + n.deleteSvcRecords(name, ip, nil, true) } } diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 07cb024f9d..fa14b1cf1f 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -123,20 +123,21 @@ type SandboxWalker func(sb Sandbox) bool type sandboxTable map[string]*sandbox type controller struct { - id string - drvRegistry *drvregistry.DrvRegistry - sandboxes sandboxTable - cfg *config.Config - stores []datastore.DataStore - discovery hostdiscovery.HostDiscovery - extKeyListener net.Listener - watchCh chan *endpoint - unWatchCh chan *endpoint - svcDb map[string]svcInfo - nmap map[string]*netWatch - defOsSbox osl.Sandbox - sboxOnce sync.Once - agent *agent + id string + drvRegistry *drvregistry.DrvRegistry + sandboxes sandboxTable + cfg *config.Config + stores []datastore.DataStore + discovery hostdiscovery.HostDiscovery + extKeyListener net.Listener + watchCh chan *endpoint + unWatchCh chan *endpoint + svcRecords map[string]svcInfo + nmap map[string]*netWatch + serviceBindings map[string]*service + defOsSbox osl.Sandbox + sboxOnce sync.Once + agent *agent sync.Mutex } @@ -148,10 +149,11 @@ type initializer struct { // New creates a new instance of network controller. func New(cfgOptions ...config.Option) (NetworkController, error) { c := &controller{ - id: stringid.GenerateRandomID(), - cfg: config.ParseConfigOptions(cfgOptions...), - sandboxes: sandboxTable{}, - svcDb: make(map[string]svcInfo), + id: stringid.GenerateRandomID(), + cfg: config.ParseConfigOptions(cfgOptions...), + sandboxes: sandboxTable{}, + svcRecords: make(map[string]svcInfo), + serviceBindings: make(map[string]*service), } if err := c.agentInit(c.cfg.Daemon.Bind); err != nil { diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 45516db8f7..5335945690 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -67,6 +67,8 @@ type endpoint struct { ipamOptions map[string]string aliases map[string]string myAliases []string + svcID string + svcName string dbIndex uint64 dbExists bool sync.Mutex @@ -89,6 +91,9 @@ func (ep *endpoint) MarshalJSON() ([]byte, error) { epMap["anonymous"] = ep.anonymous epMap["disableResolution"] = ep.disableResolution epMap["myAliases"] = ep.myAliases + epMap["svcName"] = ep.svcName + epMap["svcID"] = ep.svcID + return json.Marshal(epMap) } @@ -172,6 +177,15 @@ func (ep *endpoint) UnmarshalJSON(b []byte) (err error) { if l, ok := epMap["locator"]; ok { ep.locator = l.(string) } + + if sn, ok := epMap["svcName"]; ok { + ep.svcName = sn.(string) + } + + if si, ok := epMap["svcID"]; ok { + ep.svcID = si.(string) + } + ma, _ := json.Marshal(epMap["myAliases"]) var myAliases []string json.Unmarshal(ma, &myAliases) @@ -196,6 +210,8 @@ func (ep *endpoint) CopyTo(o datastore.KVObject) error { dstEp.dbExists = ep.dbExists dstEp.anonymous = ep.anonymous dstEp.disableResolution = ep.disableResolution + dstEp.svcName = ep.svcName + dstEp.svcID = ep.svcID if ep.iface != nil { dstEp.iface = &endpointInterface{} @@ -413,7 +429,9 @@ func (ep *endpoint) sbJoin(sb *sandbox, options ...EndpointOption) error { }() // Watch for service records - n.getController().watchSvcRecord(ep) + if !n.getController().cfg.Daemon.IsAgent { + n.getController().watchSvcRecord(ep) + } address := "" if ip := ep.getFirstInterfaceAddress(); ip != nil { @@ -738,7 +756,9 @@ func (ep *endpoint) Delete(force bool) error { }() // unwatch for service records - n.getController().unWatchSvcRecord(ep) + if !n.getController().cfg.Daemon.IsAgent { + n.getController().unWatchSvcRecord(ep) + } if err = ep.deleteEndpoint(force); err != nil && !force { return err @@ -871,6 +891,14 @@ func CreateOptionAlias(name string, alias string) EndpointOption { } } +// CreateOptionService function returns an option setter for setting service binding configuration +func CreateOptionService(name, id string) EndpointOption { + return func(ep *endpoint) { + ep.svcName = name + ep.svcID = id + } +} + //CreateOptionMyAlias function returns an option setter for setting endpoint's self alias func CreateOptionMyAlias(alias string) EndpointOption { return func(ep *endpoint) { diff --git a/libnetwork/network.go b/libnetwork/network.go index e9d672b1c2..aaa79fd356 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -1002,14 +1002,14 @@ func (n *network) addSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMapUp c := n.getController() c.Lock() defer c.Unlock() - sr, ok := c.svcDb[n.ID()] + sr, ok := c.svcRecords[n.ID()] if !ok { sr = svcInfo{ svcMap: make(map[string][]net.IP), svcIPv6Map: make(map[string][]net.IP), ipMap: make(map[string]string), } - c.svcDb[n.ID()] = sr + c.svcRecords[n.ID()] = sr } if ipMapUpdate { @@ -1029,7 +1029,7 @@ func (n *network) deleteSvcRecords(name string, epIP net.IP, epIPv6 net.IP, ipMa c := n.getController() c.Lock() defer c.Unlock() - sr, ok := c.svcDb[n.ID()] + sr, ok := c.svcRecords[n.ID()] if !ok { return } @@ -1054,7 +1054,7 @@ func (n *network) getSvcRecords(ep *endpoint) []etchosts.Record { defer n.Unlock() var recs []etchosts.Record - sr, _ := n.ctrlr.svcDb[n.id] + sr, _ := n.ctrlr.svcRecords[n.id] for h, ip := range sr.svcMap { if ep != nil && strings.Split(h, ".")[0] == ep.Name() { diff --git a/libnetwork/sandbox.go b/libnetwork/sandbox.go index cca3fd32d5..4cdb017fb3 100644 --- a/libnetwork/sandbox.go +++ b/libnetwork/sandbox.go @@ -405,7 +405,7 @@ func (sb *sandbox) ResolveIP(ip string) string { for _, ep := range sb.getConnectedEndpoints() { n := ep.getNetwork() - sr, ok := n.getController().svcDb[n.ID()] + sr, ok := n.getController().svcRecords[n.ID()] if !ok { continue } @@ -512,7 +512,7 @@ func (sb *sandbox) resolveName(req string, networkName string, epList []*endpoin ep.Unlock() } - sr, ok := n.getController().svcDb[n.ID()] + sr, ok := n.getController().svcRecords[n.ID()] if !ok { continue } diff --git a/libnetwork/service.go b/libnetwork/service.go new file mode 100644 index 0000000000..9caed0ae22 --- /dev/null +++ b/libnetwork/service.go @@ -0,0 +1,80 @@ +package libnetwork + +import "net" + +type service struct { + name string + id string + backEnds map[string]map[string]net.IP +} + +func newService(name string, id string) *service { + return &service{ + name: name, + id: id, + backEnds: make(map[string]map[string]net.IP), + } +} + +func (c *controller) addServiceBinding(name, sid, nid, eid string, ip net.IP) error { + var s *service + + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + c.Lock() + s, ok := c.serviceBindings[sid] + if !ok { + s = newService(name, sid) + } + + netBackEnds, ok := s.backEnds[nid] + if !ok { + netBackEnds = make(map[string]net.IP) + s.backEnds[nid] = netBackEnds + } + + netBackEnds[eid] = ip + c.serviceBindings[sid] = s + c.Unlock() + + n.(*network).addSvcRecords(name, ip, nil, false) + return nil +} + +func (c *controller) rmServiceBinding(name, sid, nid, eid string, ip net.IP) error { + n, err := c.NetworkByID(nid) + if err != nil { + return err + } + + c.Lock() + s, ok := c.serviceBindings[sid] + if !ok { + c.Unlock() + return nil + } + + netBackEnds, ok := s.backEnds[nid] + if !ok { + c.Unlock() + return nil + } + + delete(netBackEnds, eid) + + if len(netBackEnds) == 0 { + delete(s.backEnds, nid) + } + + if len(s.backEnds) == 0 { + delete(c.serviceBindings, sid) + } + c.Unlock() + + n.(*network).deleteSvcRecords(name, ip, nil, false) + + return err +} diff --git a/libnetwork/store.go b/libnetwork/store.go index 0e7ec1921b..4a40272a3e 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -420,7 +420,7 @@ func (c *controller) processEndpointDelete(nmap map[string]*netWatch, ep *endpoi // This is the last container going away for the network. Destroy // this network's svc db entry - delete(c.svcDb, ep.getNetwork().ID()) + delete(c.svcRecords, ep.getNetwork().ID()) delete(nmap, ep.getNetwork().ID()) } diff --git a/libnetwork/test/integration/dnet/helpers.bash b/libnetwork/test/integration/dnet/helpers.bash index 67e4fcc8a3..d7f66a60e7 100644 --- a/libnetwork/test/integration/dnet/helpers.bash +++ b/libnetwork/test/integration/dnet/helpers.bash @@ -199,7 +199,7 @@ EOF cat ${tomlfile} docker run \ -d \ - --hostname=${name} \ + --hostname=$(echo ${name} | sed s/_/-/g) \ --name=${name} \ --privileged \ -p ${hport}:${cport} \