mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Cable: Extract stream handler construction
* Use separate stream handler builders for easy override and testing. * Fix worker pool execution that was silently failing since it only expected connection receivers. Sparked by code in #24162.
This commit is contained in:
parent
7ad4690b21
commit
3ba0eec20c
6 changed files with 75 additions and 25 deletions
|
@ -73,18 +73,13 @@ module ActionCable
|
||||||
# Defaults to `coder: nil` which does no decoding, passes raw messages.
|
# Defaults to `coder: nil` which does no decoding, passes raw messages.
|
||||||
def stream_from(broadcasting, callback = nil, coder: nil, &block)
|
def stream_from(broadcasting, callback = nil, coder: nil, &block)
|
||||||
broadcasting = String(broadcasting)
|
broadcasting = String(broadcasting)
|
||||||
|
|
||||||
# Don't send the confirmation until pubsub#subscribe is successful
|
# Don't send the confirmation until pubsub#subscribe is successful
|
||||||
defer_subscription_confirmation!
|
defer_subscription_confirmation!
|
||||||
|
|
||||||
if user_handler = callback || block
|
# Build a stream handler by wrapping the user-provided callback with
|
||||||
user_handler = -> message { handler.(coder.decode(message)) } if coder
|
# a decoder or defaulting to a JSON-decoding retransmitter.
|
||||||
handler = -> message do
|
handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder)
|
||||||
connection.worker_pool.async_invoke(user_handler, :call, message)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
handler = default_stream_handler(broadcasting, coder: coder)
|
|
||||||
end
|
|
||||||
|
|
||||||
streams << [ broadcasting, handler ]
|
streams << [ broadcasting, handler ]
|
||||||
|
|
||||||
connection.server.event_loop.post do
|
connection.server.event_loop.post do
|
||||||
|
@ -120,13 +115,60 @@ module ActionCable
|
||||||
@_streams ||= []
|
@_streams ||= []
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Always wrap the outermost handler to invoke the user handler on the
|
||||||
|
# worker pool rather than blocking the event loop.
|
||||||
|
def worker_pool_stream_handler(broadcasting, user_handler, coder: nil)
|
||||||
|
handler = stream_handler(broadcasting, user_handler, coder: coder)
|
||||||
|
|
||||||
|
-> message do
|
||||||
|
connection.worker_pool.async_invoke handler, :call, message, connection: connection
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# May be overridden to add instrumentation, logging, specialized error
|
||||||
|
# handling, or other forms of handler decoration.
|
||||||
|
#
|
||||||
|
# TODO: Tests demonstrating this.
|
||||||
|
def stream_handler(broadcasting, user_handler, coder: nil)
|
||||||
|
if user_handler
|
||||||
|
stream_decoder user_handler, coder: coder
|
||||||
|
else
|
||||||
|
default_stream_handler broadcasting, coder: coder
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# May be overridden to change the default stream handling behavior
|
||||||
|
# which decodes JSON and transmits to client.
|
||||||
|
#
|
||||||
|
# TODO: Tests demonstrating this.
|
||||||
|
#
|
||||||
|
# TODO: Room for optimization. Update transmit API to be coder-aware
|
||||||
|
# so we can no-op when pubsub and connection are both JSON-encoded.
|
||||||
|
# Then we can skip decode+encode if we're just proxying messages.
|
||||||
def default_stream_handler(broadcasting, coder:)
|
def default_stream_handler(broadcasting, coder:)
|
||||||
coder ||= ActiveSupport::JSON
|
coder ||= ActiveSupport::JSON
|
||||||
|
stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream_decoder(handler = identity_handler, coder:)
|
||||||
|
if coder
|
||||||
|
-> message { handler.(coder.decode(message)) }
|
||||||
|
else
|
||||||
|
handler
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream_transmitter(handler = identity_handler, broadcasting:)
|
||||||
|
via = "streamed from #{broadcasting}"
|
||||||
|
|
||||||
-> (message) do
|
-> (message) do
|
||||||
transmit coder.decode(message), via: "streamed from #{broadcasting}"
|
transmit handler.(message), via: via
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def identity_handler
|
||||||
|
-> message { message }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -12,8 +12,10 @@ module ActionCable
|
||||||
define_callbacks :work
|
define_callbacks :work
|
||||||
include ActiveRecordConnectionManagement
|
include ActiveRecordConnectionManagement
|
||||||
|
|
||||||
|
attr_reader :executor
|
||||||
|
|
||||||
def initialize(max_size: 5)
|
def initialize(max_size: 5)
|
||||||
@pool = Concurrent::ThreadPoolExecutor.new(
|
@executor = Concurrent::ThreadPoolExecutor.new(
|
||||||
min_threads: 1,
|
min_threads: 1,
|
||||||
max_threads: max_size,
|
max_threads: max_size,
|
||||||
max_queue: 0,
|
max_queue: 0,
|
||||||
|
@ -23,11 +25,11 @@ module ActionCable
|
||||||
# Stop processing work: any work that has not already started
|
# Stop processing work: any work that has not already started
|
||||||
# running will be discarded from the queue
|
# running will be discarded from the queue
|
||||||
def halt
|
def halt
|
||||||
@pool.kill
|
@executor.kill
|
||||||
end
|
end
|
||||||
|
|
||||||
def stopping?
|
def stopping?
|
||||||
@pool.shuttingdown?
|
@executor.shuttingdown?
|
||||||
end
|
end
|
||||||
|
|
||||||
def work(connection)
|
def work(connection)
|
||||||
|
@ -40,14 +42,14 @@ module ActionCable
|
||||||
self.connection = nil
|
self.connection = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def async_invoke(receiver, method, *args)
|
def async_invoke(receiver, method, *args, connection: receiver)
|
||||||
@pool.post do
|
@executor.post do
|
||||||
invoke(receiver, method, *args)
|
invoke(receiver, method, *args, connection: connection)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def invoke(receiver, method, *args)
|
def invoke(receiver, method, *args, connection:)
|
||||||
work(receiver) do
|
work(connection) do
|
||||||
begin
|
begin
|
||||||
receiver.send method, *args
|
receiver.send method, *args
|
||||||
rescue Exception => e
|
rescue Exception => e
|
||||||
|
|
|
@ -142,6 +142,7 @@ module ActionCable::StreamTests
|
||||||
connection.websocket.expects(:transmit)
|
connection.websocket.expects(:transmit)
|
||||||
@server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder
|
@server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder
|
||||||
wait_for_async
|
wait_for_async
|
||||||
|
wait_for_executor connection.server.worker_pool.executor
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -226,7 +226,9 @@ class ClientTest < ActionCable::TestCase
|
||||||
assert_equal(1, app.connections.count)
|
assert_equal(1, app.connections.count)
|
||||||
assert(app.remote_connections.where(identifier: identifier))
|
assert(app.remote_connections.where(identifier: identifier))
|
||||||
|
|
||||||
channel = app.connections.first.subscriptions.send(:subscriptions).first[1]
|
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
|
||||||
|
assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription'
|
||||||
|
channel = subscriptions.first[1]
|
||||||
channel.expects(:unsubscribed)
|
channel.expects(:unsubscribed)
|
||||||
c.close
|
c.close
|
||||||
sleep 0.1 # Data takes a moment to process
|
sleep 0.1 # Data takes a moment to process
|
||||||
|
|
|
@ -49,10 +49,7 @@ end
|
||||||
|
|
||||||
module ConcurrentRubyConcurrencyHelpers
|
module ConcurrentRubyConcurrencyHelpers
|
||||||
def wait_for_async
|
def wait_for_async
|
||||||
e = Concurrent.global_io_executor
|
wait_for_executor Concurrent.global_io_executor
|
||||||
until e.completed_task_count == e.scheduled_task_count
|
|
||||||
sleep 0.1
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def run_in_eventmachine
|
def run_in_eventmachine
|
||||||
|
@ -67,4 +64,10 @@ class ActionCable::TestCase < ActiveSupport::TestCase
|
||||||
else
|
else
|
||||||
include ConcurrentRubyConcurrencyHelpers
|
include ConcurrentRubyConcurrencyHelpers
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def wait_for_executor(executor)
|
||||||
|
until executor.completed_task_count == executor.scheduled_task_count
|
||||||
|
sleep 0.1
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -33,12 +33,12 @@ class WorkerTest < ActiveSupport::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
test "invoke" do
|
test "invoke" do
|
||||||
@worker.invoke @receiver, :run
|
@worker.invoke @receiver, :run, connection: @receiver.connection
|
||||||
assert_equal :run, @receiver.last_action
|
assert_equal :run, @receiver.last_action
|
||||||
end
|
end
|
||||||
|
|
||||||
test "invoke with arguments" do
|
test "invoke with arguments" do
|
||||||
@worker.invoke @receiver, :process, "Hello"
|
@worker.invoke @receiver, :process, "Hello", connection: @receiver.connection
|
||||||
assert_equal [ :process, "Hello" ], @receiver.last_action
|
assert_equal [ :process, "Hello" ], @receiver.last_action
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue