1
0
Fork 0
mirror of https://github.com/puma/puma.git synced 2022-11-09 13:48:40 -05:00
puma--puma/lib/puma/reactor.rb

110 lines
3.3 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
require 'puma/queue_close' unless ::Queue.instance_methods.include? :close
module Puma
# Monitors a collection of IO objects, calling a block whenever
# any monitored object either receives data or times out, or when the Reactor shuts down.
2018-04-30 18:00:56 -04:00
#
# The waiting/wake up is performed with nio4r, which will use the appropriate backend (libev,
# Java NIO or just plain IO#select). The call to `NIO::Selector#select` will
# 'wakeup' any IO object that receives data.
2018-05-01 12:19:50 -04:00
#
# This class additionally tracks a timeout for every added object,
# and wakes up any object when its timeout elapses.
2018-05-01 12:19:50 -04:00
#
# The implementation uses a Queue to synchronize adding new objects from the internal select loop.
class Reactor
# Create a new Reactor to monitor IO objects added by #add.
# The provided block will be invoked when an IO has data available to read,
# its timeout elapses, or when the Reactor shuts down.
def initialize(&block)
require 'nio'
@selector = NIO::Selector.new
@input = Queue.new
@timeouts = []
@block = block
end
# Run the internal select loop, using a background thread by default.
def run(background=true)
if background
@thread = Thread.new do
Puma.set_thread_name "reactor"
select_loop
end
else
select_loop
end
end
# Add a new IO object to monitor.
# The object must respond to #timeout and #timeout_at.
# Returns false if the reactor is already shut down.
def add(io)
@input << io
@selector.wakeup
true
rescue ClosedQueueError
false
end
# Shutdown the reactor, blocking until the background thread is finished.
def shutdown
@input.close
begin
@selector.wakeup
rescue IOError # Ignore if selector is already closed
end
@thread.join if @thread
end
private
2012-07-30 19:12:23 -04:00
def select_loop
begin
until @input.closed? && @input.empty?
# Wakeup any registered object that receives incoming data.
# Block until the earliest timeout or Selector#wakeup is called.
timeout = (earliest = @timeouts.first) && earliest.timeout
@selector.select(timeout) {|mon| wakeup!(mon.value)}
# Wakeup all objects that timed out.
timed_out = @timeouts.take_while {|t| t.timeout == 0}
timed_out.each(&method(:wakeup!))
unless @input.empty?
until @input.empty?
client = @input.pop
register(client) if client.io_ok?
end
@timeouts.sort_by!(&:timeout_at)
end
2012-07-30 19:12:23 -04:00
end
rescue StandardError => e
STDERR.puts "Error in reactor loop escaped: #{e.message} (#{e.class})"
STDERR.puts e.backtrace
retry
2012-07-30 19:12:23 -04:00
end
# Wakeup all remaining objects on shutdown.
@timeouts.each(&@block)
@selector.close
2012-07-30 19:12:23 -04:00
end
# Start monitoring the object.
def register(io)
@selector.register(io, :r).value = io
@timeouts << io
end
2012-08-10 13:10:30 -04:00
# 'Wake up' a monitored object by calling the provided block.
# Stop monitoring the object if the block returns `true`.
def wakeup!(io)
if @block.call(io)
@selector.deregister(io)
@timeouts.delete(io)
2013-02-05 01:31:40 -05:00
end
end
end
end