From d0c4e448632f49b73bf63862a4b20b142ff7f2c9 Mon Sep 17 00:00:00 2001 From: Arnaud Porterie Date: Wed, 4 Jun 2014 20:47:09 +0200 Subject: [PATCH] Refactor events publishing Events subscription (/events API endpoint) attributes pseudo-unique identifiers to incoming subscribers: originally its host, then its subscription time. This is unecessary and leads to code complexity. Introduce a JSONMessagePublisher to provide simple pub/sub mechanism for JSONMessage, and rely on this new type to publish events to all subscribed listeners. The original logic is kept for the 'since' and 'until' parameters, and for client disconnection handling. Docker-DCO-1.1-Signed-off-by: Arnaud Porterie (github: icecrime) --- server/server.go | 107 +++++++++++------------------ server/server_unit_test.go | 8 +-- utils/jsonmessagepublisher.go | 61 ++++++++++++++++ utils/jsonmessagepublisher_test.go | 73 ++++++++++++++++++++ 4 files changed, 176 insertions(+), 73 deletions(-) create mode 100644 utils/jsonmessagepublisher.go create mode 100644 utils/jsonmessagepublisher_test.go diff --git a/server/server.go b/server/server.go index 76a51e796f..620c8b20b8 100644 --- a/server/server.go +++ b/server/server.go @@ -248,85 +248,63 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status { return engine.StatusOK } -func (srv *Server) EvictListener(from int64) { - srv.Lock() - if old, ok := srv.listeners[from]; ok { - delete(srv.listeners, from) - close(old) - } - srv.Unlock() -} - func (srv *Server) Events(job *engine.Job) engine.Status { if len(job.Args) != 0 { return job.Errorf("Usage: %s", job.Name) } var ( - from = time.Now().UTC().UnixNano() since = job.GetenvInt64("since") until = job.GetenvInt64("until") timeout = time.NewTimer(time.Unix(until, 0).Sub(time.Now())) ) - sendEvent := func(event *utils.JSONMessage) error { - b, err := json.Marshal(event) - if err != nil { - return fmt.Errorf("JSON error") - } - _, err = job.Stdout.Write(b) - return err + + // If no until, disable timeout + if until == 0 { + timeout.Stop() } listener := make(chan utils.JSONMessage) - srv.Lock() - if old, ok := srv.listeners[from]; ok { - delete(srv.listeners, from) - close(old) + srv.eventPublisher.Subscribe(listener) + defer srv.eventPublisher.Unsubscribe(listener) + + // When sending an event JSON serialization errors are ignored, but all + // other errors lead to the eviction of the listener. + sendEvent := func(event *utils.JSONMessage) error { + if b, err := json.Marshal(event); err == nil { + if _, err = job.Stdout.Write(b); err != nil { + return err + } + } + return nil } - srv.listeners[from] = listener - srv.Unlock() - job.Stdout.Write(nil) // flush + + job.Stdout.Write(nil) + + // Resend every event in the [since, until] time interval. if since != 0 { - // If since, send previous events that happened after the timestamp and until timestamp for _, event := range srv.GetEvents() { if event.Time >= since && (event.Time <= until || until == 0) { - err := sendEvent(&event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - // On error, evict the listener - srv.EvictListener(from) + if err := sendEvent(&event); err != nil { return job.Error(err) } } } } - // If no until, disable timeout - if until == 0 { - timeout.Stop() - } for { select { case event, ok := <-listener: - if !ok { // Channel is closed: listener was evicted + if !ok { return engine.StatusOK } - err := sendEvent(&event) - if err != nil && err.Error() == "JSON error" { - continue - } - if err != nil { - // On error, evict the listener - srv.EvictListener(from) + if err := sendEvent(&event); err != nil { return job.Error(err) } case <-timeout.C: return engine.StatusOK } } - return engine.StatusOK } func (srv *Server) ContainerExport(job *engine.Job) engine.Status { @@ -797,7 +775,7 @@ func (srv *Server) DockerInfo(job *engine.Job) engine.Status { v.SetInt("NFd", utils.GetTotalUsedFds()) v.SetInt("NGoroutines", runtime.NumGoroutine()) v.Set("ExecutionDriver", srv.daemon.ExecutionDriver().Name()) - v.SetInt("NEventsListener", len(srv.listeners)) + v.SetInt("NEventsListener", srv.eventPublisher.SubscribersCount()) v.Set("KernelVersion", kernelVersion) v.Set("IndexServerAddress", registry.IndexServerAddress()) v.Set("InitSha1", dockerversion.INITSHA1) @@ -2387,12 +2365,12 @@ func NewServer(eng *engine.Engine, config *daemonconfig.Config) (*Server, error) return nil, err } srv := &Server{ - Eng: eng, - daemon: daemon, - pullingPool: make(map[string]chan struct{}), - pushingPool: make(map[string]chan struct{}), - events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events - listeners: make(map[int64]chan utils.JSONMessage), + Eng: eng, + daemon: daemon, + pullingPool: make(map[string]chan struct{}), + pushingPool: make(map[string]chan struct{}), + events: make([]utils.JSONMessage, 0, 64), //only keeps the 64 last events + eventPublisher: utils.NewJSONMessagePublisher(), } daemon.SetServer(srv) return srv, nil @@ -2402,14 +2380,7 @@ func (srv *Server) LogEvent(action, id, from string) *utils.JSONMessage { now := time.Now().UTC().Unix() jm := utils.JSONMessage{Status: action, ID: id, From: from, Time: now} srv.AddEvent(jm) - srv.Lock() - for _, c := range srv.listeners { - select { // non blocking channel - case c <- jm: - default: - } - } - srv.Unlock() + srv.eventPublisher.Publish(jm) return &jm } @@ -2461,12 +2432,12 @@ func (srv *Server) Close() error { type Server struct { sync.RWMutex - daemon *daemon.Daemon - pullingPool map[string]chan struct{} - pushingPool map[string]chan struct{} - events []utils.JSONMessage - listeners map[int64]chan utils.JSONMessage - Eng *engine.Engine - running bool - tasks sync.WaitGroup + daemon *daemon.Daemon + pullingPool map[string]chan struct{} + pushingPool map[string]chan struct{} + events []utils.JSONMessage + eventPublisher *utils.JSONMessagePublisher + Eng *engine.Engine + running bool + tasks sync.WaitGroup } diff --git a/server/server_unit_test.go b/server/server_unit_test.go index 47e4be8280..e6c5d49b82 100644 --- a/server/server_unit_test.go +++ b/server/server_unit_test.go @@ -47,16 +47,14 @@ func TestPools(t *testing.T) { func TestLogEvent(t *testing.T) { srv := &Server{ - events: make([]utils.JSONMessage, 0, 64), - listeners: make(map[int64]chan utils.JSONMessage), + events: make([]utils.JSONMessage, 0, 64), + eventPublisher: utils.NewJSONMessagePublisher(), } srv.LogEvent("fakeaction", "fakeid", "fakeimage") listener := make(chan utils.JSONMessage) - srv.Lock() - srv.listeners[1337] = listener - srv.Unlock() + srv.eventPublisher.Subscribe(listener) srv.LogEvent("fakeaction2", "fakeid", "fakeimage") diff --git a/utils/jsonmessagepublisher.go b/utils/jsonmessagepublisher.go new file mode 100644 index 0000000000..659e6c8304 --- /dev/null +++ b/utils/jsonmessagepublisher.go @@ -0,0 +1,61 @@ +package utils + +import ( + "sync" + "time" +) + +func NewJSONMessagePublisher() *JSONMessagePublisher { + return &JSONMessagePublisher{} +} + +type JSONMessageListener chan<- JSONMessage + +type JSONMessagePublisher struct { + m sync.RWMutex + subscribers []JSONMessageListener +} + +func (p *JSONMessagePublisher) Subscribe(l JSONMessageListener) { + p.m.Lock() + p.subscribers = append(p.subscribers, l) + p.m.Unlock() +} + +func (p *JSONMessagePublisher) SubscribersCount() int { + p.m.RLock() + count := len(p.subscribers) + p.m.RUnlock() + return count +} + +// Unsubscribe closes and removes the specified listener from the list of +// previously registed ones. +// It returns a boolean value indicating if the listener was successfully +// found, closed and unregistered. +func (p *JSONMessagePublisher) Unsubscribe(l JSONMessageListener) bool { + p.m.Lock() + defer p.m.Unlock() + + for i, subscriber := range p.subscribers { + if subscriber == l { + close(l) + p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) + return true + } + } + return false +} + +func (p *JSONMessagePublisher) Publish(m JSONMessage) { + p.m.RLock() + for _, subscriber := range p.subscribers { + // We give each subscriber a 100ms time window to receive the event, + // after which we move to the next. + select { + case subscriber <- m: + case <-time.After(100 * time.Millisecond): + } + } + p.m.RUnlock() +} diff --git a/utils/jsonmessagepublisher_test.go b/utils/jsonmessagepublisher_test.go new file mode 100644 index 0000000000..2e1a820ca3 --- /dev/null +++ b/utils/jsonmessagepublisher_test.go @@ -0,0 +1,73 @@ +package utils + +import ( + "testing" + "time" +) + +func assertSubscribersCount(t *testing.T, q *JSONMessagePublisher, expected int) { + if q.SubscribersCount() != expected { + t.Fatalf("Expected %d registered subscribers, got %d", expected, q.SubscribersCount()) + } +} + +func TestJSONMessagePublisherSubscription(t *testing.T) { + q := NewJSONMessagePublisher() + l1 := make(chan JSONMessage) + l2 := make(chan JSONMessage) + + assertSubscribersCount(t, q, 0) + q.Subscribe(l1) + assertSubscribersCount(t, q, 1) + q.Subscribe(l2) + assertSubscribersCount(t, q, 2) + + q.Unsubscribe(l1) + q.Unsubscribe(l2) + assertSubscribersCount(t, q, 0) +} + +func TestJSONMessagePublisherPublish(t *testing.T) { + q := NewJSONMessagePublisher() + l1 := make(chan JSONMessage) + l2 := make(chan JSONMessage) + + go func() { + for { + select { + case <-l1: + close(l1) + l1 = nil + case <-l2: + close(l2) + l2 = nil + case <-time.After(1 * time.Second): + q.Unsubscribe(l1) + q.Unsubscribe(l2) + t.Fatal("Timeout waiting for broadcasted message") + } + } + }() + + q.Subscribe(l1) + q.Subscribe(l2) + q.Publish(JSONMessage{}) +} + +func TestJSONMessagePublishTimeout(t *testing.T) { + q := NewJSONMessagePublisher() + l := make(chan JSONMessage) + q.Subscribe(l) + + c := make(chan struct{}) + go func() { + q.Publish(JSONMessage{}) + close(c) + }() + + select { + case <-c: + case <-time.After(time.Second): + t.Fatal("Timeout publishing message") + } +}