mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
515 lines
14 KiB
Ruby
515 lines
14 KiB
Ruby
require "abstract_unit"
|
|
require "concurrent/atomic/count_down_latch"
|
|
Thread.abort_on_exception = true
|
|
|
|
module ActionController
|
|
class SSETest < ActionController::TestCase
|
|
class SSETestController < ActionController::Base
|
|
include ActionController::Live
|
|
|
|
def basic_sse
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
sse = SSE.new(response.stream)
|
|
sse.write("{\"name\":\"John\"}")
|
|
sse.write(name: "Ryan")
|
|
ensure
|
|
sse.close
|
|
end
|
|
|
|
def sse_with_event
|
|
sse = SSE.new(response.stream, event: "send-name")
|
|
sse.write("{\"name\":\"John\"}")
|
|
sse.write(name: "Ryan")
|
|
ensure
|
|
sse.close
|
|
end
|
|
|
|
def sse_with_retry
|
|
sse = SSE.new(response.stream, retry: 1000)
|
|
sse.write("{\"name\":\"John\"}")
|
|
sse.write({ name: "Ryan" }, retry: 1500)
|
|
ensure
|
|
sse.close
|
|
end
|
|
|
|
def sse_with_id
|
|
sse = SSE.new(response.stream)
|
|
sse.write("{\"name\":\"John\"}", id: 1)
|
|
sse.write({ name: "Ryan" }, id: 2)
|
|
ensure
|
|
sse.close
|
|
end
|
|
|
|
def sse_with_multiple_line_message
|
|
sse = SSE.new(response.stream)
|
|
sse.write("first line.\nsecond line.")
|
|
ensure
|
|
sse.close
|
|
end
|
|
end
|
|
|
|
tests SSETestController
|
|
|
|
def wait_for_response_stream_close
|
|
response.body
|
|
end
|
|
|
|
def test_basic_sse
|
|
get :basic_sse
|
|
|
|
wait_for_response_stream_close
|
|
assert_match(/data: {\"name\":\"John\"}/, response.body)
|
|
assert_match(/data: {\"name\":\"Ryan\"}/, response.body)
|
|
end
|
|
|
|
def test_sse_with_event_name
|
|
get :sse_with_event
|
|
|
|
wait_for_response_stream_close
|
|
assert_match(/data: {\"name\":\"John\"}/, response.body)
|
|
assert_match(/data: {\"name\":\"Ryan\"}/, response.body)
|
|
assert_match(/event: send-name/, response.body)
|
|
end
|
|
|
|
def test_sse_with_retry
|
|
get :sse_with_retry
|
|
|
|
wait_for_response_stream_close
|
|
first_response, second_response = response.body.split("\n\n")
|
|
assert_match(/data: {\"name\":\"John\"}/, first_response)
|
|
assert_match(/retry: 1000/, first_response)
|
|
|
|
assert_match(/data: {\"name\":\"Ryan\"}/, second_response)
|
|
assert_match(/retry: 1500/, second_response)
|
|
end
|
|
|
|
def test_sse_with_id
|
|
get :sse_with_id
|
|
|
|
wait_for_response_stream_close
|
|
first_response, second_response = response.body.split("\n\n")
|
|
assert_match(/data: {\"name\":\"John\"}/, first_response)
|
|
assert_match(/id: 1/, first_response)
|
|
|
|
assert_match(/data: {\"name\":\"Ryan\"}/, second_response)
|
|
assert_match(/id: 2/, second_response)
|
|
end
|
|
|
|
def test_sse_with_multiple_line_message
|
|
get :sse_with_multiple_line_message
|
|
|
|
wait_for_response_stream_close
|
|
first_response, second_response = response.body.split("\n")
|
|
assert_match(/data: first line/, first_response)
|
|
assert_match(/data: second line/, second_response)
|
|
end
|
|
end
|
|
|
|
class LiveStreamTest < ActionController::TestCase
|
|
class Exception < StandardError
|
|
end
|
|
|
|
class TestController < ActionController::Base
|
|
include ActionController::Live
|
|
|
|
attr_accessor :latch, :tc, :error_latch
|
|
|
|
def self.controller_path
|
|
"test"
|
|
end
|
|
|
|
def set_cookie
|
|
cookies[:hello] = "world"
|
|
response.stream.write "hello world"
|
|
response.close
|
|
end
|
|
|
|
def render_text
|
|
render plain: "zomg"
|
|
end
|
|
|
|
def default_header
|
|
response.stream.write "<html><body>hi</body></html>"
|
|
response.stream.close
|
|
end
|
|
|
|
def basic_stream
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
%w{ hello world }.each do |word|
|
|
response.stream.write word
|
|
end
|
|
response.stream.close
|
|
end
|
|
|
|
def blocking_stream
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
%w{ hello world }.each do |word|
|
|
response.stream.write word
|
|
latch.wait
|
|
end
|
|
response.stream.close
|
|
end
|
|
|
|
def write_sleep_autoload
|
|
path = File.join(File.dirname(__FILE__), "../fixtures")
|
|
ActiveSupport::Dependencies.autoload_paths << path
|
|
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
response.stream.write "before load"
|
|
sleep 0.01
|
|
silence_warning do
|
|
::LoadMe
|
|
end
|
|
response.stream.close
|
|
latch.count_down
|
|
|
|
ActiveSupport::Dependencies.autoload_paths.reject! { |p| p == path }
|
|
end
|
|
|
|
def thread_locals
|
|
tc.assert_equal "aaron", Thread.current[:setting]
|
|
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
%w{ hello world }.each do |word|
|
|
response.stream.write word
|
|
end
|
|
response.stream.close
|
|
end
|
|
|
|
def with_stale
|
|
render plain: "stale" if stale?(etag: "123", template: false)
|
|
end
|
|
|
|
def exception_in_view
|
|
render "doesntexist"
|
|
end
|
|
|
|
def exception_in_view_after_commit
|
|
response.stream.write ""
|
|
render "doesntexist"
|
|
end
|
|
|
|
def exception_with_callback
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
|
|
response.stream.on_error do
|
|
response.stream.write %(data: "500 Internal Server Error"\n\n)
|
|
response.stream.close
|
|
end
|
|
|
|
response.stream.write "" # make sure the response is committed
|
|
raise "An exception occurred..."
|
|
end
|
|
|
|
def exception_in_controller
|
|
raise Exception, "Exception in controller"
|
|
end
|
|
|
|
def bad_request_error
|
|
raise ActionController::BadRequest
|
|
end
|
|
|
|
def exception_in_exception_callback
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
response.stream.on_error do
|
|
raise "We need to go deeper."
|
|
end
|
|
response.stream.write ""
|
|
response.stream.write params[:widget][:didnt_check_for_nil]
|
|
end
|
|
|
|
def overfill_buffer_and_die
|
|
logger = ActionController::Base.logger || Logger.new($stdout)
|
|
response.stream.on_error do
|
|
logger.warn "Error while streaming."
|
|
error_latch.count_down
|
|
end
|
|
|
|
# Write until the buffer is full. It doesn't expose that
|
|
# information directly, so we must hard-code its size:
|
|
10.times do
|
|
response.stream.write "."
|
|
end
|
|
# .. plus one more, because the #each frees up a slot:
|
|
response.stream.write "."
|
|
|
|
latch.count_down
|
|
|
|
# This write will block, and eventually raise
|
|
response.stream.write "x"
|
|
|
|
20.times do
|
|
response.stream.write "."
|
|
end
|
|
end
|
|
|
|
def ignore_client_disconnect
|
|
response.stream.ignore_disconnect = true
|
|
|
|
response.stream.write "" # commit
|
|
|
|
# These writes will be ignored
|
|
15.times do
|
|
response.stream.write "x"
|
|
end
|
|
|
|
logger.info "Work complete"
|
|
latch.count_down
|
|
end
|
|
end
|
|
|
|
tests TestController
|
|
|
|
def assert_stream_closed
|
|
assert response.stream.closed?, "stream should be closed"
|
|
assert response.committed?, "response should be committed"
|
|
assert response.sent?, "response should be sent"
|
|
end
|
|
|
|
def capture_log_output
|
|
output = StringIO.new
|
|
old_logger, ActionController::Base.logger = ActionController::Base.logger, ActiveSupport::Logger.new(output)
|
|
|
|
begin
|
|
yield output
|
|
ensure
|
|
ActionController::Base.logger = old_logger
|
|
end
|
|
end
|
|
|
|
def setup
|
|
super
|
|
|
|
def @controller.new_controller_thread
|
|
Thread.new { yield }
|
|
end
|
|
end
|
|
|
|
def test_set_cookie
|
|
get :set_cookie
|
|
assert_equal({ "hello" => "world" }, @response.cookies)
|
|
assert_equal "hello world", @response.body
|
|
end
|
|
|
|
def test_write_to_stream
|
|
get :basic_stream
|
|
assert_equal "helloworld", @response.body
|
|
assert_equal "text/event-stream", @response.headers["Content-Type"]
|
|
end
|
|
|
|
def test_delayed_autoload_after_write_within_interlock_hook
|
|
# Simulate InterlockHook
|
|
ActiveSupport::Dependencies.interlock.start_running
|
|
res = get :write_sleep_autoload
|
|
res.each {}
|
|
ActiveSupport::Dependencies.interlock.done_running
|
|
end
|
|
|
|
def test_async_stream
|
|
rubinius_skip "https://github.com/rubinius/rubinius/issues/2934"
|
|
|
|
@controller.latch = Concurrent::CountDownLatch.new
|
|
parts = ["hello", "world"]
|
|
|
|
get :blocking_stream
|
|
|
|
t = Thread.new(response) { |resp|
|
|
resp.await_commit
|
|
resp.stream.each do |part|
|
|
assert_equal parts.shift, part
|
|
ol = @controller.latch
|
|
@controller.latch = Concurrent::CountDownLatch.new
|
|
ol.count_down
|
|
end
|
|
}
|
|
|
|
assert t.join(3), "timeout expired before the thread terminated"
|
|
end
|
|
|
|
def test_abort_with_full_buffer
|
|
@controller.latch = Concurrent::CountDownLatch.new
|
|
@controller.error_latch = Concurrent::CountDownLatch.new
|
|
|
|
capture_log_output do |output|
|
|
get :overfill_buffer_and_die, format: "plain"
|
|
|
|
t = Thread.new(response) { |resp|
|
|
resp.await_commit
|
|
_, _, body = resp.to_a
|
|
body.each do
|
|
@controller.latch.wait
|
|
body.close
|
|
break
|
|
end
|
|
}
|
|
|
|
t.join
|
|
@controller.error_latch.wait
|
|
assert_match "Error while streaming", output.rewind && output.read
|
|
end
|
|
end
|
|
|
|
def test_ignore_client_disconnect
|
|
@controller.latch = Concurrent::CountDownLatch.new
|
|
|
|
capture_log_output do |output|
|
|
get :ignore_client_disconnect
|
|
|
|
t = Thread.new(response) { |resp|
|
|
resp.await_commit
|
|
_, _, body = resp.to_a
|
|
body.each do
|
|
body.close
|
|
break
|
|
end
|
|
}
|
|
|
|
t.join
|
|
Timeout.timeout(3) do
|
|
@controller.latch.wait
|
|
end
|
|
assert_match "Work complete", output.rewind && output.read
|
|
end
|
|
end
|
|
|
|
def test_thread_locals_get_copied
|
|
@controller.tc = self
|
|
Thread.current[:originating_thread] = Thread.current.object_id
|
|
Thread.current[:setting] = "aaron"
|
|
|
|
get :thread_locals
|
|
end
|
|
|
|
def test_live_stream_default_header
|
|
get :default_header
|
|
assert response.headers["Content-Type"]
|
|
end
|
|
|
|
def test_render_text
|
|
get :render_text
|
|
assert_equal "zomg", response.body
|
|
assert_stream_closed
|
|
end
|
|
|
|
def test_exception_handling_html
|
|
assert_raises(ActionView::MissingTemplate) do
|
|
get :exception_in_view
|
|
end
|
|
|
|
capture_log_output do |output|
|
|
get :exception_in_view_after_commit
|
|
assert_match %r((window\.location = "/500\.html"</script></html>)$), response.body
|
|
assert_match "Missing template test/doesntexist", output.rewind && output.read
|
|
assert_stream_closed
|
|
end
|
|
assert response.body
|
|
assert_stream_closed
|
|
end
|
|
|
|
def test_exception_handling_plain_text
|
|
assert_raises(ActionView::MissingTemplate) do
|
|
get :exception_in_view, format: :json
|
|
end
|
|
|
|
capture_log_output do |output|
|
|
get :exception_in_view_after_commit, format: :json
|
|
assert_equal "", response.body
|
|
assert_match "Missing template test/doesntexist", output.rewind && output.read
|
|
assert_stream_closed
|
|
end
|
|
end
|
|
|
|
def test_exception_callback_when_committed
|
|
current_threads = Thread.list
|
|
|
|
capture_log_output do |output|
|
|
get :exception_with_callback, format: "text/event-stream"
|
|
|
|
# Wait on the execution of all threads
|
|
(Thread.list - current_threads).each(&:join)
|
|
|
|
assert_equal %(data: "500 Internal Server Error"\n\n), response.body
|
|
assert_match "An exception occurred...", output.rewind && output.read
|
|
assert_stream_closed
|
|
end
|
|
end
|
|
|
|
def test_exception_in_controller_before_streaming
|
|
assert_raises(ActionController::LiveStreamTest::Exception) do
|
|
get :exception_in_controller, format: "text/event-stream"
|
|
end
|
|
end
|
|
|
|
def test_bad_request_in_controller_before_streaming
|
|
assert_raises(ActionController::BadRequest) do
|
|
get :bad_request_error, format: "text/event-stream"
|
|
end
|
|
end
|
|
|
|
def test_exceptions_raised_handling_exceptions_and_committed
|
|
capture_log_output do |output|
|
|
get :exception_in_exception_callback, format: "text/event-stream"
|
|
assert_equal "", response.body
|
|
assert_match "We need to go deeper", output.rewind && output.read
|
|
assert_stream_closed
|
|
end
|
|
end
|
|
|
|
def test_stale_without_etag
|
|
get :with_stale
|
|
assert_equal 200, response.status.to_i
|
|
end
|
|
|
|
def test_stale_with_etag
|
|
@request.if_none_match = %(W/"#{Digest::MD5.hexdigest('123')}")
|
|
get :with_stale
|
|
assert_equal 304, response.status.to_i
|
|
end
|
|
end
|
|
|
|
class BufferTest < ActionController::TestCase
|
|
def test_nil_callback
|
|
buf = ActionController::Live::Buffer.new nil
|
|
assert buf.call_on_error
|
|
end
|
|
end
|
|
end
|
|
|
|
class LiveStreamRouterTest < ActionDispatch::IntegrationTest
|
|
class TestController < ActionController::Base
|
|
include ActionController::Live
|
|
|
|
def index
|
|
response.headers["Content-Type"] = "text/event-stream"
|
|
sse = SSE.new(response.stream)
|
|
sse.write("{\"name\":\"John\"}")
|
|
sse.write(name: "Ryan")
|
|
ensure
|
|
sse.close
|
|
end
|
|
end
|
|
|
|
def self.call(env)
|
|
routes.call(env)
|
|
end
|
|
|
|
def self.routes
|
|
@routes ||= ActionDispatch::Routing::RouteSet.new
|
|
end
|
|
|
|
routes.draw do
|
|
get "/test" => "live_stream_router_test/test#index"
|
|
end
|
|
|
|
def app
|
|
self.class
|
|
end
|
|
|
|
test "streaming served through the router" do
|
|
get "/test"
|
|
|
|
assert_response :ok
|
|
assert_match(/data: {\"name\":\"John\"}/, response.body)
|
|
assert_match(/data: {\"name\":\"Ryan\"}/, response.body)
|
|
end
|
|
end
|