mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
3ba0eec20c
* Use separate stream handler builders for easy override and testing. * Fix worker pool execution that was silently failing since it only expected connection receivers. Sparked by code in #24162.
253 lines
8.7 KiB
Ruby
253 lines
8.7 KiB
Ruby
require 'test_helper'
|
|
require 'concurrent'
|
|
|
|
require 'active_support/core_ext/hash/indifferent_access'
|
|
require 'pathname'
|
|
|
|
require 'faye/websocket'
|
|
require 'json'
|
|
|
|
class ClientTest < ActionCable::TestCase
|
|
WAIT_WHEN_EXPECTING_EVENT = 8
|
|
WAIT_WHEN_NOT_EXPECTING_EVENT = 0.5
|
|
|
|
def setup
|
|
ActionCable.instance_variable_set(:@server, nil)
|
|
server = ActionCable.server
|
|
server.config.logger = Logger.new(StringIO.new).tap { |l| l.level = Logger::UNKNOWN }
|
|
|
|
server.config.cable = { adapter: 'async' }.with_indifferent_access
|
|
server.config.use_faye = ENV['FAYE'].present?
|
|
|
|
# and now the "real" setup for our test:
|
|
server.config.disable_request_forgery_protection = true
|
|
server.config.channel_paths = [ File.expand_path('client/echo_channel.rb', __dir__) ]
|
|
|
|
Thread.new { EventMachine.run } unless EventMachine.reactor_running?
|
|
Thread.pass until EventMachine.reactor_running?
|
|
|
|
# faye-websocket is warning-rich
|
|
@previous_verbose, $VERBOSE = $VERBOSE, nil
|
|
end
|
|
|
|
def teardown
|
|
$VERBOSE = @previous_verbose
|
|
end
|
|
|
|
def with_puma_server(rack_app = ActionCable.server, port = 3099)
|
|
server = ::Puma::Server.new(rack_app, ::Puma::Events.strings)
|
|
server.add_tcp_listener '127.0.0.1', port
|
|
server.min_threads = 1
|
|
server.max_threads = 4
|
|
|
|
t = Thread.new { server.run.join }
|
|
yield port
|
|
|
|
ensure
|
|
server.stop(true) if server
|
|
t.join if t
|
|
end
|
|
|
|
class SyncClient
|
|
attr_reader :pings
|
|
|
|
def initialize(port)
|
|
@ws = Faye::WebSocket::Client.new("ws://127.0.0.1:#{port}/")
|
|
@messages = Queue.new
|
|
@closed = Concurrent::Event.new
|
|
@has_messages = Concurrent::Semaphore.new(0)
|
|
@pings = 0
|
|
|
|
open = Concurrent::Event.new
|
|
error = nil
|
|
|
|
@ws.on(:error) do |event|
|
|
if open.set?
|
|
@messages << RuntimeError.new(event.message)
|
|
else
|
|
error = event.message
|
|
open.set
|
|
end
|
|
end
|
|
|
|
@ws.on(:open) do |event|
|
|
open.set
|
|
end
|
|
|
|
@ws.on(:message) do |event|
|
|
message = JSON.parse(event.data)
|
|
if message['type'] == 'ping'
|
|
@pings += 1
|
|
else
|
|
@messages << message
|
|
@has_messages.release
|
|
end
|
|
end
|
|
|
|
@ws.on(:close) do |event|
|
|
@closed.set
|
|
end
|
|
|
|
open.wait(WAIT_WHEN_EXPECTING_EVENT)
|
|
raise error if error
|
|
end
|
|
|
|
def read_message
|
|
@has_messages.try_acquire(1, WAIT_WHEN_EXPECTING_EVENT)
|
|
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
msg
|
|
end
|
|
|
|
def read_messages(expected_size = 0)
|
|
list = []
|
|
loop do
|
|
if @has_messages.try_acquire(1, list.size < expected_size ? WAIT_WHEN_EXPECTING_EVENT : WAIT_WHEN_NOT_EXPECTING_EVENT)
|
|
msg = @messages.pop(true)
|
|
raise msg if msg.is_a?(Exception)
|
|
|
|
list << msg
|
|
else
|
|
break
|
|
end
|
|
end
|
|
list
|
|
end
|
|
|
|
def send_message(message)
|
|
@ws.send(JSON.generate(message))
|
|
end
|
|
|
|
def close
|
|
sleep WAIT_WHEN_NOT_EXPECTING_EVENT
|
|
|
|
unless @messages.empty?
|
|
raise "#{@messages.size} messages unprocessed"
|
|
end
|
|
|
|
@ws.close
|
|
wait_for_close
|
|
end
|
|
|
|
def wait_for_close
|
|
@closed.wait(WAIT_WHEN_EXPECTING_EVENT)
|
|
end
|
|
|
|
def closed?
|
|
@closed.set?
|
|
end
|
|
end
|
|
|
|
def faye_client(port)
|
|
SyncClient.new(port)
|
|
end
|
|
|
|
def test_single_client
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
c.close
|
|
end
|
|
end
|
|
|
|
def test_interacting_clients
|
|
with_puma_server do |port|
|
|
clients = 10.times.map { faye_client(port) }
|
|
|
|
barrier_1 = Concurrent::CyclicBarrier.new(clients.size)
|
|
barrier_2 = Concurrent::CyclicBarrier.new(clients.size)
|
|
|
|
clients.map {|c| Concurrent::Future.execute {
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
barrier_1.wait WAIT_WHEN_EXPECTING_EVENT
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'bulk', message: 'hello')
|
|
barrier_2.wait WAIT_WHEN_EXPECTING_EVENT
|
|
assert_equal clients.size, c.read_messages(clients.size).size
|
|
} }.each(&:wait!)
|
|
|
|
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
|
end
|
|
end
|
|
|
|
def test_many_clients
|
|
with_puma_server do |port|
|
|
clients = 100.times.map { faye_client(port) }
|
|
|
|
clients.map {|c| Concurrent::Future.execute {
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
} }.each(&:wait!)
|
|
|
|
clients.map {|c| Concurrent::Future.execute { c.close } }.each(&:wait!)
|
|
end
|
|
end
|
|
|
|
def test_disappearing_client
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'delay', message: 'hello')
|
|
c.close # disappear before write
|
|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message) # pop the first welcome message off the stack
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
c.send_message command: 'message', identifier: JSON.generate(channel: 'EchoChannel'), data: JSON.generate(action: 'ding', message: 'hello')
|
|
assert_equal({"identifier"=>'{"channel":"EchoChannel"}', "message"=>{"dong"=>"hello"}}, c.read_message)
|
|
c.close # disappear before read
|
|
end
|
|
end
|
|
|
|
def test_unsubscribe_client
|
|
with_puma_server do |port|
|
|
app = ActionCable.server
|
|
identifier = JSON.generate(channel: 'EchoChannel')
|
|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message)
|
|
c.send_message command: 'subscribe', identifier: identifier
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
assert_equal(1, app.connections.count)
|
|
assert(app.remote_connections.where(identifier: identifier))
|
|
|
|
subscriptions = app.connections.first.subscriptions.send(:subscriptions)
|
|
assert_not_equal 0, subscriptions.size, 'Missing EchoChannel subscription'
|
|
channel = subscriptions.first[1]
|
|
channel.expects(:unsubscribed)
|
|
c.close
|
|
sleep 0.1 # Data takes a moment to process
|
|
|
|
# All data is removed: No more connection or subscription information!
|
|
assert_equal(0, app.connections.count)
|
|
end
|
|
end
|
|
|
|
def test_server_restart
|
|
with_puma_server do |port|
|
|
c = faye_client(port)
|
|
assert_equal({"type" => "welcome"}, c.read_message)
|
|
c.send_message command: 'subscribe', identifier: JSON.generate(channel: 'EchoChannel')
|
|
assert_equal({"identifier"=>"{\"channel\":\"EchoChannel\"}", "type"=>"confirm_subscription"}, c.read_message)
|
|
|
|
ActionCable.server.restart
|
|
c.wait_for_close
|
|
assert c.closed?
|
|
end
|
|
end
|
|
end
|