mirror of
https://github.com/moby/moby.git
synced 2022-11-09 12:21:53 -05:00
d0c4e44863
Events subscription (/events API endpoint) attributes pseudo-unique identifiers to incoming subscribers: originally its host, then its subscription time. This is unecessary and leads to code complexity. Introduce a JSONMessagePublisher to provide simple pub/sub mechanism for JSONMessage, and rely on this new type to publish events to all subscribed listeners. The original logic is kept for the 'since' and 'until' parameters, and for client disconnection handling. Docker-DCO-1.1-Signed-off-by: Arnaud Porterie <icecrime@gmail.com> (github: icecrime)
61 lines
1.3 KiB
Go
61 lines
1.3 KiB
Go
package utils
|
|
|
|
import (
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
func NewJSONMessagePublisher() *JSONMessagePublisher {
|
|
return &JSONMessagePublisher{}
|
|
}
|
|
|
|
type JSONMessageListener chan<- JSONMessage
|
|
|
|
type JSONMessagePublisher struct {
|
|
m sync.RWMutex
|
|
subscribers []JSONMessageListener
|
|
}
|
|
|
|
func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) {
|
|
p.m.Lock()
|
|
p.subscribers = append(p.subscribers, l)
|
|
p.m.Unlock()
|
|
}
|
|
|
|
func (p *JSONMessagePublisher) SubscribersCount() int {
|
|
p.m.RLock()
|
|
count := len(p.subscribers)
|
|
p.m.RUnlock()
|
|
return count
|
|
}
|
|
|
|
// Unsubscribe closes and removes the specified listener from the list of
|
|
// previously registed ones.
|
|
// It returns a boolean value indicating if the listener was successfully
|
|
// found, closed and unregistered.
|
|
func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool {
|
|
p.m.Lock()
|
|
defer p.m.Unlock()
|
|
|
|
for i, subscriber := range p.subscribers {
|
|
if subscriber == l {
|
|
close(l)
|
|
p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...)
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (p *JSONMessagePublisher) Publish(m JSONMessage) {
|
|
p.m.RLock()
|
|
for _, subscriber := range p.subscribers {
|
|
// We give each subscriber a 100ms time window to receive the event,
|
|
// after which we move to the next.
|
|
select {
|
|
case subscriber <- m:
|
|
case <-time.After(100 * time.Millisecond):
|
|
}
|
|
}
|
|
p.m.RUnlock()
|
|
}
|