diff --git a/libnetwork/controller.go b/libnetwork/controller.go index be027f4988..d8a6d76f84 100644 --- a/libnetwork/controller.go +++ b/libnetwork/controller.go @@ -218,7 +218,14 @@ func (c *controller) initDiscovery(watcher discovery.Watcher) error { } c.discovery = hostdiscovery.NewHostDiscovery(watcher) - return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback) + return c.discovery.Watch(c.activeCallback, c.hostJoinCallback, c.hostLeaveCallback) +} + +func (c *controller) activeCallback() { + ds := c.getStore(datastore.GlobalScope) + if ds != nil && !ds.Active() { + ds.RestartWatch() + } } func (c *controller) hostJoinCallback(nodes []net.IP) { diff --git a/libnetwork/datastore/datastore.go b/libnetwork/datastore/datastore.go index 70be5c9cc1..3919afe0fc 100644 --- a/libnetwork/datastore/datastore.go +++ b/libnetwork/datastore/datastore.go @@ -34,6 +34,10 @@ type DataStore interface { Watchable() bool // Watch for changes on a KVObject Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error) + // RestartWatch retriggers stopped Watches + RestartWatch() + // Active returns if the store is active + Active() bool // List returns of a list of KVObjects belonging to the parent // key. The caller must pass a KVObject of the same type as // the objects that need to be listed @@ -53,9 +57,11 @@ var ( ) type datastore struct { - scope string - store store.Store - cache *cache + scope string + store store.Store + cache *cache + watchCh chan struct{} + active bool sync.Mutex } @@ -204,7 +210,7 @@ func newClient(scope string, kv string, addr string, config *store.Config, cache return nil, err } - ds := &datastore{scope: scope, store: store} + ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{})} if cached { ds.cache = newCache(ds) } @@ -239,6 +245,10 @@ func (ds *datastore) Scope() string { return ds.scope } +func (ds *datastore) Active() bool { + return ds.active +} + func (ds *datastore) Watchable() bool { return ds.scope != LocalScope } @@ -259,6 +269,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV kvoCh := make(chan KVObject) go func() { + retry_watch: + var err error + + // Make sure to get a new instance of watch channel + ds.Lock() + watchCh := ds.watchCh + ds.Unlock() + + loop: for { select { case <-stopCh: @@ -269,12 +288,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV // for the watch can exit resulting in a nil value in // channel. if kvPair == nil { - close(sCh) - return + ds.Lock() + ds.active = false + ds.Unlock() + break loop } + dstO := ctor.New() - if err := dstO.SetValue(kvPair.Value); err != nil { + if err = dstO.SetValue(kvPair.Value); err != nil { log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value)) break } @@ -283,11 +305,31 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV kvoCh <- dstO } } + + // Wait on watch channel for a re-trigger when datastore becomes active + <-watchCh + + kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh) + if err != nil { + log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err) + } + + goto retry_watch }() return kvoCh, nil } +func (ds *datastore) RestartWatch() { + ds.Lock() + defer ds.Unlock() + + ds.active = true + watchCh := ds.watchCh + ds.watchCh = make(chan struct{}) + close(watchCh) +} + func (ds *datastore) KVStore() store.Store { return ds.store } diff --git a/libnetwork/hostdiscovery/hostdiscovery.go b/libnetwork/hostdiscovery/hostdiscovery.go index cb29e45032..3fe2a64a17 100644 --- a/libnetwork/hostdiscovery/hostdiscovery.go +++ b/libnetwork/hostdiscovery/hostdiscovery.go @@ -34,7 +34,7 @@ func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery { return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})} } -func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error { +func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error { h.Lock() d := h.watcher h.Unlock() @@ -42,15 +42,16 @@ func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCall return types.BadRequestErrorf("invalid discovery watcher") } discoveryCh, errCh := d.Watch(h.stopChan) - go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback) + go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback) return nil } -func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) { +func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, + activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) { for { select { case entries := <-ch: - h.processCallback(entries, joinCallback, leaveCallback) + h.processCallback(entries, activeCallback, joinCallback, leaveCallback) case err := <-errCh: if err != nil { log.Errorf("discovery error: %v", err) @@ -71,7 +72,8 @@ func (h *hostDiscovery) StopDiscovery() error { return nil } -func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) { +func (h *hostDiscovery) processCallback(entries discovery.Entries, + activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) { updated := hosts(entries) h.Lock() existing := h.nodes @@ -79,6 +81,7 @@ func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback h.nodes = updated h.Unlock() + activeCallback() if len(added) > 0 { joinCallback(added) } diff --git a/libnetwork/hostdiscovery/hostdiscovery_api.go b/libnetwork/hostdiscovery/hostdiscovery_api.go index 5be520fca8..b9c17250ce 100644 --- a/libnetwork/hostdiscovery/hostdiscovery_api.go +++ b/libnetwork/hostdiscovery/hostdiscovery_api.go @@ -5,13 +5,16 @@ import "net" // JoinCallback provides a callback event for new node joining the cluster type JoinCallback func(entries []net.IP) +// ActiveCallback provides a callback event for active discovery event +type ActiveCallback func() + // LeaveCallback provides a callback event for node leaving the cluster type LeaveCallback func(entries []net.IP) // HostDiscovery primary interface type HostDiscovery interface { //Watch Node join and leave cluster events - Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error + Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error // StopDiscovery stops the discovery perocess StopDiscovery() error // Fetch returns a list of host IPs that are currently discovered diff --git a/libnetwork/hostdiscovery/hostdiscovery_test.go b/libnetwork/hostdiscovery/hostdiscovery_test.go index 0b3a902306..42adc13dbb 100644 --- a/libnetwork/hostdiscovery/hostdiscovery_test.go +++ b/libnetwork/hostdiscovery/hostdiscovery_test.go @@ -44,7 +44,7 @@ func TestAddedCallback(t *testing.T) { added := false removed := false - hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) if !added { t.Fatalf("Expecting a Added callback notification. But none received") } @@ -57,7 +57,7 @@ func TestRemovedCallback(t *testing.T) { added := false removed := false - hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) if !removed { t.Fatalf("Expecting a Removed callback notification. But none received") } @@ -70,7 +70,7 @@ func TestNoCallback(t *testing.T) { added := false removed := false - hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) + hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true }) if added || removed { t.Fatalf("Not expecting any callback notification. But received a callback") }