diff --git a/pkg/pubsub/publisher.go b/pkg/pubsub/publisher.go index 534b74ad4d..ab457cfba9 100644 --- a/pkg/pubsub/publisher.go +++ b/pkg/pubsub/publisher.go @@ -19,6 +19,8 @@ func NewPublisher(publishTimeout time.Duration, buffer int) *Publisher { type subscriber chan interface{} +// Publisher is basic pub/sub structure. Allows to send events and subscribe +// to them. Can be safely used from multiple goroutines. type Publisher struct { m sync.RWMutex buffer int @@ -56,9 +58,16 @@ func (p *Publisher) Publish(v interface{}) { p.m.RLock() for sub := range p.subscribers { // send under a select as to not block if the receiver is unavailable + if p.timeout > 0 { + select { + case sub <- v: + case <-time.After(p.timeout): + } + continue + } select { case sub <- v: - case <-time.After(p.timeout): + default: } } p.m.RUnlock()