From ebbca4814e63216e30a80b484f9734defabede96 Mon Sep 17 00:00:00 2001 From: Chun Chen Date: Wed, 16 Sep 2015 18:15:56 +0800 Subject: [PATCH] Update libkv to latest commit Signed-off-by: Chun Chen --- libnetwork/Godeps/Godeps.json | 2 +- .../src/github.com/docker/libkv/README.md | 20 +- .../src/github.com/docker/libkv/libkv.go | 90 +++++++- .../src/github.com/docker/libkv/libkv_test.go | 58 +---- .../docker/libkv/store/consul/consul.go | 55 +++-- .../docker/libkv/store/consul/consul_test.go | 28 ++- .../docker/libkv/store/etcd/etcd.go | 96 +++++--- .../docker/libkv/store/etcd/etcd_test.go | 28 ++- .../github.com/docker/libkv/store/store.go | 17 +- .../docker/libkv/store/zookeeper/zookeeper.go | 77 ++++--- .../libkv/store/zookeeper/zookeeper_test.go | 28 ++- .../docker/libkv/testutils/utils.go | 210 ++++++++++++------ libnetwork/datastore/datastore.go | 9 + 13 files changed, 468 insertions(+), 250 deletions(-) diff --git a/libnetwork/Godeps/Godeps.json b/libnetwork/Godeps/Godeps.json index fd6ee0e641..7bb699da8b 100644 --- a/libnetwork/Godeps/Godeps.json +++ b/libnetwork/Godeps/Godeps.json @@ -76,7 +76,7 @@ }, { "ImportPath": "github.com/docker/libkv", - "Rev": "60c7c881345b3c67defc7f93a8297debf041d43c" + "Rev": "a0a57ed3755665e9a402a3df315402134eb6625f" }, { "ImportPath": "github.com/godbus/dbus", diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md index d09b34c1ea..941c96925f 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/README.md @@ -12,7 +12,7 @@ For example, you can use it to store your metadata or for service discovery to r You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package). -As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` and `BoltDB`. ## Example of usage @@ -24,17 +24,23 @@ package main import ( "fmt" "time" - + "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" log "github.com/Sirupsen/logrus" ) +func init() { + // Register consul store to libkv + consul.Register() +} + func main() { client := "localhost:8500" // Initialize a new store with consul - kv, err = libkv.NewStore( + kv, err := libkv.NewStore( store.CONSUL, // or "consul" []string{client}, &store.Config{ @@ -62,11 +68,13 @@ func main() { You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories. -## Details +## Warning -You should expect the same experience for basic operations like `Get`/`Put`, etc. +There are a few consistency issues with *etcd*, on the notion of *directory* and *key*. If you want to use the three KV backends in an interchangeable way, you should only put data on leaves (see [Issue 20](https://github.com/docker/libkv/issues/20) for more details). This will be fixed when *etcd* API v3 will be made available (API v3 drops the *directory/key* distinction). An official release for *libkv* with a tag is likely to come after this issue being marked as **solved**. -However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). +Other than that, you should expect the same experience for basic operations like `Get`/`Put`, etc. + +Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**). ## Create a new storage backend diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go index 28df703afc..7b3d0584ba 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv.go @@ -1,10 +1,72 @@ +// Package libkv provides a Go native library to store metadata. +// +// The goal of libkv is to abstract common store operations for multiple +// Key/Value backends and offer the same experience no matter which one of the +// backend you want to use. +// +// For example, you can use it to store your metadata or for service discovery to +// register machines and endpoints inside your cluster. +// +// As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +// +// ## Example of usage +// +// ### Create a new store and use Put/Get +// +// +// package main +// +// import ( +// "fmt" +// "time" +// +// "github.com/docker/libkv" +// "github.com/docker/libkv/store" +// log "github.com/Sirupsen/logrus" +// ) +// +// func main() { +// client := "localhost:8500" +// +// // Initialize a new store with consul +// kv, err := libkv.NewStore( +// store.CONSUL, // or "consul" +// []string{client}, +// &store.Config{ +// ConnectionTimeout: 10*time.Second, +// }, +// ) +// if err != nil { +// log.Fatal("Cannot create store consul") +// } +// +// key := "foo" +// err = kv.Put(key, []byte("bar"), nil) +// if err != nil { +// log.Error("Error trying to put value at key `", key, "`") +// } +// +// pair, err := kv.Get(key) +// if err != nil { +// log.Error("Error trying accessing value at key `", key, "`") +// } +// +// log.Info("value: ", string(pair.Value)) +// } +// +// ##Copyright and license +// +// Code and documentation copyright 2015 Docker, inc. Code released under the +// Apache 2.0 license. Docs released under Creative commons. +// package libkv import ( + "fmt" + "sort" + "strings" + "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" ) // Initialize creates a new Store object, initializing the client @@ -12,11 +74,16 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error) var ( // Backend initializers - initializers = map[store.Backend]Initialize{ - store.CONSUL: consul.New, - store.ETCD: etcd.New, - store.ZK: zookeeper.New, - } + initializers = make(map[store.Backend]Initialize) + + supportedBackend = func() string { + keys := make([]string, 0, len(initializers)) + for k := range initializers { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return strings.Join(keys, ", ") + }() ) // NewStore creates a an instance of store @@ -25,5 +92,10 @@ func NewStore(backend store.Backend, addrs []string, options *store.Config) (sto return init(addrs, options) } - return nil, store.ErrNotSupported + return nil, fmt.Errorf("%s %s", store.ErrNotSupported.Error(), supportedBackend) +} + +// AddStore adds a new store backend to libkv +func AddStore(store store.Backend, init Initialize) { + initializers[store] = init } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go index 8b2ae0e6cc..fe7af6b06c 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/libkv_test.go @@ -5,66 +5,9 @@ import ( "time" "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" "github.com/stretchr/testify/assert" ) -func TestNewStoreConsul(t *testing.T) { - client := "localhost:8500" - - kv, err := NewStore( - store.CONSUL, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*consul.Consul); !ok { - t.Fatal("Error while initializing store consul") - } -} - -func TestNewStoreEtcd(t *testing.T) { - client := "localhost:4001" - - kv, err := NewStore( - store.ETCD, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*etcd.Etcd); !ok { - t.Fatal("Error while initializing store etcd") - } -} - -func TestNewStoreZookeeper(t *testing.T) { - client := "localhost:2181" - - kv, err := NewStore( - store.ZK, - []string{client}, - &store.Config{ - ConnectionTimeout: 10 * time.Second, - }, - ) - assert.NoError(t, err) - assert.NotNil(t, kv) - - if _, ok := kv.(*zookeeper.Zookeeper); !ok { - t.Fatal("Error while initializing store zookeeper") - } -} - func TestNewStoreUnsupported(t *testing.T) { client := "localhost:9999" @@ -77,4 +20,5 @@ func TestNewStoreUnsupported(t *testing.T) { ) assert.Error(t, err) assert.Nil(t, kv) + assert.Equal(t, "Backend storage not supported yet, please choose one of ", err.Error()) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go index ebabde5ffa..76762ce54a 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul.go @@ -2,11 +2,13 @@ package consul import ( "crypto/tls" + "errors" "net/http" "strings" "sync" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" api "github.com/hashicorp/consul/api" ) @@ -18,22 +20,36 @@ const ( DefaultWatchWaitTime = 15 * time.Second ) +var ( + // ErrMultipleEndpointsUnsupported is thrown when there are + // multiple endpoints specified for Consul + ErrMultipleEndpointsUnsupported = errors.New("consul does not support multiple endpoints") +) + // Consul is the receiver type for the // Store interface type Consul struct { sync.Mutex - config *api.Config - client *api.Client - ephemeralTTL time.Duration + config *api.Config + client *api.Client } type consulLock struct { lock *api.Lock } +// Register registers consul to libkv +func Register() { + libkv.AddStore(store.CONSUL, New) +} + // New creates a new Consul client given a list // of endpoints and optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { + if len(endpoints) > 1 { + return nil, ErrMultipleEndpointsUnsupported + } + s := &Consul{} // Create Consul client @@ -51,9 +67,6 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Creates a new client @@ -79,18 +92,13 @@ func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } -// SetEphemeralTTL sets the ttl for ephemeral nodes -func (s *Consul) setEphemeralTTL(ttl time.Duration) { - s.ephemeralTTL = ttl -} - // Normalize the key for usage in Consul func (s *Consul) normalize(key string) string { key = store.Normalize(key) return strings.TrimPrefix(key, "/") } -func (s *Consul) refreshSession(pair *api.KVPair) error { +func (s *Consul) refreshSession(pair *api.KVPair, ttl time.Duration) error { // Check if there is any previous session with an active TTL session, err := s.getActiveSession(pair.Key) if err != nil { @@ -99,9 +107,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { if session == "" { entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires - TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x - LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires + TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay } // Create the key session @@ -126,7 +134,7 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { _, _, err = s.client.Session().Renew(session, nil) if err != nil { - return s.refreshSession(pair) + return s.refreshSession(pair, ttl) } return nil } @@ -174,9 +182,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { Value: value, } - if opts != nil && opts.Ephemeral { + if opts != nil && opts.TTL > 0 { // Create or refresh the session - err := s.refreshSession(p) + err := s.refreshSession(p, opts.TTL) if err != nil { return err } @@ -188,6 +196,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Consul) Delete(key string) error { + if _, err := s.Get(key); err != nil { + return err + } _, err := s.client.KV().Delete(s.normalize(key), nil) return err } @@ -195,7 +206,10 @@ func (s *Consul) Delete(key string) error { // Exists checks that the key exists inside the store func (s *Consul) Exists(key string) (bool, error) { _, err := s.Get(key) - if err != nil && err == store.ErrKeyNotFound { + if err != nil { + if err == store.ErrKeyNotFound { + return false, nil + } return false, err } return true, nil @@ -229,6 +243,9 @@ func (s *Consul) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Consul) DeleteTree(directory string) error { + if _, err := s.List(directory); err != nil { + return err + } _, err := s.client.KV().DeleteTree(s.normalize(directory), nil) return err } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go index 26e20bb471..a15bf9e91d 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/consul/consul_test.go @@ -4,19 +4,22 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" "github.com/stretchr/testify/assert" ) +var ( + client = "localhost:8500" +) + func makeConsulClient(t *testing.T) store.Store { - client := "localhost:8500" kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -27,11 +30,28 @@ func makeConsulClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.CONSUL, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Consul); !ok { + t.Fatal("Error registering and initializing consul") + } +} + func TestConsulStore(t *testing.T) { kv := makeConsulClient(t) backup := makeConsulClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } func TestGetActiveSession(t *testing.T) { @@ -43,7 +63,7 @@ func TestGetActiveSession(t *testing.T) { value := []byte("bar") // Put the first key with the Ephemeral flag - err := kv.Put(key, value, &store.WriteOptions{Ephemeral: true}) + err := kv.Put(key, value, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Session should not be empty diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go index 30d2e08314..92c239e460 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd.go @@ -8,14 +8,14 @@ import ( "time" etcd "github.com/coreos/go-etcd/etcd" + "github.com/docker/libkv" "github.com/docker/libkv/store" ) // Etcd is the receiver type for the // Store interface type Etcd struct { - client *etcd.Client - ephemeralTTL time.Duration + client *etcd.Client } type etcdLock struct { @@ -33,6 +33,11 @@ const ( defaultUpdateTime = 5 * time.Second ) +// Register registers etcd to libkv +func Register() { + libkv.AddStore(store.ETCD, New) +} + // New creates a new Etcd client given a list // of endpoints and an optional tls config func New(addrs []string, options *store.Config) (store.Store, error) { @@ -49,9 +54,6 @@ func New(addrs []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Periodic SyncCluster @@ -93,12 +95,6 @@ func (s *Etcd) setTimeout(time time.Duration) { s.client.SetDialTimeout(time) } -// setEphemeralHeartbeat sets the heartbeat value to notify -// that a node is alive -func (s *Etcd) setEphemeralTTL(time time.Duration) { - s.ephemeralTTL = time -} - // createDirectory creates the entire path for a directory // that does not exist func (s *Etcd) createDirectory(path string) error { @@ -120,11 +116,8 @@ func (s *Etcd) createDirectory(path string) error { func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { result, err := s.client.Get(store.Normalize(key), false, false) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return nil, store.ErrKeyNotFound - } + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound } return nil, err } @@ -143,8 +136,8 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Default TTL = 0 means no expiration var ttl uint64 - if opts != nil && opts.Ephemeral { - ttl = uint64(s.ephemeralTTL.Seconds()) + if opts != nil && opts.TTL > 0 { + ttl = uint64(opts.TTL.Seconds()) } if _, err := s.client.Set(key, string(value), ttl); err != nil { @@ -173,14 +166,17 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Etcd) Delete(key string) error { _, err := s.client.Delete(store.Normalize(key), false) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Etcd) Exists(key string) (bool, error) { - entry, err := s.Get(key) - if err != nil && entry != nil { - if err == store.ErrKeyNotFound || entry.Value == nil { + _, err := s.Get(key) + if err != nil { + if err == store.ErrKeyNotFound { return false, nil } return false, err @@ -194,12 +190,6 @@ func (s *Etcd) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the current value - current, err := s.Get(key) - if err != nil { - return nil, err - } - // Start an etcd watch. // Note: etcd will send the current value through the channel. etcdWatchCh := make(chan *etcd.Response) @@ -212,12 +202,23 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, go func() { defer close(watchCh) + // Get the current value + current, err := s.Get(key) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current for { select { case result := <-etcdWatchCh: + if result == nil || result.Node == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } watchCh <- &store.KVPair{ Key: key, Value: []byte(result.Node.Value), @@ -238,12 +239,6 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, // will be sent to the channel .Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // Get child values - current, err := s.List(directory) - if err != nil { - return nil, err - } - // Start the watch etcdWatchCh := make(chan *etcd.Response) etcdStopCh := make(chan bool) @@ -255,12 +250,23 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st go func() { defer close(watchCh) + // Get child values + current, err := s.List(directory) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current for { select { - case <-etcdWatchCh: + case event := <-etcdWatchCh: + if event == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } // FIXME: We should probably use the value pushed by the channel. // However, Node.Nodes seems to be empty. if list, err := s.List(directory); err == nil { @@ -349,6 +355,9 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { func (s *Etcd) List(directory string) ([]*store.KVPair, error) { resp, err := s.client.Get(store.Normalize(directory), true, true) if err != nil { + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound + } return nil, err } kv := []*store.KVPair{} @@ -366,6 +375,9 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Etcd) DeleteTree(directory string) error { _, err := s.client.Delete(store.Normalize(directory), true) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } @@ -422,7 +434,7 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) { lastIndex = resp.Node.ModifiedIndex } - _, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) if err == nil { // Leader section @@ -457,7 +469,7 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan for { select { case <-update.C: - l.last, err = l.client.Update(key, l.value, l.ttl) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex) if err != nil { return } @@ -497,3 +509,15 @@ func (l *etcdLock) Unlock() error { func (s *Etcd) Close() { return } + +func isKeyNotFoundError(err error) bool { + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Not a Directory or Not a file + if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { + return true + } + } + } + return false +} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go index 73d1b06f97..3f79ce09f7 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/etcd/etcd_test.go @@ -4,18 +4,21 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:4001" ) func makeEtcdClient(t *testing.T) store.Store { - client := "localhost:4001" - kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -26,9 +29,26 @@ func makeEtcdClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ETCD, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Etcd); !ok { + t.Fatal("Error registering and initializing etcd") + } +} + func TestEtcdStore(t *testing.T) { kv := makeEtcdClient(t) backup := makeEtcdClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go index 49e2eb9dcd..352edaa56a 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/store.go @@ -16,11 +16,13 @@ const ( ETCD Backend = "etcd" // ZK backend ZK Backend = "zk" + // BOLTDB backend + BOLTDB Backend = "boltdb" ) var ( // ErrNotSupported is thrown when the backend k/v store is not supported by libkv - ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") + ErrNotSupported = errors.New("Backend storage not supported yet, please choose one of") // ErrNotImplemented is thrown when a method is not implemented by the current backend ErrNotImplemented = errors.New("Call not implemented in current backend") // ErrNotReachable is thrown when the API cannot be reached for issuing common store operations @@ -39,7 +41,7 @@ var ( type Config struct { TLS *tls.Config ConnectionTimeout time.Duration - EphemeralTTL time.Duration + Bucket string } // Store represents the backend K/V storage @@ -63,10 +65,10 @@ type Store interface { Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // WatchTree watches for changes on child nodes under - // a given a directory + // a given directory WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - // CreateLock for a given key. + // NewLock creates a lock for a given key. // The returned Locker is not held and must be acquired // with `.Lock`. The Value is optional. NewLock(key string, options *LockOptions) (Locker, error) @@ -97,8 +99,7 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { - Heartbeat time.Duration - Ephemeral bool + TTL time.Duration } // LockOptions contains optional request parameters @@ -107,10 +108,6 @@ type LockOptions struct { TTL time.Duration // Optional, expiration ttl associated with the lock } -// WatchCallback is used for watch methods on keys -// and is triggered on key change -type WatchCallback func(entries ...*KVPair) - // Locker provides locking mechanism on top of the store. // Similar to `sync.Lock` except it may return errors. type Locker interface { diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index d12edf6792..ff6b481947 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -4,11 +4,14 @@ import ( "strings" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" zk "github.com/samuel/go-zookeeper/zk" ) -const defaultTimeout = 10 * time.Second +const ( + defaultTimeout = 10 * time.Second +) // Zookeeper is the receiver type for // the Store interface @@ -24,6 +27,11 @@ type zookeeperLock struct { value []byte } +// Register registers zookeeper to libkv +func Register() { + libkv.AddStore(store.ZK, New) +} + // New creates a new Zookeeper client given a // list of endpoints and an optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { @@ -55,14 +63,13 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(store.Normalize(key)) - if err != nil { - return nil, err - } + resp, meta, err := s.client.Get(s.normalize(key)) - // If resp is nil, the key does not exist - if resp == nil { - return nil, store.ErrKeyNotFound + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err } pair = &store.KVPair{ @@ -80,10 +87,10 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) return err } - _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists if err != zk.ErrNodeExists { @@ -96,7 +103,7 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { // Put a value at "key" func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { - fkey := store.Normalize(key) + fkey := s.normalize(key) exists, err := s.Exists(key) if err != nil { @@ -104,10 +111,10 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro } if !exists { - if opts != nil && opts.Ephemeral { - s.createFullPath(store.SplitKey(key), opts.Ephemeral) + if opts != nil && opts.TTL > 0 { + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) } else { - s.createFullPath(store.SplitKey(key), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) } } @@ -117,13 +124,16 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro // Delete a value at "key" func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(store.Normalize(key), -1) + err := s.client.Delete(s.normalize(key), -1) + if err == zk.ErrNoNode { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(store.Normalize(key)) + exists, _, err := s.client.Exists(s.normalize(key)) if err != nil { return false, err } @@ -151,7 +161,7 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP // to listening to any event that may occur on that key watchCh <- pair for { - _, _, eventCh, err := s.client.GetW(store.Normalize(key)) + _, _, eventCh, err := s.client.GetW(s.normalize(key)) if err != nil { return } @@ -195,7 +205,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan watchCh <- entries for { - _, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory)) + _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) if err != nil { return } @@ -218,8 +228,11 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan // List child nodes of a given directory func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(store.Normalize(directory)) + keys, stat, err := s.client.Children(s.normalize(directory)) if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } return nil, err } @@ -227,8 +240,12 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { // FIXME Costly Get request for each child key.. for _, key := range keys { - pair, err := s.Get(directory + store.Normalize(key)) + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) if err != nil { + // If node is not found: List is out of date, retry + if err == zk.ErrNoNode { + return s.List(directory) + } return nil, err } @@ -253,7 +270,7 @@ func (s *Zookeeper) DeleteTree(directory string) error { for _, pair := range pairs { reqs = append(reqs, &zk.DeleteRequest{ - Path: store.Normalize(directory + "/" + pair.Key), + Path: s.normalize(directory + "/" + pair.Key), Version: -1, }) } @@ -268,7 +285,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, var lastIndex uint64 if previous != nil { - meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) + meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) if err != nil { // Compare Failed if err == zk.ErrBadVersion { @@ -279,7 +296,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, lastIndex = uint64(meta.Version) } else { // Interpret previous == nil as create operation. - _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Zookeeper will complain if the directory doesn't exist. if err == zk.ErrNoNode { @@ -290,7 +307,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Failed to create the directory. return false, nil, err } - if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { return false, nil, err } @@ -319,7 +336,7 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro return false, store.ErrPreviousNotSpecified } - err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex)) + err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) if err != nil { if err == zk.ErrBadVersion { return false, store.ErrKeyModified @@ -343,9 +360,9 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. lock = &zookeeperLock{ client: s.client, - key: store.Normalize(key), + key: s.normalize(key), value: value, - lock: zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)), + lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), } return lock, err @@ -377,3 +394,9 @@ func (l *zookeeperLock) Unlock() error { func (s *Zookeeper) Close() { s.client.Close() } + +// Normalize the key for usage in Zookeeper +func (s *Zookeeper) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimSuffix(key, "/") +} diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go index 759297e542..739c2ba619 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/store/zookeeper/zookeeper_test.go @@ -4,18 +4,21 @@ import ( "testing" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" "github.com/docker/libkv/testutils" + "github.com/stretchr/testify/assert" +) + +var ( + client = "localhost:2181" ) func makeZkClient(t *testing.T) store.Store { - client := "localhost:2181" - kv, err := New( []string{client}, &store.Config{ ConnectionTimeout: 3 * time.Second, - EphemeralTTL: 2 * time.Second, }, ) @@ -26,9 +29,26 @@ func makeZkClient(t *testing.T) store.Store { return kv } +func TestRegister(t *testing.T) { + Register() + + kv, err := libkv.NewStore(store.ZK, []string{client}, nil) + assert.NoError(t, err) + assert.NotNil(t, kv) + + if _, ok := kv.(*Zookeeper); !ok { + t.Fatal("Error registering and initializing zookeeper") + } +} + func TestZkStore(t *testing.T) { kv := makeZkClient(t) backup := makeZkClient(t) - testutils.RunTestStore(t, kv, backup) + testutils.RunTestCommon(t, kv) + testutils.RunTestAtomic(t, kv) + testutils.RunTestWatch(t, kv) + testutils.RunTestLock(t, kv) + testutils.RunTestTTL(t, kv, backup) + testutils.RunCleanup(t, kv) } diff --git a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go index 717d9ecdc2..bb8f26336d 100644 --- a/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go +++ b/libnetwork/Godeps/_workspace/src/github.com/docker/libkv/testutils/utils.go @@ -1,6 +1,7 @@ package testutils import ( + "fmt" "testing" "time" @@ -8,50 +9,89 @@ import ( "github.com/stretchr/testify/assert" ) -// RunTestStore is an helper testing method that is -// called by each K/V backend sub-package testing -func RunTestStore(t *testing.T, kv store.Store, backup store.Store) { - testPutGetDelete(t, kv) - testWatch(t, kv) - testWatchTree(t, kv) - testAtomicPut(t, kv) - testAtomicPutCreate(t, kv) - testAtomicDelete(t, kv) - testLockUnlock(t, kv) - testPutEphemeral(t, kv, backup) +// RunTestCommon tests the minimal required APIs which +// should be supported by all K/V backends +func RunTestCommon(t *testing.T, kv store.Store) { + testPutGetDeleteExists(t, kv) testList(t, kv) testDeleteTree(t, kv) } -func testPutGetDelete(t *testing.T, kv store.Store) { - key := "foo" +// RunTestAtomic tests the Atomic operations by the K/V +// backends +func RunTestAtomic(t *testing.T, kv store.Store) { + testAtomicPut(t, kv) + testAtomicPutCreate(t, kv) + testAtomicDelete(t, kv) +} + +// RunTestWatch tests the watch/monitor APIs supported +// by the K/V backends. +func RunTestWatch(t *testing.T, kv store.Store) { + testWatch(t, kv) + testWatchTree(t, kv) +} + +// RunTestLock tests the KV pair Lock/Unlock APIs supported +// by the K/V backends. +func RunTestLock(t *testing.T, kv store.Store) { + testLockUnlock(t, kv) +} + +// RunTestTTL tests the TTL funtionality of the K/V backend. +func RunTestTTL(t *testing.T, kv store.Store, backup store.Store) { + testPutTTL(t, kv, backup) +} + +func testPutGetDeleteExists(t *testing.T, kv store.Store) { + // Get a not exist key should return ErrKeyNotFound + pair, err := kv.Get("/testPutGetDelete_not_exist_key") + assert.Equal(t, store.ErrKeyNotFound, err) + value := []byte("bar") + for _, key := range []string{ + "testPutGetDeleteExists", + "testPutGetDeleteExists/", + "testPutGetDeleteExists/testbar/", + "testPutGetDeleteExists/testbar/testfoobar", + } { + failMsg := fmt.Sprintf("Fail key %s", key) + // Put the key + err = kv.Put(key, value, nil) + assert.NoError(t, err, failMsg) - // Put the key - err := kv.Put(key, value, nil) - assert.NoError(t, err) + // Get should return the value and an incremented index + pair, err = kv.Get(key) + assert.NoError(t, err, failMsg) + if assert.NotNil(t, pair, failMsg) { + assert.NotNil(t, pair.Value, failMsg) + } + assert.Equal(t, pair.Value, value, failMsg) + assert.NotEqual(t, pair.LastIndex, 0, failMsg) - // Get should return the value and an incremented index - pair, err := kv.Get(key) - assert.NoError(t, err) - if assert.NotNil(t, pair) { - assert.NotNil(t, pair.Value) + // Exists should return true + exists, err := kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.True(t, exists, failMsg) + + // Delete the key + err = kv.Delete(key) + assert.NoError(t, err, failMsg) + + // Get should fail + pair, err = kv.Get(key) + assert.Error(t, err, failMsg) + assert.Nil(t, pair, failMsg) + + // Exists should return false + exists, err = kv.Exists(key) + assert.NoError(t, err, failMsg) + assert.False(t, exists, failMsg) } - assert.Equal(t, pair.Value, value) - assert.NotEqual(t, pair.LastIndex, 0) - - // Delete the key - err = kv.Delete(key) - assert.NoError(t, err) - - // Get should fail - pair, err = kv.Get(key) - assert.Error(t, err) - assert.Nil(t, pair) } func testWatch(t *testing.T, kv store.Store) { - key := "hello" + key := "testWatch" value := []byte("world") newValue := []byte("world!") @@ -108,15 +148,15 @@ func testWatch(t *testing.T, kv store.Store) { } func testWatchTree(t *testing.T, kv store.Store) { - dir := "tree" + dir := "testWatchTree" - node1 := "tree/node1" + node1 := "testWatchTree/node1" value1 := []byte("node1") - node2 := "tree/node2" + node2 := "testWatchTree/node2" value2 := []byte("node2") - node3 := "tree/node3" + node3 := "testWatchTree/node3" value3 := []byte("node3") err := kv.Put(node1, value1, nil) @@ -162,7 +202,7 @@ func testWatchTree(t *testing.T, kv store.Store) { } func testAtomicPut(t *testing.T, kv store.Store) { - key := "hello" + key := "testAtomicPut" value := []byte("world") // Put the key @@ -179,18 +219,18 @@ func testAtomicPut(t *testing.T, kv store.Store) { assert.NotEqual(t, pair.LastIndex, 0) // This CAS should fail: previous exists. - success, _, err := kv.AtomicPut("hello", []byte("WORLD"), nil, nil) + success, _, err := kv.AtomicPut(key, []byte("WORLD"), nil, nil) assert.Error(t, err) assert.False(t, success) // This CAS should succeed - success, _, err = kv.AtomicPut("hello", []byte("WORLD"), pair, nil) + success, _, err = kv.AtomicPut(key, []byte("WORLD"), pair, nil) assert.NoError(t, err) assert.True(t, success) // This CAS should fail, key exists. pair.LastIndex = 0 - success, _, err = kv.AtomicPut("hello", []byte("WORLDWORLD"), pair, nil) + success, _, err = kv.AtomicPut(key, []byte("WORLDWORLD"), pair, nil) assert.Error(t, err) assert.False(t, success) } @@ -198,7 +238,7 @@ func testAtomicPut(t *testing.T, kv store.Store) { func testAtomicPutCreate(t *testing.T, kv store.Store) { // Use a key in a new directory to ensure Stores will create directories // that don't yet exist. - key := "put/create" + key := "testAtomicPutCreate/create" value := []byte("putcreate") // AtomicPut the key, previous = nil indicates create. @@ -223,14 +263,10 @@ func testAtomicPutCreate(t *testing.T, kv store.Store) { success, _, err = kv.AtomicPut(key, []byte("PUTCREATE"), pair, nil) assert.NoError(t, err) assert.True(t, success) - - // Delete the key, ensures runs of the test don't interfere with each other. - err = kv.DeleteTree("put") - assert.NoError(t, err) } func testAtomicDelete(t *testing.T, kv store.Store) { - key := "atomic" + key := "testAtomicDelete" value := []byte("world") // Put the key @@ -262,11 +298,11 @@ func testAtomicDelete(t *testing.T, kv store.Store) { } func testLockUnlock(t *testing.T, kv store.Store) { - key := "foo" + key := "testLockUnlock" value := []byte("bar") // We should be able to create a new lock on key - lock, err := kv.NewLock(key, &store.LockOptions{Value: value}) + lock, err := kv.NewLock(key, &store.LockOptions{Value: value, TTL: 2 * time.Second}) assert.NoError(t, err) assert.NotNil(t, lock) @@ -288,6 +324,11 @@ func testLockUnlock(t *testing.T, kv store.Store) { err = lock.Unlock() assert.NoError(t, err) + // Lock should succeed again + lockChan, err = lock.Lock() + assert.NoError(t, err) + assert.NotNil(t, lockChan) + // Get should work pair, err = kv.Get(key) assert.NoError(t, err) @@ -298,19 +339,19 @@ func testLockUnlock(t *testing.T, kv store.Store) { assert.NotEqual(t, pair.LastIndex, 0) } -func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) { - firstKey := "first" +func testPutTTL(t *testing.T, kv store.Store, otherConn store.Store) { + firstKey := "testPutTTL" firstValue := []byte("foo") secondKey := "second" secondValue := []byte("bar") // Put the first key with the Ephemeral flag - err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{Ephemeral: true}) + err := otherConn.Put(firstKey, firstValue, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Put a second key with the Ephemeral flag - err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{Ephemeral: true}) + err = otherConn.Put(secondKey, secondValue, &store.WriteOptions{TTL: 2 * time.Second}) assert.NoError(t, err) // Get on firstKey should work @@ -341,12 +382,12 @@ func testPutEphemeral(t *testing.T, kv store.Store, otherConn store.Store) { } func testList(t *testing.T, kv store.Store) { - prefix := "nodes" + prefix := "testList" - firstKey := "nodes/first" + firstKey := "testList/first" firstValue := []byte("first") - secondKey := "nodes/second" + secondKey := "testList/second" secondValue := []byte("second") // Put the first key @@ -358,35 +399,37 @@ func testList(t *testing.T, kv store.Store) { assert.NoError(t, err) // List should work and return the two correct values - pairs, err := kv.List(prefix) - assert.NoError(t, err) - if assert.NotNil(t, pairs) { - assert.Equal(t, len(pairs), 2) - } - - // Check pairs, those are not necessarily in Put order - for _, pair := range pairs { - if pair.Key == firstKey { - assert.Equal(t, pair.Value, firstValue) + for _, parent := range []string{prefix, prefix + "/"} { + pairs, err := kv.List(parent) + assert.NoError(t, err) + if assert.NotNil(t, pairs) { + assert.Equal(t, len(pairs), 2) } - if pair.Key == secondKey { - assert.Equal(t, pair.Value, secondValue) + + // Check pairs, those are not necessarily in Put order + for _, pair := range pairs { + if pair.Key == firstKey { + assert.Equal(t, pair.Value, firstValue) + } + if pair.Key == secondKey { + assert.Equal(t, pair.Value, secondValue) + } } } // List should fail: the key does not exist - pairs, err = kv.List("idontexist") - assert.Error(t, err) + pairs, err := kv.List("idontexist") + assert.Equal(t, store.ErrKeyNotFound, err) assert.Nil(t, pairs) } func testDeleteTree(t *testing.T, kv store.Store) { - prefix := "nodes" + prefix := "testDeleteTree" - firstKey := "nodes/first" + firstKey := "testDeleteTree/first" firstValue := []byte("first") - secondKey := "nodes/second" + secondKey := "testDeleteTree/second" secondValue := []byte("second") // Put the first key @@ -428,3 +471,24 @@ func testDeleteTree(t *testing.T, kv store.Store) { assert.Error(t, err) assert.Nil(t, pair) } + +// RunCleanup cleans up keys introduced by the tests +func RunCleanup(t *testing.T, kv store.Store) { + for _, key := range []string{ + "testPutGetDeleteExists", + "testWatch", + "testWatchTree", + "testAtomicPut", + "testAtomicPutCreate", + "testAtomicDelete", + "testLockUnlock", + "testPutTTL", + "testList", + "testDeleteTree", + } { + err := kv.DeleteTree(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete tree key %s: %v", key, err)) + err = kv.Delete(key) + assert.True(t, err == nil || err == store.ErrKeyNotFound, fmt.Sprintf("failed to delete key %s: %v", key, err)) + } +} diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 8a195aa902..f504860f0a 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -6,6 +6,9 @@ import ( "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/types" ) @@ -66,6 +69,12 @@ const ( var rootChain = []string{"docker", "libnetwork"} +func init() { + consul.Register() + zookeeper.Register() + etcd.Register() +} + //Key provides convenient method to create a Key func Key(key ...string) string { keychain := append(rootChain, key...)