diff --git a/server/server.go b/server/server.go index f8e7539e7e..d82c5fe549 100644 --- a/server/server.go +++ b/server/server.go @@ -198,6 +198,15 @@ func (srv *Server) ContainerKill(job *engine.Job) engine.Status { return engine.StatusOK } +func (srv *Server) EvictListener(from string) { + srv.Lock() + if old, ok := srv.listeners[from]; ok { + delete(srv.listeners, from) + close(old) + } + srv.Unlock() +} + func (srv *Server) Events(job *engine.Job) engine.Status { if len(job.Args) != 1 { return job.Errorf("Usage: %s FROM", job.Name) @@ -215,15 +224,7 @@ func (srv *Server) Events(job *engine.Job) engine.Status { return fmt.Errorf("JSON error") } _, err = job.Stdout.Write(b) - if err != nil { - // On error, evict the listener - utils.Errorf("%s", err) - srv.Lock() - delete(srv.listeners, from) - srv.Unlock() - return err - } - return nil + return err } listener := make(chan utils.JSONMessage) @@ -244,8 +245,9 @@ func (srv *Server) Events(job *engine.Job) engine.Status { continue } if err != nil { - job.Error(err) - return engine.StatusErr + // On error, evict the listener + srv.EvictListener(from) + return job.Error(err) } } } @@ -263,6 +265,8 @@ func (srv *Server) Events(job *engine.Job) engine.Status { continue } if err != nil { + // On error, evict the listener + srv.EvictListener(from) return job.Error(err) } case <-timeout.C: