1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Properly support reloading for Action Cable channels

This commit is contained in:
Matthew Draper 2016-06-02 04:08:48 +09:30
parent 7b75ca100d
commit a8df1bc345
10 changed files with 56 additions and 84 deletions

View file

@ -26,12 +26,12 @@ module ActionCable
id_key = data['identifier'] id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.channel_classes[id_options[:channel]] subscription_klass = id_options[:channel].safe_constantize
if subscription_klass if subscription_klass && ActionCable::Channel::Base >= subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else else
logger.error "Subscription class not found (#{data.inspect})" logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end end
end end

View file

@ -31,11 +31,8 @@ module ActionCable
self.cable = Rails.application.config_for(config_path).with_indifferent_access self.cable = Rails.application.config_for(config_path).with_indifferent_access
end end
if 'ApplicationCable::Connection'.safe_constantize previous_connection_class = self.connection_class
self.connection_class = ApplicationCable::Connection self.connection_class = -> { 'ApplicationCable::Connection'.safe_constantize || previous_connection_class.call }
end
self.channel_paths = Rails.application.paths['app/channels'].existent
options.each { |k,v| send("#{k}=", v) } options.each { |k,v| send("#{k}=", v) }
end end

View file

@ -19,13 +19,13 @@ module ActionCable
def initialize def initialize
@mutex = Monitor.new @mutex = Monitor.new
@remote_connections = @event_loop = @worker_pool = @channel_classes = @pubsub = nil @remote_connections = @event_loop = @worker_pool = @pubsub = nil
end end
# Called by Rack to setup the server. # Called by Rack to setup the server.
def call(env) def call(env)
setup_heartbeat_timer setup_heartbeat_timer
config.connection_class.new(self, env).process config.connection_class.call.new(self, env).process
end end
# Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections. # Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
@ -67,16 +67,6 @@ module ActionCable
@worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) } @worker_pool || @mutex.synchronize { @worker_pool ||= ActionCable::Server::Worker.new(max_size: config.worker_pool_size) }
end end
# Requires and returns a hash of all of the channel class constants, which are keyed by name.
def channel_classes
@channel_classes || @mutex.synchronize do
@channel_classes ||= begin
config.channel_paths.each { |channel_path| require channel_path }
config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
end
end
end
# Adapter used for all streams/broadcasting. # Adapter used for all streams/broadcasting.
def pubsub def pubsub
@pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) } @pubsub || @mutex.synchronize { @pubsub ||= config.pubsub_adapter.new(self) }
@ -84,7 +74,7 @@ module ActionCable
# All of the identifiers applied to the connection class associated with this server. # All of the identifiers applied to the connection class associated with this server.
def connection_identifiers def connection_identifiers
config.connection_class.identifiers config.connection_class.call.identifiers
end end
end end

View file

@ -8,23 +8,15 @@ module ActionCable
attr_accessor :disable_request_forgery_protection, :allowed_request_origins attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :cable, :url, :mount_path attr_accessor :cable, :url, :mount_path
attr_accessor :channel_paths # :nodoc:
def initialize def initialize
@log_tags = [] @log_tags = []
@connection_class = ActionCable::Connection::Base @connection_class = -> { ActionCable::Connection::Base }
@worker_pool_size = 4 @worker_pool_size = 4
@disable_request_forgery_protection = false @disable_request_forgery_protection = false
end end
def channel_class_names
@channel_class_names ||= channel_paths.collect do |channel_path|
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
# Returns constant of subscription adapter specified in config/cable.yml. # Returns constant of subscription adapter specified in config/cable.yml.
# If the adapter cannot be found, this will default to the Redis adapter. # If the adapter cannot be found, this will default to the Redis adapter.
# Also makes sure proper dependencies are required. # Also makes sure proper dependencies are required.

View file

@ -128,10 +128,6 @@ module ActionCable::StreamTests
setup do setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
@server.config.allowed_request_origins = %w( http://rubyonrails.com ) @server.config.allowed_request_origins = %w( http://rubyonrails.com )
@server.stubs(:channel_classes).returns(
ChatChannel.name => ChatChannel,
UserCallbackChannel.name => UserCallbackChannel,
)
end end
test 'custom encoder' do test 'custom encoder' do

View file

@ -1,22 +0,0 @@
class EchoChannel < ActionCable::Channel::Base
def subscribed
stream_from "global"
end
def unsubscribed
'Goodbye from EchoChannel!'
end
def ding(data)
transmit(dong: data['message'])
end
def delay(data)
sleep 1
transmit(dong: data['message'])
end
def bulk(data)
ActionCable.server.broadcast "global", wide: data['message']
end
end

View file

@ -11,6 +11,29 @@ class ClientTest < ActionCable::TestCase
WAIT_WHEN_EXPECTING_EVENT = 8 WAIT_WHEN_EXPECTING_EVENT = 8
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5 WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
class EchoChannel < ActionCable::Channel::Base
def subscribed
stream_from "global"
end
def unsubscribed
'Goodbye from EchoChannel!'
end
def ding(data)
transmit(dong: data['message'])
end
def delay(data)
sleep 1
transmit(dong: data['message'])
end
def bulk(data)
ActionCable.server.broadcast "global", wide: data['message']
end
end
def setup def setup
ActionCable.instance_variable_set(:@server, nil) ActionCable.instance_variable_set(:@server, nil)
server = ActionCable.server server = ActionCable.server
@ -21,7 +44,6 @@ class ClientTest < ActionCable::TestCase
# and now the "real" setup for our test: # and now the "real" setup for our test:
server.config.disable_request_forgery_protection = true server.config.disable_request_forgery_protection = true
server.config.channel_paths = [ File.expand_path('client/echo_channel.rb', __dir__) ]
Thread.new { EventMachine.run } unless EventMachine.reactor_running? Thread.new { EventMachine.run } unless EventMachine.reactor_running?
Thread.pass until EventMachine.reactor_running? Thread.pass until EventMachine.reactor_running?
@ -148,10 +170,10 @@ class ClientTest < ActionCable::TestCase
with_puma_server do |port| with_puma_server do |port|
c = faye_client(port) c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
c.close c.close
end end
end end
@ -165,12 +187,12 @@ class ClientTest < ActionCable::TestCase
clients.map {|c| Concurrent::Future.execute { clients.map {|c| Concurrent::Future.execute {
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
assert_equal clients.size, c.read_messages(clients.size).size assert_equal clients.size, c.read_messages(clients.size).size
} }.each(&:wait!) } }.each(&:wait!)
@ -185,10 +207,10 @@ class ClientTest < ActionCable::TestCase
clients.map {|c| Concurrent::Future.execute { clients.map {|c| Concurrent::Future.execute {
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
} }.each(&:wait!) } }.each(&:wait!)
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!) clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
@ -199,17 +221,17 @@ class ClientTest < ActionCable::TestCase
with_puma_server do |port| with_puma_server do |port|
c = faye_client(port) c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
c.close # disappear before write c.close # disappear before write
c = faye_client(port) c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello') c.send_message command: 'message', identifier: JSON.generate(channel: 'ClientTest::EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message) assert_equal({"identifier"=>'{"channel":"ClientTest::EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
c.close # disappear before read c.close # disappear before read
end end
end end
@ -217,12 +239,12 @@ class ClientTest < ActionCable::TestCase
def test_unsubscribe_client def test_unsubscribe_client
with_puma_server do |port| with_puma_server do |port|
app = ActionCable.server app = ActionCable.server
identifier = JSON.generate(channel: 'EchoChannel') identifier = JSON.generate(channel: 'ClientTest::EchoChannel')
c = faye_client(port) c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) assert_equal({"type" => "welcome"}, c.read_message)
c.send_message command: 'subscribe', identifier: identifier c.send_message command: 'subscribe', identifier: identifier
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
assert_equal(1, app.connections.count) assert_equal(1, app.connections.count)
assert(app.remote_connections.where(identifier: identifier)) assert(app.remote_connections.where(identifier: identifier))
@ -242,8 +264,8 @@ class ClientTest < ActionCable::TestCase
with_puma_server do |port| with_puma_server do |port|
c = faye_client(port) c = faye_client(port)
assert_equal({"type" => "welcome"}, c.read_message) assert_equal({"type" => "welcome"}, c.read_message)
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel') c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'ClientTest::EchoChannel')
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message) assert_equal({"identifier"=>"{\"channel\":\"ClientTest::EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
ActionCable.server.restart ActionCable.server.restart
c.wait_for_close c.wait_for_close

View file

@ -24,7 +24,6 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
setup do setup do
@server = TestServer.new @server = TestServer.new
@server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
@chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel') @chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
end end

View file

@ -1,4 +1,3 @@
# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading.
module ApplicationCable module ApplicationCable
class Channel < ActionCable::Channel::Base class Channel < ActionCable::Channel::Base
end end

View file

@ -1,4 +1,3 @@
# Be sure to restart your server when you modify this file. Action Cable runs in a loop that does not support auto reloading.
module ApplicationCable module ApplicationCable
class Connection < ActionCable::Connection::Base class Connection < ActionCable::Connection::Base
end end