diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 84594fb3d6..200c9d053c 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -73,18 +73,13 @@ module ActionCable # Defaults to `coder: nil` which does no decoding, passes raw messages. def stream_from(broadcasting, callback = nil, coder: nil, &block) broadcasting = String(broadcasting) + # Don't send the confirmation until pubsub#subscribe is successful defer_subscription_confirmation! - if user_handler = callback || block - user_handler = -> message { handler.(coder.decode(message)) } if coder - handler = -> message do - connection.worker_pool.async_invoke(user_handler, :call, message) - end - else - handler = default_stream_handler(broadcasting, coder: coder) - end - + # Build a stream handler by wrapping the user-provided callback with + # a decoder or defaulting to a JSON-decoding retransmitter. + handler = worker_pool_stream_handler(broadcasting, callback || block, coder: coder) streams << [ broadcasting, handler ] connection.server.event_loop.post do @@ -120,13 +115,60 @@ module ActionCable @_streams ||= [] end + # Always wrap the outermost handler to invoke the user handler on the + # worker pool rather than blocking the event loop. + def worker_pool_stream_handler(broadcasting, user_handler, coder: nil) + handler = stream_handler(broadcasting, user_handler, coder: coder) + + -> message do + connection.worker_pool.async_invoke handler, :call, message, connection: connection + end + end + + # May be overridden to add instrumentation, logging, specialized error + # handling, or other forms of handler decoration. + # + # TODO: Tests demonstrating this. + def stream_handler(broadcasting, user_handler, coder: nil) + if user_handler + stream_decoder user_handler, coder: coder + else + default_stream_handler broadcasting, coder: coder + end + end + + # May be overridden to change the default stream handling behavior + # which decodes JSON and transmits to client. + # + # TODO: Tests demonstrating this. + # + # TODO: Room for optimization. Update transmit API to be coder-aware + # so we can no-op when pubsub and connection are both JSON-encoded. + # Then we can skip decode+encode if we're just proxying messages. def default_stream_handler(broadcasting, coder:) coder ||= ActiveSupport::JSON + stream_transmitter stream_decoder(coder: coder), broadcasting: broadcasting + end + + def stream_decoder(handler = identity_handler, coder:) + if coder + -> message { handler.(coder.decode(message)) } + else + handler + end + end + + def stream_transmitter(handler = identity_handler, broadcasting:) + via = "streamed from #{broadcasting}" -> (message) do - transmit coder.decode(message), via: "streamed from #{broadcasting}" + transmit handler.(message), via: via end end + + def identity_handler + -> message { message } + end end end end diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 46a8989f34..a638ff72e7 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,8 +12,10 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement + attr_reader :executor + def initialize(max_size: 5) - @pool = Concurrent::ThreadPoolExecutor.new( + @executor = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size, max_queue: 0, @@ -23,11 +25,11 @@ module ActionCable # Stop processing work: any work that has not already started # running will be discarded from the queue def halt - @pool.kill + @executor.kill end def stopping? - @pool.shuttingdown? + @executor.shuttingdown? end def work(connection) @@ -40,14 +42,14 @@ module ActionCable self.connection = nil end - def async_invoke(receiver, method, *args) - @pool.post do - invoke(receiver, method, *args) + def async_invoke(receiver, method, *args, connection: receiver) + @executor.post do + invoke(receiver, method, *args, connection: connection) end end - def invoke(receiver, method, *args) - work(receiver) do + def invoke(receiver, method, *args, connection:) + work(connection) do begin receiver.send method, *args rescue Exception => e diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index df129a7c62..0b0c72ccf6 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -142,6 +142,7 @@ module ActionCable::StreamTests connection.websocket.expects(:transmit) @server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder wait_for_async + wait_for_executor connection.server.worker_pool.executor end end diff --git a/actioncable/test/client_test.rb b/actioncable/test/client_test.rb index 5ac453db35..fe503fd703 100644 --- a/actioncable/test/client_test.rb +++ b/actioncable/test/client_test.rb @@ -226,7 +226,9 @@ class ClientTest < ActionCable::TestCase assert_equal(1, app.connections.count) assert(app.remote_connections.where(identifier: identifier)) - channel = app.connections.first.subscriptions.send(:subscriptions).first[1] + subscriptions = app.connections.first.subscriptions.send(:subscriptions) + assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription' + channel = subscriptions.first[1] channel.expects(:unsubscribed) c.close sleep 0.1 # Data takes a moment to process diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index de1ee96770..0a9ee7ce77 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -49,10 +49,7 @@ end module ConcurrentRubyConcurrencyHelpers def wait_for_async - e = Concurrent.global_io_executor - until e.completed_task_count == e.scheduled_task_count - sleep 0.1 - end + wait_for_executor Concurrent.global_io_executor end def run_in_eventmachine @@ -67,4 +64,10 @@ class ActionCable::TestCase < ActiveSupport::TestCase else include ConcurrentRubyConcurrencyHelpers end + + def wait_for_executor(executor) + until executor.completed_task_count == executor.scheduled_task_count + sleep 0.1 + end + end end diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb index 7016da3493..e2c81fe312 100644 --- a/actioncable/test/worker_test.rb +++ b/actioncable/test/worker_test.rb @@ -33,12 +33,12 @@ class WorkerTest < ActiveSupport::TestCase end test "invoke" do - @worker.invoke @receiver, :run + @worker.invoke @receiver, :run, connection: @receiver.connection assert_equal :run, @receiver.last_action end test "invoke with arguments" do - @worker.invoke @receiver, :process, "Hello" + @worker.invoke @receiver, :process, "Hello", connection: @receiver.connection assert_equal [ :process, "Hello" ], @receiver.last_action end end