Merge pull request #695 from mavenugo/watch_retry

Add watch retrigger when store restarts
This commit is contained in:
Jana Radhakrishnan 2015-10-22 11:44:33 -07:00
commit 15871f32fc
5 changed files with 72 additions and 17 deletions

View File

@ -218,7 +218,14 @@ func (c *controller) initDiscovery(watcher discovery.Watcher) error {
}
c.discovery = hostdiscovery.NewHostDiscovery(watcher)
return c.discovery.Watch(c.hostJoinCallback, c.hostLeaveCallback)
return c.discovery.Watch(c.activeCallback, c.hostJoinCallback, c.hostLeaveCallback)
}
func (c *controller) activeCallback() {
ds := c.getStore(datastore.GlobalScope)
if ds != nil && !ds.Active() {
ds.RestartWatch()
}
}
func (c *controller) hostJoinCallback(nodes []net.IP) {

View File

@ -34,6 +34,10 @@ type DataStore interface {
Watchable() bool
// Watch for changes on a KVObject
Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KVObject, error)
// RestartWatch retriggers stopped Watches
RestartWatch()
// Active returns if the store is active
Active() bool
// List returns of a list of KVObjects belonging to the parent
// key. The caller must pass a KVObject of the same type as
// the objects that need to be listed
@ -53,9 +57,11 @@ var (
)
type datastore struct {
scope string
store store.Store
cache *cache
scope string
store store.Store
cache *cache
watchCh chan struct{}
active bool
sync.Mutex
}
@ -204,7 +210,7 @@ func newClient(scope string, kv string, addr string, config *store.Config, cache
return nil, err
}
ds := &datastore{scope: scope, store: store}
ds := &datastore{scope: scope, store: store, active: true, watchCh: make(chan struct{})}
if cached {
ds.cache = newCache(ds)
}
@ -239,6 +245,10 @@ func (ds *datastore) Scope() string {
return ds.scope
}
func (ds *datastore) Active() bool {
return ds.active
}
func (ds *datastore) Watchable() bool {
return ds.scope != LocalScope
}
@ -259,6 +269,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
kvoCh := make(chan KVObject)
go func() {
retry_watch:
var err error
// Make sure to get a new instance of watch channel
ds.Lock()
watchCh := ds.watchCh
ds.Unlock()
loop:
for {
select {
case <-stopCh:
@ -269,12 +288,15 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
// for the watch can exit resulting in a nil value in
// channel.
if kvPair == nil {
close(sCh)
return
ds.Lock()
ds.active = false
ds.Unlock()
break loop
}
dstO := ctor.New()
if err := dstO.SetValue(kvPair.Value); err != nil {
if err = dstO.SetValue(kvPair.Value); err != nil {
log.Printf("Could not unmarshal kvpair value = %s", string(kvPair.Value))
break
}
@ -283,11 +305,31 @@ func (ds *datastore) Watch(kvObject KVObject, stopCh <-chan struct{}) (<-chan KV
kvoCh <- dstO
}
}
// Wait on watch channel for a re-trigger when datastore becomes active
<-watchCh
kvpCh, err = ds.store.Watch(Key(kvObject.Key()...), sCh)
if err != nil {
log.Printf("Could not watch the key %s in store: %v", Key(kvObject.Key()...), err)
}
goto retry_watch
}()
return kvoCh, nil
}
func (ds *datastore) RestartWatch() {
ds.Lock()
defer ds.Unlock()
ds.active = true
watchCh := ds.watchCh
ds.watchCh = make(chan struct{})
close(watchCh)
}
func (ds *datastore) KVStore() store.Store {
return ds.store
}

View File

@ -34,7 +34,7 @@ func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery {
return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})}
}
func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error {
func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error {
h.Lock()
d := h.watcher
h.Unlock()
@ -42,15 +42,16 @@ func (h *hostDiscovery) Watch(joinCallback JoinCallback, leaveCallback LeaveCall
return types.BadRequestErrorf("invalid discovery watcher")
}
discoveryCh, errCh := d.Watch(h.stopChan)
go h.monitorDiscovery(discoveryCh, errCh, joinCallback, leaveCallback)
go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback)
return nil
}
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, joinCallback JoinCallback, leaveCallback LeaveCallback) {
func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
for {
select {
case entries := <-ch:
h.processCallback(entries, joinCallback, leaveCallback)
h.processCallback(entries, activeCallback, joinCallback, leaveCallback)
case err := <-errCh:
if err != nil {
log.Errorf("discovery error: %v", err)
@ -71,7 +72,8 @@ func (h *hostDiscovery) StopDiscovery() error {
return nil
}
func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback JoinCallback, leaveCallback LeaveCallback) {
func (h *hostDiscovery) processCallback(entries discovery.Entries,
activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) {
updated := hosts(entries)
h.Lock()
existing := h.nodes
@ -79,6 +81,7 @@ func (h *hostDiscovery) processCallback(entries discovery.Entries, joinCallback
h.nodes = updated
h.Unlock()
activeCallback()
if len(added) > 0 {
joinCallback(added)
}

View File

@ -5,13 +5,16 @@ import "net"
// JoinCallback provides a callback event for new node joining the cluster
type JoinCallback func(entries []net.IP)
// ActiveCallback provides a callback event for active discovery event
type ActiveCallback func()
// LeaveCallback provides a callback event for node leaving the cluster
type LeaveCallback func(entries []net.IP)
// HostDiscovery primary interface
type HostDiscovery interface {
//Watch Node join and leave cluster events
Watch(joinCallback JoinCallback, leaveCallback LeaveCallback) error
Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error
// StopDiscovery stops the discovery perocess
StopDiscovery() error
// Fetch returns a list of host IPs that are currently discovered

View File

@ -44,7 +44,7 @@ func TestAddedCallback(t *testing.T) {
added := false
removed := false
hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
if !added {
t.Fatalf("Expecting a Added callback notification. But none received")
}
@ -57,7 +57,7 @@ func TestRemovedCallback(t *testing.T) {
added := false
removed := false
hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
if !removed {
t.Fatalf("Expecting a Removed callback notification. But none received")
}
@ -70,7 +70,7 @@ func TestNoCallback(t *testing.T) {
added := false
removed := false
hd.processCallback(update, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
hd.processCallback(update, func() {}, func(hosts []net.IP) { added = true }, func(hosts []net.IP) { removed = true })
if added || removed {
t.Fatalf("Not expecting any callback notification. But received a callback")
}