add #stream helper

This commit is contained in:
Konstantin Haase 2011-08-17 11:51:45 +02:00
parent 9f6923214f
commit 49d4c90052
4 changed files with 219 additions and 4 deletions

View File

@ -188,6 +188,9 @@ That way we can, for instance, easily implement a streaming example:
get('/') { Stream.new }
You can also use the +stream+ helper method (described below) to reduce boiler
plate and embed the streaming logic in the route.
=== Custom Route Matchers
As shown above, Sinatra ships with built-in support for using String patterns
@ -813,6 +816,54 @@ Similar to the body, you can also set the status code and headers:
Like +body+, +headers+ and +status+ with no arguments can be used to access
their current values.
=== Streaming Responses
Sometimes you want to start sending out data while still generating parts of
the response body. In extreme examples, you want to keep sending data until
the client closes the connection. You can use the +stream+ helper to avoid
creating your own wrapper:
get '/' do
stream do |out|
out << "It's gonna be legen -\n"
sleep 0.5
out << " (wait for it) \n"
sleep 1
out << "- dary!\n"
end
end
This allows you to implement streaming APIs,
{Server Sent Events}[http://dev.w3.org/html5/eventsource/] and can be used as
basis for {WebSockets}[http://en.wikipedia.org/wiki/WebSocket]. It can also be
used to increase throughput if some but not all content depends on a slow
resource.
Note that the streaming behavior, especially the number of concurrent request,
highly depends on the web server used to serve the application. Some servers,
like WEBRick, might not even support streaming at all. If the server does not
support streaming, the body will be sent all at once after the block passed to
+stream+ finished executing.
If the optional parameter is set to +false+, it will not call +close+ on the
stream object, allowing you to close it at any later point in the execution
flow. This only works on evented servers, like Thin and Rainbows. Other
servers will still close the stream.
set :server, :thin
connections = []
get '/' do
# keep stream open
stream(false) { |out| connections << out }
end
post '/' do
# write to all open streams
connections.each { |out| out << params[:message] << "\n" }
"message sent"
end
=== Logging
In the request scope, the +logger+ helper exposes a +Logger+ instance:
@ -1243,6 +1294,9 @@ You can access those options via <tt>settings</tt>:
Use an explicit array when setting multiple values:
<tt>set :static_cache_control, [:public, :max_age => 300]</tt>
[threaded] If set to +true+, will tell Thin to use
<tt>EventMachine.defer</tt> for processing the request.
[views] views folder.
== Error Handling

View File

@ -78,7 +78,11 @@ module Sinatra
elsif Array === body and not [204, 304].include?(status.to_i)
headers["Content-Length"] = body.inject(0) { |l, p| l + Rack::Utils.bytesize(p) }.to_s
end
super
# Rack::Response#finish sometimes returns self as response body. We don't want that.
status, headers, result = super
result = body if result == self
[status, headers, result]
end
end
@ -225,6 +229,61 @@ module Sinatra
not_found
end
# Class of the response body in case you use #stream.
#
# Three things really matter: The front and back block (back being the
# blog generating content, front the one sending it to the client) and
# the scheduler, integrating with whatever concurrency feature the Rack
# handler is using.
#
# Scheduler has to respond to defer and schedule.
class Stream
def self.schedule(*) yield end
def self.defer(*) yield end
def initialize(scheduler = self.class, close = true, &back)
@back, @scheduler, @callback, @close = back.to_proc, scheduler, nil, close
end
def close
@scheduler.schedule { @callback.call if @callback }
end
def each(&front)
@front = front
@scheduler.defer do
begin
@back.call(self)
rescue Exception => e
@scheduler.schedule { raise e }
end
close if @close
end
end
def <<(data)
@scheduler.schedule { @front.call(data.to_s) }
self
end
def callback(&block)
@callback = block
end
alias errback callback
end
# Allows to start sending data to the client even though later parts of
# the response body have not yet been generated.
#
# The close parameter specifies whether Stream#close should be called
# after the block has been executed. This is only relevant for evented
# servers like Thin or Rainbows.
def stream(close = true, &block)
scheduler = env['async.callback'] ? EventMachine : Stream
body Stream.new(scheduler, close, &block)
end
# Specify response freshness policy for HTTP caches (Cache-Control header).
# Any number of non-value directives (:public, :private, :no_cache,
# :no_store, :must_revalidate, :proxy_revalidate) may be passed along with
@ -1204,8 +1263,9 @@ module Sinatra
"on #{port} for #{environment} with backup from #{handler_name}"
end
[:INT, :TERM].each { |sig| trap(sig) { quit!(server, handler_name) } }
server.threaded = settings.threaded if server.respond_to? :threaded=
set :running, true
yield handler if block_given?
yield server if block_given?
end
rescue Errno::EADDRINUSE => e
$stderr.puts "== Someone is already performing on port #{port}!"
@ -1277,7 +1337,7 @@ module Sinatra
servers = Array(server)
servers.each do |server_name|
begin
return Rack::Handler.get(server_name)
return Rack::Handler.get(server_name.to_s)
rescue LoadError
rescue NameError
end
@ -1406,6 +1466,7 @@ module Sinatra
set :views, Proc.new { root && File.join(root, 'views') }
set :reload_templates, Proc.new { development? }
set :lock, false
set :threaded, true
set :public_folder, Proc.new { root && File.join(root, 'public') }
set :static, Proc.new { public_folder && File.exist?(public_folder) }

View File

@ -37,7 +37,7 @@ class ResponseTest < Test::Unit::TestCase
@response.body = ['Hello', 'World!', '✈']
status, headers, body = @response.finish
assert_equal '14', headers['Content-Length']
assert_equal @response.body, body.body
assert_equal @response.body, body
end
it 'does not call #to_ary or #inject on the body' do

100
test/streaming_test.rb Normal file
View File

@ -0,0 +1,100 @@
require File.expand_path('../helper', __FILE__)
class StreamingTest < Test::Unit::TestCase
Stream = Sinatra::Helpers::Stream
it 'returns the concatinated body' do
mock_app do
get '/' do
stream do |out|
out << "Hello" << " "
out << "World!"
end
end
end
get('/')
assert_body "Hello World!"
end
it 'always yields strings' do
stream = Stream.new { |out| out << :foo }
stream.each { |str| assert_equal 'foo', str }
end
it 'postpones body generation' do
step = 0
stream = Stream.new do |out|
10.times do
out << step
step += 1
end
end
stream.each do |s|
assert_equal s, step.to_s
step += 1
end
end
it 'calls the callback after it is done' do
step = 0
final = 0
stream = Stream.new { |o| 10.times { step += 1 }}
stream.callback { final = step }
stream.each { |str| }
assert_equal 10, final
end
it 'does not trigger the callback if close is set to false' do
step = 0
final = 0
stream = Stream.new(Stream, false) { |o| 10.times { step += 1 } }
stream.callback { final = step }
stream.each { |str| }
assert_equal 0, final
end
class MockScheduler
def initialize(*) @schedule, @defer = [], [] end
def schedule(&block) @schedule << block end
def defer(&block) @defer << block end
def schedule!(*) @schedule.pop.call until @schedule.empty? end
def defer!(*) @defer.pop.call until @defer.empty? end
end
it 'allows dropping in another scheduler' do
scheduler = MockScheduler.new
processing = sending = done = false
stream = Stream.new(scheduler) do |out|
processing = true
out << :foo
end
stream.each { sending = true}
stream.callback { done = true }
scheduler.schedule!
assert !processing
assert !sending
assert !done
scheduler.defer!
assert processing
assert !sending
assert !done
scheduler.schedule!
assert sending
assert done
end
it 'schedules exceptions to be raised on the main thread/event loop/...' do
scheduler = MockScheduler.new
Stream.new(scheduler) { fail 'should be caught' }.each { }
scheduler.defer!
assert_raise(RuntimeError) { scheduler.schedule! }
end
end