mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Add Channel#ensure_confirmation_sent; call #subscribe_to_channel after initializing
This commit is contained in:
parent
03a209e92a
commit
3e68d8b872
7 changed files with 39 additions and 33 deletions
|
@ -145,22 +145,13 @@ module ActionCable
|
|||
# When a channel is streaming via pubsub, we want to delay the confirmation
|
||||
# transmission until pubsub subscription is confirmed.
|
||||
#
|
||||
# We use atomic fixnum to track the number of waiting tasks to avoid race conditions
|
||||
# The counter starts at 1 because it's awaiting a call to #subscribe_to_channel
|
||||
@defer_subscription_confirmation_counter = Concurrent::AtomicFixnum.new(1)
|
||||
|
||||
@reject_subscription = nil
|
||||
@subscription_confirmation_sent = nil
|
||||
|
||||
delegate_connection_identifiers
|
||||
subscribe_to_channel
|
||||
end
|
||||
|
||||
# This method is called after subscription has been added to the channel.
|
||||
# Send confirmation here to avoid race conditions when client tries to perform actions
|
||||
# right after receiving confirmation.
|
||||
def registered!
|
||||
@defer_subscription_confirmation_counter.decrement
|
||||
transmit_subscription_confirmation unless defer_subscription_confirmation?
|
||||
end
|
||||
|
||||
# Extract the action name from the passed data and process it via the channel. The process will ensure
|
||||
|
@ -179,6 +170,17 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
# This method is called after subscription has been added to the connection
|
||||
# and confirms or rejects the subscription.
|
||||
def subscribe_to_channel
|
||||
run_callbacks :subscribe do
|
||||
subscribed
|
||||
end
|
||||
|
||||
reject_subscription if subscription_rejected?
|
||||
ensure_confirmation_sent
|
||||
end
|
||||
|
||||
# Called by the cable connection when it's cut, so the channel has a chance to cleanup with callbacks.
|
||||
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
|
||||
def unsubscribe_from_channel # :nodoc:
|
||||
|
@ -211,22 +213,24 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
def ensure_confirmation_sent
|
||||
return if subscription_rejected?
|
||||
@defer_subscription_confirmation_counter.decrement
|
||||
transmit_subscription_confirmation unless defer_subscription_confirmation?
|
||||
end
|
||||
|
||||
def defer_subscription_confirmation!
|
||||
@defer_subscription_confirmation_counter.increment
|
||||
end
|
||||
|
||||
def defer_subscription_confirmation?
|
||||
@defer_subscription_confirmation_counter.value.positive?
|
||||
@defer_subscription_confirmation_counter.value > 0
|
||||
end
|
||||
|
||||
def subscription_confirmation_sent?
|
||||
@subscription_confirmation_sent
|
||||
end
|
||||
|
||||
def registered?
|
||||
@registered
|
||||
end
|
||||
|
||||
def reject
|
||||
@reject_subscription = true
|
||||
end
|
||||
|
@ -244,14 +248,6 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
def subscribe_to_channel
|
||||
run_callbacks :subscribe do
|
||||
subscribed
|
||||
end
|
||||
|
||||
reject_subscription if subscription_rejected?
|
||||
end
|
||||
|
||||
def extract_action(data)
|
||||
(data["action"].presence || :receive).to_sym
|
||||
end
|
||||
|
|
|
@ -84,8 +84,7 @@ module ActionCable
|
|||
|
||||
connection.server.event_loop.post do
|
||||
pubsub.subscribe(broadcasting, handler, lambda do
|
||||
@defer_subscription_confirmation_counter.decrement
|
||||
transmit_subscription_confirmation unless defer_subscription_confirmation?
|
||||
ensure_confirmation_sent
|
||||
logger.info "#{self.class.name} is streaming from #{broadcasting}"
|
||||
end)
|
||||
end
|
||||
|
|
|
@ -26,12 +26,14 @@ module ActionCable
|
|||
id_key = data["identifier"]
|
||||
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
|
||||
|
||||
return if subscriptions.key?(id_key)
|
||||
|
||||
subscription_klass = id_options[:channel].safe_constantize
|
||||
|
||||
if subscription_klass && ActionCable::Channel::Base >= subscription_klass
|
||||
subscription = subscription_klass.new(connection, id_key, id_options)
|
||||
subscriptions[id_key] ||= subscription
|
||||
subscription.registered!
|
||||
subscriptions[id_key] = subscription
|
||||
subscription.subscribe_to_channel
|
||||
else
|
||||
logger.error "Subscription class not found: #{id_options[:channel].inspect}"
|
||||
end
|
||||
|
|
|
@ -77,11 +77,13 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
|
|||
@channel = ChatChannel.new @connection, "{id: 1}", id: 1
|
||||
end
|
||||
|
||||
test "should subscribe to a channel on initialize" do
|
||||
test "should subscribe to a channel" do
|
||||
@channel.subscribe_to_channel
|
||||
assert_equal 1, @channel.room.id
|
||||
end
|
||||
|
||||
test "on subscribe callbacks" do
|
||||
@channel.subscribe_to_channel
|
||||
assert @channel.subscribed
|
||||
end
|
||||
|
||||
|
@ -90,6 +92,8 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
|
|||
end
|
||||
|
||||
test "unsubscribing from a channel" do
|
||||
@channel.subscribe_to_channel
|
||||
|
||||
assert @channel.room
|
||||
assert @channel.subscribed?
|
||||
|
||||
|
@ -154,9 +158,9 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
|
|||
assert_nil @connection.last_transmission
|
||||
end
|
||||
|
||||
test "subscription confirmation on registration" do
|
||||
test "subscription confirmation on subscribe_to_channel" do
|
||||
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
|
||||
@channel.registered!
|
||||
@channel.subscribe_to_channel
|
||||
assert_equal expected, @connection.last_transmission
|
||||
end
|
||||
|
||||
|
@ -213,7 +217,7 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
|
|||
|
||||
test "notification for transmit_subscription_confirmation" do
|
||||
begin
|
||||
@channel.registered!
|
||||
@channel.subscribe_to_channel
|
||||
|
||||
events = []
|
||||
ActiveSupport::Notifications.subscribe "transmit_subscription_confirmation.action_cable" do |*args|
|
||||
|
|
|
@ -62,6 +62,7 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
|
|||
@connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil))
|
||||
channel = ChatChannel.new @connection, "{id: 1}", id: 1
|
||||
|
||||
channel.subscribe_to_channel
|
||||
channel.unsubscribe_from_channel
|
||||
assert_equal [], channel.send(:active_periodic_timers)
|
||||
end
|
||||
|
|
|
@ -20,6 +20,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
|
|||
test "subscription rejection" do
|
||||
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
|
||||
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
|
||||
@channel.subscribe_to_channel
|
||||
|
||||
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
|
||||
assert_equal expected, @connection.last_transmission
|
||||
|
@ -28,6 +29,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
|
|||
test "does not execute action if subscription is rejected" do
|
||||
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
|
||||
@channel = SecretChannel.new @connection, "{id: 1}", id: 1
|
||||
@channel.subscribe_to_channel
|
||||
|
||||
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
|
||||
assert_equal expected, @connection.last_transmission
|
||||
|
|
|
@ -53,6 +53,7 @@ module ActionCable::StreamTests
|
|||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = ChatChannel.new connection, "{id: 1}", id: 1
|
||||
channel.subscribe_to_channel
|
||||
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
|
@ -64,6 +65,7 @@ module ActionCable::StreamTests
|
|||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = SymbolChannel.new connection, ""
|
||||
channel.subscribe_to_channel
|
||||
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
|
@ -76,6 +78,7 @@ module ActionCable::StreamTests
|
|||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
|
||||
channel = ChatChannel.new connection, ""
|
||||
channel.subscribe_to_channel
|
||||
channel.stream_for Room.new(1)
|
||||
end
|
||||
end
|
||||
|
@ -85,7 +88,7 @@ module ActionCable::StreamTests
|
|||
connection = TestConnection.new
|
||||
|
||||
channel = ChatChannel.new connection, "{id: 1}", id: 1
|
||||
channel.registered!
|
||||
channel.subscribe_to_channel
|
||||
|
||||
assert_nil connection.last_transmission
|
||||
|
||||
|
@ -103,7 +106,6 @@ module ActionCable::StreamTests
|
|||
connection = TestConnection.new
|
||||
|
||||
channel = ChatChannel.new connection, "test_channel"
|
||||
channel.registered!
|
||||
channel.send_confirmation
|
||||
channel.send_confirmation
|
||||
|
||||
|
|
Loading…
Reference in a new issue