1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00
ruby--ruby/test/fiber/scheduler.rb
2020-07-20 13:20:58 +12:00

162 lines
2.7 KiB
Ruby

# frozen_string_literal: true
require 'fiber'
require 'socket'
begin
require 'io/nonblock'
rescue LoadError
# Ignore.
end
class Scheduler
def initialize
@readable = {}
@writable = {}
@waiting = {}
@blocking = []
@ios = ObjectSpace::WeakMap.new
end
attr :readable
attr :writable
attr :waiting
attr :blocking
def next_timeout
_fiber, timeout = @waiting.min_by{|key, value| value}
if timeout
offset = timeout - current_time
if offset < 0
return 0
else
return offset
end
end
end
def run
while @readable.any? or @writable.any? or @waiting.any?
# Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
readable&.each do |io|
@readable[io]&.resume
end
writable&.each do |io|
@writable[io]&.resume
end
if @waiting.any?
time = current_time
waiting = @waiting
@waiting = {}
waiting.each do |fiber, timeout|
if timeout <= time
fiber.resume
else
@waiting[fiber] = timeout
end
end
end
end
end
def for_fd(fd)
@ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
end
def wait_readable(io)
@readable[io] = Fiber.current
Fiber.yield
@readable.delete(io)
return true
end
def wait_readable_fd(fd)
wait_readable(
for_fd(fd)
)
end
def wait_writable(io)
@writable[io] = Fiber.current
Fiber.yield
@writable.delete(io)
return true
end
def wait_writable_fd(fd)
wait_writable(
for_fd(fd)
)
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def wait_sleep(duration = nil)
@waiting[Fiber.current] = current_time + duration
Fiber.yield
return true
end
def wait_any(io, events, duration)
unless (events & IO::WAIT_READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & IO::WAIT_WRITABLE).zero?
@writable[io] = Fiber.current
end
Fiber.yield
@readable.delete(io)
@writable.delete(io)
return true
end
def wait_for_single_fd(fd, events, duration)
wait_any(
for_fd(fd),
events,
duration
)
end
def enter_blocking_region
# puts "Enter blocking region: #{caller.first}"
end
def exit_blocking_region
# puts "Exit blocking region: #{caller.first}"
@blocking << caller.first
end
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
fiber.resume
return fiber
end
end