Add support to keep open streaming connections with Puma (#1858)

Also run async tests against Puma.

Co-authored-by: Jordan Owens <jkowens@gmail.com>
Co-authored-by: Patrik Ragnarsson <patrik@starkast.net>
This commit is contained in:
Jordan Owens 2023-02-10 12:35:35 -05:00 committed by GitHub
parent 386a4794c5
commit baa6bf783e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 44 additions and 15 deletions

View File

@ -1,11 +1,13 @@
#!/usr/bin/env ruby -I ../lib -I lib #!/usr/bin/env ruby -I ../lib -I lib
# frozen_string_literal: true # frozen_string_literal: true
require_relative 'rainbows' # This example does *not* work properly with WEBrick or other
# servers that buffer output. To shut down the server, close any
# open browser tabs that are connected to the chat server.
require 'sinatra' require 'sinatra'
set :server, :rainbows set :server, :puma
connections = [] connections = Set.new
get '/' do get '/' do
halt erb(:login) unless params[:user] halt erb(:login) unless params[:user]
@ -14,13 +16,22 @@ end
get '/stream', provides: 'text/event-stream' do get '/stream', provides: 'text/event-stream' do
stream :keep_open do |out| stream :keep_open do |out|
connections << out if connections.add?(out)
out.callback { connections.delete(out) } out.callback { connections.delete(out) }
end
out << "heartbeat:\n"
sleep 1
rescue
out.close
end end
end end
post '/' do post '/' do
connections.each { |out| out << "data: #{params[:msg]}\n\n" } connections.each do |out|
out << "data: #{params[:msg]}\n\n"
rescue
out.close
end
204 # response without entity body 204 # response without entity body
end end
@ -37,10 +48,10 @@ __END__
</html> </html>
@@ login @@ login
<form action='/'> <form action="/">
<label for='user'>User Name:</label> <label for='user'>User Name:</label>
<input name='user' value='' /> <input name="user" value="" />
<input type='submit' value="GO!" /> <input type="submit" value="GO!" />
</form> </form>
@@ chat @@ chat

View File

@ -474,8 +474,9 @@ module Sinatra
@back.call(self) @back.call(self)
rescue Exception => e rescue Exception => e
@scheduler.schedule { raise e } @scheduler.schedule { raise e }
ensure
close unless @keep_open
end end
close unless @keep_open
end end
end end
@ -506,7 +507,16 @@ module Sinatra
def stream(keep_open = false) def stream(keep_open = false)
scheduler = env['async.callback'] ? EventMachine : Stream scheduler = env['async.callback'] ? EventMachine : Stream
current = @params.dup current = @params.dup
body Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } } stream = if scheduler == Stream && keep_open
Stream.new(scheduler, false) do |out|
until out.closed?
with_params(current) { yield(out) }
end
end
else
Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } }
end
body stream
end end
# Specify response freshness policy for HTTP caches (Cache-Control header). # Specify response freshness policy for HTTP caches (Cache-Control header).

View File

@ -37,7 +37,7 @@ end
set :out, nil set :out, nil
get '/async' do get '/async' do
stream(:keep_open) { |o| (settings.out = o) << "hi!" } stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 }
end end
get '/send' do get '/send' do
@ -66,7 +66,7 @@ end
class Subclass < Sinatra::Base class Subclass < Sinatra::Base
set :out, nil set :out, nil
get '/subclass/async' do get '/subclass/async' do
stream(:keep_open) { |o| (settings.out = o) << "hi!" } stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 }
end end
get '/subclass/send' do get '/subclass/send' do

View File

@ -4,7 +4,7 @@ module IntegrationAsyncHelper
def it(message, &block) def it(message, &block)
base_port = 5100 + Process.pid % 100 base_port = 5100 + Process.pid % 100
%w(rainbows).each_with_index do |server_name, index| %w(rainbows puma).each_with_index do |server_name, index|
server = IntegrationHelper::BaseServer.new(server_name, base_port + index) server = IntegrationHelper::BaseServer.new(server_name, base_port + index)
next unless server.installed? next unless server.installed?

View File

@ -44,7 +44,14 @@ module IntegrationHelper
def installed? def installed?
return @installed unless @installed.nil? return @installed unless @installed.nil?
s = server == 'HTTP' ? 'net/http/server' : server s = case server
when 'HTTP'
'net/http/server'
when 'puma'
'puma/rack/handler'
else
server
end
require s require s
@installed = true @installed = true
rescue LoadError rescue LoadError

View File

@ -133,6 +133,7 @@ class StreamingTest < Minitest::Test
env['async.close'] = close env['async.close'] = close
stream(:keep_open) do |out| stream(:keep_open) do |out|
out.callback { ran = true } out.callback { ran = true }
out.close
end end
end end
end end