mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Synchronize the lazy setters in Server
They're all at risk of races on the first requests.
This commit is contained in:
parent
a928aa3d3f
commit
16a6603956
5 changed files with 47 additions and 15 deletions
|
@ -1,3 +1,5 @@
|
|||
require 'thread'
|
||||
|
||||
module ActionCable
|
||||
module Server
|
||||
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
|
||||
|
@ -13,7 +15,12 @@ module ActionCable
|
|||
def self.logger; config.logger; end
|
||||
delegate :logger, to: :config
|
||||
|
||||
attr_reader :mutex
|
||||
|
||||
def initialize
|
||||
@mutex = Mutex.new
|
||||
|
||||
@remote_connections = @stream_event_loop = @worker_pool = @channel_classes = @pubsub = nil
|
||||
end
|
||||
|
||||
# Called by rack to setup the server.
|
||||
|
@ -29,29 +36,31 @@ module ActionCable
|
|||
|
||||
# Gateway to RemoteConnections. See that class for details.
|
||||
def remote_connections
|
||||
@remote_connections ||= RemoteConnections.new(self)
|
||||
@remote_connections || @mutex.synchronize { @remote_connections ||= RemoteConnections.new(self) }
|
||||
end
|
||||
|
||||
def stream_event_loop
|
||||
@stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new
|
||||
@stream_event_loop || @mutex.synchronize { @stream_event_loop ||= ActionCable::Connection::StreamEventLoop.new }
|
||||
end
|
||||
|
||||
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
|
||||
def worker_pool
|
||||
@worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size)
|
||||
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
|
||||
end
|
||||
|
||||
# Requires and returns a hash of all the channel class constants keyed by name.
|
||||
def channel_classes
|
||||
@channel_classes ||= begin
|
||||
config.channel_paths.each { |channel_path| require channel_path }
|
||||
config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
|
||||
@channel_classes || @mutex.synchronize do
|
||||
@channel_classes ||= begin
|
||||
config.channel_paths.each { |channel_path| require channel_path }
|
||||
config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# Adapter used for all streams/broadcasting.
|
||||
def pubsub
|
||||
@pubsub ||= config.pubsub_adapter.new(self)
|
||||
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
|
||||
end
|
||||
|
||||
# All the identifiers applied to the connection class associated with this server.
|
||||
|
|
|
@ -4,8 +4,8 @@ module ActionCable
|
|||
module SubscriptionAdapter
|
||||
class Async < Inline # :nodoc:
|
||||
private
|
||||
def subscriber_map
|
||||
@subscriber_map ||= AsyncSubscriberMap.new
|
||||
def new_subscriber_map
|
||||
AsyncSubscriberMap.new
|
||||
end
|
||||
|
||||
class AsyncSubscriberMap < SubscriberMap
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
module ActionCable
|
||||
module SubscriptionAdapter
|
||||
class Inline < Base # :nodoc:
|
||||
def initialize(*)
|
||||
super
|
||||
@subscriber_map = nil
|
||||
end
|
||||
|
||||
def broadcast(channel, payload)
|
||||
subscriber_map.broadcast(channel, payload)
|
||||
end
|
||||
|
@ -19,7 +24,11 @@ module ActionCable
|
|||
|
||||
private
|
||||
def subscriber_map
|
||||
@subscriber_map ||= SubscriberMap.new
|
||||
@subscriber_map || @server.mutex.synchronize { @subscriber_map ||= new_subscriber_map }
|
||||
end
|
||||
|
||||
def new_subscriber_map
|
||||
SubscriberMap.new
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,6 +5,11 @@ require 'thread'
|
|||
module ActionCable
|
||||
module SubscriptionAdapter
|
||||
class PostgreSQL < Base # :nodoc:
|
||||
def initialize(*)
|
||||
super
|
||||
@listener = nil
|
||||
end
|
||||
|
||||
def broadcast(channel, payload)
|
||||
with_connection do |pg_conn|
|
||||
pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'")
|
||||
|
@ -37,7 +42,7 @@ module ActionCable
|
|||
|
||||
private
|
||||
def listener
|
||||
@listener ||= Listener.new(self)
|
||||
@listener || @server.mutex.synchronize { @listener ||= Listener.new(self) }
|
||||
end
|
||||
|
||||
class Listener < SubscriberMap
|
||||
|
|
|
@ -13,6 +13,11 @@ module ActionCable
|
|||
class Redis < Base # :nodoc:
|
||||
@@mutex = Mutex.new
|
||||
|
||||
def initialize(*)
|
||||
super
|
||||
@redis_connection_for_broadcasts = @redis_connection_for_subscriptions = nil
|
||||
end
|
||||
|
||||
def broadcast(channel, payload)
|
||||
redis_connection_for_broadcasts.publish(channel, payload)
|
||||
end
|
||||
|
@ -35,15 +40,19 @@ module ActionCable
|
|||
private
|
||||
def redis_connection_for_subscriptions
|
||||
ensure_reactor_running
|
||||
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
|
||||
redis.on(:reconnect_failed) do
|
||||
@logger.info "[ActionCable] Redis reconnect failed."
|
||||
@redis_connection_for_subscriptions || @server.mutex.synchronize do
|
||||
@redis_connection_for_subscriptions ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis|
|
||||
redis.on(:reconnect_failed) do
|
||||
@logger.info "[ActionCable] Redis reconnect failed."
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def redis_connection_for_broadcasts
|
||||
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
|
||||
@redis_connection_for_broadcasts || @server.mutex.synchronize do
|
||||
@redis_connection_for_broadcasts ||= ::Redis.new(@server.config.cable)
|
||||
end
|
||||
end
|
||||
|
||||
def ensure_reactor_running
|
||||
|
|
Loading…
Reference in a new issue