From 005d8f1f5240c2f3e40ab8850c1d3c33e9e2be5a Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Thu, 3 Sep 2015 22:11:45 -0700 Subject: [PATCH 1/2] Fix bridge driver panic in CreateNetwork Bridge driver panics in `CreateNetwork` if called without a prior `Config` call. This causes issues in dnet which tries to create network using default driver configuration. It should be valid to call `CreateNetwork` without a prior `Config` call in which case we need to assume default driver config. Fixed this by properly initializing the driver config pointer. Also introduced a `configured` bool to make sure that still `Config` is called exactly once for the instance of the bridge driver. Signed-off-by: Jana Radhakrishnan --- libnetwork/drivers/bridge/bridge.go | 58 +++++++++++++++--------- libnetwork/drivers/bridge/bridge_test.go | 13 ++++++ 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/libnetwork/drivers/bridge/bridge.go b/libnetwork/drivers/bridge/bridge.go index 1f340892f0..10a72d684c 100644 --- a/libnetwork/drivers/bridge/bridge.go +++ b/libnetwork/drivers/bridge/bridge.go @@ -98,6 +98,7 @@ type bridgeNetwork struct { type driver struct { config *configuration + configured bool network *bridgeNetwork natChain *iptables.ChainInfo filterChain *iptables.ChainInfo @@ -108,7 +109,7 @@ type driver struct { // New constructs a new bridge driver func newDriver() driverapi.Driver { ipAllocator = ipallocator.New() - return &driver{networks: map[string]*bridgeNetwork{}} + return &driver{networks: map[string]*bridgeNetwork{}, config: &configuration{}} } // Init registers a new instance of bridge driver @@ -433,29 +434,26 @@ func (d *driver) Config(option map[string]interface{}) error { d.Lock() defer d.Unlock() - if d.config != nil { + if d.configured { return &ErrConfigExists{} } genericData, ok := option[netlabel.GenericData] - if ok && genericData != nil { - switch opt := genericData.(type) { - case options.Generic: - opaqueConfig, err := options.GenerateFromModel(opt, &configuration{}) - if err != nil { - return err - } - config = opaqueConfig.(*configuration) - case *configuration: - config = opt - default: - return &ErrInvalidDriverConfig{} - } + if !ok || genericData == nil { + return nil + } - d.config = config - } else { - config = &configuration{} - d.config = config + switch opt := genericData.(type) { + case options.Generic: + opaqueConfig, err := options.GenerateFromModel(opt, &configuration{}) + if err != nil { + return err + } + config = opaqueConfig.(*configuration) + case *configuration: + config = opt + default: + return &ErrInvalidDriverConfig{} } if config.EnableIPForwarding { @@ -467,9 +465,13 @@ func (d *driver) Config(option map[string]interface{}) error { if config.EnableIPTables { d.natChain, d.filterChain, err = setupIPChains(config) - return err + if err != nil { + return err + } } + d.configured = true + d.config = config return nil } @@ -566,12 +568,20 @@ func (d *driver) getNetworks() []*bridgeNetwork { // Create a new network using bridge plugin func (d *driver) CreateNetwork(id string, option map[string]interface{}) error { - var err error + var ( + err error + configLocked bool + ) defer osl.InitOSContext()() // Sanity checks d.Lock() + if !d.configured { + configLocked = true + d.configured = true + } + if _, ok := d.networks[id]; ok { d.Unlock() return types.ForbiddenErrorf("network %s exists", id) @@ -610,6 +620,10 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}) error { defer func() { if err != nil { d.Lock() + if configLocked { + d.configured = false + } + delete(d.networks, id) d.Unlock() } @@ -651,7 +665,7 @@ func (d *driver) CreateNetwork(id string, option map[string]interface{}) error { bridgeSetup.queueStep(setupBridgeIPv4) enableIPv6Forwarding := false - if d.config != nil && d.config.EnableIPForwarding && config.FixedCIDRv6 != nil { + if d.config.EnableIPForwarding && config.FixedCIDRv6 != nil { enableIPv6Forwarding = true } diff --git a/libnetwork/drivers/bridge/bridge_test.go b/libnetwork/drivers/bridge/bridge_test.go index 5efc7e3da3..7ee9250bd9 100644 --- a/libnetwork/drivers/bridge/bridge_test.go +++ b/libnetwork/drivers/bridge/bridge_test.go @@ -68,6 +68,19 @@ func TestCreateFullOptions(t *testing.T) { } } +func TestCreateNoConfig(t *testing.T) { + defer osl.SetupTestOSContext(t)() + d := newDriver() + + netconfig := &networkConfiguration{BridgeName: DefaultBridgeName} + genericOption := make(map[string]interface{}) + genericOption[netlabel.GenericData] = netconfig + + if err := d.CreateNetwork("dummy", genericOption); err != nil { + t.Fatalf("Failed to create bridge: %v", err) + } +} + func TestCreate(t *testing.T) { defer osl.SetupTestOSContext(t)() d := newDriver() From 46a773fba60f9c8c6cc70fe7c3e4edadad1c78aa Mon Sep 17 00:00:00 2001 From: Jana Radhakrishnan Date: Sat, 5 Sep 2015 20:49:20 -0700 Subject: [PATCH 2/2] Fix libkv source out-of-sync with Godeps Looks like the libkv version vendored in really not in sync with the git hash value in Godeps.json. The commit https://github.com/docker/libnetwork/commit/04bd8f67ad8bcd96e6c9ddf49745d9eea1eae7aa has just updated the Godeps.json without update the source. Dnet in multi-host testing is broken due to this, while docker mult-host functionality works because the correct version of libkv has been vendored in docker/docker. Signed-off-by: Jana Radhakrishnan --- .../src/github.com/docker/libkv/.travis.yml | 4 +- .../src/github.com/docker/libkv/libkv.go | 10 +-- .../src/github.com/docker/libkv/libkv_test.go | 11 --- .../docker/libkv/script/travis_consul.sh | 2 +- .../docker/libkv/store/consul/consul.go | 51 ++++++------- .../docker/libkv/store/consul/consul_test.go | 5 +- .../docker/libkv/store/etcd/etcd.go | 35 +++++++-- .../docker/libkv/store/etcd/etcd_test.go | 5 +- .../docker/libkv/store/mock/mock.go | 4 +- .../github.com/docker/libkv/store/store.go | 11 ++- .../docker/libkv/store/zookeeper/zookeeper.go | 52 ++++++++++---- .../libkv/store/zookeeper/zookeeper_test.go | 5 +- .../store-test.go => testutils/utils.go} | 72 ++++++++++++++----- 13 files changed, 168 insertions(+), 99 deletions(-) rename libnetwork/Godeps/_workspace/src/github.com/docker/libkv/{store/store-test.go => testutils/utils.go} (79%) diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/.travis.yml b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/.travis.yml index d704b7cf4b..ae27049309 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/.travis.yml +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/.travis.yml @@ -11,8 +11,8 @@ sudo: false before_install: # Symlink below is needed for Travis CI to work correctly on personal forks of libkv - ln -s $HOME/gopath/src/github.com/${TRAVIS_REPO_SLUG///libkv/} $HOME/gopath/src/github.com/docker - - go get code.google.com/p/go.tools/cmd/vet - - go get code.google.com/p/go.tools/cmd/cover + - go get golang.org/x/tools/cmd/vet + - go get golang.org/x/tools/cmd/cover - go get github.com/mattn/goveralls - go get github.com/golang/lint/golint - go get github.com/GeertJohan/fgt diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go index d91c41950b..28df703afc 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go @@ -1,11 +1,9 @@ package libkv import ( - log "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" "github.com/docker/libkv/store/consul" "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/mock" "github.com/docker/libkv/store/zookeeper" ) @@ -15,17 +13,15 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error) var ( // Backend initializers initializers = map[store.Backend]Initialize{ - store.MOCK: mock.InitializeMock, - store.CONSUL: consul.InitializeConsul, - store.ETCD: etcd.InitializeEtcd, - store.ZK: zookeeper.InitializeZookeeper, + store.CONSUL: consul.New, + store.ETCD: etcd.New, + store.ZK: zookeeper.New, } ) // NewStore creates a an instance of store func NewStore(backend store.Backend, addrs []string, options *store.Config) (store.Store, error) { if init, exists := initializers[backend]; exists { - log.WithFields(log.Fields{"backend": backend}).Debug("Initializing store service") return init(addrs, options) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go index 17b9287582..8b2ae0e6cc 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go @@ -7,7 +7,6 @@ import ( "github.com/docker/libkv/store" "github.com/docker/libkv/store/consul" "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/mock" "github.com/docker/libkv/store/zookeeper" "github.com/stretchr/testify/assert" ) @@ -66,16 +65,6 @@ func TestNewStoreZookeeper(t *testing.T) { } } -func TestNewStoreMock(t *testing.T) { - kv, err := NewStore(store.MOCK, []string{}, &store.Config{}) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*mock.Mock); !ok { - t.Fatal("Error while initializing mock store") - } -} - func TestNewStoreUnsupported(t *testing.T) { client := "localhost:9999" diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh index 5268c4326f..3899711637 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/script/travis_consul.sh @@ -12,7 +12,7 @@ unzip "${CONSUL_VERSION}_linux_amd64.zip" # make config for minimum ttl touch config.json -echo "{\"session_ttl_min\": \"2s\"}" >> config.json +echo "{\"session_ttl_min\": \"1s\"}" >> config.json # check ./consul --version diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go index 1b4fac5cb9..ebabde5ffa 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go @@ -7,7 +7,6 @@ import ( "sync" "time" - log "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" api "github.com/hashicorp/consul/api" ) @@ -32,9 +31,9 @@ type consulLock struct { lock *api.Lock } -// InitializeConsul creates a new Consul client given -// a list of endpoints and optional tls config -func InitializeConsul(endpoints []string, options *store.Config) (store.Store, error) { +// New creates a new Consul client given a list +// of endpoints and optional tls config +func New(endpoints []string, options *store.Config) (store.Store, error) { s := &Consul{} // Create Consul client @@ -60,7 +59,6 @@ func InitializeConsul(endpoints []string, options *store.Config) (store.Store, e // Creates a new client client, err := api.NewClient(config) if err != nil { - log.Errorf("Couldn't initialize consul client..") return nil, err } s.client = client @@ -101,8 +99,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { if session == "" { entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, - TTL: s.ephemeralTTL.String(), + Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires + TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay } // Create the key session @@ -110,19 +109,19 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { if err != nil { return err } - } - lockOpts := &api.LockOptions{ - Key: pair.Key, - Session: session, - } + lockOpts := &api.LockOptions{ + Key: pair.Key, + Session: session, + } - // Lock and ignore if lock is held - // It's just a placeholder for the - // ephemeral behavior - lock, _ := s.client.LockOpts(lockOpts) - if lock != nil { - lock.Lock(nil) + // Lock and ignore if lock is held + // It's just a placeholder for the + // ephemeral behavior + lock, _ := s.client.LockOpts(lockOpts) + if lock != nil { + lock.Lock(nil) + } } _, _, err = s.client.Session().Renew(session, nil) @@ -312,7 +311,6 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []* // Get all the childrens pairs, meta, err := kv.List(directory, opts) if err != nil { - log.Errorf("consul: %v", err) return } @@ -324,18 +322,18 @@ func (s *Consul) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []* opts.WaitIndex = meta.LastIndex // Return children KV pairs to the channel - kv := []*store.KVPair{} + kvpairs := []*store.KVPair{} for _, pair := range pairs { if pair.Key == directory { continue } - kv = append(kv, &store.KVPair{ + kvpairs = append(kvpairs, &store.KVPair{ Key: pair.Key, Value: pair.Value, LastIndex: pair.ModifyIndex, }) } - watchCh <- kv + watchCh <- kvpairs } }() @@ -377,11 +375,16 @@ func (l *consulLock) Unlock() error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Consul) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { + + p := &api.KVPair{Key: s.normalize(key), Value: value} + if previous == nil { - return false, nil, store.ErrPreviousNotSpecified + // Consul interprets ModifyIndex = 0 as new key. + p.ModifyIndex = 0 + } else { + p.ModifyIndex = previous.LastIndex } - p := &api.KVPair{Key: s.normalize(key), Value: value, ModifyIndex: previous.LastIndex} if work, _, err := s.client.KV().CAS(p, nil); err != nil { return false, nil, err } else if !work { diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go index 38888e4ba4..26e20bb471 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go @@ -5,13 +5,14 @@ import ( "time" "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" "github.com/stretchr/testify/assert" ) func makeConsulClient(t *testing.T) store.Store { client := "localhost:8500" - kv, err := InitializeConsul( + kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, @@ -30,7 +31,7 @@ func TestConsulStore(t *testing.T) { kv := makeConsulClient(t) backup := makeConsulClient(t) - store.TestStore(t, kv, backup) + testutils.RunTestStore(t, kv, backup) } func TestGetActiveSession(t *testing.T) { diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index f1f32e7b96..30d2e08314 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -33,9 +33,9 @@ const ( defaultUpdateTime = 5 * time.Second ) -// InitializeEtcd creates a new Etcd client given -// a list of endpoints and an optional tls config -func InitializeEtcd(addrs []string, options *store.Config) (store.Store, error) { +// New creates a new Etcd client given a list +// of endpoints and an optional tls config +func New(addrs []string, options *store.Config) (store.Store, error) { s := &Etcd{} entries := store.CreateEndpoints(addrs, "http") @@ -278,11 +278,32 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Etcd) AtomicPut(key string, value []byte, previous *store.KVPair, options *store.WriteOptions) (bool, *store.KVPair, error) { - if previous == nil { - return false, nil, store.ErrPreviousNotSpecified - } - meta, err := s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + var meta *etcd.Response + var err error + if previous != nil { + meta, err = s.client.CompareAndSwap(store.Normalize(key), string(value), 0, "", previous.LastIndex) + } else { + // Interpret previous == nil as Atomic Create + meta, err = s.client.Create(store.Normalize(key), string(value), 0) + if etcdError, ok := err.(*etcd.EtcdError); ok { + + // Directory doesn't exist. + if etcdError.ErrorCode == 104 { + // Remove the last element (the actual key) + // and create the full directory path + err = s.createDirectory(store.GetDirectory(key)) + if err != nil { + return false, nil, err + } + + // Now that the directory is created, create the key + if _, err := s.client.Create(key, string(value), 0); err != nil { + return false, nil, err + } + } + } + } if err != nil { if etcdError, ok := err.(*etcd.EtcdError); ok { // Compare Failed diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go index 7e9db93a0f..73d1b06f97 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go @@ -5,12 +5,13 @@ import ( "time" "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" ) func makeEtcdClient(t *testing.T) store.Store { client := "localhost:4001" - kv, err := InitializeEtcd( + kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, @@ -29,5 +30,5 @@ func TestEtcdStore(t *testing.T) { kv := makeEtcdClient(t) backup := makeEtcdClient(t) - store.TestStore(t, kv, backup) + testutils.RunTestStore(t, kv, backup) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go index bd6283b73b..9f63e50ae0 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/mock/mock.go @@ -16,8 +16,8 @@ type Mock struct { Options *store.Config } -// InitializeMock creates a Mock store. -func InitializeMock(endpoints []string, options *store.Config) (store.Store, error) { +// New creates a Mock store +func New(endpoints []string, options *store.Config) (store.Store, error) { s := &Mock{} s.Endpoints = endpoints s.Options = options diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 0c8d0aa06d..49e2eb9dcd 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -10,14 +10,12 @@ import ( type Backend string const ( - // MOCK backend - MOCK Backend = "mock" // CONSUL backend - CONSUL = "consul" + CONSUL Backend = "consul" // ETCD backend - ETCD = "etcd" + ETCD Backend = "etcd" // ZK backend - ZK = "zk" + ZK Backend = "zk" ) var ( @@ -79,7 +77,8 @@ type Store interface { // DeleteTree deletes a range of keys under a given directory DeleteTree(directory string) error - // Atomic operation on a single value + // Atomic CAS operation on a single value. + // Pass previous = nil to create a new key. AtomicPut(key string, value []byte, previous *KVPair, options *WriteOptions) (bool, *KVPair, error) // Atomic delete of a single value diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index 316a2ed8d7..d12edf6792 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -4,7 +4,6 @@ import ( "strings" "time" - log "github.com/Sirupsen/logrus" "github.com/docker/libkv/store" zk "github.com/samuel/go-zookeeper/zk" ) @@ -25,9 +24,9 @@ type zookeeperLock struct { value []byte } -// InitializeZookeeper creates a new Zookeeper client -// given a list of endpoints and an optional tls config -func InitializeZookeeper(endpoints []string, options *store.Config) (store.Store, error) { +// New creates a new Zookeeper client given a +// list of endpoints and an optional tls config +func New(endpoints []string, options *store.Config) (store.Store, error) { s := &Zookeeper{} s.timeout = defaultTimeout @@ -41,7 +40,6 @@ func InitializeZookeeper(endpoints []string, options *store.Config) (store.Store // Connect to Zookeeper conn, _, err := zk.Connect(endpoints, s.timeout) if err != nil { - log.Error(err) return nil, err } s.client = conn @@ -267,23 +265,47 @@ func (s *Zookeeper) DeleteTree(directory string) error { // AtomicPut put a value at "key" if the key has not been // modified in the meantime, throws an error if this is the case func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, _ *store.WriteOptions) (bool, *store.KVPair, error) { - if previous == nil { - return false, nil, store.ErrPreviousNotSpecified - } - meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) - if err != nil { - // Compare Failed - if err == zk.ErrBadVersion { - return false, nil, store.ErrKeyModified + var lastIndex uint64 + if previous != nil { + meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) + if err != nil { + // Compare Failed + if err == zk.ErrBadVersion { + return false, nil, store.ErrKeyModified + } + return false, nil, err } - return false, nil, err + lastIndex = uint64(meta.Version) + } else { + // Interpret previous == nil as create operation. + _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + // Zookeeper will complain if the directory doesn't exist. + if err == zk.ErrNoNode { + // Create the directory + parts := store.SplitKey(key) + parts = parts[:len(parts)-1] + if err = s.createFullPath(parts, false); err != nil { + // Failed to create the directory. + return false, nil, err + } + if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + return false, nil, err + } + + } else { + // Unhandled error + return false, nil, err + } + } + lastIndex = 0 // Newly created nodes have version 0. } pair := &store.KVPair{ Key: key, Value: value, - LastIndex: uint64(meta.Version), + LastIndex: lastIndex, } return true, pair, nil diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go index cb31832adb..759297e542 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go @@ -5,12 +5,13 @@ import ( "time" "github.com/docker/libkv/store" + "github.com/docker/libkv/testutils" ) func makeZkClient(t *testing.T) store.Store { client := "localhost:2181" - kv, err := InitializeZookeeper( + kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, @@ -29,5 +30,5 @@ func TestZkStore(t *testing.T) { kv := makeZkClient(t) backup := makeZkClient(t) - store.TestStore(t, kv, backup) + testutils.RunTestStore(t, kv, backup) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store-test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go similarity index 79% rename from libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store-test.go rename to libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index 3de108e529..717d9ecdc2 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store-test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -1,19 +1,21 @@ -package store +package testutils import ( "testing" "time" + "github.com/docker/libkv/store" "github.com/stretchr/testify/assert" ) -// TestStore is an helper testing method that is +// RunTestStore is an helper testing method that is // called by each K/V backend sub-package testing -func TestStore(t *testing.T, kv Store, backup Store) { +func RunTestStore(t *testing.T, kv store.Store, backup store.Store) { testPutGetDelete(t, kv) testWatch(t, kv) testWatchTree(t, kv) testAtomicPut(t, kv) + testAtomicPutCreate(t, kv) testAtomicDelete(t, kv) testLockUnlock(t, kv) testPutEphemeral(t, kv, backup) @@ -21,7 +23,7 @@ func TestStore(t *testing.T, kv Store, backup Store) { testDeleteTree(t, kv) } -func testPutGetDelete(t *testing.T, kv Store) { +func testPutGetDelete(t *testing.T, kv store.Store) { key := "foo" value := []byte("bar") @@ -48,7 +50,7 @@ func testPutGetDelete(t *testing.T, kv Store) { assert.Nil(t, pair) } -func testWatch(t *testing.T, kv Store) { +func testWatch(t *testing.T, kv store.Store) { key := "hello" value := []byte("world") newValue := []byte("world!") @@ -105,7 +107,7 @@ func testWatch(t *testing.T, kv Store) { } } -func testWatchTree(t *testing.T, kv Store) { +func testWatchTree(t *testing.T, kv store.Store) { dir := "tree" node1 := "tree/node1" @@ -159,7 +161,7 @@ func testWatchTree(t *testing.T, kv Store) { } } -func testAtomicPut(t *testing.T, kv Store) { +func testAtomicPut(t *testing.T, kv store.Store) { key := "hello" value := []byte("world") @@ -176,7 +178,7 @@ func testAtomicPut(t *testing.T, kv Store) { assert.Equal(t, pair.Value, value) assert.NotEqual(t, pair.LastIndex, 0) - // This CAS should fail: no previous + // This CAS should fail: previous exists. success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil) assert.Error(t, err) assert.False(t, success) @@ -186,14 +188,48 @@ func testAtomicPut(t *testing.T, kv Store) { assert.NoError(t, err) assert.True(t, success) - // This CAS should fail + // This CAS should fail, key exists. pair.LastIndex = 0 success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) } -func testAtomicDelete(t *testing.T, kv Store) { +func testAtomicPutCreate(t *testing.T, kv store.Store) { + // Use a key in a new directory to ensure Stores will create directories + // that don't yet exist. + key := "put/create" + value := []byte("putcreate") + + // AtomicPut the key, previous = nil indicates create. + success, _, err := kv.AtomicPut(key, value, nil, nil) + assert.NoError(t, err) + assert.True(t, success) + + // Get should return the value and an incremented index + pair, err := kv.Get(key) + assert.NoError(t, err) + if assert.NotNil(t, pair) { + assert.NotNil(t, pair.Value) + } + assert.Equal(t, pair.Value, value) + + // Attempting to create again should fail. + success, _, err = kv.AtomicPut(key, value, nil, nil) + assert.Error(t, err) + assert.False(t, success) + + // This CAS should succeed, since it has the value from Get() + success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) + assert.NoError(t, err) + assert.True(t, success) + + // Delete the key, ensures runs of the test don't interfere with each other. + err = kv.DeleteTree("put") + assert.NoError(t, err) +} + +func testAtomicDelete(t *testing.T, kv store.Store) { key := "atomic" value := []byte("world") @@ -225,12 +261,12 @@ func testAtomicDelete(t *testing.T, kv Store) { assert.True(t, success) } -func testLockUnlock(t *testing.T, kv Store) { +func testLockUnlock(t *testing.T, kv store.Store) { key := "foo" value := []byte("bar") // We should be able to create a new lock on key - lock, err := kv.NewLock(key, &LockOptions{Value: value}) + lock, err := kv.NewLock(key, &store.LockOptions{Value: value}) assert.NoError(t, err) assert.NotNil(t, lock) @@ -262,7 +298,7 @@ func testLockUnlock(t *testing.T, kv Store) { assert.NotEqual(t, pair.LastIndex, 0) } -func testPutEphemeral(t *testing.T, kv Store, otherConn Store) { +func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) { firstKey := "first" firstValue := []byte("foo") @@ -270,11 +306,11 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) { secondValue := []byte("bar") // Put the first key with the Ephemeral flag - err := otherConn.Put(firstKey, firstValue, &WriteOptions{Ephemeral: true}) + err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true}) assert.NoError(t, err) // Put a second key with the Ephemeral flag - err = otherConn.Put(secondKey, secondValue, &WriteOptions{Ephemeral: true}) + err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true}) assert.NoError(t, err) // Get on firstKey should work @@ -291,7 +327,7 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) { otherConn.Close() // Let the session expire - time.Sleep(5 * time.Second) + time.Sleep(3 * time.Second) // Get on firstKey shouldn't work pair, err = kv.Get(firstKey) @@ -304,7 +340,7 @@ func testPutEphemeral(t *testing.T, kv Store, otherConn Store) { assert.Nil(t, pair) } -func testList(t *testing.T, kv Store) { +func testList(t *testing.T, kv store.Store) { prefix := "nodes" firstKey := "nodes/first" @@ -344,7 +380,7 @@ func testList(t *testing.T, kv Store) { assert.Nil(t, pairs) } -func testDeleteTree(t *testing.T, kv Store) { +func testDeleteTree(t *testing.T, kv store.Store) { prefix := "nodes" firstKey := "nodes/first"