1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00
moby--moby/libnetwork/bitseq/sequence.go
Abhinandan Prativadi 6a8a15dd9d Fixing Duplicate IP issue in IPAM library
This commit contains fixes for duplicate IP with 3 issues addressed:
1) Race condition when datastore is not present in cases like swarmkit
2) Byte Offset calculation depending on where the start of the bit
   in the bitsequence is, the offset was adding more bytes to the offset
   when the start of the bit is in the middle of one of the instances in
   a block
3) Finding the available bit was returning the last bit in the curent instance in
   a block if the block is not full and the current bit is after the last
   available bit.

Signed-off-by: Abhinandan Prativadi <abhi@docker.com>
2018-03-08 11:19:37 -08:00

737 lines
20 KiB
Go

// Package bitseq provides a structure and utilities for representing long bitmask
// as sequence of run-length encoded blocks. It operates directly on the encoded
// representation, it does not decode/encode.
package bitseq
import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"sync"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/types"
"github.com/sirupsen/logrus"
)
// block sequence constants
// If needed we can think of making these configurable
const (
blockLen = uint32(32)
blockBytes = uint64(blockLen / 8)
blockMAX = uint32(1<<blockLen - 1)
blockFirstBit = uint32(1) << (blockLen - 1)
invalidPos = uint64(0xFFFFFFFFFFFFFFFF)
)
var (
// ErrNoBitAvailable is returned when no more bits are available to set
ErrNoBitAvailable = errors.New("no bit available")
// ErrBitAllocated is returned when the specific bit requested is already set
ErrBitAllocated = errors.New("requested bit is already allocated")
)
// Handle contains the sequece representing the bitmask and its identifier
type Handle struct {
bits uint64
unselected uint64
head *sequence
app string
id string
dbIndex uint64
dbExists bool
curr uint64
store datastore.DataStore
sync.Mutex
}
// NewHandle returns a thread-safe instance of the bitmask handler
func NewHandle(app string, ds datastore.DataStore, id string, numElements uint64) (*Handle, error) {
h := &Handle{
app: app,
id: id,
store: ds,
bits: numElements,
unselected: numElements,
head: &sequence{
block: 0x0,
count: getNumBlocks(numElements),
},
}
if h.store == nil {
return h, nil
}
// Get the initial status from the ds if present.
if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
return nil, err
}
// If the handle is not in store, write it.
if !h.Exists() {
if err := h.writeToStore(); err != nil {
return nil, fmt.Errorf("failed to write bitsequence to store: %v", err)
}
}
return h, nil
}
// sequence represents a recurring sequence of 32 bits long bitmasks
type sequence struct {
block uint32 // block is a symbol representing 4 byte long allocation bitmask
count uint64 // number of consecutive blocks (symbols)
next *sequence // next sequence
}
// String returns a string representation of the block sequence starting from this block
func (s *sequence) toString() string {
var nextBlock string
if s.next == nil {
nextBlock = "end"
} else {
nextBlock = s.next.toString()
}
return fmt.Sprintf("(0x%x, %d)->%s", s.block, s.count, nextBlock)
}
// GetAvailableBit returns the position of the first unset bit in the bitmask represented by this sequence
func (s *sequence) getAvailableBit(from uint64) (uint64, uint64, error) {
if s.block == blockMAX || s.count == 0 {
return invalidPos, invalidPos, ErrNoBitAvailable
}
bits := from
bitSel := blockFirstBit >> from
for bitSel > 0 && s.block&bitSel != 0 {
bitSel >>= 1
bits++
}
// Check if the loop exited because it could not
// find any available bit int block starting from
// "from". Return invalid pos in that case.
if bitSel == 0 {
return invalidPos, invalidPos, ErrNoBitAvailable
}
return bits / 8, bits % 8, nil
}
// GetCopy returns a copy of the linked list rooted at this node
func (s *sequence) getCopy() *sequence {
n := &sequence{block: s.block, count: s.count}
pn := n
ps := s.next
for ps != nil {
pn.next = &sequence{block: ps.block, count: ps.count}
pn = pn.next
ps = ps.next
}
return n
}
// Equal checks if this sequence is equal to the passed one
func (s *sequence) equal(o *sequence) bool {
this := s
other := o
for this != nil {
if other == nil {
return false
}
if this.block != other.block || this.count != other.count {
return false
}
this = this.next
other = other.next
}
// Check if other is longer than this
if other != nil {
return false
}
return true
}
// ToByteArray converts the sequence into a byte array
func (s *sequence) toByteArray() ([]byte, error) {
var bb []byte
p := s
for p != nil {
b := make([]byte, 12)
binary.BigEndian.PutUint32(b[0:], p.block)
binary.BigEndian.PutUint64(b[4:], p.count)
bb = append(bb, b...)
p = p.next
}
return bb, nil
}
// fromByteArray construct the sequence from the byte array
func (s *sequence) fromByteArray(data []byte) error {
l := len(data)
if l%12 != 0 {
return fmt.Errorf("cannot deserialize byte sequence of length %d (%v)", l, data)
}
p := s
i := 0
for {
p.block = binary.BigEndian.Uint32(data[i : i+4])
p.count = binary.BigEndian.Uint64(data[i+4 : i+12])
i += 12
if i == l {
break
}
p.next = &sequence{}
p = p.next
}
return nil
}
func (h *Handle) getCopy() *Handle {
return &Handle{
bits: h.bits,
unselected: h.unselected,
head: h.head.getCopy(),
app: h.app,
id: h.id,
dbIndex: h.dbIndex,
dbExists: h.dbExists,
store: h.store,
curr: h.curr,
}
}
// SetAnyInRange atomically sets the first unset bit in the specified range in the sequence and returns the corresponding ordinal
func (h *Handle) SetAnyInRange(start, end uint64, serial bool) (uint64, error) {
if end < start || end >= h.bits {
return invalidPos, fmt.Errorf("invalid bit range [%d, %d]", start, end)
}
if h.Unselected() == 0 {
return invalidPos, ErrNoBitAvailable
}
return h.set(0, start, end, true, false, serial)
}
// SetAny atomically sets the first unset bit in the sequence and returns the corresponding ordinal
func (h *Handle) SetAny(serial bool) (uint64, error) {
if h.Unselected() == 0 {
return invalidPos, ErrNoBitAvailable
}
return h.set(0, 0, h.bits-1, true, false, serial)
}
// Set atomically sets the corresponding bit in the sequence
func (h *Handle) Set(ordinal uint64) error {
if err := h.validateOrdinal(ordinal); err != nil {
return err
}
_, err := h.set(ordinal, 0, 0, false, false, false)
return err
}
// Unset atomically unsets the corresponding bit in the sequence
func (h *Handle) Unset(ordinal uint64) error {
if err := h.validateOrdinal(ordinal); err != nil {
return err
}
_, err := h.set(ordinal, 0, 0, false, true, false)
return err
}
// IsSet atomically checks if the ordinal bit is set. In case ordinal
// is outside of the bit sequence limits, false is returned.
func (h *Handle) IsSet(ordinal uint64) bool {
if err := h.validateOrdinal(ordinal); err != nil {
return false
}
h.Lock()
_, _, err := checkIfAvailable(h.head, ordinal)
h.Unlock()
return err != nil
}
func (h *Handle) runConsistencyCheck() bool {
corrupted := false
for p, c := h.head, h.head.next; c != nil; c = c.next {
if c.count == 0 {
corrupted = true
p.next = c.next
continue // keep same p
}
p = c
}
return corrupted
}
// CheckConsistency checks if the bit sequence is in an inconsistent state and attempts to fix it.
// It looks for a corruption signature that may happen in docker 1.9.0 and 1.9.1.
func (h *Handle) CheckConsistency() error {
for {
h.Lock()
store := h.store
h.Unlock()
if store != nil {
if err := store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
return err
}
}
h.Lock()
nh := h.getCopy()
h.Unlock()
if !nh.runConsistencyCheck() {
return nil
}
if err := nh.writeToStore(); err != nil {
if _, ok := err.(types.RetryError); !ok {
return fmt.Errorf("internal failure while fixing inconsistent bitsequence: %v", err)
}
continue
}
logrus.Infof("Fixed inconsistent bit sequence in datastore:\n%s\n%s", h, nh)
h.Lock()
h.head = nh.head
h.Unlock()
return nil
}
}
// set/reset the bit
func (h *Handle) set(ordinal, start, end uint64, any bool, release bool, serial bool) (uint64, error) {
var (
bitPos uint64
bytePos uint64
ret uint64
err error
)
for {
var store datastore.DataStore
curr := uint64(0)
h.Lock()
store = h.store
if store != nil {
h.Unlock() // The lock is acquired in the GetObject
if err := store.GetObject(datastore.Key(h.Key()...), h); err != nil && err != datastore.ErrKeyNotFound {
return ret, err
}
h.Lock() // Acquire the lock back
}
logrus.Debugf("Received set for ordinal %v, start %v, end %v, any %t, release %t, serial:%v curr:%d \n", ordinal, start, end, any, release, serial, h.curr)
if serial {
curr = h.curr
}
// Get position if available
if release {
bytePos, bitPos = ordinalToPos(ordinal)
} else {
if any {
bytePos, bitPos, err = getAvailableFromCurrent(h.head, start, curr, end)
ret = posToOrdinal(bytePos, bitPos)
if err == nil {
h.curr = ret + 1
}
} else {
bytePos, bitPos, err = checkIfAvailable(h.head, ordinal)
ret = ordinal
}
}
if err != nil {
h.Unlock()
return ret, err
}
// Create a private copy of h and work on it
nh := h.getCopy()
nh.head = pushReservation(bytePos, bitPos, nh.head, release)
if release {
nh.unselected++
} else {
nh.unselected--
}
if h.store != nil {
h.Unlock()
// Attempt to write private copy to store
if err := nh.writeToStore(); err != nil {
if _, ok := err.(types.RetryError); !ok {
return ret, fmt.Errorf("internal failure while setting the bit: %v", err)
}
// Retry
continue
}
h.Lock()
}
// Previous atomic push was succesfull. Save private copy to local copy
h.unselected = nh.unselected
h.head = nh.head
h.dbExists = nh.dbExists
h.dbIndex = nh.dbIndex
h.Unlock()
return ret, nil
}
}
// checks is needed because to cover the case where the number of bits is not a multiple of blockLen
func (h *Handle) validateOrdinal(ordinal uint64) error {
h.Lock()
defer h.Unlock()
if ordinal >= h.bits {
return errors.New("bit does not belong to the sequence")
}
return nil
}
// Destroy removes from the datastore the data belonging to this handle
func (h *Handle) Destroy() error {
for {
if err := h.deleteFromStore(); err != nil {
if _, ok := err.(types.RetryError); !ok {
return fmt.Errorf("internal failure while destroying the sequence: %v", err)
}
// Fetch latest
if err := h.store.GetObject(datastore.Key(h.Key()...), h); err != nil {
if err == datastore.ErrKeyNotFound { // already removed
return nil
}
return fmt.Errorf("failed to fetch from store when destroying the sequence: %v", err)
}
continue
}
return nil
}
}
// ToByteArray converts this handle's data into a byte array
func (h *Handle) ToByteArray() ([]byte, error) {
h.Lock()
defer h.Unlock()
ba := make([]byte, 16)
binary.BigEndian.PutUint64(ba[0:], h.bits)
binary.BigEndian.PutUint64(ba[8:], h.unselected)
bm, err := h.head.toByteArray()
if err != nil {
return nil, fmt.Errorf("failed to serialize head: %s", err.Error())
}
ba = append(ba, bm...)
return ba, nil
}
// FromByteArray reads his handle's data from a byte array
func (h *Handle) FromByteArray(ba []byte) error {
if ba == nil {
return errors.New("nil byte array")
}
nh := &sequence{}
err := nh.fromByteArray(ba[16:])
if err != nil {
return fmt.Errorf("failed to deserialize head: %s", err.Error())
}
h.Lock()
h.head = nh
h.bits = binary.BigEndian.Uint64(ba[0:8])
h.unselected = binary.BigEndian.Uint64(ba[8:16])
h.Unlock()
return nil
}
// Bits returns the length of the bit sequence
func (h *Handle) Bits() uint64 {
return h.bits
}
// Unselected returns the number of bits which are not selected
func (h *Handle) Unselected() uint64 {
h.Lock()
defer h.Unlock()
return h.unselected
}
func (h *Handle) String() string {
h.Lock()
defer h.Unlock()
return fmt.Sprintf("App: %s, ID: %s, DBIndex: 0x%x, bits: %d, unselected: %d, sequence: %s",
h.app, h.id, h.dbIndex, h.bits, h.unselected, h.head.toString())
}
// MarshalJSON encodes Handle into json message
func (h *Handle) MarshalJSON() ([]byte, error) {
m := map[string]interface{}{
"id": h.id,
}
b, err := h.ToByteArray()
if err != nil {
return nil, err
}
m["sequence"] = b
return json.Marshal(m)
}
// UnmarshalJSON decodes json message into Handle
func (h *Handle) UnmarshalJSON(data []byte) error {
var (
m map[string]interface{}
b []byte
err error
)
if err = json.Unmarshal(data, &m); err != nil {
return err
}
h.id = m["id"].(string)
bi, _ := json.Marshal(m["sequence"])
if err := json.Unmarshal(bi, &b); err != nil {
return err
}
return h.FromByteArray(b)
}
// getFirstAvailable looks for the first unset bit in passed mask starting from start
func getFirstAvailable(head *sequence, start uint64) (uint64, uint64, error) {
// Find sequence which contains the start bit
byteStart, bitStart := ordinalToPos(start)
current, _, precBlocks, inBlockBytePos := findSequence(head, byteStart)
// Derive the this sequence offsets
byteOffset := byteStart - inBlockBytePos
bitOffset := inBlockBytePos*8 + bitStart
for current != nil {
if current.block != blockMAX {
// If the current block is not full, check if there is any bit
// from the current bit in the current block. If not, before proceeding to the
// next block node, make sure we check for available bit in the next
// instance of the same block. Due to RLE same block signature will be
// compressed.
retry:
bytePos, bitPos, err := current.getAvailableBit(bitOffset)
if err != nil && precBlocks == current.count-1 {
// This is the last instance in the same block node,
// so move to the next block.
goto next
}
if err != nil {
// There are some more instances of the same block, so add the offset
// and be optimistic that you will find the available bit in the next
// instance of the same block.
bitOffset = 0
byteOffset += blockBytes
precBlocks++
goto retry
}
return byteOffset + bytePos, bitPos, err
}
// Moving to next block: Reset bit offset.
next:
bitOffset = 0
byteOffset += (current.count * blockBytes) - (precBlocks * blockBytes)
precBlocks = 0
current = current.next
}
return invalidPos, invalidPos, ErrNoBitAvailable
}
// getAvailableFromCurrent will look for available ordinal from the current ordinal.
// If none found then it will loop back to the start to check of the available bit.
// This can be further optimized to check from start till curr in case of a rollover
func getAvailableFromCurrent(head *sequence, start, curr, end uint64) (uint64, uint64, error) {
var bytePos, bitPos uint64
var err error
if curr != 0 && curr > start {
bytePos, bitPos, err = getFirstAvailable(head, curr)
ret := posToOrdinal(bytePos, bitPos)
if end < ret || err != nil {
goto begin
}
return bytePos, bitPos, nil
}
begin:
bytePos, bitPos, err = getFirstAvailable(head, start)
ret := posToOrdinal(bytePos, bitPos)
if end < ret || err != nil {
return invalidPos, invalidPos, ErrNoBitAvailable
}
return bytePos, bitPos, nil
}
// checkIfAvailable checks if the bit correspondent to the specified ordinal is unset
// If the ordinal is beyond the sequence limits, a negative response is returned
func checkIfAvailable(head *sequence, ordinal uint64) (uint64, uint64, error) {
bytePos, bitPos := ordinalToPos(ordinal)
// Find the sequence containing this byte
current, _, _, inBlockBytePos := findSequence(head, bytePos)
if current != nil {
// Check whether the bit corresponding to the ordinal address is unset
bitSel := blockFirstBit >> (inBlockBytePos*8 + bitPos)
if current.block&bitSel == 0 {
return bytePos, bitPos, nil
}
}
return invalidPos, invalidPos, ErrBitAllocated
}
// Given the byte position and the sequences list head, return the pointer to the
// sequence containing the byte (current), the pointer to the previous sequence,
// the number of blocks preceding the block containing the byte inside the current sequence.
// If bytePos is outside of the list, function will return (nil, nil, 0, invalidPos)
func findSequence(head *sequence, bytePos uint64) (*sequence, *sequence, uint64, uint64) {
// Find the sequence containing this byte
previous := head
current := head
n := bytePos
for current.next != nil && n >= (current.count*blockBytes) { // Nil check for less than 32 addresses masks
n -= (current.count * blockBytes)
previous = current
current = current.next
}
// If byte is outside of the list, let caller know
if n >= (current.count * blockBytes) {
return nil, nil, 0, invalidPos
}
// Find the byte position inside the block and the number of blocks
// preceding the block containing the byte inside this sequence
precBlocks := n / blockBytes
inBlockBytePos := bytePos % blockBytes
return current, previous, precBlocks, inBlockBytePos
}
// PushReservation pushes the bit reservation inside the bitmask.
// Given byte and bit positions, identify the sequence (current) which holds the block containing the affected bit.
// Create a new block with the modified bit according to the operation (allocate/release).
// Create a new sequence containing the new block and insert it in the proper position.
// Remove current sequence if empty.
// Check if new sequence can be merged with neighbour (previous/next) sequences.
//
//
// Identify "current" sequence containing block:
// [prev seq] [current seq] [next seq]
//
// Based on block position, resulting list of sequences can be any of three forms:
//
// block position Resulting list of sequences
// A) block is first in current: [prev seq] [new] [modified current seq] [next seq]
// B) block is last in current: [prev seq] [modified current seq] [new] [next seq]
// C) block is in the middle of current: [prev seq] [curr pre] [new] [curr post] [next seq]
func pushReservation(bytePos, bitPos uint64, head *sequence, release bool) *sequence {
// Store list's head
newHead := head
// Find the sequence containing this byte
current, previous, precBlocks, inBlockBytePos := findSequence(head, bytePos)
if current == nil {
return newHead
}
// Construct updated block
bitSel := blockFirstBit >> (inBlockBytePos*8 + bitPos)
newBlock := current.block
if release {
newBlock &^= bitSel
} else {
newBlock |= bitSel
}
// Quit if it was a redundant request
if current.block == newBlock {
return newHead
}
// Current sequence inevitably looses one block, upadate count
current.count--
// Create new sequence
newSequence := &sequence{block: newBlock, count: 1}
// Insert the new sequence in the list based on block position
if precBlocks == 0 { // First in sequence (A)
newSequence.next = current
if current == head {
newHead = newSequence
previous = newHead
} else {
previous.next = newSequence
}
removeCurrentIfEmpty(&newHead, newSequence, current)
mergeSequences(previous)
} else if precBlocks == current.count { // Last in sequence (B)
newSequence.next = current.next
current.next = newSequence
mergeSequences(current)
} else { // In between the sequence (C)
currPre := &sequence{block: current.block, count: precBlocks, next: newSequence}
currPost := current
currPost.count -= precBlocks
newSequence.next = currPost
if currPost == head {
newHead = currPre
} else {
previous.next = currPre
}
// No merging or empty current possible here
}
return newHead
}
// Removes the current sequence from the list if empty, adjusting the head pointer if needed
func removeCurrentIfEmpty(head **sequence, previous, current *sequence) {
if current.count == 0 {
if current == *head {
*head = current.next
} else {
previous.next = current.next
current = current.next
}
}
}
// Given a pointer to a sequence, it checks if it can be merged with any following sequences
// It stops when no more merging is possible.
// TODO: Optimization: only attempt merge from start to end sequence, no need to scan till the end of the list
func mergeSequences(seq *sequence) {
if seq != nil {
// Merge all what possible from seq
for seq.next != nil && seq.block == seq.next.block {
seq.count += seq.next.count
seq.next = seq.next.next
}
// Move to next
mergeSequences(seq.next)
}
}
func getNumBlocks(numBits uint64) uint64 {
numBlocks := numBits / uint64(blockLen)
if numBits%uint64(blockLen) != 0 {
numBlocks++
}
return numBlocks
}
func ordinalToPos(ordinal uint64) (uint64, uint64) {
return ordinal / 8, ordinal % 8
}
func posToOrdinal(bytePos, bitPos uint64) uint64 {
return bytePos*8 + bitPos
}