mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Controller actions are processed in a separate thread for live
responses. Processing controller actions in a separate thread allows us to work around the rack api - we can allow the user to set status and headers, then block until the first bytes are written. As soon as the first bytes are written, the main thread can return the status, headers, and (essentially) a queue for the body.
This commit is contained in:
parent
06c9e176ca
commit
38cfbb8aa7
3 changed files with 105 additions and 7 deletions
|
@ -6,8 +6,7 @@ module ActionController
|
|||
class Response < ActionDispatch::Response
|
||||
class Buffer < ActionDispatch::Response::Buffer # :nodoc:
|
||||
def initialize(response)
|
||||
@response = response
|
||||
@buf = Queue.new
|
||||
super(response, Queue.new)
|
||||
end
|
||||
|
||||
def write(string)
|
||||
|
@ -59,5 +58,30 @@ module ActionController
|
|||
buf
|
||||
end
|
||||
end
|
||||
|
||||
def process(name)
|
||||
t1 = Thread.current
|
||||
locals = t1.keys.map { |key| [key, t1[key]] }
|
||||
|
||||
# This processes the action in a child thread. It lets us return the
|
||||
# response code and headers back up the rack stack, and still process
|
||||
# the body in parallel with sending data to the client
|
||||
Thread.new {
|
||||
t2 = Thread.current
|
||||
t2.abort_on_exception = true
|
||||
|
||||
# Since we're processing the view in a different thread, copy the
|
||||
# thread locals from the main thread to the child thread. :'(
|
||||
locals.each { |k,v| t2[k] = v }
|
||||
|
||||
begin
|
||||
super(name)
|
||||
ensure
|
||||
@_response.commit!
|
||||
end
|
||||
}
|
||||
|
||||
@_response.await_commit
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -517,8 +517,8 @@ module ActionController
|
|||
end
|
||||
|
||||
def setup_controller_request_and_response
|
||||
@request = TestRequest.new
|
||||
@response = TestResponse.new
|
||||
@request = build_request
|
||||
@response = build_response
|
||||
@response.request = @request
|
||||
|
||||
@controller = nil unless defined? @controller
|
||||
|
@ -539,6 +539,14 @@ module ActionController
|
|||
end
|
||||
end
|
||||
|
||||
def build_request
|
||||
TestRequest.new
|
||||
end
|
||||
|
||||
def build_response
|
||||
TestResponse.new
|
||||
end
|
||||
|
||||
included do
|
||||
include ActionController::TemplateAssertions
|
||||
include ActionDispatch::Assertions
|
||||
|
|
|
@ -1,16 +1,41 @@
|
|||
require 'abstract_unit'
|
||||
require 'active_support/concurrency/latch'
|
||||
|
||||
module ActionController
|
||||
class StreamingResponseTest < ActionController::TestCase
|
||||
class LiveStreamTest < ActionController::TestCase
|
||||
class TestController < ActionController::Base
|
||||
include ActionController::Live
|
||||
|
||||
attr_accessor :latch, :tc
|
||||
|
||||
def self.controller_path
|
||||
'test'
|
||||
end
|
||||
|
||||
def basic_stream
|
||||
response.headers['Content-Type'] = 'text/event-stream'
|
||||
%w{ hello world }.each do |word|
|
||||
response.stream.write word
|
||||
end
|
||||
response.stream.close
|
||||
end
|
||||
|
||||
def blocking_stream
|
||||
response.headers['Content-Type'] = 'text/event-stream'
|
||||
%w{ hello world }.each do |word|
|
||||
response.stream.write word
|
||||
latch.await
|
||||
end
|
||||
response.stream.close
|
||||
end
|
||||
|
||||
def thread_locals
|
||||
tc.assert_equal 'aaron', Thread.current[:setting]
|
||||
tc.refute_equal Thread.current.object_id, Thread.current[:originating_thread]
|
||||
|
||||
response.headers['Content-Type'] = 'text/event-stream'
|
||||
%w{ hello world }.each do |word|
|
||||
response.stream.write word
|
||||
response.stream.write "\n"
|
||||
end
|
||||
response.stream.close
|
||||
end
|
||||
|
@ -18,9 +43,50 @@ module ActionController
|
|||
|
||||
tests TestController
|
||||
|
||||
class TestResponse < Live::Response
|
||||
def recycle!
|
||||
initialize
|
||||
end
|
||||
end
|
||||
|
||||
def build_response
|
||||
TestResponse.new
|
||||
end
|
||||
|
||||
def test_write_to_stream
|
||||
@controller = TestController.new
|
||||
get :basic_stream
|
||||
assert_equal "hello\nworld\n", @response.body
|
||||
assert_equal "helloworld", @response.body
|
||||
assert_equal 'text/event-stream', @response.headers['Content-Type']
|
||||
end
|
||||
|
||||
def test_async_stream
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
parts = ['hello', 'world']
|
||||
|
||||
@controller.request = @request
|
||||
@controller.response = @response
|
||||
|
||||
t = Thread.new(@response) { |resp|
|
||||
resp.stream.each do |part|
|
||||
assert_equal parts.shift, part
|
||||
ol = @controller.latch
|
||||
@controller.latch = ActiveSupport::Concurrency::Latch.new
|
||||
ol.release
|
||||
end
|
||||
}
|
||||
|
||||
@controller.process :blocking_stream
|
||||
|
||||
assert t.join
|
||||
end
|
||||
|
||||
def test_thread_locals_get_copied
|
||||
@controller.tc = self
|
||||
Thread.current[:originating_thread] = Thread.current.object_id
|
||||
Thread.current[:setting] = 'aaron'
|
||||
|
||||
get :thread_locals
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue