mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
b168eb5819
* Introduce a connection coder responsible for encoding Cable messages as WebSocket messages, defaulting to `ActiveSupport::JSON` and duck- typing to any object responding to `#encode` and `#decode`. * Consolidate encoding responsibility to the connection. No longer explicitly JSON-encode from channels or other sources. Pass Cable messages as Hashes to `#transmit` and rely on it to encode. * Introduce stream encoders responsible for decoding pubsub messages. Preserve the currently raw encoding, but make it easy to use JSON. Same duck type as the connection encoder. * Revert recent data normalization/quoting (#23649) which treated `identifier` and `data` values as nested JSON objects rather than as opaque JSON-encoded strings. That dealt us an awkward hand where we'd decode JSON strings… or not, but always encode as JSON. Embedding JSON object values directly is preferably, no extra JSON encoding, but that should be a purposeful protocol version change rather than ambiguously, inadvertently supporting multiple message formats.
251 lines
8.5 KiB
Ruby
251 lines
8.5 KiB
Ruby
require 'test_helper'
|
|
require 'concurrent'
|
|
|
|
require 'active_support/core_ext/hash/indifferent_access'
|
|
require 'pathname'
|
|
|
|
require 'faye/websocket'
|
|
require 'json'
|
|
|
|
class ClientTest < ActionCable::TestCase
|
|
WAIT_WHEN_EXPECTING_EVENT = 8
|
|
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
|
|
|
|
def setup
|
|
ActionCable.instance_variable_set(:@server, nil)
|
|
server = ActionCable.server
|
|
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
|
|
|
|
server.config.cable = { adapter: 'async' }.with_indifferent_access
|
|
server.config.use_faye = ENV['FAYE'].present?
|
|
|
|
# and now the "real" setup for our test:
|
|
server.config.disable_request_forgery_protection = true
|
|
server.config.channel_paths = [ File.expand_path('client/echo_channel.rb', __dir__) ]
|
|
|
|
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
|
|
Thread.pass until EventMachine.reactor_running?
|
|
|
|
# faye-websocket is warning-rich
|
|
@previous_verbose, $VERBOSE = $VERBOSE, nil
|
|
end
|
|
|
|
def teardown
|
|
$VERBOSE = @previous_verbose
|
|
end
|
|
|
|
def with_puma_server(rack_app = ActionCable.server, port = 3099)
|
|
server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
|
|
server.add_tcp_listener '127.0.0.1', port
|
|
server.min_threads = 1
|
|
server.max_threads = 4
|
|
|
|
t = Thread.new { server.run.join }
|
|
yield port
|
|
|
|
ensure
|
|
server.stop(true) if server
|
|
t.join if t
|
|
end
|
|
|
|
class SyncClient
|
|
attr_reader :pings
|
|
|
|
def initialize(port)
|
|
@ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
|
|
@messages = Queue.new
|
|
@closed = Concurrent::Event.new
|
|
@has_messages = Concurrent::Semaphore.new(0)
|
|
@pings = 0
|
|
|
|
open = Concurrent::Event.new
|
|
error = nil
|
|
|
|
@ws.on(:error) do |event|
|
|
if open.set?
|
|
@messages << RuntimeError.new(event.message)
|
|
else
|
|
error = event.message
|
|
open.set
|
|
end
|
|
end
|
|
|
|
@ws.on(:open) do |event|
|
|
open.set
|
|
end
|
|
|
|
@ws.on(:message) do |event|
|
|
message = JSON.parse(event.data)
|
|
if message['type'] == 'ping'
|
|
@pings += 1
|
|
else
|
|
@messages << message
|
|
@has_messages.release
|
|
end
|
|
end
|
|
|
|
@ws.on(:close) do |event|
|
|
@closed.set
|
|
end
|
|
|
|
open.wait(WAIT_WHEN_EXPECTING_EVENT)
|
|
raise error if error
|
|
end
|
|
|
|
def read_message
|
|
@has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
|
|
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
msg
|
|
end
|
|
|
|
def read_messages(expected_size = 0)
|
|
list = []
|
|
loop do
|
|
if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
list << msg
|
|
else
|
|
break
|
|
end
|
|
end
|
|
list
|
|
end
|
|
|
|
def send_message(message)
|
|
@ws.send(JSON.generate(message))
|
|
end
|
|
|
|
def close
|
|
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
|
|
|
|
unless @messages.empty?
|
|
raise "#{@messages.size} messages unprocessed"
|
|
end
|
|
|
|
@ws.close
|
|
wait_for_close
|
|
end
|
|
|
|
def wait_for_close
|
|
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
|
|
end
|
|
|
|
def closed?
|
|
@closed.set?
|
|
end
|
|
end
|
|
|
|
def faye_client(port)
|
|
SyncClient.new(port)
|
|
end
|
|
|
|
def test_single_client
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
c.close
|
|
end
|
|
end
|
|
|
|
def test_interacting_clients
|
|
with_puma_server do |port|
|
|
clients = 10.times.map { faye_client(port) }
|
|
|
|
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
|
|
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
|
|
|
|
clients.map {|c| Concurrent::Future.execute {
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
|
|
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
|
|
assert_equal clients.size, c.read_messages(clients.size).size
|
|
} }.each(&:wait!)
|
|
|
|
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
|
end
|
|
end
|
|
|
|
def test_many_clients
|
|
with_puma_server do |port|
|
|
clients = 100.times.map { faye_client(port) }
|
|
|
|
clients.map {|c| Concurrent::Future.execute {
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
} }.each(&:wait!)
|
|
|
|
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
|
end
|
|
end
|
|
|
|
def test_disappearing_client
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
|
|
c.close # disappear before write
|
|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
c.close # disappear before read
|
|
end
|
|
end
|
|
|
|
def test_unsubscribe_client
|
|
with_puma_server do |port|
|
|
app = ActionCable.server
|
|
identifier = JSON.generate(channel: 'EchoChannel')
|
|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message)
|
|
c.send_message command: 'subscribe', identifier: identifier
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
assert_equal(1, app.connections.count)
|
|
assert(app.remote_connections.where(identifier: identifier))
|
|
|
|
channel = app.connections.first.subscriptions.send(:subscriptions).first[1]
|
|
channel.expects(:unsubscribed)
|
|
c.close
|
|
sleep 0.1 # Data takes a moment to process
|
|
|
|
# All data is removed: No more connection or subscription information!
|
|
assert_equal(0, app.connections.count)
|
|
end
|
|
end
|
|
|
|
def test_server_restart
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message)
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
|
|
ActionCable.server.restart
|
|
c.wait_for_close
|
|
assert c.closed?
|
|
end
|
|
end
|
|
end
|