From 814c19591d875a747433f87ae84dffbf58c4d21a Mon Sep 17 00:00:00 2001 From: Madhu Venugopal Date: Wed, 16 Sep 2015 11:54:28 -0700 Subject: [PATCH] Vendoring in libnetwork and libkv * libkv upgrade is required for the docker discovery PR * vendor-in libnetwork contains an update to network plugin api (Thanks @WeiZhang555 : https://github.com/docker/libnetwork/pull/516) Signed-off-by: Madhu Venugopal --- hack/vendor.sh | 4 +- vendor/src/github.com/docker/libkv/README.md | 20 ++-- vendor/src/github.com/docker/libkv/libkv.go | 90 +++++++++++++++-- .../docker/libkv/store/consul/consul.go | 55 +++++++---- .../docker/libkv/store/etcd/etcd.go | 96 ++++++++++++------- .../github.com/docker/libkv/store/store.go | 17 ++-- .../docker/libkv/store/zookeeper/zookeeper.go | 77 +++++++++------ .../docker/libnetwork/datastore/datastore.go | 9 ++ .../libnetwork/drivers/remote/api/api.go | 6 ++ .../libnetwork/drivers/remote/driver.go | 30 +++++- 10 files changed, 292 insertions(+), 112 deletions(-) diff --git a/hack/vendor.sh b/hack/vendor.sh index 0f49839385..c152455f70 100755 --- a/hack/vendor.sh +++ b/hack/vendor.sh @@ -20,12 +20,12 @@ clone git github.com/tchap/go-patricia v2.1.0 clone git golang.org/x/net 3cffabab72adf04f8e3b01c5baf775361837b5fe https://github.com/golang/net.git #get libnetwork packages -clone git github.com/docker/libnetwork 2baa2ddc78b42f011f55633282ac63a72e1b09c1 +clone git github.com/docker/libnetwork 927d2765909ddc3a5dd49b649ae9ef121c0fbef0 clone git github.com/armon/go-metrics eb0af217e5e9747e41dd5303755356b62d28e3ec clone git github.com/hashicorp/go-msgpack 71c2886f5a673a35f909803f38ece5810165097b clone git github.com/hashicorp/memberlist 9a1e242e454d2443df330bdd51a436d5a9058fc4 clone git github.com/hashicorp/serf 7151adcef72687bf95f451a2e0ba15cb19412bf2 -clone git github.com/docker/libkv 60c7c881345b3c67defc7f93a8297debf041d43c +clone git github.com/docker/libkv a0a57ed3755665e9a402a3df315402134eb6625f clone git github.com/vishvananda/netns 604eaf189ee867d8c147fafc28def2394e878d25 clone git github.com/vishvananda/netlink 4b5dce31de6d42af5bb9811c6d265472199e0fec clone git github.com/BurntSushi/toml f706d00e3de6abe700c994cdd545a1a4915af060 diff --git a/vendor/src/github.com/docker/libkv/README.md b/vendor/src/github.com/docker/libkv/README.md index d09b34c1ea..941c96925f 100644 --- a/vendor/src/github.com/docker/libkv/README.md +++ b/vendor/src/github.com/docker/libkv/README.md @@ -12,7 +12,7 @@ For example, you can use it to store your metadata or for service discovery to r You can also easily implement a generic *Leader Election* on top of it (see the [swarm/leadership](https://github.com/docker/swarm/tree/master/leadership) package). -As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +As of now, `libkv` offers support for `Consul`, `Etcd`, `Zookeeper` and `BoltDB`. ## Example of usage @@ -24,17 +24,23 @@ package main import ( "fmt" "time" - + "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" log "github.com/Sirupsen/logrus" ) +func init() { + // Register consul store to libkv + consul.Register() +} + func main() { client := "localhost:8500" // Initialize a new store with consul - kv, err = libkv.NewStore( + kv, err := libkv.NewStore( store.CONSUL, // or "consul" []string{client}, &store.Config{ @@ -62,11 +68,13 @@ func main() { You can find other usage examples for `libkv` under the `docker/swarm` or `docker/libnetwork` repositories. -## Details +## Warning -You should expect the same experience for basic operations like `Get`/`Put`, etc. +There are a few consistency issues with *etcd*, on the notion of *directory* and *key*. If you want to use the three KV backends in an interchangeable way, you should only put data on leaves (see [Issue 20](https://github.com/docker/libkv/issues/20) for more details). This will be fixed when *etcd* API v3 will be made available (API v3 drops the *directory/key* distinction). An official release for *libkv* with a tag is likely to come after this issue being marked as **solved**. -However calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). +Other than that, you should expect the same experience for basic operations like `Get`/`Put`, etc. + +Calls like `WatchTree` may return different events (or number of events) depending on the backend (for now, `Etcd` and `Consul` will likely return more events than `Zookeeper` that you should triage properly). Although you should be able to use it successfully to watch on events in an interchangeable way (see the **swarm/leadership** or **swarm/discovery** packages in **docker/swarm**). ## Create a new storage backend diff --git a/vendor/src/github.com/docker/libkv/libkv.go b/vendor/src/github.com/docker/libkv/libkv.go index 28df703afc..7b3d0584ba 100644 --- a/vendor/src/github.com/docker/libkv/libkv.go +++ b/vendor/src/github.com/docker/libkv/libkv.go @@ -1,10 +1,72 @@ +// Package libkv provides a Go native library to store metadata. +// +// The goal of libkv is to abstract common store operations for multiple +// Key/Value backends and offer the same experience no matter which one of the +// backend you want to use. +// +// For example, you can use it to store your metadata or for service discovery to +// register machines and endpoints inside your cluster. +// +// As of now, `libkv` offers support for `Consul`, `Etcd` and `Zookeeper`. +// +// ## Example of usage +// +// ### Create a new store and use Put/Get +// +// +// package main +// +// import ( +// "fmt" +// "time" +// +// "github.com/docker/libkv" +// "github.com/docker/libkv/store" +// log "github.com/Sirupsen/logrus" +// ) +// +// func main() { +// client := "localhost:8500" +// +// // Initialize a new store with consul +// kv, err := libkv.NewStore( +// store.CONSUL, // or "consul" +// []string{client}, +// &store.Config{ +// ConnectionTimeout: 10*time.Second, +// }, +// ) +// if err != nil { +// log.Fatal("Cannot create store consul") +// } +// +// key := "foo" +// err = kv.Put(key, []byte("bar"), nil) +// if err != nil { +// log.Error("Error trying to put value at key `", key, "`") +// } +// +// pair, err := kv.Get(key) +// if err != nil { +// log.Error("Error trying accessing value at key `", key, "`") +// } +// +// log.Info("value: ", string(pair.Value)) +// } +// +// ##Copyright and license +// +// Code and documentation copyright 2015 Docker, inc. Code released under the +// Apache 2.0 license. Docs released under Creative commons. +// package libkv import ( + "fmt" + "sort" + "strings" + "github.com/docker/libkv/store" - "github.com/docker/libkv/store/consul" - "github.com/docker/libkv/store/etcd" - "github.com/docker/libkv/store/zookeeper" ) // Initialize creates a new Store object, initializing the client @@ -12,11 +74,16 @@ type Initialize func(addrs []string, options *store.Config) (store.Store, error) var ( // Backend initializers - initializers = map[store.Backend]Initialize{ - store.CONSUL: consul.New, - store.ETCD: etcd.New, - store.ZK: zookeeper.New, - } + initializers = make(map[store.Backend]Initialize) + + supportedBackend = func() string { + keys := make([]string, 0, len(initializers)) + for k := range initializers { + keys = append(keys, string(k)) + } + sort.Strings(keys) + return strings.Join(keys, ", ") + }() ) // NewStore creates a an instance of store @@ -25,5 +92,10 @@ func NewStore(backend store.Backend, addrs []string, options *store.Config) (sto return init(addrs, options) } - return nil, store.ErrNotSupported + return nil, fmt.Errorf("%s %s", store.ErrNotSupported.Error(), supportedBackend) +} + +// AddStore adds a new store backend to libkv +func AddStore(store store.Backend, init Initialize) { + initializers[store] = init } diff --git a/vendor/src/github.com/docker/libkv/store/consul/consul.go b/vendor/src/github.com/docker/libkv/store/consul/consul.go index ebabde5ffa..76762ce54a 100644 --- a/vendor/src/github.com/docker/libkv/store/consul/consul.go +++ b/vendor/src/github.com/docker/libkv/store/consul/consul.go @@ -2,11 +2,13 @@ package consul import ( "crypto/tls" + "errors" "net/http" "strings" "sync" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" api "github.com/hashicorp/consul/api" ) @@ -18,22 +20,36 @@ const ( DefaultWatchWaitTime = 15 * time.Second ) +var ( + // ErrMultipleEndpointsUnsupported is thrown when there are + // multiple endpoints specified for Consul + ErrMultipleEndpointsUnsupported = errors.New("consul does not support multiple endpoints") +) + // Consul is the receiver type for the // Store interface type Consul struct { sync.Mutex - config *api.Config - client *api.Client - ephemeralTTL time.Duration + config *api.Config + client *api.Client } type consulLock struct { lock *api.Lock } +// Register registers consul to libkv +func Register() { + libkv.AddStore(store.CONSUL, New) +} + // New creates a new Consul client given a list // of endpoints and optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { + if len(endpoints) > 1 { + return nil, ErrMultipleEndpointsUnsupported + } + s := &Consul{} // Create Consul client @@ -51,9 +67,6 @@ func New(endpoints []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Creates a new client @@ -79,18 +92,13 @@ func (s *Consul) setTimeout(time time.Duration) { s.config.WaitTime = time } -// SetEphemeralTTL sets the ttl for ephemeral nodes -func (s *Consul) setEphemeralTTL(ttl time.Duration) { - s.ephemeralTTL = ttl -} - // Normalize the key for usage in Consul func (s *Consul) normalize(key string) string { key = store.Normalize(key) return strings.TrimPrefix(key, "/") } -func (s *Consul) refreshSession(pair *api.KVPair) error { +func (s *Consul) refreshSession(pair *api.KVPair, ttl time.Duration) error { // Check if there is any previous session with an active TTL session, err := s.getActiveSession(pair.Key) if err != nil { @@ -99,9 +107,9 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { if session == "" { entry := &api.SessionEntry{ - Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires - TTL: ((s.ephemeralTTL) / 2).String(), // Consul multiplies the TTL by 2x - LockDelay: 1 * time.Millisecond, // Virtually disable lock delay + Behavior: api.SessionBehaviorDelete, // Delete the key when the session expires + TTL: (ttl / 2).String(), // Consul multiplies the TTL by 2x + LockDelay: 1 * time.Millisecond, // Virtually disable lock delay } // Create the key session @@ -126,7 +134,7 @@ func (s *Consul) refreshSession(pair *api.KVPair) error { _, _, err = s.client.Session().Renew(session, nil) if err != nil { - return s.refreshSession(pair) + return s.refreshSession(pair, ttl) } return nil } @@ -174,9 +182,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { Value: value, } - if opts != nil && opts.Ephemeral { + if opts != nil && opts.TTL > 0 { // Create or refresh the session - err := s.refreshSession(p) + err := s.refreshSession(p, opts.TTL) if err != nil { return err } @@ -188,6 +196,9 @@ func (s *Consul) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Consul) Delete(key string) error { + if _, err := s.Get(key); err != nil { + return err + } _, err := s.client.KV().Delete(s.normalize(key), nil) return err } @@ -195,7 +206,10 @@ func (s *Consul) Delete(key string) error { // Exists checks that the key exists inside the store func (s *Consul) Exists(key string) (bool, error) { _, err := s.Get(key) - if err != nil && err == store.ErrKeyNotFound { + if err != nil { + if err == store.ErrKeyNotFound { + return false, nil + } return false, err } return true, nil @@ -229,6 +243,9 @@ func (s *Consul) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Consul) DeleteTree(directory string) error { + if _, err := s.List(directory); err != nil { + return err + } _, err := s.client.KV().DeleteTree(s.normalize(directory), nil) return err } diff --git a/vendor/src/github.com/docker/libkv/store/etcd/etcd.go b/vendor/src/github.com/docker/libkv/store/etcd/etcd.go index 30d2e08314..92c239e460 100644 --- a/vendor/src/github.com/docker/libkv/store/etcd/etcd.go +++ b/vendor/src/github.com/docker/libkv/store/etcd/etcd.go @@ -8,14 +8,14 @@ import ( "time" etcd "github.com/coreos/go-etcd/etcd" + "github.com/docker/libkv" "github.com/docker/libkv/store" ) // Etcd is the receiver type for the // Store interface type Etcd struct { - client *etcd.Client - ephemeralTTL time.Duration + client *etcd.Client } type etcdLock struct { @@ -33,6 +33,11 @@ const ( defaultUpdateTime = 5 * time.Second ) +// Register registers etcd to libkv +func Register() { + libkv.AddStore(store.ETCD, New) +} + // New creates a new Etcd client given a list // of endpoints and an optional tls config func New(addrs []string, options *store.Config) (store.Store, error) { @@ -49,9 +54,6 @@ func New(addrs []string, options *store.Config) (store.Store, error) { if options.ConnectionTimeout != 0 { s.setTimeout(options.ConnectionTimeout) } - if options.EphemeralTTL != 0 { - s.setEphemeralTTL(options.EphemeralTTL) - } } // Periodic SyncCluster @@ -93,12 +95,6 @@ func (s *Etcd) setTimeout(time time.Duration) { s.client.SetDialTimeout(time) } -// setEphemeralHeartbeat sets the heartbeat value to notify -// that a node is alive -func (s *Etcd) setEphemeralTTL(time time.Duration) { - s.ephemeralTTL = time -} - // createDirectory creates the entire path for a directory // that does not exist func (s *Etcd) createDirectory(path string) error { @@ -120,11 +116,8 @@ func (s *Etcd) createDirectory(path string) error { func (s *Etcd) Get(key string) (pair *store.KVPair, err error) { result, err := s.client.Get(store.Normalize(key), false, false) if err != nil { - if etcdError, ok := err.(*etcd.EtcdError); ok { - // Not a Directory or Not a file - if etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { - return nil, store.ErrKeyNotFound - } + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound } return nil, err } @@ -143,8 +136,8 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Default TTL = 0 means no expiration var ttl uint64 - if opts != nil && opts.Ephemeral { - ttl = uint64(s.ephemeralTTL.Seconds()) + if opts != nil && opts.TTL > 0 { + ttl = uint64(opts.TTL.Seconds()) } if _, err := s.client.Set(key, string(value), ttl); err != nil { @@ -173,14 +166,17 @@ func (s *Etcd) Put(key string, value []byte, opts *store.WriteOptions) error { // Delete a value at "key" func (s *Etcd) Delete(key string) error { _, err := s.client.Delete(store.Normalize(key), false) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Etcd) Exists(key string) (bool, error) { - entry, err := s.Get(key) - if err != nil && entry != nil { - if err == store.ErrKeyNotFound || entry.Value == nil { + _, err := s.Get(key) + if err != nil { + if err == store.ErrKeyNotFound { return false, nil } return false, err @@ -194,12 +190,6 @@ func (s *Etcd) Exists(key string) (bool, error) { // be sent to the channel. Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, error) { - // Get the current value - current, err := s.Get(key) - if err != nil { - return nil, err - } - // Start an etcd watch. // Note: etcd will send the current value through the channel. etcdWatchCh := make(chan *etcd.Response) @@ -212,12 +202,23 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, go func() { defer close(watchCh) + // Get the current value + current, err := s.Get(key) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current for { select { case result := <-etcdWatchCh: + if result == nil || result.Node == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } watchCh <- &store.KVPair{ Key: key, Value: []byte(result.Node.Value), @@ -238,12 +239,6 @@ func (s *Etcd) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVPair, // will be sent to the channel .Providing a non-nil stopCh can // be used to stop watching. func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*store.KVPair, error) { - // Get child values - current, err := s.List(directory) - if err != nil { - return nil, err - } - // Start the watch etcdWatchCh := make(chan *etcd.Response) etcdStopCh := make(chan bool) @@ -255,12 +250,23 @@ func (s *Etcd) WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*st go func() { defer close(watchCh) + // Get child values + current, err := s.List(directory) + if err != nil { + return + } + // Push the current value through the channel. watchCh <- current for { select { - case <-etcdWatchCh: + case event := <-etcdWatchCh: + if event == nil { + // Something went wrong, exit + // No need to stop the chan as the watch already ended + return + } // FIXME: We should probably use the value pushed by the channel. // However, Node.Nodes seems to be empty. if list, err := s.List(directory); err == nil { @@ -349,6 +355,9 @@ func (s *Etcd) AtomicDelete(key string, previous *store.KVPair) (bool, error) { func (s *Etcd) List(directory string) ([]*store.KVPair, error) { resp, err := s.client.Get(store.Normalize(directory), true, true) if err != nil { + if isKeyNotFoundError(err) { + return nil, store.ErrKeyNotFound + } return nil, err } kv := []*store.KVPair{} @@ -366,6 +375,9 @@ func (s *Etcd) List(directory string) ([]*store.KVPair, error) { // DeleteTree deletes a range of keys under a given directory func (s *Etcd) DeleteTree(directory string) error { _, err := s.client.Delete(store.Normalize(directory), true) + if isKeyNotFoundError(err) { + return store.ErrKeyNotFound + } return err } @@ -422,7 +434,7 @@ func (l *etcdLock) Lock() (<-chan struct{}, error) { lastIndex = resp.Node.ModifiedIndex } - _, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", lastIndex) if err == nil { // Leader section @@ -457,7 +469,7 @@ func (l *etcdLock) holdLock(key string, lockHeld chan struct{}, stopLocking chan for { select { case <-update.C: - l.last, err = l.client.Update(key, l.value, l.ttl) + l.last, err = l.client.CompareAndSwap(key, l.value, l.ttl, "", l.last.Node.ModifiedIndex) if err != nil { return } @@ -497,3 +509,15 @@ func (l *etcdLock) Unlock() error { func (s *Etcd) Close() { return } + +func isKeyNotFoundError(err error) bool { + if err != nil { + if etcdError, ok := err.(*etcd.EtcdError); ok { + // Not a Directory or Not a file + if etcdError.ErrorCode == 100 || etcdError.ErrorCode == 102 || etcdError.ErrorCode == 104 { + return true + } + } + } + return false +} diff --git a/vendor/src/github.com/docker/libkv/store/store.go b/vendor/src/github.com/docker/libkv/store/store.go index 49e2eb9dcd..352edaa56a 100644 --- a/vendor/src/github.com/docker/libkv/store/store.go +++ b/vendor/src/github.com/docker/libkv/store/store.go @@ -16,11 +16,13 @@ const ( ETCD Backend = "etcd" // ZK backend ZK Backend = "zk" + // BOLTDB backend + BOLTDB Backend = "boltdb" ) var ( // ErrNotSupported is thrown when the backend k/v store is not supported by libkv - ErrNotSupported = errors.New("Backend storage not supported yet, please choose another one") + ErrNotSupported = errors.New("Backend storage not supported yet, please choose one of") // ErrNotImplemented is thrown when a method is not implemented by the current backend ErrNotImplemented = errors.New("Call not implemented in current backend") // ErrNotReachable is thrown when the API cannot be reached for issuing common store operations @@ -39,7 +41,7 @@ var ( type Config struct { TLS *tls.Config ConnectionTimeout time.Duration - EphemeralTTL time.Duration + Bucket string } // Store represents the backend K/V storage @@ -63,10 +65,10 @@ type Store interface { Watch(key string, stopCh <-chan struct{}) (<-chan *KVPair, error) // WatchTree watches for changes on child nodes under - // a given a directory + // a given directory WatchTree(directory string, stopCh <-chan struct{}) (<-chan []*KVPair, error) - // CreateLock for a given key. + // NewLock creates a lock for a given key. // The returned Locker is not held and must be acquired // with `.Lock`. The Value is optional. NewLock(key string, options *LockOptions) (Locker, error) @@ -97,8 +99,7 @@ type KVPair struct { // WriteOptions contains optional request parameters type WriteOptions struct { - Heartbeat time.Duration - Ephemeral bool + TTL time.Duration } // LockOptions contains optional request parameters @@ -107,10 +108,6 @@ type LockOptions struct { TTL time.Duration // Optional, expiration ttl associated with the lock } -// WatchCallback is used for watch methods on keys -// and is triggered on key change -type WatchCallback func(entries ...*KVPair) - // Locker provides locking mechanism on top of the store. // Similar to `sync.Lock` except it may return errors. type Locker interface { diff --git a/vendor/src/github.com/docker/libkv/store/zookeeper/zookeeper.go b/vendor/src/github.com/docker/libkv/store/zookeeper/zookeeper.go index d12edf6792..ff6b481947 100644 --- a/vendor/src/github.com/docker/libkv/store/zookeeper/zookeeper.go +++ b/vendor/src/github.com/docker/libkv/store/zookeeper/zookeeper.go @@ -4,11 +4,14 @@ import ( "strings" "time" + "github.com/docker/libkv" "github.com/docker/libkv/store" zk "github.com/samuel/go-zookeeper/zk" ) -const defaultTimeout = 10 * time.Second +const ( + defaultTimeout = 10 * time.Second +) // Zookeeper is the receiver type for // the Store interface @@ -24,6 +27,11 @@ type zookeeperLock struct { value []byte } +// Register registers zookeeper to libkv +func Register() { + libkv.AddStore(store.ZK, New) +} + // New creates a new Zookeeper client given a // list of endpoints and an optional tls config func New(endpoints []string, options *store.Config) (store.Store, error) { @@ -55,14 +63,13 @@ func (s *Zookeeper) setTimeout(time time.Duration) { // Get the value at "key", returns the last modified index // to use in conjunction to Atomic calls func (s *Zookeeper) Get(key string) (pair *store.KVPair, err error) { - resp, meta, err := s.client.Get(store.Normalize(key)) - if err != nil { - return nil, err - } + resp, meta, err := s.client.Get(s.normalize(key)) - // If resp is nil, the key does not exist - if resp == nil { - return nil, store.ErrKeyNotFound + if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } + return nil, err } pair = &store.KVPair{ @@ -80,10 +87,10 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { for i := 1; i <= len(path); i++ { newpath := "/" + strings.Join(path[:i], "/") if i == len(path) && ephemeral { - _, err := s.client.Create(newpath, []byte{1}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) return err } - _, err := s.client.Create(newpath, []byte{1}, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(newpath, []byte{}, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Skip if node already exists if err != zk.ErrNodeExists { @@ -96,7 +103,7 @@ func (s *Zookeeper) createFullPath(path []string, ephemeral bool) error { // Put a value at "key" func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) error { - fkey := store.Normalize(key) + fkey := s.normalize(key) exists, err := s.Exists(key) if err != nil { @@ -104,10 +111,10 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro } if !exists { - if opts != nil && opts.Ephemeral { - s.createFullPath(store.SplitKey(key), opts.Ephemeral) + if opts != nil && opts.TTL > 0 { + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), true) } else { - s.createFullPath(store.SplitKey(key), false) + s.createFullPath(store.SplitKey(strings.TrimSuffix(key, "/")), false) } } @@ -117,13 +124,16 @@ func (s *Zookeeper) Put(key string, value []byte, opts *store.WriteOptions) erro // Delete a value at "key" func (s *Zookeeper) Delete(key string) error { - err := s.client.Delete(store.Normalize(key), -1) + err := s.client.Delete(s.normalize(key), -1) + if err == zk.ErrNoNode { + return store.ErrKeyNotFound + } return err } // Exists checks if the key exists inside the store func (s *Zookeeper) Exists(key string) (bool, error) { - exists, _, err := s.client.Exists(store.Normalize(key)) + exists, _, err := s.client.Exists(s.normalize(key)) if err != nil { return false, err } @@ -151,7 +161,7 @@ func (s *Zookeeper) Watch(key string, stopCh <-chan struct{}) (<-chan *store.KVP // to listening to any event that may occur on that key watchCh <- pair for { - _, _, eventCh, err := s.client.GetW(store.Normalize(key)) + _, _, eventCh, err := s.client.GetW(s.normalize(key)) if err != nil { return } @@ -195,7 +205,7 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan watchCh <- entries for { - _, _, eventCh, err := s.client.ChildrenW(store.Normalize(directory)) + _, _, eventCh, err := s.client.ChildrenW(s.normalize(directory)) if err != nil { return } @@ -218,8 +228,11 @@ func (s *Zookeeper) WatchTree(directory string, stopCh <-chan struct{}) (<-chan // List child nodes of a given directory func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { - keys, stat, err := s.client.Children(store.Normalize(directory)) + keys, stat, err := s.client.Children(s.normalize(directory)) if err != nil { + if err == zk.ErrNoNode { + return nil, store.ErrKeyNotFound + } return nil, err } @@ -227,8 +240,12 @@ func (s *Zookeeper) List(directory string) ([]*store.KVPair, error) { // FIXME Costly Get request for each child key.. for _, key := range keys { - pair, err := s.Get(directory + store.Normalize(key)) + pair, err := s.Get(strings.TrimSuffix(directory, "/") + s.normalize(key)) if err != nil { + // If node is not found: List is out of date, retry + if err == zk.ErrNoNode { + return s.List(directory) + } return nil, err } @@ -253,7 +270,7 @@ func (s *Zookeeper) DeleteTree(directory string) error { for _, pair := range pairs { reqs = append(reqs, &zk.DeleteRequest{ - Path: store.Normalize(directory + "/" + pair.Key), + Path: s.normalize(directory + "/" + pair.Key), Version: -1, }) } @@ -268,7 +285,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, var lastIndex uint64 if previous != nil { - meta, err := s.client.Set(store.Normalize(key), value, int32(previous.LastIndex)) + meta, err := s.client.Set(s.normalize(key), value, int32(previous.LastIndex)) if err != nil { // Compare Failed if err == zk.ErrBadVersion { @@ -279,7 +296,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, lastIndex = uint64(meta.Version) } else { // Interpret previous == nil as create operation. - _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)) + _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)) if err != nil { // Zookeeper will complain if the directory doesn't exist. if err == zk.ErrNoNode { @@ -290,7 +307,7 @@ func (s *Zookeeper) AtomicPut(key string, value []byte, previous *store.KVPair, // Failed to create the directory. return false, nil, err } - if _, err := s.client.Create(store.Normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { + if _, err := s.client.Create(s.normalize(key), value, 0, zk.WorldACL(zk.PermAll)); err != nil { return false, nil, err } @@ -319,7 +336,7 @@ func (s *Zookeeper) AtomicDelete(key string, previous *store.KVPair) (bool, erro return false, store.ErrPreviousNotSpecified } - err := s.client.Delete(store.Normalize(key), int32(previous.LastIndex)) + err := s.client.Delete(s.normalize(key), int32(previous.LastIndex)) if err != nil { if err == zk.ErrBadVersion { return false, store.ErrKeyModified @@ -343,9 +360,9 @@ func (s *Zookeeper) NewLock(key string, options *store.LockOptions) (lock store. lock = &zookeeperLock{ client: s.client, - key: store.Normalize(key), + key: s.normalize(key), value: value, - lock: zk.NewLock(s.client, store.Normalize(key), zk.WorldACL(zk.PermAll)), + lock: zk.NewLock(s.client, s.normalize(key), zk.WorldACL(zk.PermAll)), } return lock, err @@ -377,3 +394,9 @@ func (l *zookeeperLock) Unlock() error { func (s *Zookeeper) Close() { s.client.Close() } + +// Normalize the key for usage in Zookeeper +func (s *Zookeeper) normalize(key string) string { + key = store.Normalize(key) + return strings.TrimSuffix(key, "/") +} diff --git a/vendor/src/github.com/docker/libnetwork/datastore/datastore.go b/vendor/src/github.com/docker/libnetwork/datastore/datastore.go index 8a195aa902..f504860f0a 100644 --- a/vendor/src/github.com/docker/libnetwork/datastore/datastore.go +++ b/vendor/src/github.com/docker/libnetwork/datastore/datastore.go @@ -6,6 +6,9 @@ import ( "github.com/docker/libkv" "github.com/docker/libkv/store" + "github.com/docker/libkv/store/consul" + "github.com/docker/libkv/store/etcd" + "github.com/docker/libkv/store/zookeeper" "github.com/docker/libnetwork/config" "github.com/docker/libnetwork/types" ) @@ -66,6 +69,12 @@ const ( var rootChain = []string{"docker", "libnetwork"} +func init() { + consul.Register() + zookeeper.Register() + etcd.Register() +} + //Key provides convenient method to create a Key func Key(key ...string) string { keychain := append(rootChain, key...) diff --git a/vendor/src/github.com/docker/libnetwork/drivers/remote/api/api.go b/vendor/src/github.com/docker/libnetwork/drivers/remote/api/api.go index 2a0d297ce5..2d441ab7d2 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/remote/api/api.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/remote/api/api.go @@ -16,6 +16,12 @@ func (r *Response) GetError() string { return r.Err } +// GetCapabilityResponse is the response of GetCapability request +type GetCapabilityResponse struct { + Response + Scope string +} + // CreateNetworkRequest requests a new network. type CreateNetworkRequest struct { // A network ID that remote plugins are expected to store for future diff --git a/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go b/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go index 88afd53903..4d1a134f09 100644 --- a/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go +++ b/vendor/src/github.com/docker/libnetwork/drivers/remote/driver.go @@ -28,16 +28,40 @@ func newDriver(name string, client *plugins.Client) driverapi.Driver { // plugin is activated. func Init(dc driverapi.DriverCallback) error { plugins.Handle(driverapi.NetworkPluginEndpointType, func(name string, client *plugins.Client) { - c := driverapi.Capability{ - Scope: driverapi.GlobalScope, + // negotiate driver capability with client + d := newDriver(name, client) + c, err := d.(*driver).getCapabilities() + if err != nil { + log.Errorf("error getting capability for %s due to %v", name, err) + return } - if err := dc.RegisterDriver(name, newDriver(name, client), c); err != nil { + if err = dc.RegisterDriver(name, d, *c); err != nil { log.Errorf("error registering driver for %s due to %v", name, err) } }) return nil } +// Get capability from client +func (d *driver) getCapabilities() (*driverapi.Capability, error) { + var capResp api.GetCapabilityResponse + if err := d.call("GetCapabilities", nil, &capResp); err != nil { + return nil, err + } + + c := &driverapi.Capability{} + switch capResp.Scope { + case "global": + c.Scope = driverapi.GlobalScope + case "local": + c.Scope = driverapi.LocalScope + default: + return nil, fmt.Errorf("invalid capability: expecting 'local' or 'global', got %s", capResp.Scope) + } + + return c, nil +} + // Config is not implemented for remote drivers, since it is assumed // to be supplied to the remote process out-of-band (e.g., as command // line arguments).