From c395cf2eb6f0aaa1644849aeca31db0797ed3df1 Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Mon, 15 Jun 2015 11:43:02 -0700 Subject: [PATCH] Datastore additions to bitmask management Signed-off-by: Madhu Venugopal --- libnetwork/bitseq/sequence.go | 23 ++++--- libnetwork/bitseq/store.go | 100 ++++++++++++++++++++++++++++++ libnetwork/idm/idm.go | 5 +- libnetwork/idm/idm_test.go | 8 +-- libnetwork/ipam/allocator.go | 8 ++- libnetwork/ipam/allocator_test.go | 14 ++--- 6 files changed, 136 insertions(+), 22 deletions(-) create mode 100644 libnetwork/bitseq/store.go diff --git a/libnetwork/bitseq/sequence.go b/libnetwork/bitseq/sequence.go index f3539c5de2..905deb4cc9 100644 --- a/libnetwork/bitseq/sequence.go +++ b/libnetwork/bitseq/sequence.go @@ -7,6 +7,7 @@ import ( "fmt" "sync" + "github.com/docker/libnetwork/datastore" "github.com/docker/libnetwork/netutils" ) @@ -21,21 +22,28 @@ const ( // Handle contains the sequece representing the bitmask and its identifier type Handle struct { - ID string - Head *Sequence + App string + ID string + Head *Sequence + store datastore.DataStore + dbIndex uint64 sync.Mutex } // NewHandle returns a thread-safe instance of the bitmask handler -func NewHandle(id string, numElements uint32) *Handle { - return &Handle{ - ID: id, +func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32) *Handle { + h := &Handle{ + App: app, + ID: id, + store: ds, Head: &Sequence{ Block: 0x0, Count: getNumBlocks(numElements), Next: nil, }, } + h.watchForChanges() + return h } // Sequence reresents a recurring sequence of 32 bits long bitmasks @@ -151,10 +159,11 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) { } // PushReservation pushes the bit reservation inside the bitmask. -func (h *Handle) PushReservation(bytePos, bitPos int, release bool) { +func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error { h.Lock() - defer h.Unlock() h.Head = PushReservation(bytePos, bitPos, h.Head, release) + h.Unlock() + return h.writeToStore() } // GetFirstAvailable looks for the first unset bit in passed mask diff --git a/libnetwork/bitseq/store.go b/libnetwork/bitseq/store.go new file mode 100644 index 0000000000..0e22dd96cd --- /dev/null +++ b/libnetwork/bitseq/store.go @@ -0,0 +1,100 @@ +package bitseq + +import ( + "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/types" +) + +// Key provides the Key to be used in KV Store +func (h *Handle) Key() []string { + h.Lock() + defer h.Unlock() + return []string{h.App, h.ID} +} + +// KeyPrefix returns the immediate parent key that can be used for tree walk +func (h *Handle) KeyPrefix() []string { + h.Lock() + defer h.Unlock() + return []string{h.App} +} + +// Value marshala the data to be stored in the KV store +func (h *Handle) Value() []byte { + h.Lock() + defer h.Unlock() + head := h.Head + if head == nil { + return []byte{} + } + b, err := head.ToByteArray() + if err != nil { + return []byte{} + } + return b +} + +// Index returns the latest DB Index as seen by this object +func (h *Handle) Index() uint64 { + h.Lock() + defer h.Unlock() + return h.dbIndex +} + +// SetIndex method allows the datastore to store the latest DB Index into this object +func (h *Handle) SetIndex(index uint64) { + h.Lock() + h.dbIndex = index + h.Unlock() +} + +func (h *Handle) watchForChanges() error { + h.Lock() + store := h.store + h.Unlock() + + if store == nil { + return nil + } + + kvpChan, err := store.KVStore().Watch(datastore.Key(h.Key()...), nil) + if err != nil { + return err + } + go func() { + for { + select { + case kvPair := <-kvpChan: + h.Lock() + h.dbIndex = kvPair.LastIndex + h.Head.FromByteArray(kvPair.Value) + h.Unlock() + } + } + }() + return nil +} + +func (h *Handle) writeToStore() error { + h.Lock() + store := h.store + h.Unlock() + if store == nil { + return nil + } + err := store.PutObjectAtomic(h) + if err == datastore.ErrKeyModified { + return types.RetryErrorf("failed to perform atomic write (%v). retry might fix the error", err) + } + return err +} + +func (h *Handle) deleteFromStore() error { + h.Lock() + store := h.store + h.Unlock() + if store == nil { + return nil + } + return store.DeleteObjectAtomic(h) +} diff --git a/libnetwork/idm/idm.go b/libnetwork/idm/idm.go index f9b1c1e1dd..784bb80f79 100644 --- a/libnetwork/idm/idm.go +++ b/libnetwork/idm/idm.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/docker/libnetwork/bitseq" + "github.com/docker/libnetwork/datastore" ) // Idm manages the reservation/release of numerical ids from a contiguos set @@ -15,14 +16,14 @@ type Idm struct { } // New returns an instance of id manager for a set of [start-end] numerical ids -func New(id string, start, end uint32) (*Idm, error) { +func New(ds datastore.DataStore, id string, start, end uint32) (*Idm, error) { if id == "" { return nil, fmt.Errorf("Invalid id") } if end <= start { return nil, fmt.Errorf("Invalid set range: [%d, %d]", start, end) } - return &Idm{start: start, end: end, handle: bitseq.NewHandle(id, 1+end-start)}, nil + return &Idm{start: start, end: end, handle: bitseq.NewHandle("idm", ds, id, uint32(1+end-start))}, nil } // GetID returns the first available id in the set diff --git a/libnetwork/idm/idm_test.go b/libnetwork/idm/idm_test.go index 9f94694531..1004cb8bb5 100644 --- a/libnetwork/idm/idm_test.go +++ b/libnetwork/idm/idm_test.go @@ -5,17 +5,17 @@ import ( ) func TestNew(t *testing.T) { - _, err := New("", 0, 1) + _, err := New(nil, "", 0, 1) if err == nil { t.Fatalf("Expected failure, but succeeded") } - _, err = New("myset", 1<<10, 0) + _, err = New(nil, "myset", 1<<10, 0) if err == nil { t.Fatalf("Expected failure, but succeeded") } - i, err := New("myset", 0, 10) + i, err := New(nil, "myset", 0, 10) if err != nil { t.Fatalf("Unexpected failure: %v", err) } @@ -31,7 +31,7 @@ func TestNew(t *testing.T) { } func TestAllocate(t *testing.T) { - i, err := New("myids", 50, 52) + i, err := New(nil, "myids", 50, 52) if err != nil { t.Fatal(err) } diff --git a/libnetwork/ipam/allocator.go b/libnetwork/ipam/allocator.go index ce1a121ce7..250053320c 100644 --- a/libnetwork/ipam/allocator.go +++ b/libnetwork/ipam/allocator.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/docker/libnetwork/bitseq" + "github.com/docker/libnetwork/datastore" ) const ( @@ -26,15 +27,18 @@ type Allocator struct { subnetsInfo map[subnetKey]*SubnetInfo // Allocated addresses in each address space's internal subnet addresses map[subnetKey]*bitmask + // Datastore + store datastore.DataStore sync.Mutex } // NewAllocator returns an instance of libnetwork ipam -func NewAllocator() *Allocator { +func NewAllocator(ds datastore.DataStore) *Allocator { a := &Allocator{} a.subnetsInfo = make(map[subnetKey]*SubnetInfo) a.addresses = make(map[subnetKey]*bitmask) a.internalHostSize = defaultInternalHostSize + a.store = ds return a } @@ -102,7 +106,7 @@ func (a *Allocator) AddSubnet(addrSpace AddressSpace, subnetInfo *SubnetInfo) er a.Lock() a.addresses[smallKey] = &bitmask{ subnet: sub, - addressMask: bitseq.NewHandle(smallKey.String(), uint32(numAddresses)), + addressMask: bitseq.NewHandle("ipam", a.store, smallKey.String(), uint32(numAddresses)), freeAddresses: numAddresses, } a.Unlock() diff --git a/libnetwork/ipam/allocator_test.go b/libnetwork/ipam/allocator_test.go index 32bc0d9e1c..453e93e22d 100644 --- a/libnetwork/ipam/allocator_test.go +++ b/libnetwork/ipam/allocator_test.go @@ -10,7 +10,7 @@ import ( ) func getAllocator(subnet *net.IPNet) *Allocator { - a := NewAllocator() + a := NewAllocator(nil) a.AddSubnet("default", &SubnetInfo{Subnet: subnet}) return a } @@ -58,7 +58,7 @@ func TestGetAddressVersion(t *testing.T) { } func TestAddSubnets(t *testing.T) { - a := NewAllocator() + a := NewAllocator(nil) _, sub0, _ := net.ParseCIDR("10.0.0.0/8") err := a.AddSubnet("default", &SubnetInfo{Subnet: sub0}) @@ -133,7 +133,7 @@ func TestAdjustAndCheckSubnet(t *testing.T) { } func TestRemoveSubnet(t *testing.T) { - a := NewAllocator() + a := NewAllocator(nil) input := []struct { addrSpace AddressSpace @@ -247,7 +247,7 @@ func TestGetAddress(t *testing.T) { } func TestGetSubnetList(t *testing.T) { - a := NewAllocator() + a := NewAllocator(nil) input := []struct { addrSpace AddressSpace subnet string @@ -295,7 +295,7 @@ func TestGetSubnetList(t *testing.T) { func TestRequestSyntaxCheck(t *testing.T) { var ( - a = NewAllocator() + a = NewAllocator(nil) subnet = "192.168.0.0/16" addSpace = AddressSpace("green") ) @@ -462,7 +462,7 @@ func assertGetAddress(t *testing.T, subnet string) { bm := &bitmask{ subnet: sub, - addressMask: bitseq.NewHandle("default/192.168.0.0/24", uint32(numAddresses)), + addressMask: bitseq.NewHandle("ipam_test", nil, "default/192.168.0.0/24", uint32(numAddresses)), freeAddresses: numAddresses, } numBlocks := bm.addressMask.Head.Count @@ -513,7 +513,7 @@ func assertNRequests(t *testing.T, subnet string, numReq int, lastExpectedIP str func benchmarkRequest(subnet *net.IPNet) { var err error - a := NewAllocator() + a := NewAllocator(nil) a.internalHostSize = 20 a.AddSubnet("default", &SubnetInfo{Subnet: subnet})