1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Merge pull request #24540 from sgrif/sg-actioncable-callbacks

Run Action Cable callbacks through the worker pool
This commit is contained in:
Sean Griffin 2016-04-14 11:25:08 -06:00
commit 4769fc4b4d
2 changed files with 30 additions and 5 deletions

View file

@ -76,8 +76,11 @@ module ActionCable
# Don't send the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
if handler = callback || block
handler = -> message { handler.(coder.decode(message)) } if coder
if user_handler = callback || block
user_handler = -> message { handler.(coder.decode(message)) } if coder
handler = -> message do
connection.worker_pool.async_invoke(user_handler, :call, message)
end
else
handler = default_stream_handler(broadcasting, coder: coder)
end

View file

@ -116,11 +116,22 @@ module ActionCable::StreamTests
require 'action_cable/subscription_adapter/inline'
class UserCallbackChannel < ActionCable::Channel::Base
def subscribed
stream_from :channel do
Thread.current[:ran_callback] = true
end
end
end
class StreamEncodingTest < ActionCable::TestCase
setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
@server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
@server.stubs(:channel_classes).returns(
ChatChannel.name => ChatChannel,
UserCallbackChannel.name => UserCallbackChannel,
)
end
test 'custom encoder' do
@ -134,6 +145,17 @@ module ActionCable::StreamTests
end
end
test "user supplied callbacks are run through the worker pool" do
run_in_eventmachine do
connection = open_connection
receive(connection, command: 'subscribe', channel: UserCallbackChannel.name, identifiers: { id: 1 })
@server.broadcast 'channel', {}
wait_for_async
refute Thread.current[:ran_callback], "User callback was not run through the worker pool"
end
end
private
def subscribe_to(connection, identifiers:)
receive connection, command: 'subscribe', identifiers: identifiers
@ -151,8 +173,8 @@ module ActionCable::StreamTests
end
end
def receive(connection, command:, identifiers:)
identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers)
def receive(connection, command:, identifiers:, channel: 'ActionCable::StreamTests::ChatChannel')
identifier = JSON.generate(channel: channel, **identifiers)
connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier)
wait_for_async
end