mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Cable message encoding
* Introduce a connection coder responsible for encoding Cable messages as WebSocket messages, defaulting to `ActiveSupport::JSON` and duck- typing to any object responding to `#encode` and `#decode`. * Consolidate encoding responsibility to the connection. No longer explicitly JSON-encode from channels or other sources. Pass Cable messages as Hashes to `#transmit` and rely on it to encode. * Introduce stream encoders responsible for decoding pubsub messages. Preserve the currently raw encoding, but make it easy to use JSON. Same duck type as the connection encoder. * Revert recent data normalization/quoting (#23649) which treated `identifier` and `data` values as nested JSON objects rather than as opaque JSON-encoded strings. That dealt us an awkward hand where we'd decode JSON strings… or not, but always encode as JSON. Embedding JSON object values directly is preferably, no extra JSON encoding, but that should be a purposeful protocol version change rather than ambiguously, inadvertently supporting multiple message formats.
This commit is contained in:
parent
903f447e43
commit
b168eb5819
18 changed files with 258 additions and 167 deletions
|
@ -1,11 +1,16 @@
|
|||
* Add ActiveSupport::Notifications to ActionCable::Channel.
|
||||
* Pubsub: automatic stream decoding.
|
||||
|
||||
*Matthew Wear*
|
||||
stream_for @room, coder: ActiveSupport::JSON do |message|
|
||||
# `message` is a Ruby hash here instead of a JSON string
|
||||
|
||||
* Allow channel identifiers with no backslahes/escaping to be accepted
|
||||
by the subscription storer.
|
||||
The `coder` must respond to `#decode`. Defaults to `coder: nil`
|
||||
which skips decoding entirely.
|
||||
|
||||
*Jon Moss*
|
||||
*Jeremy Daer*
|
||||
|
||||
* Add ActiveSupport::Notifications to ActionCable::Channel.
|
||||
|
||||
*Matthew Wear*
|
||||
|
||||
* Safely support autoloading and class unloading, by preventing concurrent
|
||||
loads, and disconnecting all cables during reload.
|
||||
|
|
|
@ -198,7 +198,7 @@ module ActionCable
|
|||
|
||||
payload = { channel_class: self.class.name, data: data, via: via }
|
||||
ActiveSupport::Notifications.instrument("transmit.action_cable", payload) do
|
||||
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
|
||||
connection.transmit identifier: @identifier, message: data
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -274,7 +274,7 @@ module ActionCable
|
|||
logger.info "#{self.class.name} is transmitting the subscription confirmation"
|
||||
|
||||
ActiveSupport::Notifications.instrument("transmit_subscription_confirmation.action_cable", channel_class: self.class.name) do
|
||||
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
|
||||
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation]
|
||||
@subscription_confirmation_sent = true
|
||||
end
|
||||
end
|
||||
|
@ -289,7 +289,7 @@ module ActionCable
|
|||
logger.info "#{self.class.name} is transmitting the subscription rejection"
|
||||
|
||||
ActiveSupport::Notifications.instrument("transmit_subscription_rejection.action_cable", channel_class: self.class.name) do
|
||||
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
|
||||
connection.transmit identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -46,9 +46,7 @@ module ActionCable
|
|||
# def subscribed
|
||||
# @room = Chat::Room[params[:room_number]]
|
||||
#
|
||||
# stream_for @room, -> (encoded_message) do
|
||||
# message = ActiveSupport::JSON.decode(encoded_message)
|
||||
#
|
||||
# stream_for @room, coder: ActiveSupport::JSON do |message|
|
||||
# if message['originated_at'].present?
|
||||
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
|
||||
#
|
||||
|
@ -71,16 +69,23 @@ module ActionCable
|
|||
|
||||
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
|
||||
# instead of the default of just transmitting the updates straight to the subscriber.
|
||||
def stream_from(broadcasting, callback = nil)
|
||||
# Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
|
||||
# Defaults to `coder: nil` which does no decoding, passes raw messages.
|
||||
def stream_from(broadcasting, callback = nil, coder: nil, &block)
|
||||
broadcasting = String(broadcasting)
|
||||
# Don't send the confirmation until pubsub#subscribe is successful
|
||||
defer_subscription_confirmation!
|
||||
|
||||
callback ||= default_stream_callback(broadcasting)
|
||||
streams << [ broadcasting, callback ]
|
||||
if handler = callback || block
|
||||
handler = -> message { handler.(coder.decode(message)) } if coder
|
||||
else
|
||||
handler = default_stream_handler(broadcasting, coder: coder)
|
||||
end
|
||||
|
||||
streams << [ broadcasting, handler ]
|
||||
|
||||
connection.server.event_loop.post do
|
||||
pubsub.subscribe(broadcasting, callback, lambda do
|
||||
pubsub.subscribe(broadcasting, handler, lambda do
|
||||
transmit_subscription_confirmation
|
||||
logger.info "#{self.class.name} is streaming from #{broadcasting}"
|
||||
end)
|
||||
|
@ -90,8 +95,11 @@ module ActionCable
|
|||
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
|
||||
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
|
||||
# to the subscriber.
|
||||
def stream_for(model, callback = nil)
|
||||
stream_from(broadcasting_for([ channel_name, model ]), callback)
|
||||
#
|
||||
# Pass `coder: ActiveSupport::JSON` to decode messages as JSON before passing to the callback.
|
||||
# Defaults to `coder: nil` which does no decoding, passes raw messages.
|
||||
def stream_for(model, callback = nil, coder: nil, &block)
|
||||
stream_from(broadcasting_for([ channel_name, model ]), callback || block, coder: coder)
|
||||
end
|
||||
|
||||
# Unsubscribes all streams associated with this channel from the pubsub queue.
|
||||
|
@ -109,9 +117,11 @@ module ActionCable
|
|||
@_streams ||= []
|
||||
end
|
||||
|
||||
def default_stream_callback(broadcasting)
|
||||
def default_stream_handler(broadcasting, coder:)
|
||||
coder ||= ActiveSupport::JSON
|
||||
|
||||
-> (message) do
|
||||
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
|
||||
transmit coder.decode(message), via: "streamed from #{broadcasting}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -51,8 +51,8 @@ module ActionCable
|
|||
attr_reader :server, :env, :subscriptions, :logger, :worker_pool
|
||||
delegate :event_loop, :pubsub, to: :server
|
||||
|
||||
def initialize(server, env)
|
||||
@server, @env = server, env
|
||||
def initialize(server, env, coder: ActiveSupport::JSON)
|
||||
@server, @env, @coder = server, env, coder
|
||||
|
||||
@worker_pool = server.worker_pool
|
||||
@logger = new_tagged_logger
|
||||
|
@ -67,7 +67,7 @@ module ActionCable
|
|||
|
||||
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
|
||||
# This method should not be called directly -- instead rely upon on the #connect (and #disconnect) callbacks.
|
||||
def process # :nodoc:
|
||||
def process #:nodoc:
|
||||
logger.info started_request_message
|
||||
|
||||
if websocket.possible? && allow_request_origin?
|
||||
|
@ -77,20 +77,22 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
# Data received over the WebSocket connection is handled by this method. It's expected that everything inbound is JSON encoded.
|
||||
# The data is routed to the proper channel that the connection has subscribed to.
|
||||
def receive(data_in_json)
|
||||
# Decodes WebSocket messages and dispatches them to subscribed channels.
|
||||
# WebSocket message transfer encoding is always JSON.
|
||||
def receive(websocket_message) #:nodoc:
|
||||
send_async :dispatch_websocket_message, websocket_message
|
||||
end
|
||||
|
||||
def dispatch_websocket_message(websocket_message) #:nodoc:
|
||||
if websocket.alive?
|
||||
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
|
||||
subscriptions.execute_command decode(websocket_message)
|
||||
else
|
||||
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
|
||||
logger.error "Ignoring message processed after the WebSocket was closed: #{websocket_message.inspect})"
|
||||
end
|
||||
end
|
||||
|
||||
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
|
||||
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
|
||||
def transmit(data) # :nodoc:
|
||||
websocket.transmit data
|
||||
def transmit(cable_message) # :nodoc:
|
||||
websocket.transmit encode(cable_message)
|
||||
end
|
||||
|
||||
# Close the WebSocket connection.
|
||||
|
@ -115,7 +117,7 @@ module ActionCable
|
|||
end
|
||||
|
||||
def beat
|
||||
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i)
|
||||
transmit type: ActionCable::INTERNAL[:message_types][:ping], message: Time.now.to_i
|
||||
end
|
||||
|
||||
def on_open # :nodoc:
|
||||
|
@ -152,6 +154,14 @@ module ActionCable
|
|||
attr_reader :message_buffer
|
||||
|
||||
private
|
||||
def encode(cable_message)
|
||||
@coder.encode cable_message
|
||||
end
|
||||
|
||||
def decode(websocket_message)
|
||||
@coder.decode websocket_message
|
||||
end
|
||||
|
||||
def handle_open
|
||||
connect if respond_to?(:connect)
|
||||
subscribe_to_internal_channel
|
||||
|
@ -178,7 +188,7 @@ module ActionCable
|
|||
# Send welcome message to the internal connection monitor channel.
|
||||
# This ensures the connection monitor state is reset after a successful
|
||||
# websocket connection.
|
||||
transmit ActiveSupport::JSON.encode(type: ActionCable::INTERNAL[:message_types][:welcome])
|
||||
transmit type: ActionCable::INTERNAL[:message_types][:welcome]
|
||||
end
|
||||
|
||||
def allow_request_origin?
|
||||
|
|
|
@ -11,7 +11,7 @@ module ActionCable
|
|||
|
||||
def subscribe_to_internal_channel
|
||||
if connection_identifier.present?
|
||||
callback = -> (message) { process_internal_message(message) }
|
||||
callback = -> (message) { process_internal_message decode(message) }
|
||||
@_internal_subscriptions ||= []
|
||||
@_internal_subscriptions << [ internal_channel, callback ]
|
||||
|
||||
|
@ -27,8 +27,6 @@ module ActionCable
|
|||
end
|
||||
|
||||
def process_internal_message(message)
|
||||
message = ActiveSupport::JSON.decode(message)
|
||||
|
||||
case message['type']
|
||||
when 'disconnect'
|
||||
logger.info "Removing connection (#{connection_identifier})"
|
||||
|
|
|
@ -30,7 +30,7 @@ module ActionCable
|
|||
|
||||
protected
|
||||
attr_reader :connection
|
||||
attr_accessor :buffered_messages
|
||||
attr_reader :buffered_messages
|
||||
|
||||
private
|
||||
def valid?(message)
|
||||
|
@ -38,7 +38,7 @@ module ActionCable
|
|||
end
|
||||
|
||||
def receive(message)
|
||||
connection.send_async :receive, message
|
||||
connection.receive message
|
||||
end
|
||||
|
||||
def buffer(message)
|
||||
|
|
|
@ -23,13 +23,13 @@ module ActionCable
|
|||
end
|
||||
|
||||
def add(data)
|
||||
id_options = decode_hash(data['identifier'])
|
||||
identifier = normalize_identifier(id_options)
|
||||
id_key = data['identifier']
|
||||
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
|
||||
|
||||
subscription_klass = connection.server.channel_classes[id_options[:channel]]
|
||||
|
||||
if subscription_klass
|
||||
subscriptions[identifier] ||= subscription_klass.new(connection, identifier, id_options)
|
||||
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
|
||||
else
|
||||
logger.error "Subscription class not found (#{data.inspect})"
|
||||
end
|
||||
|
@ -37,7 +37,7 @@ module ActionCable
|
|||
|
||||
def remove(data)
|
||||
logger.info "Unsubscribing from channel: #{data['identifier']}"
|
||||
remove_subscription subscriptions[normalize_identifier(data['identifier'])]
|
||||
remove_subscription subscriptions[data['identifier']]
|
||||
end
|
||||
|
||||
def remove_subscription(subscription)
|
||||
|
@ -46,7 +46,7 @@ module ActionCable
|
|||
end
|
||||
|
||||
def perform_action(data)
|
||||
find(data).perform_action(decode_hash(data['data']))
|
||||
find(data).perform_action ActiveSupport::JSON.decode(data['data'])
|
||||
end
|
||||
|
||||
def identifiers
|
||||
|
@ -63,21 +63,8 @@ module ActionCable
|
|||
private
|
||||
delegate :logger, to: :connection
|
||||
|
||||
def normalize_identifier(identifier)
|
||||
identifier = ActiveSupport::JSON.encode(identifier) if identifier.is_a?(Hash)
|
||||
identifier
|
||||
end
|
||||
|
||||
# If `data` is a Hash, this means that the original JSON
|
||||
# sent by the client had no backslashes in it, and does
|
||||
# not need to be decoded again.
|
||||
def decode_hash(data)
|
||||
data = ActiveSupport::JSON.decode(data) unless data.is_a?(Hash)
|
||||
data.with_indifferent_access
|
||||
end
|
||||
|
||||
def find(data)
|
||||
if subscription = subscriptions[normalize_identifier(data['identifier'])]
|
||||
if subscription = subscriptions[data['identifier']]
|
||||
subscription
|
||||
else
|
||||
raise "Unable to find subscription with identifier: #{data['identifier']}"
|
||||
|
|
|
@ -19,27 +19,28 @@ module ActionCable
|
|||
# new Notification data['title'], body: data['body']
|
||||
module Broadcasting
|
||||
# Broadcast a hash directly to a named <tt>broadcasting</tt>. This will later be JSON encoded.
|
||||
def broadcast(broadcasting, message)
|
||||
broadcaster_for(broadcasting).broadcast(message)
|
||||
def broadcast(broadcasting, message, coder: ActiveSupport::JSON)
|
||||
broadcaster_for(broadcasting, coder: coder).broadcast(message)
|
||||
end
|
||||
|
||||
# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have an object that
|
||||
# may need multiple spots to transmit to a specific broadcasting over and over.
|
||||
def broadcaster_for(broadcasting)
|
||||
Broadcaster.new(self, String(broadcasting))
|
||||
def broadcaster_for(broadcasting, coder: ActiveSupport::JSON)
|
||||
Broadcaster.new(self, String(broadcasting), coder: coder)
|
||||
end
|
||||
|
||||
private
|
||||
class Broadcaster
|
||||
attr_reader :server, :broadcasting
|
||||
attr_reader :server, :broadcasting, :coder
|
||||
|
||||
def initialize(server, broadcasting)
|
||||
@server, @broadcasting = server, broadcasting
|
||||
def initialize(server, broadcasting, coder:)
|
||||
@server, @broadcasting, @coder = server, broadcasting, coder
|
||||
end
|
||||
|
||||
def broadcast(message)
|
||||
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
|
||||
server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message)
|
||||
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message.inspect}"
|
||||
encoded = coder ? coder.encode(message) : message
|
||||
server.pubsub.broadcast broadcasting, encoded
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -146,12 +146,12 @@ class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
|
|||
test "transmitting data" do
|
||||
@channel.perform_action 'action' => :get_latest
|
||||
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
|
||||
expected = { "identifier" => "{id: 1}", "message" => { "data" => "latest" }}
|
||||
assert_equal expected, @connection.last_transmission
|
||||
end
|
||||
|
||||
test "subscription confirmation" do
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
|
||||
expected = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
|
||||
assert_equal expected, @connection.last_transmission
|
||||
end
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@ class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
|
|||
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
|
||||
@channel = SecretChannel.new @connection, "{id: 1}", { id: 1 }
|
||||
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription"
|
||||
expected = { "identifier" => "{id: 1}", "type" => "reject_subscription" }
|
||||
assert_equal expected, @connection.last_transmission
|
||||
end
|
||||
|
||||
|
|
|
@ -2,18 +2,43 @@ require 'test_helper'
|
|||
require 'stubs/test_connection'
|
||||
require 'stubs/room'
|
||||
|
||||
class ActionCable::Channel::StreamTest < ActionCable::TestCase
|
||||
module ActionCable::StreamTests
|
||||
class Connection < ActionCable::Connection::Base
|
||||
attr_reader :websocket
|
||||
|
||||
def send_async(method, *args)
|
||||
send method, *args
|
||||
end
|
||||
end
|
||||
|
||||
class ChatChannel < ActionCable::Channel::Base
|
||||
def subscribed
|
||||
if params[:id]
|
||||
@room = Room.new params[:id]
|
||||
stream_from "test_room_#{@room.id}"
|
||||
stream_from "test_room_#{@room.id}", coder: pick_coder(params[:coder])
|
||||
end
|
||||
end
|
||||
|
||||
def send_confirmation
|
||||
transmit_subscription_confirmation
|
||||
end
|
||||
|
||||
private def pick_coder(coder)
|
||||
case coder
|
||||
when nil, 'json'
|
||||
ActiveSupport::JSON
|
||||
when 'custom'
|
||||
DummyEncoder
|
||||
when 'none'
|
||||
nil
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
module DummyEncoder
|
||||
extend self
|
||||
def encode(*) '{ "foo": "encoded" }' end
|
||||
def decode(*) { foo: 'decoded' } end
|
||||
end
|
||||
|
||||
class SymbolChannel < ActionCable::Channel::Base
|
||||
|
@ -22,69 +47,114 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
test "streaming start and stop" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
|
||||
class StreamTest < ActionCable::TestCase
|
||||
test "streaming start and stop" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
|
||||
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
end
|
||||
end
|
||||
|
||||
test "stream from non-string channel" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = SymbolChannel.new connection, ""
|
||||
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
end
|
||||
end
|
||||
|
||||
test "stream_for" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:stream_tests:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
|
||||
channel = ChatChannel.new connection, ""
|
||||
channel.stream_for Room.new(1)
|
||||
end
|
||||
end
|
||||
|
||||
test "stream_from subscription confirmation" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
|
||||
ChatChannel.new connection, "{id: 1}", { id: 1 }
|
||||
assert_nil connection.last_transmission
|
||||
|
||||
wait_for_async
|
||||
|
||||
confirmation = { "identifier" => "{id: 1}", "type" => "confirm_subscription" }
|
||||
connection.transmit(confirmation)
|
||||
|
||||
assert_equal confirmation, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
|
||||
end
|
||||
end
|
||||
|
||||
test "subscription confirmation should only be sent out once" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
|
||||
channel = ChatChannel.new connection, "test_channel"
|
||||
channel.send_confirmation
|
||||
channel.send_confirmation
|
||||
|
||||
wait_for_async
|
||||
|
||||
expected = { "identifier" => "test_channel", "type" => "confirm_subscription" }
|
||||
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
|
||||
|
||||
assert_equal 1, connection.transmissions.size
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
test "stream from non-string channel" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("channel", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
channel = SymbolChannel.new connection, ""
|
||||
require 'action_cable/subscription_adapter/inline'
|
||||
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) }
|
||||
channel.unsubscribe_from_channel
|
||||
class StreamEncodingTest < ActionCable::TestCase
|
||||
setup do
|
||||
@server = TestServer.new(subscription_adapter: ActionCable::SubscriptionAdapter::Inline)
|
||||
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
|
||||
@server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
|
||||
end
|
||||
end
|
||||
|
||||
test "stream_for" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) }
|
||||
test 'custom encoder' do
|
||||
run_in_eventmachine do
|
||||
connection = open_connection
|
||||
subscribe_to connection, identifiers: { id: 1 }
|
||||
|
||||
channel = ChatChannel.new connection, ""
|
||||
channel.stream_for Room.new(1)
|
||||
connection.websocket.expects(:transmit)
|
||||
@server.broadcast 'test_room_1', { foo: 'bar' }, coder: DummyEncoder
|
||||
wait_for_async
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
def subscribe_to(connection, identifiers:)
|
||||
receive connection, command: 'subscribe', identifiers: identifiers
|
||||
end
|
||||
|
||||
def open_connection
|
||||
env = Rack::MockRequest.env_for '/test', 'HTTP_HOST' => 'localhost', 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'HTTP_ORIGIN' => 'http://rubyonrails.com'
|
||||
|
||||
Connection.new(@server, env).tap do |connection|
|
||||
connection.process
|
||||
assert connection.websocket.possible?
|
||||
|
||||
wait_for_async
|
||||
assert connection.websocket.alive?
|
||||
end
|
||||
end
|
||||
|
||||
def receive(connection, command:, identifiers:)
|
||||
identifier = JSON.generate(channel: 'ActionCable::StreamTests::ChatChannel', **identifiers)
|
||||
connection.dispatch_websocket_message JSON.generate(command: command, identifier: identifier)
|
||||
wait_for_async
|
||||
end
|
||||
end
|
||||
|
||||
test "stream_from subscription confirmation" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
|
||||
ChatChannel.new connection, "{id: 1}", { id: 1 }
|
||||
assert_nil connection.last_transmission
|
||||
|
||||
wait_for_async
|
||||
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
|
||||
connection.transmit(expected)
|
||||
|
||||
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
|
||||
end
|
||||
end
|
||||
|
||||
test "subscription confirmation should only be sent out once" do
|
||||
run_in_eventmachine do
|
||||
connection = TestConnection.new
|
||||
|
||||
channel = ChatChannel.new connection, "test_channel"
|
||||
channel.send_confirmation
|
||||
channel.send_confirmation
|
||||
|
||||
wait_for_async
|
||||
|
||||
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
|
||||
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
|
||||
|
||||
assert_equal 1, connection.transmissions.size
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -75,11 +75,11 @@ class ClientTest < ActionCable::TestCase
|
|||
end
|
||||
|
||||
@ws.on(:message) do |event|
|
||||
hash = JSON.parse(event.data)
|
||||
if hash['type'] == 'ping'
|
||||
message = JSON.parse(event.data)
|
||||
if message['type'] == 'ping'
|
||||
@pings += 1
|
||||
else
|
||||
@messages << hash
|
||||
@messages << message
|
||||
@has_messages.release
|
||||
end
|
||||
end
|
||||
|
@ -116,8 +116,8 @@ class ClientTest < ActionCable::TestCase
|
|||
list
|
||||
end
|
||||
|
||||
def send_message(hash)
|
||||
@ws.send(JSON.dump(hash))
|
||||
def send_message(message)
|
||||
@ws.send(JSON.generate(message))
|
||||
end
|
||||
|
||||
def close
|
||||
|
@ -148,9 +148,9 @@ class ClientTest < ActionCable::TestCase
|
|||
with_puma_server do |port|
|
||||
c = faye_client(port)
|
||||
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
||||
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
|
||||
c.close
|
||||
end
|
||||
|
@ -165,12 +165,12 @@ class ClientTest < ActionCable::TestCase
|
|||
|
||||
clients.map {|c| Concurrent::Future.execute {
|
||||
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
||||
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
||||
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'bulk', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
|
||||
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
|
||||
assert_equal clients.size, c.read_messages(clients.size).size
|
||||
} }.each(&:wait!)
|
||||
|
@ -185,9 +185,9 @@ class ClientTest < ActionCable::TestCase
|
|||
|
||||
clients.map {|c| Concurrent::Future.execute {
|
||||
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
||||
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
||||
} }.each(&:wait!)
|
||||
|
||||
|
@ -199,16 +199,16 @@ class ClientTest < ActionCable::TestCase
|
|||
with_puma_server do |port|
|
||||
c = faye_client(port)
|
||||
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'delay', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
|
||||
c.close # disappear before write
|
||||
|
||||
c = faye_client(port)
|
||||
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||
c.send_message command: 'message', identifier: JSON.dump(channel: 'EchoChannel'), data: JSON.dump(action: 'ding', message: 'hello')
|
||||
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
||||
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
||||
c.close # disappear before read
|
||||
end
|
||||
|
@ -217,7 +217,7 @@ class ClientTest < ActionCable::TestCase
|
|||
def test_unsubscribe_client
|
||||
with_puma_server do |port|
|
||||
app = ActionCable.server
|
||||
identifier = JSON.dump(channel: 'EchoChannel')
|
||||
identifier = JSON.generate(channel: 'EchoChannel')
|
||||
|
||||
c = faye_client(port)
|
||||
assert_equal({"type" => "welcome"}, c.read_message)
|
||||
|
@ -240,7 +240,7 @@ class ClientTest < ActionCable::TestCase
|
|||
with_puma_server do |port|
|
||||
c = faye_client(port)
|
||||
assert_equal({"type" => "welcome"}, c.read_message)
|
||||
c.send_message command: 'subscribe', identifier: JSON.dump(channel: 'EchoChannel')
|
||||
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
||||
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
||||
|
||||
ActionCable.server.restart
|
||||
|
|
|
@ -40,8 +40,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
|||
open_connection_with_stubbed_pubsub
|
||||
|
||||
@connection.websocket.expects(:close)
|
||||
message = ActiveSupport::JSON.encode('type' => 'disconnect')
|
||||
@connection.process_internal_message message
|
||||
@connection.process_internal_message 'type' => 'disconnect'
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -50,8 +49,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
|
|||
open_connection_with_stubbed_pubsub
|
||||
|
||||
@connection.websocket.expects(:close).never
|
||||
message = ActiveSupport::JSON.encode('type' => 'unknown')
|
||||
@connection.process_internal_message message
|
||||
@connection.process_internal_message 'type' => 'unknown'
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -88,7 +88,7 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
|
|||
|
||||
channel1 = subscribe_to_chat_channel
|
||||
|
||||
channel2_id = ActiveSupport::JSON.encode({ id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel' })
|
||||
channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
|
||||
channel2 = subscribe_to_chat_channel(channel2_id)
|
||||
|
||||
channel1.expects(:unsubscribe_from_channel)
|
||||
|
|
|
@ -3,24 +3,31 @@ require 'stubs/user'
|
|||
class TestConnection
|
||||
attr_reader :identifiers, :logger, :current_user, :server, :transmissions
|
||||
|
||||
def initialize(user = User.new("lifo"))
|
||||
delegate :pubsub, to: :server
|
||||
|
||||
def initialize(user = User.new("lifo"), coder: ActiveSupport::JSON, subscription_adapter: SuccessAdapter)
|
||||
@coder = coder
|
||||
@identifiers = [ :current_user ]
|
||||
|
||||
@current_user = user
|
||||
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
|
||||
@server = TestServer.new
|
||||
@server = TestServer.new(subscription_adapter: subscription_adapter)
|
||||
@transmissions = []
|
||||
end
|
||||
|
||||
def pubsub
|
||||
SuccessAdapter.new(server)
|
||||
end
|
||||
|
||||
def transmit(data)
|
||||
@transmissions << data
|
||||
def transmit(cable_message)
|
||||
@transmissions << encode(cable_message)
|
||||
end
|
||||
|
||||
def last_transmission
|
||||
@transmissions.last
|
||||
decode @transmissions.last if @transmissions.any?
|
||||
end
|
||||
|
||||
def decode(websocket_message)
|
||||
@coder.decode websocket_message
|
||||
end
|
||||
|
||||
def encode(cable_message)
|
||||
@coder.encode cable_message
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,22 +2,26 @@ require 'ostruct'
|
|||
|
||||
class TestServer
|
||||
include ActionCable::Server::Connections
|
||||
include ActionCable::Server::Broadcasting
|
||||
|
||||
attr_reader :logger, :config
|
||||
attr_reader :logger, :config, :mutex
|
||||
|
||||
def initialize
|
||||
def initialize(subscription_adapter: SuccessAdapter)
|
||||
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
|
||||
@config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter)
|
||||
|
||||
@config = OpenStruct.new(log_tags: [], subscription_adapter: subscription_adapter)
|
||||
@config.use_faye = ENV['FAYE'].present?
|
||||
@config.client_socket_class = if @config.use_faye
|
||||
ActionCable::Connection::FayeClientSocket
|
||||
else
|
||||
ActionCable::Connection::ClientSocket
|
||||
end
|
||||
|
||||
@mutex = Monitor.new
|
||||
end
|
||||
|
||||
def pubsub
|
||||
@config.subscription_adapter.new(self)
|
||||
@pubsub ||= @config.subscription_adapter.new(self)
|
||||
end
|
||||
|
||||
def event_loop
|
||||
|
|
|
@ -20,8 +20,7 @@ module CommonSubscriptionAdapterTest
|
|||
end
|
||||
|
||||
def teardown
|
||||
@tx_adapter.shutdown if @tx_adapter && @tx_adapter != @rx_adapter
|
||||
@rx_adapter.shutdown if @rx_adapter
|
||||
[@rx_adapter, @tx_adapter].uniq.each(&:shutdown)
|
||||
end
|
||||
|
||||
|
||||
|
|
|
@ -2,11 +2,13 @@ require 'action_cable'
|
|||
require 'active_support/testing/autorun'
|
||||
|
||||
require 'puma'
|
||||
|
||||
require 'mocha/setup'
|
||||
|
||||
require 'rack/mock'
|
||||
require 'active_support/core_ext/hash/indifferent_access'
|
||||
|
||||
begin
|
||||
require 'byebug'
|
||||
rescue LoadError
|
||||
end
|
||||
|
||||
# Require all the stubs and models
|
||||
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
|
||||
|
|
Loading…
Reference in a new issue