package hostdiscovery import ( "net" "sync" log "github.com/Sirupsen/logrus" mapset "github.com/deckarep/golang-set" "github.com/docker/docker/pkg/discovery" // Including KV _ "github.com/docker/docker/pkg/discovery/kv" "github.com/docker/libkv/store/consul" "github.com/docker/libkv/store/etcd" "github.com/docker/libkv/store/zookeeper" "github.com/docker/libnetwork/types" ) type hostDiscovery struct { watcher discovery.Watcher nodes mapset.Set stopChan chan struct{} sync.Mutex } func init() { consul.Register() etcd.Register() zookeeper.Register() } // NewHostDiscovery function creates a host discovery object func NewHostDiscovery(watcher discovery.Watcher) HostDiscovery { return &hostDiscovery{watcher: watcher, nodes: mapset.NewSet(), stopChan: make(chan struct{})} } func (h *hostDiscovery) Watch(activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) error { h.Lock() d := h.watcher h.Unlock() if d == nil { return types.BadRequestErrorf("invalid discovery watcher") } discoveryCh, errCh := d.Watch(h.stopChan) go h.monitorDiscovery(discoveryCh, errCh, activeCallback, joinCallback, leaveCallback) return nil } func (h *hostDiscovery) monitorDiscovery(ch <-chan discovery.Entries, errCh <-chan error, activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) { for { select { case entries := <-ch: h.processCallback(entries, activeCallback, joinCallback, leaveCallback) case err := <-errCh: if err != nil { log.Errorf("discovery error: %v", err) } case <-h.stopChan: return } } } func (h *hostDiscovery) StopDiscovery() error { h.Lock() stopChan := h.stopChan h.watcher = nil h.Unlock() close(stopChan) return nil } func (h *hostDiscovery) processCallback(entries discovery.Entries, activeCallback ActiveCallback, joinCallback JoinCallback, leaveCallback LeaveCallback) { updated := hosts(entries) h.Lock() existing := h.nodes added, removed := diff(existing, updated) h.nodes = updated h.Unlock() activeCallback() if len(added) > 0 { joinCallback(added) } if len(removed) > 0 { leaveCallback(removed) } } func diff(existing mapset.Set, updated mapset.Set) (added []net.IP, removed []net.IP) { addSlice := updated.Difference(existing).ToSlice() removeSlice := existing.Difference(updated).ToSlice() for _, ip := range addSlice { added = append(added, net.ParseIP(ip.(string))) } for _, ip := range removeSlice { removed = append(removed, net.ParseIP(ip.(string))) } return } func (h *hostDiscovery) Fetch() []net.IP { h.Lock() defer h.Unlock() ips := []net.IP{} for _, ipstr := range h.nodes.ToSlice() { ips = append(ips, net.ParseIP(ipstr.(string))) } return ips } func hosts(entries discovery.Entries) mapset.Set { hosts := mapset.NewSet() for _, entry := range entries { hosts.Add(entry.Host) } return hosts }