mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
361 lines
12 KiB
Ruby
361 lines
12 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "test_helper"
|
|
require "concurrent"
|
|
|
|
require "websocket-client-simple"
|
|
require "json"
|
|
|
|
require "active_support/hash_with_indifferent_access"
|
|
|
|
####
|
|
# 😷 Warning suppression 😷
|
|
WebSocket::Frame::Handler::Handler03.prepend Module.new {
|
|
def initialize(*)
|
|
@application_data_buffer = nil
|
|
super
|
|
end
|
|
}
|
|
|
|
WebSocket::Frame::Data.prepend Module.new {
|
|
def initialize(*)
|
|
@masking_key = nil
|
|
super
|
|
end
|
|
}
|
|
#
|
|
####
|
|
|
|
class ClientTest < ActionCable::TestCase
|
|
WAIT_WHEN_EXPECTING_EVENT = 2
|
|
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
|
|
|
|
class Connection < ActionCable::Connection::Base
|
|
identified_by :id
|
|
|
|
def connect
|
|
self.id = request.params["id"] || SecureRandom.hex(4)
|
|
end
|
|
end
|
|
|
|
class EchoChannel < ActionCable::Channel::Base
|
|
def subscribed
|
|
stream_from "global"
|
|
end
|
|
|
|
def unsubscribed
|
|
"Goodbye from EchoChannel!"
|
|
end
|
|
|
|
def ding(data)
|
|
transmit({ dong: data["message"] })
|
|
end
|
|
|
|
def delay(data)
|
|
sleep 1
|
|
transmit({ dong: data["message"] })
|
|
end
|
|
|
|
def bulk(data)
|
|
ActionCable.server.broadcast "global", { wide: data["message"] }
|
|
end
|
|
end
|
|
|
|
def setup
|
|
ActionCable.instance_variable_set(:@server, nil)
|
|
server = ActionCable.server
|
|
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
|
|
|
|
server.config.cable = ActiveSupport::HashWithIndifferentAccess.new(adapter: "async")
|
|
server.config.connection_class = -> { ClientTest::Connection }
|
|
|
|
# and now the "real" setup for our test:
|
|
server.config.disable_request_forgery_protection = true
|
|
end
|
|
|
|
def with_puma_server(rack_app = ActionCable.server, port = 3099)
|
|
opts = { min_threads: 1, max_threads: 4 }
|
|
server = if Puma::Const::PUMA_VERSION >= "6"
|
|
opts[:log_writer] = ::Puma::LogWriter.strings
|
|
::Puma::Server.new(rack_app, nil, opts)
|
|
else
|
|
# Puma >= 5.0.3
|
|
::Puma::Server.new(rack_app, ::Puma::Events.strings, opts)
|
|
end
|
|
server.add_tcp_listener "127.0.0.1", port
|
|
|
|
thread = server.run
|
|
|
|
begin
|
|
yield port
|
|
|
|
ensure
|
|
server.stop
|
|
|
|
begin
|
|
thread.join
|
|
|
|
rescue IOError
|
|
# Work around https://bugs.ruby-lang.org/issues/13405
|
|
#
|
|
# Puma's sometimes raising while shutting down, when it closes
|
|
# its internal pipe. We can safely ignore that, but we do need
|
|
# to do the step skipped by the exception:
|
|
server.binder.close
|
|
|
|
rescue RuntimeError => ex
|
|
# Work around https://bugs.ruby-lang.org/issues/13239
|
|
raise unless ex.message.match?(/can't modify frozen IOError/)
|
|
|
|
# Handle this as if it were the IOError: do the same as above.
|
|
server.binder.close
|
|
end
|
|
end
|
|
end
|
|
|
|
class SyncClient
|
|
attr_reader :pings
|
|
|
|
def initialize(port, path = "/")
|
|
messages = @messages = Queue.new
|
|
closed = @closed = Concurrent::Event.new
|
|
has_messages = @has_messages = Concurrent::Semaphore.new(0)
|
|
pings = @pings = Concurrent::AtomicFixnum.new(0)
|
|
|
|
open = Concurrent::Promise.new
|
|
|
|
@ws = WebSocket::Client::Simple.connect("ws://127.0.0.1:#{port}#{path}") do |ws|
|
|
ws.on(:error) do |event|
|
|
event = RuntimeError.new(event.message) unless event.is_a?(Exception)
|
|
|
|
if open.pending?
|
|
open.fail(event)
|
|
else
|
|
messages << event
|
|
has_messages.release
|
|
end
|
|
end
|
|
|
|
ws.on(:open) do |event|
|
|
open.set(true)
|
|
end
|
|
|
|
ws.on(:message) do |event|
|
|
if event.type == :close
|
|
closed.set
|
|
else
|
|
message = JSON.parse(event.data)
|
|
if message["type"] == "ping"
|
|
pings.increment
|
|
else
|
|
messages << message
|
|
has_messages.release
|
|
end
|
|
end
|
|
end
|
|
|
|
ws.on(:close) do |_|
|
|
closed.set
|
|
end
|
|
end
|
|
|
|
open.wait!(WAIT_WHEN_EXPECTING_EVENT)
|
|
end
|
|
|
|
def read_message
|
|
@has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
|
|
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
msg
|
|
end
|
|
|
|
def read_messages(expected_size = 0)
|
|
list = []
|
|
loop do
|
|
if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
list << msg
|
|
else
|
|
break
|
|
end
|
|
end
|
|
list
|
|
end
|
|
|
|
def send_message(message)
|
|
@ws.send(JSON.generate(message))
|
|
end
|
|
|
|
def close
|
|
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
|
|
|
|
unless @messages.empty?
|
|
raise "#{@messages.size} messages unprocessed"
|
|
end
|
|
|
|
@ws.close
|
|
wait_for_close
|
|
end
|
|
|
|
def wait_for_close
|
|
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
|
|
end
|
|
|
|
def closed?
|
|
@closed.set?
|
|
end
|
|
end
|
|
|
|
def websocket_client(*args)
|
|
SyncClient.new(*args)
|
|
end
|
|
|
|
def concurrently(enum)
|
|
enum.map { |*x| Concurrent::Promises.future { yield(*x) } }.map(&:value!)
|
|
end
|
|
|
|
def test_single_client
|
|
with_puma_server do |port|
|
|
c = websocket_client(port)
|
|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "message" => { "dong" => "hello" } }, c.read_message)
|
|
c.close
|
|
end
|
|
end
|
|
|
|
def test_interacting_clients
|
|
with_puma_server do |port|
|
|
clients = concurrently(10.times) { websocket_client(port) }
|
|
|
|
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
|
|
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
|
|
|
|
concurrently(clients) do |c|
|
|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message)
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
|
|
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
|
|
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "bulk", message: "hello")
|
|
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
|
|
assert_equal clients.size, c.read_messages(clients.size).size
|
|
end
|
|
|
|
concurrently(clients, &:close)
|
|
end
|
|
end
|
|
|
|
def test_many_clients
|
|
with_puma_server do |port|
|
|
clients = concurrently(100.times) { websocket_client(port) }
|
|
|
|
concurrently(clients) do |c|
|
|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "type" => "confirm_subscription" }, c.read_message)
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
|
|
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
|
|
end
|
|
|
|
concurrently(clients, &:close)
|
|
end
|
|
end
|
|
|
|
def test_disappearing_client
|
|
with_puma_server do |port|
|
|
c = websocket_client(port)
|
|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "delay", message: "hello")
|
|
c.close # disappear before write
|
|
|
|
c = websocket_client(port)
|
|
assert_equal({ "type" => "welcome" }, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
|
|
c.send_message command: "message", identifier: JSON.generate(channel: "ClientTest::EchoChannel"), data: JSON.generate(action: "ding", message: "hello")
|
|
assert_equal({ "identifier" => '{"channel":"ClientTest::EchoChannel"}', "message" => { "dong" => "hello" } }, c.read_message)
|
|
c.close # disappear before read
|
|
end
|
|
end
|
|
|
|
def test_unsubscribe_client
|
|
with_puma_server do |port|
|
|
app = ActionCable.server
|
|
identifier = JSON.generate(channel: "ClientTest::EchoChannel")
|
|
|
|
c = websocket_client(port)
|
|
assert_equal({ "type" => "welcome" }, c.read_message)
|
|
c.send_message command: "subscribe", identifier: identifier
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
|
|
assert_equal(1, app.connections.count)
|
|
|
|
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
|
|
assert_not_equal 0, subscriptions.size, "Missing EchoChannel subscription"
|
|
channel = subscriptions.first[1]
|
|
assert_called(channel, :unsubscribed) do
|
|
c.close
|
|
sleep 0.1 # Data takes a moment to process
|
|
end
|
|
|
|
# All data is removed: No more connection or subscription information!
|
|
assert_equal(0, app.connections.count)
|
|
end
|
|
end
|
|
|
|
def test_remote_disconnect_client
|
|
with_puma_server do |port|
|
|
app = ActionCable.server
|
|
|
|
c = websocket_client(port, "/?id=1")
|
|
assert_equal({ "type" => "welcome" }, c.read_message)
|
|
|
|
sleep 0.1 # make sure connections is registered
|
|
app.remote_connections.where(id: "1").disconnect
|
|
|
|
assert_equal({ "type" => "disconnect", "reason" => "remote", "reconnect" => true }, c.read_message)
|
|
|
|
c.wait_for_close
|
|
assert(c.closed?)
|
|
end
|
|
end
|
|
|
|
def test_remote_disconnect_client_with_reconnect
|
|
with_puma_server do |port|
|
|
app = ActionCable.server
|
|
|
|
c = websocket_client(port, "/?id=2")
|
|
assert_equal({ "type" => "welcome" }, c.read_message)
|
|
|
|
sleep 0.1 # make sure connections is registered
|
|
app.remote_connections.where(id: "2").disconnect(reconnect: false)
|
|
|
|
assert_equal({ "type" => "disconnect", "reason" => "remote", "reconnect" => false }, c.read_message)
|
|
|
|
c.wait_for_close
|
|
assert(c.closed?)
|
|
end
|
|
end
|
|
|
|
def test_server_restart
|
|
with_puma_server do |port|
|
|
c = websocket_client(port)
|
|
assert_equal({ "type" => "welcome" }, c.read_message)
|
|
c.send_message command: "subscribe", identifier: JSON.generate(channel: "ClientTest::EchoChannel")
|
|
assert_equal({ "identifier" => "{\"channel\":\"ClientTest::EchoChannel\"}", "type" => "confirm_subscription" }, c.read_message)
|
|
|
|
ActionCable.server.restart
|
|
c.wait_for_close
|
|
assert_predicate c, :closed?
|
|
end
|
|
end
|
|
end
|