From a6c2dd75b5a93a473fb72c12b91c1edaadad2655 Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Sun, 11 Oct 2015 18:51:10 -0700 Subject: [PATCH 1/2] Synchronize datastore apis Currently there are 3 distinct operations performed by datastore - Pushing the data to the store - Updating the Index of the local object - Updating the cache (in case of localscope) Without a lock racing datastore api calls can interleave in various surprising ways. Best thing is to keep these 3 above operation inseparable. Use a datastore lock to achieve this. Signed-off-by: Jana Radhakrishnan --- libnetwork/datastore/datastore.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 3514dc6561..1dff1c5201 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -5,6 +5,7 @@ import ( "log" "reflect" "strings" + "sync" "github.com/docker/libkv" "github.com/docker/libkv/store" @@ -55,6 +56,7 @@ type datastore struct { scope string store store.Store cache *cache + sync.Mutex } // KVObject is Key/Value interface used by objects to be part of the DataStore @@ -287,6 +289,8 @@ func (ds *datastore) PutObjectAtomic(kvObject KVObject) error { pair *store.KVPair err error ) + ds.Lock() + defer ds.Unlock() if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") @@ -325,6 +329,9 @@ add_cache: // PutObject adds a new Record based on an object into the datastore func (ds *datastore) PutObject(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } @@ -356,6 +363,9 @@ func (ds *datastore) putObjectWithKey(kvObject KVObject, key ...string) error { // GetObject returns a record matching the key func (ds *datastore) GetObject(key string, o KVObject) error { + ds.Lock() + defer ds.Unlock() + if ds.cache != nil { return ds.cache.get(key, o) } @@ -387,6 +397,9 @@ func (ds *datastore) ensureKey(key string) error { } func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { + ds.Lock() + defer ds.Unlock() + if ds.cache != nil { return ds.cache.list(kvObject) } @@ -430,6 +443,9 @@ func (ds *datastore) List(key string, kvObject KVObject) ([]KVObject, error) { // DeleteObject unconditionally deletes a record from the store func (ds *datastore) DeleteObject(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + // cleaup the cache first if ds.cache != nil { ds.cache.del(kvObject) @@ -444,6 +460,9 @@ func (ds *datastore) DeleteObject(kvObject KVObject) error { // DeleteObjectAtomic performs atomic delete on a record func (ds *datastore) DeleteObjectAtomic(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + if kvObject == nil { return types.BadRequestErrorf("invalid KV Object : nil") } @@ -469,6 +488,9 @@ del_cache: // DeleteTree unconditionally deletes a record from the store func (ds *datastore) DeleteTree(kvObject KVObject) error { + ds.Lock() + defer ds.Unlock() + // cleaup the cache first if ds.cache != nil { ds.cache.del(kvObject) From ab8dfb54fe0b7c93a2c61fb89fa3baf4a427f93c Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Sun, 11 Oct 2015 22:28:26 -0700 Subject: [PATCH 2/2] Separate endpoint count data from network object Currently endpoint count is maintained as part of network object and the endpoint count gets updated frequently while the rest of network is quite stable. Because of the frequent updates to endpoint count the network object is getting marshalled and unmarshalled ferquently. This is causing a lot of churn and transient memory usage. Fix this by creating a deparate object of endpoint count so that only that gets updated. Signed-off-by: Jana Radhakrishnan --- libnetwork/controller.go | 32 +++--- libnetwork/endpoint.go | 4 +- libnetwork/endpoint_cnt.go | 147 +++++++++++++++++++++++++ libnetwork/libnetwork_internal_test.go | 3 +- libnetwork/network.go | 71 ++---------- libnetwork/store.go | 25 ++++- 6 files changed, 200 insertions(+), 82 deletions(-) create mode 100644 libnetwork/endpoint_cnt.go diff --git a/libnetwork/controller.go b/libnetwork/controller.go index 2b07705627..e21e7d05c6 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -355,24 +355,30 @@ func (c *controller) NewNetwork(networkType, name string, options ...NetworkOpti } }() - // addNetwork can be called for local scope network lazily when - // an endpoint is created after a restart and the network was - // created in previous life. Make sure you wrap around the driver - // notification of network creation in once call so that the driver - // invoked only once in case both the network and endpoint creation - // happens in the same lifetime. - network.drvOnce.Do(func() { - err = c.addNetwork(network) - }) - if err != nil { + if err := c.addNetwork(network); err != nil { return nil, err } + defer func() { + if err != nil { + if e := network.deleteNetwork(); e != nil { + log.Warnf("couldn't roll back driver network on network %s creation failure: %v", network.name, err) + } + } + }() if err = c.updateToStore(network); err != nil { - log.Warnf("couldnt create network %s: %v", network.name, err) - if e := network.Delete(); e != nil { - log.Warnf("couldnt cleanup network %s on network create failure (%v): %v", network.name, err, e) + return nil, err + } + defer func() { + if err != nil { + if e := c.deleteFromStore(network); e != nil { + log.Warnf("couldnt rollback from store, network %s on failure (%v): %v", network.name, err, e) + } } + }() + + network.epCnt = &endpointCnt{n: network} + if err = c.updateToStore(network.epCnt); err != nil { return nil, err } diff --git a/libnetwork/endpoint.go b/libnetwork/endpoint.go index 1dbfe44a31..40459369f7 100644 --- a/libnetwork/endpoint.go +++ b/libnetwork/endpoint.go @@ -484,12 +484,12 @@ func (ep *endpoint) Delete() error { } ep.Unlock() - if err = n.DecEndpointCnt(); err != nil { + if err = n.getEpCnt().DecEndpointCnt(); err != nil { return err } defer func() { if err != nil { - if e := n.IncEndpointCnt(); e != nil { + if e := n.getEpCnt().IncEndpointCnt(); e != nil { log.Warnf("failed to update network %s : %v", n.name, e) } } diff --git a/libnetwork/endpoint_cnt.go b/libnetwork/endpoint_cnt.go new file mode 100644 index 0000000000..550a2a3cfd --- /dev/null +++ b/libnetwork/endpoint_cnt.go @@ -0,0 +1,147 @@ +package libnetwork + +import ( + "encoding/json" + "fmt" + "sync" + + "github.com/docker/libnetwork/datastore" +) + +type endpointCnt struct { + n *network + Count uint64 + dbIndex uint64 + dbExists bool + sync.Mutex +} + +const epCntKeyPrefix = "endpoint_count" + +func (ec *endpointCnt) Key() []string { + ec.Lock() + defer ec.Unlock() + + return []string{epCntKeyPrefix, ec.n.id} +} + +func (ec *endpointCnt) KeyPrefix() []string { + ec.Lock() + defer ec.Unlock() + + return []string{epCntKeyPrefix, ec.n.id} +} + +func (ec *endpointCnt) Value() []byte { + ec.Lock() + defer ec.Unlock() + + b, err := json.Marshal(ec) + if err != nil { + return nil + } + return b +} + +func (ec *endpointCnt) SetValue(value []byte) error { + ec.Lock() + defer ec.Unlock() + + return json.Unmarshal(value, &ec) +} + +func (ec *endpointCnt) Index() uint64 { + ec.Lock() + defer ec.Unlock() + return ec.dbIndex +} + +func (ec *endpointCnt) SetIndex(index uint64) { + ec.Lock() + ec.dbIndex = index + ec.dbExists = true + ec.Unlock() +} + +func (ec *endpointCnt) Exists() bool { + ec.Lock() + defer ec.Unlock() + return ec.dbExists +} + +func (ec *endpointCnt) Skip() bool { + ec.Lock() + defer ec.Unlock() + return !ec.n.persist +} + +func (ec *endpointCnt) New() datastore.KVObject { + ec.Lock() + defer ec.Unlock() + + return &endpointCnt{ + n: ec.n, + } +} + +func (ec *endpointCnt) CopyTo(o datastore.KVObject) error { + ec.Lock() + defer ec.Unlock() + + dstEc := o.(*endpointCnt) + dstEc.n = ec.n + dstEc.Count = ec.Count + dstEc.dbExists = ec.dbExists + dstEc.dbIndex = ec.dbIndex + + return nil +} + +func (ec *endpointCnt) DataScope() string { + return ec.n.DataScope() +} + +func (ec *endpointCnt) EndpointCnt() uint64 { + ec.Lock() + defer ec.Unlock() + + return ec.Count +} + +func (ec *endpointCnt) atomicIncDecEpCnt(inc bool) error { +retry: + ec.Lock() + if inc { + ec.Count++ + } else { + ec.Count-- + } + ec.Unlock() + + store := ec.n.getController().getStore(ec.DataScope()) + if store == nil { + return fmt.Errorf("store not found for scope %s", ec.DataScope()) + } + + if err := ec.n.getController().updateToStore(ec); err != nil { + if err == datastore.ErrKeyModified { + if err := store.GetObject(datastore.Key(ec.Key()...), ec); err != nil { + return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err) + } + + goto retry + } + + return err + } + + return nil +} + +func (ec *endpointCnt) IncEndpointCnt() error { + return ec.atomicIncDecEpCnt(true) +} + +func (ec *endpointCnt) DecEndpointCnt() error { + return ec.atomicIncDecEpCnt(false) +} diff --git a/libnetwork/libnetwork_internal_test.go b/libnetwork/libnetwork_internal_test.go index 909b809552..2847a5b3fc 100644 --- a/libnetwork/libnetwork_internal_test.go +++ b/libnetwork/libnetwork_internal_test.go @@ -39,7 +39,6 @@ func TestNetworkMarshalling(t *testing.T) { ipamType: "default", addrSpace: "viola", networkType: "bridge", - endpointCnt: 27, enableIPv6: true, persist: true, ipamV4Config: []*IpamConf{ @@ -142,7 +141,7 @@ func TestNetworkMarshalling(t *testing.T) { } if n.name != nn.name || n.id != nn.id || n.networkType != nn.networkType || n.ipamType != nn.ipamType || - n.addrSpace != nn.addrSpace || n.endpointCnt != nn.endpointCnt || n.enableIPv6 != nn.enableIPv6 || + n.addrSpace != nn.addrSpace || n.enableIPv6 != nn.enableIPv6 || n.persist != nn.persist || !compareIpamConfList(n.ipamV4Config, nn.ipamV4Config) || !compareIpamInfoList(n.ipamV4Info, nn.ipamV4Info) || !compareIpamConfList(n.ipamV6Config, nn.ipamV6Config) || !compareIpamInfoList(n.ipamV6Info, nn.ipamV6Info) { diff --git a/libnetwork/network.go b/libnetwork/network.go index bd545c3667..6e90dde68e 100644 --- a/libnetwork/network.go +++ b/libnetwork/network.go @@ -152,7 +152,7 @@ type network struct { ipamV4Info []*IpamInfo ipamV6Info []*IpamInfo enableIPv6 bool - endpointCnt uint64 + epCnt *endpointCnt generic options.Generic dbIndex uint64 svcRecords svcMap @@ -296,7 +296,6 @@ func (n *network) CopyTo(o datastore.KVObject) error { dstN.id = n.id dstN.networkType = n.networkType dstN.ipamType = n.ipamType - dstN.endpointCnt = n.endpointCnt dstN.enableIPv6 = n.enableIPv6 dstN.persist = n.persist dstN.dbIndex = n.dbIndex @@ -339,48 +338,11 @@ func (n *network) DataScope() string { return n.driverScope() } -func (n *network) EndpointCnt() uint64 { +func (n *network) getEpCnt() *endpointCnt { n.Lock() defer n.Unlock() - return n.endpointCnt -} -func (n *network) atomicIncDecEpCnt(inc bool) error { -retry: - n.Lock() - if inc { - n.endpointCnt++ - } else { - n.endpointCnt-- - } - n.Unlock() - - store := n.getController().getStore(n.DataScope()) - if store == nil { - return fmt.Errorf("store not found for scope %s", n.DataScope()) - } - - if err := n.getController().updateToStore(n); err != nil { - if err == datastore.ErrKeyModified { - if err := store.GetObject(datastore.Key(n.Key()...), n); err != nil { - return fmt.Errorf("could not update the kvobject to latest when trying to atomic add endpoint count: %v", err) - } - - goto retry - } - - return err - } - - return nil -} - -func (n *network) IncEndpointCnt() error { - return n.atomicIncDecEpCnt(true) -} - -func (n *network) DecEndpointCnt() error { - return n.atomicIncDecEpCnt(false) + return n.epCnt } // TODO : Can be made much more generic with the help of reflection (but has some golang limitations) @@ -391,7 +353,6 @@ func (n *network) MarshalJSON() ([]byte, error) { netMap["networkType"] = n.networkType netMap["ipamType"] = n.ipamType netMap["addrSpace"] = n.addrSpace - netMap["endpointCnt"] = n.endpointCnt netMap["enableIPv6"] = n.enableIPv6 if n.generic != nil { netMap["generic"] = n.generic @@ -437,7 +398,6 @@ func (n *network) UnmarshalJSON(b []byte) (err error) { n.name = netMap["name"].(string) n.id = netMap["id"].(string) n.networkType = netMap["networkType"].(string) - n.endpointCnt = uint64(netMap["endpointCnt"].(float64)) n.enableIPv6 = netMap["enableIPv6"].(bool) if v, ok := netMap["generic"]; ok { @@ -604,7 +564,7 @@ func (n *network) Delete() error { return &UnknownNetworkError{name: name, id: id} } - numEps := n.EndpointCnt() + numEps := n.getEpCnt().EndpointCnt() if numEps != 0 { return &ActiveEndpointsError{name: n.name, id: n.id} } @@ -622,23 +582,14 @@ func (n *network) Delete() error { }() // deleteFromStore performs an atomic delete operation and the - // network.endpointCnt field will help prevent any possible + // network.epCnt will help prevent any possible // race between endpoint join and network delete - if err = n.getController().deleteFromStore(n); err != nil { - if err == datastore.ErrKeyModified { - return types.InternalErrorf("operation in progress. delete failed for network %s. Please try again.") - } - return err + if err = n.getController().deleteFromStore(n.getEpCnt()); err != nil { + return fmt.Errorf("error deleting network endpoint count from store: %v", err) + } + if err = n.getController().deleteFromStore(n); err != nil { + return fmt.Errorf("error deleting network from store: %v", err) } - - defer func() { - if err != nil { - n.dbExists = false - if e := n.getController().updateToStore(n); e != nil { - log.Warnf("failed to recreate network in store %s : %v", n.name, e) - } - } - }() n.ipamRelease() @@ -736,7 +687,7 @@ func (n *network) CreateEndpoint(name string, options ...EndpointOption) (Endpoi }() // Increment endpoint count to indicate completion of endpoint addition - if err = n.IncEndpointCnt(); err != nil { + if err = n.getEpCnt().IncEndpointCnt(); err != nil { return nil, err } diff --git a/libnetwork/store.go b/libnetwork/store.go index c308423839..d5eca874a4 100644 --- a/libnetwork/store.go +++ b/libnetwork/store.go @@ -69,6 +69,13 @@ func (c *controller) getNetworkFromStore(nid string) (*network, error) { continue } + ec := &endpointCnt{n: n} + err = store.GetObject(datastore.Key(ec.Key()...), ec) + if err != nil { + return nil, fmt.Errorf("could not find endpoint count for network %s: %v", n.Name(), err) + } + + n.epCnt = ec return n, nil } @@ -94,6 +101,14 @@ func (c *controller) getNetworksFromStore() ([]*network, error) { for _, kvo := range kvol { n := kvo.(*network) n.ctrlr = c + + ec := &endpointCnt{n: n} + err = store.GetObject(datastore.Key(ec.Key()...), ec) + if err != nil { + return nil, fmt.Errorf("could not find endpoint count key %s for network %s while listing: %v", datastore.Key(ec.Key()...), n.Name(), err) + } + + n.epCnt = ec nl = append(nl, n) } } @@ -211,15 +226,15 @@ func (c *controller) unWatchSvcRecord(ep *endpoint) { c.unWatchCh <- ep } -func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, nCh <-chan datastore.KVObject) { +func (c *controller) networkWatchLoop(nw *netWatch, ep *endpoint, ecCh <-chan datastore.KVObject) { for { select { case <-nw.stopCh: return - case o := <-nCh: - n := o.(*network) + case o := <-ecCh: + ec := o.(*endpointCnt) - epl, err := n.getEndpointsFromStore() + epl, err := ec.n.getEndpointsFromStore() if err != nil { break } @@ -300,7 +315,7 @@ func (c *controller) processEndpointCreate(nmap map[string]*netWatch, ep *endpoi return } - ch, err := store.Watch(ep.getNetwork(), nw.stopCh) + ch, err := store.Watch(ep.getNetwork().getEpCnt(), nw.stopCh) if err != nil { log.Warnf("Error creating watch for network: %v", err) return