diff --git a/libnetwork/bitseq/sequence.go b/libnetwork/bitseq/sequence.go index 1120ac77c8..20c50c91ed 100644 --- a/libnetwork/bitseq/sequence.go +++ b/libnetwork/bitseq/sequence.go @@ -57,9 +57,6 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32 return h, nil } - // Register for status changes - h.watchForChanges() - // Get the initial status from the ds if present. if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound { return nil, err @@ -252,6 +249,12 @@ func (h *Handle) set(ordinal, start, end uint32, any bool, release bool) (uint32 ) for { + if h.store != nil { + if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound { + return ret, err + } + } + h.Lock() // Get position if available if release { diff --git a/libnetwork/bitseq/store.go b/libnetwork/bitseq/store.go index 8012a413d2..ef7fe33400 100644 --- a/libnetwork/bitseq/store.go +++ b/libnetwork/bitseq/store.go @@ -70,46 +70,47 @@ func (h *Handle) Exists() bool { return h.dbExists } +// New method returns a handle based on the receiver handle +func (h *Handle) New() datastore.KVObject { + h.Lock() + defer h.Unlock() + + return &Handle{ + app: h.app, + id: h.id, + store: h.store, + } +} + +// CopyTo deep copies the handle into the passed destination object +func (h *Handle) CopyTo(o datastore.KVObject) error { + h.Lock() + defer h.Unlock() + + dstH := o.(*Handle) + dstH.bits = h.bits + dstH.unselected = h.unselected + dstH.head = h.head.getCopy() + dstH.app = h.app + dstH.id = h.id + dstH.dbIndex = h.dbIndex + dstH.dbExists = h.dbExists + dstH.store = h.store + + return nil +} + // Skip provides a way for a KV Object to avoid persisting it in the KV Store func (h *Handle) Skip() bool { return false } // DataScope method returns the storage scope of the datastore -func (h *Handle) DataScope() datastore.DataScope { - return datastore.GlobalScope -} - -func (h *Handle) watchForChanges() error { +func (h *Handle) DataScope() string { h.Lock() - store := h.store - h.Unlock() + defer h.Unlock() - if store == nil { - return nil - } - - kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil) - if err != nil { - return err - } - go func() { - for { - select { - case kvPair := <-kvpChan: - // Only process remote update - if kvPair != nil && (kvPair.LastIndex != h.Index()) { - err := h.fromDsValue(kvPair.Value) - if err != nil { - log.Warnf("Failed to reconstruct bitseq handle from ds watch: %s", err.Error()) - } else { - h.SetIndex(kvPair.LastIndex) - } - } - } - } - }() - return nil + return h.store.Scope() } func (h *Handle) fromDsValue(value []byte) error { diff --git a/libnetwork/ipam/allocator.go b/libnetwork/ipam/allocator.go index 90dfb4a462..087c5ccf90 100644 --- a/libnetwork/ipam/allocator.go +++ b/libnetwork/ipam/allocator.go @@ -6,7 +6,6 @@ import ( "sync" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" "github.com/docker/libnetwork/bitseq" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" @@ -30,13 +29,10 @@ const ( type Allocator struct { // Predefined pools for default address spaces predefined map[string][]*net.IPNet - // Static subnet information - localSubnets *PoolsConfig - globalSubnets *PoolsConfig + addrSpaces map[string]*addrSpace + // stores []datastore.Datastore // Allocated addresses in each address space's subnet addresses map[SubnetKey]*bitseq.Handle - // Datastore - addrSpace2Configs map[string]*PoolsConfig sync.Mutex } @@ -44,73 +40,86 @@ type Allocator struct { func NewAllocator(lcDs, glDs datastore.DataStore) (*Allocator, error) { a := &Allocator{} - a.localSubnets = &PoolsConfig{ - subnets: map[SubnetKey]*PoolData{}, - id: dsConfigKey + "/Pools", - scope: datastore.LocalScope, - ds: lcDs, - alloc: a, - } - - a.globalSubnets = &PoolsConfig{ - subnets: map[SubnetKey]*PoolData{}, - id: dsConfigKey + "/Pools", - scope: datastore.GlobalScope, - ds: glDs, - alloc: a, - } - + // Load predefined subnet pools a.predefined = map[string][]*net.IPNet{ localAddressSpace: initLocalPredefinedPools(), globalAddressSpace: initGlobalPredefinedPools(), } - a.addrSpace2Configs = map[string]*PoolsConfig{ - localAddressSpace: a.localSubnets, - globalAddressSpace: a.globalSubnets, - } - + // Initialize bitseq map a.addresses = make(map[SubnetKey]*bitseq.Handle) - cfgs := []struct { - cfg *PoolsConfig - dsc string + // Initialize address spaces + a.addrSpaces = make(map[string]*addrSpace) + for _, aspc := range []struct { + as string + ds datastore.DataStore }{ - {a.localSubnets, "local"}, - {a.globalSubnets, "global"}, - } - // Get the initial local/global pools configfrom the datastores - var inserterList []func() error - for _, e := range cfgs { - if e.cfg.ds == nil { + {localAddressSpace, lcDs}, + {globalAddressSpace, glDs}, + } { + if aspc.ds == nil { continue } - if err := e.cfg.watchForChanges(); err != nil { - log.Warnf("Error on registering watch for %s datastore: %v", e.dsc, err) - } - if err := e.cfg.readFromStore(); err != nil && err != store.ErrKeyNotFound { - return nil, fmt.Errorf("failed to retrieve the ipam %s pools config from datastore: %v", e.dsc, err) - } - e.cfg.Lock() - for k, v := range e.cfg.subnets { - if v.Range == nil { - inserterList = append(inserterList, func() error { return a.insertBitMask(e.cfg.ds, k, v.Pool) }) - } - } - e.cfg.Unlock() - } - // Add the bitmasks (data could come from datastore) - if inserterList != nil { - for _, f := range inserterList { - if err := f(); err != nil { - return nil, err - } + + a.addrSpaces[aspc.as] = &addrSpace{ + subnets: map[SubnetKey]*PoolData{}, + id: dsConfigKey + "/" + aspc.as, + scope: aspc.ds.Scope(), + ds: aspc.ds, + alloc: a, } } return a, nil } +func (a *Allocator) refresh(as string) error { + aSpace, err := a.getAddressSpaceFromStore(as) + if err != nil { + return fmt.Errorf("error getting pools config from store during init: %v", + err) + } + + if aSpace == nil { + return nil + } + + if err := a.updateBitMasks(aSpace); err != nil { + return fmt.Errorf("error updating bit masks during init: %v", err) + } + + a.Lock() + a.addrSpaces[as] = aSpace + a.Unlock() + + return nil +} + +func (a *Allocator) updateBitMasks(aSpace *addrSpace) error { + var inserterList []func() error + + aSpace.Lock() + for k, v := range aSpace.subnets { + if v.Range == nil { + inserterList = append(inserterList, + func() error { return a.insertBitMask(k, v.Pool) }) + } + } + aSpace.Unlock() + + // Add the bitmasks (data could come from datastore) + if inserterList != nil { + for _, f := range inserterList { + if err := f(); err != nil { + return err + } + } + } + + return nil +} + // GetDefaultAddressSpaces returns the local and global default address spaces func (a *Allocator) GetDefaultAddressSpaces() (string, string, error) { return localAddressSpace, globalAddressSpace, nil @@ -123,25 +132,29 @@ func (a *Allocator) RequestPool(addressSpace, pool, subPool string, options map[ return "", nil, nil, ipamapi.ErrInvalidPool } - cfg, err := a.getPoolsConfig(addressSpace) +retry: + if err := a.refresh(addressSpace); err != nil { + return "", nil, nil, err + } + + aSpace, err := a.getAddrSpace(addressSpace) if err != nil { return "", nil, nil, err } -retry: - insert, err := cfg.updatePoolDBOnAdd(*k, nw, ipr) + insert, err := aSpace.updatePoolDBOnAdd(*k, nw, ipr) if err != nil { return "", nil, nil, err } - if err := cfg.writeToStore(); err != nil { + + if err := a.writeToStore(aSpace); err != nil { if _, ok := err.(types.RetryError); !ok { return "", nil, nil, types.InternalErrorf("pool configuration failed because of %s", err.Error()) } - if erru := cfg.readFromStore(); erru != nil { - return "", nil, nil, fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err) - } + goto retry } + return k.String(), aw, nil, insert() } @@ -152,23 +165,25 @@ func (a *Allocator) ReleasePool(poolID string) error { return types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) +retry: + if err := a.refresh(k.AddressSpace); err != nil { + return err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return err } -retry: - remove, err := cfg.updatePoolDBOnRemoval(k) + remove, err := aSpace.updatePoolDBOnRemoval(k) if err != nil { return err } - if err = cfg.writeToStore(); err != nil { + + if err = a.writeToStore(aSpace); err != nil { if _, ok := err.(types.RetryError); !ok { return types.InternalErrorf("pool (%s) removal failed because of %v", poolID, err) } - if erru := cfg.readFromStore(); erru != nil { - return fmt.Errorf("failed to get updated pool config from datastore (%v) after (%v)", erru, err) - } goto retry } @@ -177,14 +192,14 @@ retry: // Given the address space, returns the local or global PoolConfig based on the // address space is local or global. AddressSpace locality is being registered with IPAM out of band. -func (a *Allocator) getPoolsConfig(addrSpace string) (*PoolsConfig, error) { +func (a *Allocator) getAddrSpace(as string) (*addrSpace, error) { a.Lock() defer a.Unlock() - cfg, ok := a.addrSpace2Configs[addrSpace] + aSpace, ok := a.addrSpaces[as] if !ok { - return nil, types.BadRequestErrorf("cannot find locality of address space: %s", addrSpace) + return nil, types.BadRequestErrorf("cannot find locality of address space: %s", as) } - return cfg, nil + return aSpace, nil } func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool) (*SubnetKey, *net.IPNet, *net.IPNet, *AddressRange, error) { @@ -224,8 +239,14 @@ func (a *Allocator) parsePoolRequest(addressSpace, pool, subPool string, v6 bool return &SubnetKey{AddressSpace: addressSpace, Subnet: nw.String(), ChildSubnet: subPool}, nw, aw, ipr, nil } -func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool *net.IPNet) error { +func (a *Allocator) insertBitMask(key SubnetKey, pool *net.IPNet) error { log.Debugf("Inserting bitmask (%s, %s)", key.String(), pool.String()) + + store := a.getStore(key.AddressSpace) + if store == nil { + return fmt.Errorf("could not find store for address space %s while inserting bit mask", key.AddressSpace) + } + ipVer := getAddressVersion(pool.IP) ones, bits := pool.Mask.Size() numAddresses := uint32(1 << uint(bits-ones)) @@ -252,13 +273,13 @@ func (a *Allocator) insertBitMask(store datastore.DataStore, key SubnetKey, pool return nil } -func (a *Allocator) retrieveBitmask(ds datastore.DataStore, k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) { +func (a *Allocator) retrieveBitmask(k SubnetKey, n *net.IPNet) (*bitseq.Handle, error) { a.Lock() bm, ok := a.addresses[k] a.Unlock() if !ok { log.Debugf("Retrieving bitmask (%s, %s)", k.String(), n.String()) - if err := a.insertBitMask(ds, k, n); err != nil { + if err := a.insertBitMask(k, n); err != nil { return nil, fmt.Errorf("could not find bitmask in datastore for %s", k.String()) } a.Lock() @@ -289,7 +310,7 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error) return nil, fmt.Errorf("no default pool availbale for non-default addresss spaces") } - cfg, err := a.getPoolsConfig(as) + aSpace, err := a.getAddrSpace(as) if err != nil { return nil, err } @@ -298,14 +319,14 @@ func (a *Allocator) getPredefinedPool(as string, ipV6 bool) (*net.IPNet, error) if v != getAddressVersion(nw.IP) { continue } - cfg.Lock() - _, ok := cfg.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}] - cfg.Unlock() + aSpace.Lock() + _, ok := aSpace.subnets[SubnetKey{AddressSpace: as, Subnet: nw.String()}] + aSpace.Unlock() if ok { continue } - if !cfg.contains(as, nw) { + if !aSpace.contains(as, nw) { if as == localAddressSpace { if err := netutils.CheckRouteOverlaps(nw); err == nil { return nw, nil @@ -326,31 +347,35 @@ func (a *Allocator) RequestAddress(poolID string, prefAddress net.IP, opts map[s return nil, nil, types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) + if err := a.refresh(k.AddressSpace); err != nil { + return nil, nil, err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return nil, nil, err } - cfg.Lock() - p, ok := cfg.subnets[k] + aSpace.Lock() + p, ok := aSpace.subnets[k] if !ok { - cfg.Unlock() + aSpace.Unlock() return nil, nil, types.NotFoundErrorf("cannot find address pool for poolID:%s", poolID) } if prefAddress != nil && !p.Pool.Contains(prefAddress) { - cfg.Unlock() + aSpace.Unlock() return nil, nil, ipamapi.ErrIPOutOfRange } c := p for c.Range != nil { k = c.ParentKey - c, ok = cfg.subnets[k] + c, ok = aSpace.subnets[k] } - cfg.Unlock() + aSpace.Unlock() - bm, err := a.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := a.retrieveBitmask(k, c.Pool) if err != nil { return nil, nil, fmt.Errorf("could not find bitmask in datastore for %s on address %v request from pool %s: %v", k.String(), prefAddress, poolID, err) @@ -370,29 +395,33 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error { return types.BadRequestErrorf("invalid pool id: %s", poolID) } - cfg, err := a.getPoolsConfig(k.AddressSpace) + if err := a.refresh(k.AddressSpace); err != nil { + return err + } + + aSpace, err := a.getAddrSpace(k.AddressSpace) if err != nil { return err } - cfg.Lock() - p, ok := cfg.subnets[k] + aSpace.Lock() + p, ok := aSpace.subnets[k] if !ok { - cfg.Unlock() + aSpace.Unlock() return ipamapi.ErrBadPool } if address == nil || !p.Pool.Contains(address) { - cfg.Unlock() + aSpace.Unlock() return ipamapi.ErrInvalidRequest } c := p for c.Range != nil { k = c.ParentKey - c = cfg.subnets[k] + c = aSpace.subnets[k] } - cfg.Unlock() + aSpace.Unlock() mask := p.Pool.Mask if p.Range != nil { @@ -403,7 +432,7 @@ func (a *Allocator) ReleaseAddress(poolID string, address net.IP) error { return fmt.Errorf("failed to release address %s: %v", address.String(), err) } - bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := a.retrieveBitmask(k, c.Pool) if err != nil { return fmt.Errorf("could not find bitmask in datastore for %s on address %v release from pool %s: %v", k.String(), address, poolID, err) @@ -449,23 +478,20 @@ func (a *Allocator) DumpDatabase() string { a.Lock() defer a.Unlock() - s := fmt.Sprintf("\n\nLocal Pool Config") - a.localSubnets.Lock() - for k, config := range a.localSubnets.subnets { - s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) + var s string + for as, aSpace := range a.addrSpaces { + s = fmt.Sprintf("\n\n%s Config", as) + aSpace.Lock() + for k, config := range aSpace.subnets { + s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) + } + aSpace.Unlock() } - a.localSubnets.Unlock() - - s = fmt.Sprintf("%s\n\nGlobal Pool Config", s) - a.globalSubnets.Lock() - for k, config := range a.globalSubnets.subnets { - s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n%v: %v", k, config)) - } - a.globalSubnets.Unlock() s = fmt.Sprintf("%s\n\nBitmasks", s) for k, bm := range a.addresses { s = fmt.Sprintf("%s%s", s, fmt.Sprintf("\n\t%s: %s\n\t%d", k, bm, bm.Unselected())) } + return s } diff --git a/libnetwork/ipam/allocator_test.go b/libnetwork/ipam/allocator_test.go index c6f5573df2..bf5132ee1d 100644 --- a/libnetwork/ipam/allocator_test.go +++ b/libnetwork/ipam/allocator_test.go @@ -11,7 +11,6 @@ import ( "github.com/docker/libkv/store" "github.com/docker/libnetwork/bitseq" - "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/ipamapi" "github.com/docker/libnetwork/netutils" @@ -32,9 +31,9 @@ func randomLocalStore() (datastore.DataStore, error) { if err := tmp.Close(); err != nil { return nil, fmt.Errorf("Error closing temp file: %v", err) } - return datastore.NewDataStore(&config.DatastoreCfg{ + return datastore.NewDataStore(datastore.LocalScope, &datastore.ScopeCfg{ Embedded: true, - Client: config.DatastoreClientCfg{ + Client: datastore.ScopeClientCfg{ Provider: "boltdb", Address: defaultPrefix + tmp.Name(), Config: &store.Config{ @@ -191,7 +190,11 @@ func TestSubnetsMarshal(t *testing.T) { t.Fatal(err) } - cfg := a.localSubnets + cfg, err := a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + ba := cfg.Value() if err := cfg.SetValue(ba); err != nil { t.Fatal(err) @@ -221,7 +224,7 @@ func TestAddSubnets(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["abc"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["abc"] = a.addrSpaces[localAddressSpace] pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false) if err != nil { @@ -290,7 +293,13 @@ func TestAddReleasePoolID(t *testing.T) { if err != nil { t.Fatal(err) } - subnets := a.localSubnets.subnets + + aSpace, err := a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets := aSpace.subnets pid0, _, _, err := a.RequestPool(localAddressSpace, "10.0.0.0/8", "", nil, false) if err != nil { t.Fatalf("Unexpected failure in adding pool") @@ -298,6 +307,14 @@ func TestAddReleasePoolID(t *testing.T) { if err := k0.FromString(pid0); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets + if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -309,6 +326,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := k1.FromString(pid1); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k1].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k1, subnets[k1].RefCount) } @@ -323,6 +347,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := k2.FromString(pid2); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k2].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k2, subnets[k2].RefCount) } @@ -334,12 +365,26 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid1); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } if err := a.ReleasePool(pid0); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -351,6 +396,13 @@ func TestAddReleasePoolID(t *testing.T) { if pid00 != pid0 { t.Fatalf("main pool should still exist") } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 2 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -358,6 +410,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid2); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -365,6 +424,13 @@ func TestAddReleasePoolID(t *testing.T) { if err := a.ReleasePool(pid00); err != nil { t.Fatal(err) } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if bp, ok := subnets[k0]; ok { t.Fatalf("Base pool %s is still present: %v", k0, bp) } @@ -373,6 +439,13 @@ func TestAddReleasePoolID(t *testing.T) { if err != nil { t.Fatalf("Unexpected failure in adding pool") } + + aSpace, err = a.getAddrSpace(localAddressSpace) + if err != nil { + t.Fatal(err) + } + + subnets = aSpace.subnets if subnets[k0].RefCount != 1 { t.Fatalf("Unexpected ref count for %s: %d", k0, subnets[k0].RefCount) } @@ -417,18 +490,6 @@ func TestPredefinedPool(t *testing.T) { if nw != a.predefined[localAddressSpace][i] { t.Fatalf("Unexpected default network returned: %s", nw) } - - i, available, err = getFirstAvailablePool(a, globalAddressSpace, 2) - if err != nil { - t.Skip(err) - } - nw, err = a.getPredefinedPool(globalAddressSpace, false) - if err != nil { - t.Fatal(err) - } - if nw != available { - t.Fatalf("Unexpected default network returned: %s", nw) - } } func getFirstAvailablePool(a *Allocator, as string, atLeast int) (int, *net.IPNet, error) { @@ -475,7 +536,13 @@ func TestRemoveSubnet(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["splane"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["splane"] = &addrSpace{ + id: dsConfigKey + "/" + "splane", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } input := []struct { addrSpace string @@ -512,7 +579,13 @@ func TestGetSameAddress(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["giallo"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["giallo"] = &addrSpace{ + id: dsConfigKey + "/" + "giallo", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } pid, _, _, err := a.RequestPool("giallo", "192.168.100.0/24", "", nil, false) if err != nil { @@ -536,7 +609,13 @@ func TestRequestReleaseAddressFromSubPool(t *testing.T) { if err != nil { t.Fatal(err) } - a.addrSpace2Configs["rosso"] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces["rosso"] = &addrSpace{ + id: dsConfigKey + "/" + "rosso", + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } poolID, _, _, err := a.RequestPool("rosso", "172.28.0.0/16", "172.28.30.0/24", nil, false) if err != nil { @@ -615,17 +694,23 @@ func TestGetAddress(t *testing.T) { func TestRequestSyntaxCheck(t *testing.T) { var ( - pool = "192.168.0.0/16" - subPool = "192.168.0.0/24" - addrSpace = "green" - err error + pool = "192.168.0.0/16" + subPool = "192.168.0.0/24" + as = "green" + err error ) a, err := getAllocator() if err != nil { t.Fatal(err) } - a.addrSpace2Configs[addrSpace] = a.addrSpace2Configs[localAddressSpace] + a.addrSpaces[as] = &addrSpace{ + id: dsConfigKey + "/" + as, + ds: a.addrSpaces[localAddressSpace].ds, + alloc: a.addrSpaces[localAddressSpace].alloc, + scope: a.addrSpaces[localAddressSpace].scope, + subnets: map[SubnetKey]*PoolData{}, + } _, _, _, err = a.RequestPool("", pool, "", nil, false) if err == nil { @@ -637,12 +722,12 @@ func TestRequestSyntaxCheck(t *testing.T) { t.Fatalf("Failed to detect wrong request: empty address space") } - _, _, _, err = a.RequestPool(addrSpace, "", subPool, nil, false) + _, _, _, err = a.RequestPool(as, "", subPool, nil, false) if err == nil { t.Fatalf("Failed to detect wrong request: subPool specified and no pool") } - pid, _, _, err := a.RequestPool(addrSpace, pool, subPool, nil, false) + pid, _, _, err := a.RequestPool(as, pool, subPool, nil, false) if err != nil { t.Fatalf("Unexpected failure: %v", err) } @@ -764,6 +849,7 @@ func TestRelease(t *testing.T) { for i, inp := range toRelease { ip0 := net.ParseIP(inp.address) a.ReleaseAddress(pid, ip0) + bm = a.addresses[SubnetKey{localAddressSpace, subnet, ""}] if bm.Unselected() != 1 { t.Fatalf("Failed to update free address count after release. Expected %d, Found: %d", i+1, bm.Unselected()) } diff --git a/libnetwork/ipam/store.go b/libnetwork/ipam/store.go index 7dc92d1570..f288fca05e 100644 --- a/libnetwork/ipam/store.go +++ b/libnetwork/ipam/store.go @@ -2,30 +2,30 @@ package ipam import ( "encoding/json" + "fmt" log "github.com/Sirupsen/logrus" - "github.com/docker/libkv/store" "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/types" ) // Key provides the Key to be used in KV Store -func (cfg *PoolsConfig) Key() []string { - cfg.Lock() - defer cfg.Unlock() - return []string{cfg.id} +func (aSpace *addrSpace) Key() []string { + aSpace.Lock() + defer aSpace.Unlock() + return []string{aSpace.id} } // KeyPrefix returns the immediate parent key that can be used for tree walk -func (cfg *PoolsConfig) KeyPrefix() []string { - cfg.Lock() - defer cfg.Unlock() +func (aSpace *addrSpace) KeyPrefix() []string { + aSpace.Lock() + defer aSpace.Unlock() return []string{dsConfigKey} } // Value marshals the data to be stored in the KV store -func (cfg *PoolsConfig) Value() []byte { - b, err := json.Marshal(cfg) +func (aSpace *addrSpace) Value() []byte { + b, err := json.Marshal(aSpace) if err != nil { log.Warnf("Failed to marshal ipam configured pools: %v", err) return nil @@ -34,97 +34,94 @@ func (cfg *PoolsConfig) Value() []byte { } // SetValue unmarshalls the data from the KV store. -func (cfg *PoolsConfig) SetValue(value []byte) error { - rc := &PoolsConfig{subnets: make(map[SubnetKey]*PoolData)} +func (aSpace *addrSpace) SetValue(value []byte) error { + rc := &addrSpace{subnets: make(map[SubnetKey]*PoolData)} if err := json.Unmarshal(value, rc); err != nil { return err } - cfg.subnets = rc.subnets + aSpace.subnets = rc.subnets return nil } // Index returns the latest DB Index as seen by this object -func (cfg *PoolsConfig) Index() uint64 { - cfg.Lock() - defer cfg.Unlock() - return cfg.dbIndex +func (aSpace *addrSpace) Index() uint64 { + aSpace.Lock() + defer aSpace.Unlock() + return aSpace.dbIndex } // SetIndex method allows the datastore to store the latest DB Index into this object -func (cfg *PoolsConfig) SetIndex(index uint64) { - cfg.Lock() - cfg.dbIndex = index - cfg.dbExists = true - cfg.Unlock() +func (aSpace *addrSpace) SetIndex(index uint64) { + aSpace.Lock() + aSpace.dbIndex = index + aSpace.dbExists = true + aSpace.Unlock() } // Exists method is true if this object has been stored in the DB. -func (cfg *PoolsConfig) Exists() bool { - cfg.Lock() - defer cfg.Unlock() - return cfg.dbExists +func (aSpace *addrSpace) Exists() bool { + aSpace.Lock() + defer aSpace.Unlock() + return aSpace.dbExists } // Skip provides a way for a KV Object to avoid persisting it in the KV Store -func (cfg *PoolsConfig) Skip() bool { +func (aSpace *addrSpace) Skip() bool { return false } -func (cfg *PoolsConfig) watchForChanges() error { - if cfg.ds == nil { - return nil - } - kvpChan, err := cfg.ds.KVStore().Watch(datastore.Key(cfg.Key()...), nil) - if err != nil { - return err - } - go func() { - for { - select { - case kvPair := <-kvpChan: - if kvPair != nil { - cfg.readFromKey(kvPair) - } - } - } - }() - return nil +func (a *Allocator) getStore(as string) datastore.DataStore { + a.Lock() + defer a.Unlock() + + return a.addrSpaces[as].ds } -func (cfg *PoolsConfig) writeToStore() error { - if cfg.ds == nil { - return nil +func (a *Allocator) getAddressSpaceFromStore(as string) (*addrSpace, error) { + store := a.getStore(as) + if store == nil { + return nil, fmt.Errorf("store for address space %s not found", as) } - err := cfg.ds.PutObjectAtomic(cfg) + + pc := &addrSpace{id: dsConfigKey + "/" + as, ds: store, alloc: a} + if err := store.GetObject(datastore.Key(pc.Key()...), pc); err != nil { + if err == datastore.ErrKeyNotFound { + return nil, nil + } + + return nil, fmt.Errorf("could not get pools config from store: %v", err) + } + + return pc, nil +} + +func (a *Allocator) writeToStore(aSpace *addrSpace) error { + store := aSpace.store() + if store == nil { + return fmt.Errorf("invalid store while trying to write %s address space", aSpace.DataScope()) + } + + err := store.PutObjectAtomic(aSpace) if err == datastore.ErrKeyModified { return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err) } + return err } -func (cfg *PoolsConfig) readFromStore() error { - if cfg.ds == nil { - return nil +func (a *Allocator) deleteFromStore(aSpace *addrSpace) error { + store := aSpace.store() + if store == nil { + return fmt.Errorf("invalid store while trying to delete %s address space", aSpace.DataScope()) } - return cfg.ds.GetObject(datastore.Key(cfg.Key()...), cfg) -} -func (cfg *PoolsConfig) readFromKey(kvPair *store.KVPair) { - if cfg.dbIndex < kvPair.LastIndex { - cfg.SetValue(kvPair.Value) - cfg.dbIndex = kvPair.LastIndex - cfg.dbExists = true - } -} - -func (cfg *PoolsConfig) deleteFromStore() error { - if cfg.ds == nil { - return nil - } - return cfg.ds.DeleteObjectAtomic(cfg) + return store.DeleteObjectAtomic(aSpace) } // DataScope method returns the storage scope of the datastore -func (cfg *PoolsConfig) DataScope() datastore.DataScope { - return cfg.scope +func (aSpace *addrSpace) DataScope() string { + aSpace.Lock() + defer aSpace.Unlock() + + return aSpace.scope } diff --git a/libnetwork/ipam/structures.go b/libnetwork/ipam/structures.go index ddbc7a10e0..71a5690aa0 100644 --- a/libnetwork/ipam/structures.go +++ b/libnetwork/ipam/structures.go @@ -27,13 +27,13 @@ type PoolData struct { RefCount int } -// PoolsConfig contains the pool configurations -type PoolsConfig struct { +// addrSpace contains the pool configurations for the address space +type addrSpace struct { subnets map[SubnetKey]*PoolData dbIndex uint64 dbExists bool id string - scope datastore.DataScope + scope string ds datastore.DataStore alloc *Allocator sync.Mutex @@ -153,18 +153,18 @@ func (p *PoolData) UnmarshalJSON(data []byte) error { return nil } -// MarshalJSON returns the JSON encoding of the PoolsConfig object -func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) { - cfg.Lock() - defer cfg.Unlock() +// MarshalJSON returns the JSON encoding of the addrSpace object +func (aSpace *addrSpace) MarshalJSON() ([]byte, error) { + aSpace.Lock() + defer aSpace.Unlock() m := map[string]interface{}{ - "Scope": string(cfg.scope), + "Scope": string(aSpace.scope), } - if cfg.subnets != nil { + if aSpace.subnets != nil { s := map[string]*PoolData{} - for k, v := range cfg.subnets { + for k, v := range aSpace.subnets { s[k.String()] = v } m["Subnets"] = s @@ -173,10 +173,10 @@ func (cfg *PoolsConfig) MarshalJSON() ([]byte, error) { return json.Marshal(m) } -// UnmarshalJSON decodes data into the PoolsConfig object -func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { - cfg.Lock() - defer cfg.Unlock() +// UnmarshalJSON decodes data into the addrSpace object +func (aSpace *addrSpace) UnmarshalJSON(data []byte) error { + aSpace.Lock() + defer aSpace.Unlock() m := map[string]interface{}{} err := json.Unmarshal(data, &m) @@ -184,10 +184,10 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { return err } - cfg.scope = datastore.LocalScope + aSpace.scope = datastore.LocalScope s := m["Scope"].(string) if s == string(datastore.GlobalScope) { - cfg.scope = datastore.GlobalScope + aSpace.scope = datastore.GlobalScope } if v, ok := m["Subnets"]; ok { @@ -200,31 +200,81 @@ func (cfg *PoolsConfig) UnmarshalJSON(data []byte) error { for ks, v := range s { k := SubnetKey{} k.FromString(ks) - cfg.subnets[k] = v + aSpace.subnets[k] = v } } return nil } -func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) { - cfg.Lock() - defer cfg.Unlock() +// CopyTo deep copies the pool data to the destination pooldata +func (p *PoolData) CopyTo(dstP *PoolData) error { + dstP.ParentKey = p.ParentKey + dstP.Pool = types.GetIPNetCopy(p.Pool) + + if p.Range != nil { + dstP.Range = &AddressRange{} + dstP.Range.Sub = types.GetIPNetCopy(p.Range.Sub) + dstP.Range.Start = p.Range.Start + dstP.Range.End = p.Range.End + } + + dstP.RefCount = p.RefCount + return nil +} + +func (aSpace *addrSpace) CopyTo(o datastore.KVObject) error { + aSpace.Lock() + defer aSpace.Unlock() + + dstAspace := o.(*addrSpace) + + dstAspace.id = aSpace.id + dstAspace.ds = aSpace.ds + dstAspace.alloc = aSpace.alloc + dstAspace.scope = aSpace.scope + dstAspace.dbIndex = aSpace.dbIndex + dstAspace.dbExists = aSpace.dbExists + + dstAspace.subnets = make(map[SubnetKey]*PoolData) + for k, v := range aSpace.subnets { + dstAspace.subnets[k] = &PoolData{} + v.CopyTo(dstAspace.subnets[k]) + } + + return nil +} + +func (aSpace *addrSpace) New() datastore.KVObject { + aSpace.Lock() + defer aSpace.Unlock() + + return &addrSpace{ + id: aSpace.id, + ds: aSpace.ds, + alloc: aSpace.alloc, + scope: aSpace.scope, + } +} + +func (aSpace *addrSpace) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *AddressRange) (func() error, error) { + aSpace.Lock() + defer aSpace.Unlock() // Check if already allocated - if p, ok := cfg.subnets[k]; ok { - cfg.incRefCount(p, 1) + if p, ok := aSpace.subnets[k]; ok { + aSpace.incRefCount(p, 1) return func() error { return nil }, nil } // If master pool, check for overlap if ipr == nil { - if cfg.contains(k.AddressSpace, nw) { + if aSpace.contains(k.AddressSpace, nw) { return nil, ipamapi.ErrPoolOverlap } // This is a new master pool, add it along with corresponding bitmask - cfg.subnets[k] = &PoolData{Pool: nw, RefCount: 1} - return func() error { return cfg.alloc.insertBitMask(cfg.ds, k, nw) }, nil + aSpace.subnets[k] = &PoolData{Pool: nw, RefCount: 1} + return func() error { return aSpace.alloc.insertBitMask(k, nw) }, nil } // This is a new non-master pool @@ -234,38 +284,38 @@ func (cfg *PoolsConfig) updatePoolDBOnAdd(k SubnetKey, nw *net.IPNet, ipr *Addre Range: ipr, RefCount: 1, } - cfg.subnets[k] = p + aSpace.subnets[k] = p // Look for parent pool - pp, ok := cfg.subnets[p.ParentKey] + pp, ok := aSpace.subnets[p.ParentKey] if ok { - cfg.incRefCount(pp, 1) + aSpace.incRefCount(pp, 1) return func() error { return nil }, nil } // Parent pool does not exist, add it along with corresponding bitmask - cfg.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1} - return func() error { return cfg.alloc.insertBitMask(cfg.ds, p.ParentKey, nw) }, nil + aSpace.subnets[p.ParentKey] = &PoolData{Pool: nw, RefCount: 1} + return func() error { return aSpace.alloc.insertBitMask(p.ParentKey, nw) }, nil } -func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) { - cfg.Lock() - defer cfg.Unlock() +func (aSpace *addrSpace) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) { + aSpace.Lock() + defer aSpace.Unlock() - p, ok := cfg.subnets[k] + p, ok := aSpace.subnets[k] if !ok { return nil, ipamapi.ErrBadPool } - cfg.incRefCount(p, -1) + aSpace.incRefCount(p, -1) c := p for ok { if c.RefCount == 0 { - delete(cfg.subnets, k) + delete(aSpace.subnets, k) if c.Range == nil { return func() error { - bm, err := cfg.alloc.retrieveBitmask(cfg.ds, k, c.Pool) + bm, err := aSpace.alloc.retrieveBitmask(k, c.Pool) if err != nil { return fmt.Errorf("could not find bitmask in datastore for pool %s removal: %v", k.String(), err) } @@ -274,24 +324,24 @@ func (cfg *PoolsConfig) updatePoolDBOnRemoval(k SubnetKey) (func() error, error) } } k = c.ParentKey - c, ok = cfg.subnets[k] + c, ok = aSpace.subnets[k] } return func() error { return nil }, nil } -func (cfg *PoolsConfig) incRefCount(p *PoolData, delta int) { +func (aSpace *addrSpace) incRefCount(p *PoolData, delta int) { c := p ok := true for ok { c.RefCount += delta - c, ok = cfg.subnets[c.ParentKey] + c, ok = aSpace.subnets[c.ParentKey] } } // Checks whether the passed subnet is a superset or subset of any of the subset in this config db -func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool { - for k, v := range cfg.subnets { +func (aSpace *addrSpace) contains(space string, nw *net.IPNet) bool { + for k, v := range aSpace.subnets { if space == k.AddressSpace && k.ChildSubnet == "" { if nw.Contains(v.Pool.IP) || v.Pool.Contains(nw.IP) { return true @@ -300,3 +350,10 @@ func (cfg *PoolsConfig) contains(space string, nw *net.IPNet) bool { } return false } + +func (aSpace *addrSpace) store() datastore.DataStore { + aSpace.Lock() + defer aSpace.Unlock() + + return aSpace.ds +}