diff --git a/libnetwork/bitseq/sequence.go b/libnetwork/bitseq/sequence.go index 905deb4cc9..5828d717b2 100644 --- a/libnetwork/bitseq/sequence.go +++ b/libnetwork/bitseq/sequence.go @@ -39,10 +39,26 @@ func NewHandle(app string, ds datastore.DataStore, id string, numElements uint32 Head: &Sequence{ Block: 0x0, Count: getNumBlocks(numElements), - Next: nil, }, } + + if h.store == nil { + return h + } + + // Register for status changes h.watchForChanges() + + // Get the initial status from the ds if present. + // We will be getting an instance without a dbIndex + // (GetObject() does not set it): It is ok for now, + // it will only cause the first allocation on this + // node to go through a retry. + var bs []byte + if err := h.store.GetObject(datastore.Key(h.Key()...), bs); err == nil { + h.Head.FromByteArray(bs) + } + return h } @@ -83,6 +99,19 @@ func (s *Sequence) GetAvailableBit() (bytePos, bitPos int) { return bits / 8, bits % 8 } +// GetCopy returns a copy of the linked list rooted at this node +func (s *Sequence) GetCopy() *Sequence { + n := &Sequence{Block: s.Block, Count: s.Count} + pn := n + ps := s.Next + for ps != nil { + pn.Next = &Sequence{Block: ps.Block, Count: ps.Count} + pn = pn.Next + ps = ps.Next + } + return n +} + // Equal checks if this sequence is equal to the passed one func (s *Sequence) Equal(o *Sequence) bool { this := s @@ -160,10 +189,22 @@ func (h *Handle) CheckIfAvailable(ordinal int) (int, int, error) { // PushReservation pushes the bit reservation inside the bitmask. func (h *Handle) PushReservation(bytePos, bitPos int, release bool) error { + // Create a copy of the current handler h.Lock() - h.Head = PushReservation(bytePos, bitPos, h.Head, release) + nh := &Handle{App: h.App, ID: h.ID, store: h.store, dbIndex: h.dbIndex, Head: h.Head.GetCopy()} h.Unlock() - return h.writeToStore() + + nh.Head = PushReservation(bytePos, bitPos, nh.Head, release) + + err := nh.writeToStore() + if err == nil { + // Commit went through, save locally + h.Lock() + h.Head = nh.Head + h.Unlock() + } + + return err } // GetFirstAvailable looks for the first unset bit in passed mask diff --git a/libnetwork/bitseq/sequence_test.go b/libnetwork/bitseq/sequence_test.go index b0936bf61c..54e505f77b 100644 --- a/libnetwork/bitseq/sequence_test.go +++ b/libnetwork/bitseq/sequence_test.go @@ -87,6 +87,58 @@ func TestSequenceEqual(t *testing.T) { } } +func TestSequenceCopy(t *testing.T) { + s := &Sequence{ + Block: 0x0, + Count: 8, + Next: &Sequence{ + Block: 0x0, + Count: 8, + Next: &Sequence{ + Block: 0x0, + Count: 0, + Next: &Sequence{ + Block: 0x0, + Count: 0, + Next: &Sequence{ + Block: 0x0, + Count: 2, + Next: &Sequence{ + Block: 0x0, + Count: 1, + Next: &Sequence{ + Block: 0x0, + Count: 1, + Next: &Sequence{ + Block: 0x0, + Count: 2, + Next: &Sequence{ + Block: 0x1, + Count: 1, + Next: &Sequence{ + Block: 0x0, + Count: 2, + Next: nil, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } + + n := s.GetCopy() + if !s.Equal(n) { + t.Fatalf("copy of s failed") + } + if n == s { + t.Fatalf("not true copy of s") + } +} + func TestGetFirstAvailable(t *testing.T) { input := []struct { mask *Sequence diff --git a/libnetwork/bitseq/store.go b/libnetwork/bitseq/store.go index 0e22dd96cd..91c5c19f0c 100644 --- a/libnetwork/bitseq/store.go +++ b/libnetwork/bitseq/store.go @@ -19,7 +19,7 @@ func (h *Handle) KeyPrefix() []string { return []string{h.App} } -// Value marshala the data to be stored in the KV store +// Value marshals the data to be stored in the KV store func (h *Handle) Value() []byte { h.Lock() defer h.Unlock() diff --git a/libnetwork/ipam/allocator.go b/libnetwork/ipam/allocator.go index 0e1329f894..0edaab98b7 100644 --- a/libnetwork/ipam/allocator.go +++ b/libnetwork/ipam/allocator.go @@ -6,8 +6,10 @@ import ( "strings" "sync" + log "github.com/Sirupsen/logrus" "github.com/docker/libnetwork/bitseq" "github.com/docker/libnetwork/datastore" + "github.com/docker/libnetwork/types" ) const ( @@ -323,10 +325,22 @@ func (a *Allocator) Release(addrSpace AddressSpace, address net.IP) { // Retrieve correspondent ordinal in the subnet ordinal := ipToInt(getHostPortionIP(address, sub)) // Release it - space.addressMask.PushReservation(ordinal/8, ordinal%8, true) + for { + var err error + if err = space.addressMask.PushReservation(ordinal/8, ordinal%8, true); err == nil { + break + } + if _, ok := err.(types.RetryError); ok { + // bitmask must have changed, retry delete + continue + } + log.Warnf("Failed to release address %s because of internal error: %s", address.String(), err.Error()) + return + } space.freeAddresses++ return } + } } @@ -385,6 +399,7 @@ func (a *Allocator) getSubnetList(addrSpace AddressSpace, ver ipVersion) []subne func (a *Allocator) getAddress(smallSubnet *bitmask, prefAddress net.IP, ver ipVersion) (net.IP, error) { var ( bytePos, bitPos int + ordinal int err error ) // Look for free IP, skip .0 and .255, they will be automatically reserved @@ -395,19 +410,32 @@ again: if prefAddress == nil { bytePos, bitPos, err = smallSubnet.addressMask.GetFirstAvailable() } else { - ordinal := ipToInt(getHostPortionIP(prefAddress, smallSubnet.subnet)) + ordinal = ipToInt(getHostPortionIP(prefAddress, smallSubnet.subnet)) bytePos, bitPos, err = smallSubnet.addressMask.CheckIfAvailable(ordinal) } if err != nil { return nil, ErrNoAvailableIPs } +pushsame: // Lock it - smallSubnet.addressMask.PushReservation(bytePos, bitPos, false) + if err = smallSubnet.addressMask.PushReservation(bytePos, bitPos, false); err != nil { + if _, ok := err.(types.RetryError); !ok { + return nil, fmt.Errorf("internal failure while reserving the address: %s", err.Error()) + } + // bitmask view must have changed. Selected address may or may no longer be available + if prefAddress != nil { + if _, _, err = smallSubnet.addressMask.CheckIfAvailable(ordinal); err == nil { + //still available + goto pushsame + } + goto again + } + } smallSubnet.freeAddresses-- // Build IP ordinal - ordinal := bitPos + bytePos*8 + ordinal = bitPos + bytePos*8 // For v4, let reservation of .0 and .255 happen automatically if ver == v4 && !isValidIP(ordinal) {