From baa6bf783ea6dd8d2a708aa604c8ee583a90ee23 Mon Sep 17 00:00:00 2001 From: Jordan Owens Date: Fri, 10 Feb 2023 12:35:35 -0500 Subject: [PATCH] Add support to keep open streaming connections with Puma (#1858) Also run async tests against Puma. Co-authored-by: Jordan Owens Co-authored-by: Patrik Ragnarsson --- examples/chat.rb | 29 ++++++++++++++++++++--------- lib/sinatra/base.rb | 14 ++++++++++++-- test/integration/app.rb | 4 ++-- test/integration_async_helper.rb | 2 +- test/integration_helper.rb | 9 ++++++++- test/streaming_test.rb | 1 + 6 files changed, 44 insertions(+), 15 deletions(-) diff --git a/examples/chat.rb b/examples/chat.rb index 9e5f8f89..afba7a3e 100755 --- a/examples/chat.rb +++ b/examples/chat.rb @@ -1,11 +1,13 @@ #!/usr/bin/env ruby -I ../lib -I lib # frozen_string_literal: true -require_relative 'rainbows' +# This example does *not* work properly with WEBrick or other +# servers that buffer output. To shut down the server, close any +# open browser tabs that are connected to the chat server. require 'sinatra' -set :server, :rainbows -connections = [] +set :server, :puma +connections = Set.new get '/' do halt erb(:login) unless params[:user] @@ -14,13 +16,22 @@ end get '/stream', provides: 'text/event-stream' do stream :keep_open do |out| - connections << out - out.callback { connections.delete(out) } + if connections.add?(out) + out.callback { connections.delete(out) } + end + out << "heartbeat:\n" + sleep 1 + rescue + out.close end end post '/' do - connections.each { |out| out << "data: #{params[:msg]}\n\n" } + connections.each do |out| + out << "data: #{params[:msg]}\n\n" + rescue + out.close + end 204 # response without entity body end @@ -37,10 +48,10 @@ __END__ @@ login -
+ - - + +
@@ chat diff --git a/lib/sinatra/base.rb b/lib/sinatra/base.rb index ba330a46..c063c4a9 100644 --- a/lib/sinatra/base.rb +++ b/lib/sinatra/base.rb @@ -474,8 +474,9 @@ module Sinatra @back.call(self) rescue Exception => e @scheduler.schedule { raise e } + ensure + close unless @keep_open end - close unless @keep_open end end @@ -506,7 +507,16 @@ module Sinatra def stream(keep_open = false) scheduler = env['async.callback'] ? EventMachine : Stream current = @params.dup - body Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } } + stream = if scheduler == Stream && keep_open + Stream.new(scheduler, false) do |out| + until out.closed? + with_params(current) { yield(out) } + end + end + else + Stream.new(scheduler, keep_open) { |out| with_params(current) { yield(out) } } + end + body stream end # Specify response freshness policy for HTTP caches (Cache-Control header). diff --git a/test/integration/app.rb b/test/integration/app.rb index 23632499..abbe7009 100644 --- a/test/integration/app.rb +++ b/test/integration/app.rb @@ -37,7 +37,7 @@ end set :out, nil get '/async' do - stream(:keep_open) { |o| (settings.out = o) << "hi!" } + stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 } end get '/send' do @@ -66,7 +66,7 @@ end class Subclass < Sinatra::Base set :out, nil get '/subclass/async' do - stream(:keep_open) { |o| (settings.out = o) << "hi!" } + stream(:keep_open) { |o| (settings.out = o) << "hi!"; sleep 1 } end get '/subclass/send' do diff --git a/test/integration_async_helper.rb b/test/integration_async_helper.rb index cf05839d..fceceb5e 100644 --- a/test/integration_async_helper.rb +++ b/test/integration_async_helper.rb @@ -4,7 +4,7 @@ module IntegrationAsyncHelper def it(message, &block) base_port = 5100 + Process.pid % 100 - %w(rainbows).each_with_index do |server_name, index| + %w(rainbows puma).each_with_index do |server_name, index| server = IntegrationHelper::BaseServer.new(server_name, base_port + index) next unless server.installed? diff --git a/test/integration_helper.rb b/test/integration_helper.rb index f7edfa55..7525cf10 100644 --- a/test/integration_helper.rb +++ b/test/integration_helper.rb @@ -44,7 +44,14 @@ module IntegrationHelper def installed? return @installed unless @installed.nil? - s = server == 'HTTP' ? 'net/http/server' : server + s = case server + when 'HTTP' + 'net/http/server' + when 'puma' + 'puma/rack/handler' + else + server + end require s @installed = true rescue LoadError diff --git a/test/streaming_test.rb b/test/streaming_test.rb index 800bcca2..83b3bd0b 100644 --- a/test/streaming_test.rb +++ b/test/streaming_test.rb @@ -133,6 +133,7 @@ class StreamingTest < Minitest::Test env['async.close'] = close stream(:keep_open) do |out| out.callback { ran = true } + out.close end end end