diff --git a/actionpack/lib/action_controller/metal/live.rb b/actionpack/lib/action_controller/metal/live.rb index efeeefda9d..48818cb8e4 100644 --- a/actionpack/lib/action_controller/metal/live.rb +++ b/actionpack/lib/action_controller/metal/live.rb @@ -6,8 +6,7 @@ module ActionController class Response < ActionDispatch::Response class Buffer < ActionDispatch::Response::Buffer # :nodoc: def initialize(response) - @response = response - @buf = Queue.new + super(response, Queue.new) end def write(string) @@ -59,5 +58,30 @@ module ActionController buf end end + + def process(name) + t1 = Thread.current + locals = t1.keys.map { |key| [key, t1[key]] } + + # This processes the action in a child thread. It lets us return the + # response code and headers back up the rack stack, and still process + # the body in parallel with sending data to the client + Thread.new { + t2 = Thread.current + t2.abort_on_exception = true + + # Since we're processing the view in a different thread, copy the + # thread locals from the main thread to the child thread. :'( + locals.each { |k,v| t2[k] = v } + + begin + super(name) + ensure + @_response.commit! + end + } + + @_response.await_commit + end end end diff --git a/actionpack/lib/action_controller/test_case.rb b/actionpack/lib/action_controller/test_case.rb index c4c825ba6b..ca1ecc43a1 100644 --- a/actionpack/lib/action_controller/test_case.rb +++ b/actionpack/lib/action_controller/test_case.rb @@ -517,8 +517,8 @@ module ActionController end def setup_controller_request_and_response - @request = TestRequest.new - @response = TestResponse.new + @request = build_request + @response = build_response @response.request = @request @controller = nil unless defined? @controller @@ -539,6 +539,14 @@ module ActionController end end + def build_request + TestRequest.new + end + + def build_response + TestResponse.new + end + included do include ActionController::TemplateAssertions include ActionDispatch::Assertions diff --git a/actionpack/test/controller/live_stream_test.rb b/actionpack/test/controller/live_stream_test.rb index 6ee6444065..5bb37a271d 100644 --- a/actionpack/test/controller/live_stream_test.rb +++ b/actionpack/test/controller/live_stream_test.rb @@ -1,16 +1,41 @@ require 'abstract_unit' +require 'active_support/concurrency/latch' module ActionController - class StreamingResponseTest < ActionController::TestCase + class LiveStreamTest < ActionController::TestCase class TestController < ActionController::Base + include ActionController::Live + + attr_accessor :latch, :tc + def self.controller_path 'test' end def basic_stream + response.headers['Content-Type'] = 'text/event-stream' + %w{ hello world }.each do |word| + response.stream.write word + end + response.stream.close + end + + def blocking_stream + response.headers['Content-Type'] = 'text/event-stream' + %w{ hello world }.each do |word| + response.stream.write word + latch.await + end + response.stream.close + end + + def thread_locals + tc.assert_equal 'aaron', Thread.current[:setting] + tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread] + + response.headers['Content-Type'] = 'text/event-stream' %w{ hello world }.each do |word| response.stream.write word - response.stream.write "\n" end response.stream.close end @@ -18,9 +43,50 @@ module ActionController tests TestController + class TestResponse < Live::Response + def recycle! + initialize + end + end + + def build_response + TestResponse.new + end + def test_write_to_stream + @controller = TestController.new get :basic_stream - assert_equal "hello\nworld\n", @response.body + assert_equal "helloworld", @response.body + assert_equal 'text/event-stream', @response.headers['Content-Type'] + end + + def test_async_stream + @controller.latch = ActiveSupport::Concurrency::Latch.new + parts = ['hello', 'world'] + + @controller.request = @request + @controller.response = @response + + t = Thread.new(@response) { |resp| + resp.stream.each do |part| + assert_equal parts.shift, part + ol = @controller.latch + @controller.latch = ActiveSupport::Concurrency::Latch.new + ol.release + end + } + + @controller.process :blocking_stream + + assert t.join + end + + def test_thread_locals_get_copied + @controller.tc = self + Thread.current[:originating_thread] = Thread.current.object_id + Thread.current[:setting] = 'aaron' + + get :thread_locals end end end