1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00
ruby--ruby/test/fiber/scheduler.rb
Samuel Williams 7f175e5648
Avoid missed wakeup with fiber scheduler and Fiber.blocking. (#6588)
* Ensure that blocked fibers don't prevent valid wakeups.
2022-10-20 13:38:52 +13:00

363 lines
7.5 KiB
Ruby

# frozen_string_literal: true
# This is an example and simplified scheduler for test purposes.
# It is not efficient for a large number of file descriptors as it uses IO.select().
# Production Fiber schedulers should use epoll/kqueue/etc.
require 'fiber'
require 'socket'
begin
require 'io/nonblock'
rescue LoadError
# Ignore.
end
class Scheduler
experimental = Warning[:experimental]
begin
Warning[:experimental] = false
IO::Buffer.new(0)
ensure
Warning[:experimental] = experimental
end
def initialize
@readable = {}
@writable = {}
@waiting = {}
@closed = false
@lock = Thread::Mutex.new
@blocking = Hash.new.compare_by_identity
@ready = []
@urgent = IO.pipe
end
attr :readable
attr :writable
attr :waiting
def next_timeout
_fiber, timeout = @waiting.min_by{|key, value| value}
if timeout
offset = timeout - current_time
if offset < 0
return 0
else
return offset
end
end
end
def run
# $stderr.puts [__method__, Fiber.current].inspect
while @readable.any? or @writable.any? or @waiting.any? or @blocking.any?
# Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys + [@urgent.first], @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
selected = {}
readable&.each do |io|
if fiber = @readable.delete(io)
@writable.delete(io) if @writable[io] == fiber
selected[fiber] = IO::READABLE
elsif io == @urgent.first
@urgent.first.read_nonblock(1024)
end
end
writable&.each do |io|
if fiber = @writable.delete(io)
@readable.delete(io) if @readable[io] == fiber
selected[fiber] = selected.fetch(fiber, 0) | IO::WRITABLE
end
end
selected.each do |fiber, events|
fiber.resume(events)
end
if @waiting.any?
time = current_time
waiting, @waiting = @waiting, {}
waiting.each do |fiber, timeout|
if fiber.alive?
if timeout <= time
fiber.resume
else
@waiting[fiber] = timeout
end
end
end
end
if @ready.any?
ready = nil
@lock.synchronize do
ready, @ready = @ready, []
end
ready.each do |fiber|
fiber.resume
end
end
end
end
def scheduler_close
close(true)
end
def close(internal = false)
# $stderr.puts [__method__, Fiber.current].inspect
unless internal
if Fiber.scheduler == self
return Fiber.set_scheduler(nil)
end
end
if @closed
raise "Scheduler already closed!"
end
self.run
ensure
if @urgent
@urgent.each(&:close)
@urgent = nil
end
@closed ||= true
# We freeze to detect any unintended modifications after the scheduler is closed:
self.freeze
end
def closed?
@closed
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def timeout_after(duration, klass, message, &block)
fiber = Fiber.current
self.fiber do
sleep(duration)
if fiber&.alive?
fiber.raise(klass, message)
end
end
begin
yield(duration)
ensure
fiber = nil
end
end
def process_wait(pid, flags)
# $stderr.puts [__method__, pid, flags, Fiber.current].inspect
# This is a very simple way to implement a non-blocking wait:
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
def io_wait(io, events, duration)
# $stderr.puts [__method__, io, events, duration, Fiber.current].inspect
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & IO::WRITABLE).zero?
@writable[io] = Fiber.current
end
Fiber.yield
ensure
@readable.delete(io)
@writable.delete(io)
end
def io_select(...)
# Emulate the operation using a non-blocking thread:
Thread.new do
IO.select(...)
end.value
end
# Used for Kernel#sleep and Thread::Mutex#sleep
def kernel_sleep(duration = nil)
# $stderr.puts [__method__, duration, Fiber.current].inspect
self.block(:sleep, duration)
return true
end
# Used when blocking on synchronization (Thread::Mutex#lock,
# Thread::Queue#pop, Thread::SizedQueue#push, ...)
def block(blocker, timeout = nil)
# $stderr.puts [__method__, blocker, timeout].inspect
fiber = Fiber.current
if timeout
@waiting[fiber] = current_time + timeout
begin
Fiber.yield
ensure
# Remove from @waiting in the case #unblock was called before the timeout expired:
@waiting.delete(fiber)
end
else
@blocking[fiber] = true
begin
Fiber.yield
ensure
@blocking.delete(fiber)
end
end
end
# Used when synchronization wakes up a previously-blocked fiber
# (Thread::Mutex#unlock, Thread::Queue#push, ...).
# This might be called from another thread.
def unblock(blocker, fiber)
# $stderr.puts [__method__, blocker, fiber].inspect
# $stderr.puts blocker.backtrace.inspect
# $stderr.puts fiber.backtrace.inspect
@lock.synchronize do
@ready << fiber
end
io = @urgent.last
io.write_nonblock('.')
end
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
fiber.resume
return fiber
end
def address_resolve(hostname)
Thread.new do
Addrinfo.getaddrinfo(hostname, nil).map(&:ip_address).uniq
end.value
end
end
class IOBufferScheduler < Scheduler
EAGAIN = -Errno::EAGAIN::Errno
def io_read(io, buffer, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.read(io, maximum_size, offset)}
if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::READABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def io_write(io, buffer, length, offset)
total = 0
io.nonblock = true
while true
maximum_size = buffer.size - offset
result = blocking{buffer.write(io, maximum_size, offset)}
if result > 0
total += result
offset += result
break if total >= length
elsif result == 0
break
elsif result == EAGAIN
if length > 0
self.io_wait(io, IO::WRITABLE, nil)
else
return result
end
elsif result < 0
return result
end
end
return total
end
def blocking(&block)
Fiber.blocking(&block)
end
end
class BrokenUnblockScheduler < Scheduler
def unblock(blocker, fiber)
super
raise "Broken unblock!"
end
end
class SleepingUnblockScheduler < Scheduler
# This method is invoked when the thread is exiting.
def unblock(blocker, fiber)
super
# This changes the current thread state to `THREAD_RUNNING` which causes `thread_join_sleep` to hang.
sleep(0.1)
end
end
class SleepingBlockingScheduler < Scheduler
def kernel_sleep(duration = nil)
# Deliberaly sleep in a blocking state which can trigger a deadlock if the implementation is not correct.
Fiber.blocking{sleep 0.0001}
self.block(:sleep, duration)
return true
end
end