Merge pull request #26547 from palkan/fix/actioncable-confirmation-race-condition

Avoid race condition on subscription confirmation
This commit is contained in:
Matthew Draper 2016-10-01 15:14:43 +09:30
commit 9588a3d66d
9 changed files with 83 additions and 24 deletions

View File

@ -1,3 +1,11 @@
* Prevent race where the client could receive and act upon a
subscription confirmation before the channel's `subscribed` method
completed.
Fixes #25381.
*Vladimir Dementyev*
* Buffer writes to websocket connections, to avoid blocking threads * Buffer writes to websocket connections, to avoid blocking threads
that could be doing more useful things. that could be doing more useful things.

View File

@ -144,13 +144,14 @@ module ActionCable
# When a channel is streaming via pubsub, we want to delay the confirmation # When a channel is streaming via pubsub, we want to delay the confirmation
# transmission until pubsub subscription is confirmed. # transmission until pubsub subscription is confirmed.
@defer_subscription_confirmation = false #
# The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
@reject_subscription = nil @reject_subscription = nil
@subscription_confirmation_sent = nil @subscription_confirmation_sent = nil
delegate_connection_identifiers delegate_connection_identifiers
subscribe_to_channel
end end
# Extract the action name from the passed data and process it via the channel. The process will ensure # Extract the action name from the passed data and process it via the channel. The process will ensure
@ -169,6 +170,17 @@ module ActionCable
end end
end end
# This method is called after subscription has been added to the connection
# and confirms or rejects the subscription.
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
end
reject_subscription if subscription_rejected?
ensure_confirmation_sent
end
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks. # Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback. # This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel # :nodoc: def unsubscribe_from_channel # :nodoc:
@ -201,12 +213,18 @@ module ActionCable
end end
end end
def ensure_confirmation_sent
return if subscription_rejected?
@defer_subscription_confirmation_counter.decrement
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
def defer_subscription_confirmation! def defer_subscription_confirmation!
@defer_subscription_confirmation = true @defer_subscription_confirmation_counter.increment
end end
def defer_subscription_confirmation? def defer_subscription_confirmation?
@defer_subscription_confirmation @defer_subscription_confirmation_counter.value > 0
end end
def subscription_confirmation_sent? def subscription_confirmation_sent?
@ -230,18 +248,6 @@ module ActionCable
end end
end end
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
end
if subscription_rejected?
reject_subscription
else
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
end
def extract_action(data) def extract_action(data)
(data["action"].presence || :receive).to_sym (data["action"].presence || :receive).to_sym
end end

View File

@ -84,7 +84,7 @@ module ActionCable
connection.server.event_loop.post do connection.server.event_loop.post do
pubsub.subscribe(broadcasting, handler, lambda do pubsub.subscribe(broadcasting, handler, lambda do
transmit_subscription_confirmation ensure_confirmation_sent
logger.info "#{self.class.name} is streaming from #{broadcasting}" logger.info "#{self.class.name} is streaming from #{broadcasting}"
end) end)
end end

View File

@ -26,10 +26,14 @@ module ActionCable
id_key = data["identifier"] id_key = data["identifier"]
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
return if subscriptions.key?(id_key)
subscription_klass = id_options[:channel].safe_constantize subscription_klass = id_options[:channel].safe_constantize
if subscription_klass && ActionCable::Channel::Base >= subscription_klass if subscription_klass && ActionCable::Channel::Base >= subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options) subscription = subscription_klass.new(connection, id_key, id_options)
subscriptions[id_key] = subscription
subscription.subscribe_to_channel
else else
logger.error "Subscription class not found: #{id_options[:channel].inspect}" logger.error "Subscription class not found: #{id_options[:channel].inspect}"
end end

View File

@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
@channel = ChatChannel.new @connection, "{id: 1}", id: 1 @channel = ChatChannel.new @connection, "{id: 1}", id: 1
end end
test "should subscribe to a channel on initialize" do test "should subscribe to a channel" do
@channel.subscribe_to_channel
assert_equal 1, @channel.room.id assert_equal 1, @channel.room.id
end end
test "on subscribe callbacks" do test "on subscribe callbacks" do
@channel.subscribe_to_channel
assert @channel.subscribed assert @channel.subscribed
end end
@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
end end
test "unsubscribing from a channel" do test "unsubscribing from a channel" do
@channel.subscribe_to_channel
assert @channel.room assert @channel.room
assert @channel.subscribed? assert @channel.subscribed?
@ -150,8 +154,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
assert_equal expected, @connection.last_transmission assert_equal expected, @connection.last_transmission
end end
test "subscription confirmation" do test "do not send subscription confirmation on initialize" do
assert_nil @connection.last_transmission
end
test "subscription confirmation on subscribe_to_channel" do
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" } expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
@channel.subscribe_to_channel
assert_equal expected, @connection.last_transmission assert_equal expected, @connection.last_transmission
end end
@ -208,6 +217,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
test "notification for transmit_subscription_confirmation" do test "notification for transmit_subscription_confirmation" do
begin begin
@channel.subscribe_to_channel
events = [] events = []
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args| ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
events << ActiveSupport::Notifications::Event.new(*args) events << ActiveSupport::Notifications::Event.new(*args)

View File

@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
@connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil)) @connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil))
channel = ChatChannel.new @connection, "{id: 1}", id: 1 channel = ChatChannel.new @connection, "{id: 1}", id: 1
channel.subscribe_to_channel
channel.unsubscribe_from_channel channel.unsubscribe_from_channel
assert_equal [], channel.send(:active_periodic_timers) assert_equal [], channel.send(:active_periodic_timers)
end end

View File

@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "subscription rejection" do test "subscription rejection" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1 @channel = SecretChannel.new @connection, "{id: 1}", id: 1
@channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission assert_equal expected, @connection.last_transmission
@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
test "does not execute action if subscription is rejected" do test "does not execute action if subscription is rejected" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) } @connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", id: 1 @channel = SecretChannel.new @connection, "{id: 1}", id: 1
@channel.subscribe_to_channel
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" } expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
assert_equal expected, @connection.last_transmission assert_equal expected, @connection.last_transmission

View File

@ -53,6 +53,7 @@ module ActionCable::StreamTests
connection = TestConnection.new connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", id: 1 channel = ChatChannel.new connection, "{id: 1}", id: 1
channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel channel.unsubscribe_from_channel
@ -64,6 +65,7 @@ module ActionCable::StreamTests
connection = TestConnection.new connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = SymbolChannel.new connection, "" channel = SymbolChannel.new connection, ""
channel.subscribe_to_channel
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
channel.unsubscribe_from_channel channel.unsubscribe_from_channel
@ -76,6 +78,7 @@ module ActionCable::StreamTests
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "" channel = ChatChannel.new connection, ""
channel.subscribe_to_channel
channel.stream_for Room.new(1) channel.stream_for Room.new(1)
end end
end end
@ -84,7 +87,9 @@ module ActionCable::StreamTests
run_in_eventmachine do run_in_eventmachine do
connection = TestConnection.new connection = TestConnection.new
ChatChannel.new connection, "{id: 1}", id: 1 channel = ChatChannel.new connection, "{id: 1}", id: 1
channel.subscribe_to_channel
assert_nil connection.last_transmission assert_nil connection.last_transmission
wait_for_async wait_for_async
@ -114,7 +119,7 @@ module ActionCable::StreamTests
end end
end end
require "action_cable/subscription_adapter/inline" require "action_cable/subscription_adapter/async"
class UserCallbackChannel < ActionCable::Channel::Base class UserCallbackChannel < ActionCable::Channel::Base
def subscribed def subscribed
@ -124,9 +129,16 @@ module ActionCable::StreamTests
end end
end end
class StreamEncodingTest < ActionCable::TestCase class MultiChatChannel < ActionCable::Channel::Base
def subscribed
stream_from "main_room"
stream_from "test_all_rooms"
end
end
class StreamFromTest < ActionCable::TestCase
setup do setup do
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline) @server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Async)
@server.config.allowed_request_origins = %w( http://rubyonrails.com ) @server.config.allowed_request_origins = %w( http://rubyonrails.com )
end end
@ -153,6 +165,17 @@ module ActionCable::StreamTests
end end
end end
test "subscription confirmation should only be sent out once with muptiple stream_from" do
run_in_eventmachine do
connection = open_connection
expected = { "identifier" => { "channel" => MultiChatChannel.name }.to_json, "type" => "confirm_subscription" }
connection.websocket.expects(:transmit).with(expected.to_json)
receive(connection, command: "subscribe", channel: MultiChatChannel.name, identifiers: {})
wait_for_async
end
end
private private
def subscribe_to(connection, identifiers:) def subscribe_to(connection, identifiers:)
receive connection, command: "subscribe", identifiers: identifiers receive connection, command: "subscribe", identifiers: identifiers

View File

@ -66,8 +66,12 @@ class ActionCable::TestCase < ActiveSupport::TestCase
end end
def wait_for_executor(executor) def wait_for_executor(executor)
# do not wait forever, wait 2s
timeout = 2
until executor.completed_task_count == executor.scheduled_task_count until executor.completed_task_count == executor.scheduled_task_count
sleep 0.1 sleep 0.1
timeout -= 0.1
raise "Executor could not complete all tasks in 2 seconds" unless timeout > 0
end end
end end
end end