2018-09-17 12:41:14 -04:00
|
|
|
# frozen_string_literal: true
|
|
|
|
|
2020-10-25 15:59:40 -04:00
|
|
|
require 'puma/queue_close' unless ::Queue.instance_methods.include? :close
|
2019-02-19 19:38:21 -05:00
|
|
|
|
2012-07-23 13:26:52 -04:00
|
|
|
module Puma
|
2020-10-06 09:22:53 -04:00
|
|
|
# Monitors a collection of IO objects, calling a block whenever
|
|
|
|
# any monitored object either receives data or times out, or when the Reactor shuts down.
|
2018-04-30 18:00:56 -04:00
|
|
|
#
|
2020-10-06 09:22:53 -04:00
|
|
|
# The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev,
|
|
|
|
# Java NIO or just plain IO#select). The call to `NIO::Selector#select` will
|
|
|
|
# 'wakeup' any IO object that receives data.
|
2018-05-01 12:19:50 -04:00
|
|
|
#
|
2020-10-06 09:22:53 -04:00
|
|
|
# This class additionally tracks a timeout for every added object,
|
|
|
|
# and wakes up any object when its timeout elapses.
|
2018-05-01 12:19:50 -04:00
|
|
|
#
|
2020-10-06 09:22:53 -04:00
|
|
|
# The implementation uses a Queue to synchronize adding new objects from the internal select loop.
|
2012-07-23 13:26:52 -04:00
|
|
|
class Reactor
|
2020-10-06 09:22:53 -04:00
|
|
|
# Create a new Reactor to monitor IO objects added by #add.
|
|
|
|
# The provided block will be invoked when an IO has data available to read,
|
|
|
|
# its timeout elapses, or when the Reactor shuts down.
|
|
|
|
def initialize(&block)
|
2020-10-20 09:31:23 -04:00
|
|
|
require 'nio'
|
2019-02-19 19:38:21 -05:00
|
|
|
@selector = NIO::Selector.new
|
2020-10-06 09:22:53 -04:00
|
|
|
@input = Queue.new
|
2012-07-23 13:26:52 -04:00
|
|
|
@timeouts = []
|
2020-10-06 09:22:53 -04:00
|
|
|
@block = block
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
# Run the internal select loop, using a background thread by default.
|
|
|
|
def run(background=true)
|
|
|
|
if background
|
|
|
|
@thread = Thread.new do
|
|
|
|
Puma.set_thread_name "reactor"
|
|
|
|
select_loop
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
2020-10-06 09:22:53 -04:00
|
|
|
else
|
|
|
|
select_loop
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
2013-10-28 08:56:45 -04:00
|
|
|
end
|
|
|
|
|
2020-10-27 09:38:34 -04:00
|
|
|
# Add a new client to monitor.
|
2020-10-06 09:22:53 -04:00
|
|
|
# The object must respond to #timeout and #timeout_at.
|
|
|
|
# Returns false if the reactor is already shut down.
|
2020-10-27 09:38:34 -04:00
|
|
|
def add(client)
|
|
|
|
@input << client
|
2020-10-06 09:22:53 -04:00
|
|
|
@selector.wakeup
|
|
|
|
true
|
|
|
|
rescue ClosedQueueError
|
|
|
|
false
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
# Shutdown the reactor, blocking until the background thread is finished.
|
|
|
|
def shutdown
|
|
|
|
@input.close
|
|
|
|
begin
|
|
|
|
@selector.wakeup
|
|
|
|
rescue IOError # Ignore if selector is already closed
|
2013-10-28 09:27:30 -04:00
|
|
|
end
|
2020-10-06 09:22:53 -04:00
|
|
|
@thread.join if @thread
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
private
|
2012-07-30 19:12:23 -04:00
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
def select_loop
|
|
|
|
begin
|
|
|
|
until @input.closed? && @input.empty?
|
|
|
|
# Wakeup any registered object that receives incoming data.
|
|
|
|
# Block until the earliest timeout or Selector#wakeup is called.
|
|
|
|
timeout = (earliest = @timeouts.first) && earliest.timeout
|
|
|
|
@selector.select(timeout) {|mon| wakeup!(mon.value)}
|
|
|
|
|
|
|
|
# Wakeup all objects that timed out.
|
|
|
|
timed_out = @timeouts.take_while {|t| t.timeout == 0}
|
|
|
|
timed_out.each(&method(:wakeup!))
|
|
|
|
|
|
|
|
unless @input.empty?
|
2020-10-25 15:58:49 -04:00
|
|
|
until @input.empty?
|
|
|
|
client = @input.pop
|
|
|
|
register(client) if client.io_ok?
|
|
|
|
end
|
2020-10-06 09:22:53 -04:00
|
|
|
@timeouts.sort_by!(&:timeout_at)
|
|
|
|
end
|
2012-07-30 19:12:23 -04:00
|
|
|
end
|
2020-10-06 09:22:53 -04:00
|
|
|
rescue StandardError => e
|
|
|
|
STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
|
|
|
|
STDERR.puts e.backtrace
|
|
|
|
retry
|
2012-07-30 19:12:23 -04:00
|
|
|
end
|
2020-10-06 09:22:53 -04:00
|
|
|
# Wakeup all remaining objects on shutdown.
|
2020-10-07 21:51:59 -04:00
|
|
|
@timeouts.each(&@block)
|
2020-10-06 09:22:53 -04:00
|
|
|
@selector.close
|
2012-07-30 19:12:23 -04:00
|
|
|
end
|
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
# Start monitoring the object.
|
2020-10-27 09:38:34 -04:00
|
|
|
def register(client)
|
|
|
|
@selector.register(client.to_io, :r).value = client
|
|
|
|
@timeouts << client
|
2020-11-16 15:31:07 -05:00
|
|
|
rescue ArgumentError
|
|
|
|
# unreadable clients raise error when processed by NIO
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
2012-08-10 13:10:30 -04:00
|
|
|
|
2020-10-06 09:22:53 -04:00
|
|
|
# 'Wake up' a monitored object by calling the provided block.
|
|
|
|
# Stop monitoring the object if the block returns `true`.
|
2020-10-27 09:38:34 -04:00
|
|
|
def wakeup!(client)
|
|
|
|
if @block.call client
|
|
|
|
@selector.deregister client.to_io
|
|
|
|
@timeouts.delete client
|
2013-02-05 01:31:40 -05:00
|
|
|
end
|
2012-09-02 23:33:09 -04:00
|
|
|
end
|
2012-07-23 13:26:52 -04:00
|
|
|
end
|
|
|
|
end
|