mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Listener no longer needs to be a singleton
We now only create one adapter instance for the server, so it can hold the listener. This in turn allows the listener to get the PG connection from the adapter, which will be a good place to allow more flexible configuration.
This commit is contained in:
parent
bc413e814b
commit
78ff63ee41
1 changed files with 58 additions and 52 deletions
|
@ -5,6 +5,20 @@ module ActionCable
|
|||
class Postgres < Base
|
||||
# The storage instance used for broadcasting. Not intended for direct user use.
|
||||
def broadcast(channel, payload)
|
||||
with_connection do |pg_conn|
|
||||
pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
|
||||
end
|
||||
end
|
||||
|
||||
def subscribe(channel, callback, success_callback = nil)
|
||||
listener.subscribe_to(channel, callback, success_callback)
|
||||
end
|
||||
|
||||
def unsubscribe(channel, callback)
|
||||
listener.unsubscribe_to(channel, callback)
|
||||
end
|
||||
|
||||
def with_connection(&block) # :nodoc:
|
||||
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
|
||||
pg_conn = ar_conn.raw_connection
|
||||
|
||||
|
@ -12,78 +26,70 @@ module ActionCable
|
|||
raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter'
|
||||
end
|
||||
|
||||
pg_conn.exec("NOTIFY #{channel}, '#{payload}'")
|
||||
yield pg_conn
|
||||
end
|
||||
end
|
||||
|
||||
def subscribe(channel, message_callback, success_callback = nil)
|
||||
Listener.instance.subscribe_to(channel, message_callback, success_callback)
|
||||
end
|
||||
private
|
||||
def listener
|
||||
@listener ||= Listener.new(self)
|
||||
end
|
||||
|
||||
def unsubscribe(channel, message_callback)
|
||||
Listener.instance.unsubscribe_to(channel, message_callback)
|
||||
end
|
||||
class Listener
|
||||
def initialize(adapter)
|
||||
@adapter = adapter
|
||||
@subscribers = Hash.new { |h,k| h[k] = [] }
|
||||
@sync = Mutex.new
|
||||
@queue = Queue.new
|
||||
|
||||
class Listener
|
||||
include Singleton
|
||||
|
||||
attr_accessor :subscribers
|
||||
|
||||
def initialize
|
||||
@subscribers = Hash.new {|h,k| h[k] = [] }
|
||||
@sync = Mutex.new
|
||||
@queue = Queue.new
|
||||
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
listen
|
||||
Thread.new do
|
||||
Thread.current.abort_on_exception = true
|
||||
listen
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def listen
|
||||
ActiveRecord::Base.connection_pool.with_connection do |ar_conn|
|
||||
pg_conn = ar_conn.raw_connection
|
||||
def listen
|
||||
@adapter.with_connection do |pg_conn|
|
||||
loop do
|
||||
until @queue.empty?
|
||||
value = @queue.pop(true)
|
||||
if value.first == :listen
|
||||
pg_conn.exec("LISTEN #{value[1]}")
|
||||
::EM.next_tick(&value[2]) if value[2]
|
||||
elsif value.first == :unlisten
|
||||
pg_conn.exec("UNLISTEN #{value[1]}")
|
||||
end
|
||||
|
||||
loop do
|
||||
until @queue.empty?
|
||||
value = @queue.pop(true)
|
||||
if value.first == :listen
|
||||
pg_conn.exec("LISTEN #{value[1]}")
|
||||
::EM.next_tick(&value[2]) if value[2]
|
||||
elsif value.first == :unlisten
|
||||
pg_conn.exec("UNLISTEN #{value[1]}")
|
||||
end
|
||||
end
|
||||
|
||||
pg_conn.wait_for_notify(1) do |chan, pid, message|
|
||||
@subscribers[chan].each do |callback|
|
||||
::EM.next_tick { callback.call(message) }
|
||||
pg_conn.wait_for_notify(1) do |chan, pid, message|
|
||||
@subscribers[chan].each do |callback|
|
||||
::EM.next_tick { callback.call(message) }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def subscribe_to(channel, callback, success_callback)
|
||||
@sync.synchronize do
|
||||
if @subscribers[channel].empty?
|
||||
@queue.push([:listen, channel, success_callback])
|
||||
def subscribe_to(channel, callback, success_callback)
|
||||
@sync.synchronize do
|
||||
if @subscribers[channel].empty?
|
||||
@queue.push([:listen, channel, success_callback])
|
||||
end
|
||||
|
||||
@subscribers[channel] << callback
|
||||
end
|
||||
|
||||
@subscribers[channel] << callback
|
||||
end
|
||||
end
|
||||
|
||||
def unsubscribe_to(channel, callback)
|
||||
@sync.synchronize do
|
||||
@subscribers[channel].delete(callback)
|
||||
def unsubscribe_to(channel, callback)
|
||||
@sync.synchronize do
|
||||
@subscribers[channel].delete(callback)
|
||||
|
||||
if @subscribers[channel].empty?
|
||||
@queue.push([:unlisten, channel])
|
||||
if @subscribers[channel].empty?
|
||||
@queue.push([:unlisten, channel])
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue