1
0
Fork 0
mirror of https://github.com/moby/moby.git synced 2022-11-09 12:21:53 -05:00

Merge pull request #494 from mrjana/bugs

Fix bridge driver panic in CreateNetwork
This commit is contained in:
Madhu Venugopal 2015-09-07 13:17:48 -07:00
commit 82181e0da9
15 changed files with 217 additions and 121 deletions

View file

@ -11,8 +11,8 @@ sudo: false
before_install:
# Symlink below is needed for Travis CI to work correctly on personal forks of libkv
- ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker
- go get code.google.com/p/go.tools/cmd/vet
- go get code.google.com/p/go.tools/cmd/cover
- go get golang.org/x/tools/cmd/vet
- go get golang.org/x/tools/cmd/cover
- go get github.com/mattn/goveralls
- go get github.com/golang/lint/golint
- go get github.com/GeertJohan/fgt

View file

@ -1,11 +1,9 @@
package libkv
import (
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/mock"
"github.com/docker/libkv/store/zookeeper"
)
@ -15,17 +13,15 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error)
var (
// Backend initializers
initializers = map[store.Backend]Initialize{
store.MOCK: mock.InitializeMock,
store.CONSUL: consul.InitializeConsul,
store.ETCD: etcd.InitializeEtcd,
store.ZK: zookeeper.InitializeZookeeper,
store.CONSUL: consul.New,
store.ETCD: etcd.New,
store.ZK: zookeeper.New,
}
)
// NewStore creates a an instance of store
func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) {
if init, exists := initializers[backend]; exists {
log.WithFields(log.Fields{"backend": backend}).Debug("Initializing store service")
return init(addrs, options)
}

View file

@ -7,7 +7,6 @@ import (
"github.com/docker/libkv/store"
"github.com/docker/libkv/store/consul"
"github.com/docker/libkv/store/etcd"
"github.com/docker/libkv/store/mock"
"github.com/docker/libkv/store/zookeeper"
"github.com/stretchr/testify/assert"
)
@ -66,16 +65,6 @@ func TestNewStoreZookeeper(t *testing.T) {
}
}
func TestNewStoreMock(t *testing.T) {
kv, err := NewStore(store.MOCK, []string{}, &store.Config{})
assert.NoError(t, err)
assert.NotNil(t, kv)
if _, ok := kv.(*mock.Mock); !ok {
t.Fatal("Error while initializing mock store")
}
}
func TestNewStoreUnsupported(t *testing.T) {
client := "localhost:9999"

View file

@ -12,7 +12,7 @@ unzip "${CONSUL_VERSION}_linux_amd64.zip"
# make config for minimum ttl
touch config.json
echo "{\"session_ttl_min\": \"2s\"}" >> config.json
echo "{\"session_ttl_min\": \"1s\"}" >> config.json
# check
./consul --version

View file

@ -7,7 +7,6 @@ import (
"sync"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
api "github.com/hashicorp/consul/api"
)
@ -32,9 +31,9 @@ type consulLock struct {
lock *api.Lock
}
// InitializeConsul creates a new Consul client given
// a list of endpoints and optional tls config
func InitializeConsul(endpoints []string, options *store.Config) (store.Store, error) {
// New creates a new Consul client given a list
// of endpoints and optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Consul{}
// Create Consul client
@ -60,7 +59,6 @@ func InitializeConsul(endpoints []string, options *store.Config) (store.Store, e
// Creates a new client
client, err := api.NewClient(config)
if err != nil {
log.Errorf("Couldn't initialize consul client..")
return nil, err
}
s.client = client
@ -101,8 +99,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
if session == "" {
entry := &api.SessionEntry{
Behavior: api.SessionBehaviorDelete,
TTL: s.ephemeralTTL.String(),
Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires
TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x
LockDelay: 1 * time.Millisecond, // Virtually disable lock delay
}
// Create the key session
@ -110,19 +109,19 @@ func (s *Consul) refreshSession(pair *api.KVPair) error {
if err != nil {
return err
}
}
lockOpts := &api.LockOptions{
Key: pair.Key,
Session: session,
}
lockOpts := &api.LockOptions{
Key: pair.Key,
Session: session,
}
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
// Lock and ignore if lock is held
// It's just a placeholder for the
// ephemeral behavior
lock, _ := s.client.LockOpts(lockOpts)
if lock != nil {
lock.Lock(nil)
}
}
_, _, err = s.client.Session().Renew(session, nil)
@ -312,7 +311,6 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*
// Get all the childrens
pairs, meta, err := kv.List(directory, opts)
if err != nil {
log.Errorf("consul: %v", err)
return
}
@ -324,18 +322,18 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*
opts.WaitIndex = meta.LastIndex
// Return children KV pairs to the channel
kv := []*store.KVPair{}
kvpairs := []*store.KVPair{}
for _, pair := range pairs {
if pair.Key == directory {
continue
}
kv = append(kv, &store.KVPair{
kvpairs = append(kvpairs, &store.KVPair{
Key: pair.Key,
Value: pair.Value,
LastIndex: pair.ModifyIndex,
})
}
watchCh <- kv
watchCh <- kvpairs
}
}()
@ -377,11 +375,16 @@ func (l *consulLock) Unlock() error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
p := &api.KVPair{Key: s.normalize(key), Value: value}
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
// Consul interprets ModifyIndex = 0 as new key.
p.ModifyIndex = 0
} else {
p.ModifyIndex = previous.LastIndex
}
p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex}
if work, _, err := s.client.KV().CAS(p, nil); err != nil {
return false, nil, err
} else if !work {

View file

@ -5,13 +5,14 @@ import (
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
"github.com/stretchr/testify/assert"
)
func makeConsulClient(t *testing.T) store.Store {
client := "localhost:8500"
kv, err := InitializeConsul(
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
@ -30,7 +31,7 @@ func TestConsulStore(t *testing.T) {
kv := makeConsulClient(t)
backup := makeConsulClient(t)
store.TestStore(t, kv, backup)
testutils.RunTestStore(t, kv, backup)
}
func TestGetActiveSession(t *testing.T) {

View file

@ -33,9 +33,9 @@ const (
defaultUpdateTime = 5 * time.Second
)
// InitializeEtcd creates a new Etcd client given
// a list of endpoints and an optional tls config
func InitializeEtcd(addrs []string, options *store.Config) (store.Store, error) {
// New creates a new Etcd client given a list
// of endpoints and an optional tls config
func New(addrs []string, options *store.Config) (store.Store, error) {
s := &Etcd{}
entries := store.CreateEndpoints(addrs, "http")
@ -278,11 +278,32 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex)
var meta *etcd.Response
var err error
if previous != nil {
meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex)
} else {
// Interpret previous == nil as Atomic Create
meta, err = s.client.Create(store.Normalize(key), string(value), 0)
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Directory doesn't exist.
if etcdError.ErrorCode == 104 {
// Remove the last element (the actual key)
// and create the full directory path
err = s.createDirectory(store.GetDirectory(key))
if err != nil {
return false, nil, err
}
// Now that the directory is created, create the key
if _, err := s.client.Create(key, string(value), 0); err != nil {
return false, nil, err
}
}
}
}
if err != nil {
if etcdError, ok := err.(*etcd.EtcdError); ok {
// Compare Failed

View file

@ -5,12 +5,13 @@ import (
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
)
func makeEtcdClient(t *testing.T) store.Store {
client := "localhost:4001"
kv, err := InitializeEtcd(
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
@ -29,5 +30,5 @@ func TestEtcdStore(t *testing.T) {
kv := makeEtcdClient(t)
backup := makeEtcdClient(t)
store.TestStore(t, kv, backup)
testutils.RunTestStore(t, kv, backup)
}

View file

@ -16,8 +16,8 @@ type Mock struct {
Options *store.Config
}
// InitializeMock creates a Mock store.
func InitializeMock(endpoints []string, options *store.Config) (store.Store, error) {
// New creates a Mock store
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Mock{}
s.Endpoints = endpoints
s.Options = options

View file

@ -10,14 +10,12 @@ import (
type Backend string
const (
// MOCK backend
MOCK Backend = "mock"
// CONSUL backend
CONSUL = "consul"
CONSUL Backend = "consul"
// ETCD backend
ETCD = "etcd"
ETCD Backend = "etcd"
// ZK backend
ZK = "zk"
ZK Backend = "zk"
)
var (
@ -79,7 +77,8 @@ type Store interface {
// DeleteTree deletes a range of keys under a given directory
DeleteTree(directory string) error
// Atomic operation on a single value
// Atomic CAS operation on a single value.
// Pass previous = nil to create a new key.
AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error)
// Atomic delete of a single value

View file

@ -4,7 +4,6 @@ import (
"strings"
"time"
log "github.com/Sirupsen/logrus"
"github.com/docker/libkv/store"
zk "github.com/samuel/go-zookeeper/zk"
)
@ -25,9 +24,9 @@ type zookeeperLock struct {
value []byte
}
// InitializeZookeeper creates a new Zookeeper client
// given a list of endpoints and an optional tls config
func InitializeZookeeper(endpoints []string, options *store.Config) (store.Store, error) {
// New creates a new Zookeeper client given a
// list of endpoints and an optional tls config
func New(endpoints []string, options *store.Config) (store.Store, error) {
s := &Zookeeper{}
s.timeout = defaultTimeout
@ -41,7 +40,6 @@ func InitializeZookeeper(endpoints []string, options *store.Config) (store.Store
// Connect to Zookeeper
conn, _, err := zk.Connect(endpoints, s.timeout)
if err != nil {
log.Error(err)
return nil, err
}
s.client = conn
@ -267,23 +265,47 @@ func (s *Zookeeper) DeleteTree(directory string) error {
// AtomicPut put a value at "key" if the key has not been
// modified in the meantime, throws an error if this is the case
func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) {
if previous == nil {
return false, nil, store.ErrPreviousNotSpecified
}
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
var lastIndex uint64
if previous != nil {
meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex))
if err != nil {
// Compare Failed
if err == zk.ErrBadVersion {
return false, nil, store.ErrKeyModified
}
return false, nil, err
}
return false, nil, err
lastIndex = uint64(meta.Version)
} else {
// Interpret previous == nil as create operation.
_, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll))
if err != nil {
// Zookeeper will complain if the directory doesn't exist.
if err == zk.ErrNoNode {
// Create the directory
parts := store.SplitKey(key)
parts = parts[:len(parts)-1]
if err = s.createFullPath(parts, false); err != nil {
// Failed to create the directory.
return false, nil, err
}
if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil {
return false, nil, err
}
} else {
// Unhandled error
return false, nil, err
}
}
lastIndex = 0 // Newly created nodes have version 0.
}
pair := &store.KVPair{
Key: key,
Value: value,
LastIndex: uint64(meta.Version),
LastIndex: lastIndex,
}
return true, pair, nil

View file

@ -5,12 +5,13 @@ import (
"time"
"github.com/docker/libkv/store"
"github.com/docker/libkv/testutils"
)
func makeZkClient(t *testing.T) store.Store {
client := "localhost:2181"
kv, err := InitializeZookeeper(
kv, err := New(
[]string{client},
&store.Config{
ConnectionTimeout: 3 * time.Second,
@ -29,5 +30,5 @@ func TestZkStore(t *testing.T) {
kv := makeZkClient(t)
backup := makeZkClient(t)
store.TestStore(t, kv, backup)
testutils.RunTestStore(t, kv, backup)
}

View file

@ -1,19 +1,21 @@
package store
package testutils
import (
"testing"
"time"
"github.com/docker/libkv/store"
"github.com/stretchr/testify/assert"
)
// TestStore is an helper testing method that is
// RunTestStore is an helper testing method that is
// called by each K/V backend sub-package testing
func TestStore(t *testing.T, kv Store, backup Store) {
func RunTestStore(t *testing.T, kv store.Store, backup store.Store) {
testPutGetDelete(t, kv)
testWatch(t, kv)
testWatchTree(t, kv)
testAtomicPut(t, kv)
testAtomicPutCreate(t, kv)
testAtomicDelete(t, kv)
testLockUnlock(t, kv)
testPutEphemeral(t, kv, backup)
@ -21,7 +23,7 @@ func TestStore(t *testing.T, kv Store, backup Store) {
testDeleteTree(t, kv)
}
func testPutGetDelete(t *testing.T, kv Store) {
func testPutGetDelete(t *testing.T, kv store.Store) {
key := "foo"
value := []byte("bar")
@ -48,7 +50,7 @@ func testPutGetDelete(t *testing.T, kv Store) {
assert.Nil(t, pair)
}
func testWatch(t *testing.T, kv Store) {
func testWatch(t *testing.T, kv store.Store) {
key := "hello"
value := []byte("world")
newValue := []byte("world!")
@ -105,7 +107,7 @@ func testWatch(t *testing.T, kv Store) {
}
}
func testWatchTree(t *testing.T, kv Store) {
func testWatchTree(t *testing.T, kv store.Store) {
dir := "tree"
node1 := "tree/node1"
@ -159,7 +161,7 @@ func testWatchTree(t *testing.T, kv Store) {
}
}
func testAtomicPut(t *testing.T, kv Store) {
func testAtomicPut(t *testing.T, kv store.Store) {
key := "hello"
value := []byte("world")
@ -176,7 +178,7 @@ func testAtomicPut(t *testing.T, kv Store) {
assert.Equal(t, pair.Value, value)
assert.NotEqual(t, pair.LastIndex, 0)
// This CAS should fail: no previous
// This CAS should fail: previous exists.
success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil)
assert.Error(t, err)
assert.False(t, success)
@ -186,14 +188,48 @@ func testAtomicPut(t *testing.T, kv Store) {
assert.NoError(t, err)
assert.True(t, success)
// This CAS should fail
// This CAS should fail, key exists.
pair.LastIndex = 0
success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil)
assert.Error(t, err)
assert.False(t, success)
}
func testAtomicDelete(t *testing.T, kv Store) {
func testAtomicPutCreate(t *testing.T, kv store.Store) {
// Use a key in a new directory to ensure Stores will create directories
// that don't yet exist.
key := "put/create"
value := []byte("putcreate")
// AtomicPut the key, previous = nil indicates create.
success, _, err := kv.AtomicPut(key, value, nil, nil)
assert.NoError(t, err)
assert.True(t, success)
// Get should return the value and an incremented index
pair, err := kv.Get(key)
assert.NoError(t, err)
if assert.NotNil(t, pair) {
assert.NotNil(t, pair.Value)
}
assert.Equal(t, pair.Value, value)
// Attempting to create again should fail.
success, _, err = kv.AtomicPut(key, value, nil, nil)
assert.Error(t, err)
assert.False(t, success)
// This CAS should succeed, since it has the value from Get()
success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil)
assert.NoError(t, err)
assert.True(t, success)
// Delete the key, ensures runs of the test don't interfere with each other.
err = kv.DeleteTree("put")
assert.NoError(t, err)
}
func testAtomicDelete(t *testing.T, kv store.Store) {
key := "atomic"
value := []byte("world")
@ -225,12 +261,12 @@ func testAtomicDelete(t *testing.T, kv Store) {
assert.True(t, success)
}
func testLockUnlock(t *testing.T, kv Store) {
func testLockUnlock(t *testing.T, kv store.Store) {
key := "foo"
value := []byte("bar")
// We should be able to create a new lock on key
lock, err := kv.NewLock(key, &LockOptions{Value: value})
lock, err := kv.NewLock(key, &store.LockOptions{Value: value})
assert.NoError(t, err)
assert.NotNil(t, lock)
@ -262,7 +298,7 @@ func testLockUnlock(t *testing.T, kv Store) {
assert.NotEqual(t, pair.LastIndex, 0)
}
func testPutEphemeral(t *testing.T, kv Store, otherConn Store) {
func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) {
firstKey := "first"
firstValue := []byte("foo")
@ -270,11 +306,11 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) {
secondValue := []byte("bar")
// Put the first key with the Ephemeral flag
err := otherConn.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true})
err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Put a second key with the Ephemeral flag
err = otherConn.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true})
err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true})
assert.NoError(t, err)
// Get on firstKey should work
@ -291,7 +327,7 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) {
otherConn.Close()
// Let the session expire
time.Sleep(5 * time.Second)
time.Sleep(3 * time.Second)
// Get on firstKey shouldn't work
pair, err = kv.Get(firstKey)
@ -304,7 +340,7 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) {
assert.Nil(t, pair)
}
func testList(t *testing.T, kv Store) {
func testList(t *testing.T, kv store.Store) {
prefix := "nodes"
firstKey := "nodes/first"
@ -344,7 +380,7 @@ func testList(t *testing.T, kv Store) {
assert.Nil(t, pairs)
}
func testDeleteTree(t *testing.T, kv Store) {
func testDeleteTree(t *testing.T, kv store.Store) {
prefix := "nodes"
firstKey := "nodes/first"

View file

@ -98,6 +98,7 @@ type bridgeNetwork struct {
type driver struct {
config *configuration
configured bool
network *bridgeNetwork
natChain *iptables.ChainInfo
filterChain *iptables.ChainInfo
@ -108,7 +109,7 @@ type driver struct {
// New constructs a new bridge driver
func newDriver() driverapi.Driver {
ipAllocator = ipallocator.New()
return &driver{networks: map[string]*bridgeNetwork{}}
return &driver{networks: map[string]*bridgeNetwork{}, config: &configuration{}}
}
// Init registers a new instance of bridge driver
@ -433,29 +434,26 @@ func (d *driver) Config(option map[string]interface{}) error {
d.Lock()
defer d.Unlock()
if d.config != nil {
if d.configured {
return &ErrConfigExists{}
}
genericData, ok := option[netlabel.GenericData]
if ok && genericData != nil {
switch opt := genericData.(type) {
case options.Generic:
opaqueConfig, err := options.GenerateFromModel(opt, &configuration{})
if err != nil {
return err
}
config = opaqueConfig.(*configuration)
case *configuration:
config = opt
default:
return &ErrInvalidDriverConfig{}
}
if !ok || genericData == nil {
return nil
}
d.config = config
} else {
config = &configuration{}
d.config = config
switch opt := genericData.(type) {
case options.Generic:
opaqueConfig, err := options.GenerateFromModel(opt, &configuration{})
if err != nil {
return err
}
config = opaqueConfig.(*configuration)
case *configuration:
config = opt
default:
return &ErrInvalidDriverConfig{}
}
if config.EnableIPForwarding {
@ -467,9 +465,13 @@ func (d *driver) Config(option map[string]interface{}) error {
if config.EnableIPTables {
d.natChain, d.filterChain, err = setupIPChains(config)
return err
if err != nil {
return err
}
}
d.configured = true
d.config = config
return nil
}
@ -566,12 +568,20 @@ func (d *driver) getNetworks() []*bridgeNetwork {
// Create a new network using bridge plugin
func (d *driver) CreateNetwork(id string, option map[string]interface{}) error {
var err error
var (
err error
configLocked bool
)
defer osl.InitOSContext()()
// Sanity checks
d.Lock()
if !d.configured {
configLocked = true
d.configured = true
}
if _, ok := d.networks[id]; ok {
d.Unlock()
return types.ForbiddenErrorf("network %s exists", id)
@ -610,6 +620,10 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}) error {
defer func() {
if err != nil {
d.Lock()
if configLocked {
d.configured = false
}
delete(d.networks, id)
d.Unlock()
}
@ -651,7 +665,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}) error {
bridgeSetup.queueStep(setupBridgeIPv4)
enableIPv6Forwarding := false
if d.config != nil && d.config.EnableIPForwarding && config.FixedCIDRv6 != nil {
if d.config.EnableIPForwarding && config.FixedCIDRv6 != nil {
enableIPv6Forwarding = true
}

View file

@ -68,6 +68,19 @@ func TestCreateFullOptions(t *testing.T) {
}
}
func TestCreateNoConfig(t *testing.T) {
defer osl.SetupTestOSContext(t)()
d := newDriver()
netconfig := &networkConfiguration{BridgeName: DefaultBridgeName}
genericOption := make(map[string]interface{})
genericOption[netlabel.GenericData] = netconfig
if err := d.CreateNetwork("dummy", genericOption); err != nil {
t.Fatalf("Failed to create bridge: %v", err)
}
}
func TestCreate(t *testing.T) {
defer testutils.SetupTestOSContext(t)()
d := newDriver()