1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Thread scheduler for light weight concurrency.

This commit is contained in:
Samuel Williams 2020-05-14 22:10:55 +12:00 committed by GitHub
parent 336119dfc5
commit 0e3b0fcdba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
Notes: git 2020-05-14 19:11:27 +09:00
Merged: https://github.com/ruby/ruby/pull/3032

Merged-By: ioquatix <samuel@codeotaku.com>
28 changed files with 1018 additions and 309 deletions

92
cont.c
View file

@ -241,12 +241,17 @@ struct rb_fiber_struct {
*/
unsigned int transferred : 1;
/* Whether the fiber is allowed to implicitly yield. */
unsigned int blocking : 1;
struct coroutine_context context;
struct fiber_pool_stack stack;
};
static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0};
static ID fiber_initialize_keywords[2] = {0};
/*
* FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL
* if MAP_STACK is passed.
@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass)
}
static rb_fiber_t*
fiber_t_alloc(VALUE fiber_value)
fiber_t_alloc(VALUE fiber_value, unsigned int blocking)
{
rb_fiber_t *fiber;
rb_thread_t *th = GET_THREAD();
@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value)
fiber = ZALLOC(rb_fiber_t);
fiber->cont.self = fiber_value;
fiber->cont.type = FIBER_CONTEXT;
fiber->blocking = blocking;
cont_init(&fiber->cont, th);
fiber->cont.saved_ec.fiber_ptr = fiber;
@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value)
}
static VALUE
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool)
fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking)
{
rb_fiber_t *fiber = fiber_t_alloc(self);
rb_fiber_t *fiber = fiber_t_alloc(self, blocking);
fiber->first_proc = proc;
fiber->stack.base = NULL;
@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber)
sec->local_storage_recursive_hash_for_trace = Qnil;
}
static struct fiber_pool *
rb_fiber_pool_default(VALUE pool)
{
return &shared_fiber_pool;
}
/* :nodoc: */
static VALUE
rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat)
{
VALUE pool = Qnil;
VALUE blocking = Qtrue;
if (kw_splat != RB_NO_KEYWORDS) {
VALUE options = Qnil;
VALUE arguments[2] = {Qundef};
argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options);
rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments);
blocking = arguments[0];
pool = arguments[1];
}
return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking));
}
/* :nodoc: */
static VALUE
rb_fiber_initialize(int argc, VALUE* argv, VALUE self)
{
return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool);
return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p());
}
VALUE
rb_fiber_new(rb_block_call_func_t func, VALUE obj)
{
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool);
return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1);
}
static VALUE
rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat)
{
rb_thread_t * th = GET_THREAD();
VALUE scheduler = th->scheduler;
VALUE fiber = Qnil;
if (scheduler != Qnil) {
fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat);
} else {
rb_raise(rb_eRuntimeError, "No scheduler is available!");
}
return fiber;
}
static VALUE
rb_f_fiber(int argc, VALUE *argv, VALUE obj)
{
return rb_f_fiber_kw(argc, argv, rb_keyword_given_p());
}
static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt);
@ -1820,6 +1875,10 @@ rb_fiber_start(void)
VM_ASSERT(th->ec == ruby_current_execution_context_ptr);
VM_ASSERT(FIBER_RESUMED_P(fiber));
if (fiber->blocking) {
th->blocking += 1;
}
EC_PUSH_TAG(th->ec);
if ((state = EC_EXEC_TAG()) == TAG_NONE) {
rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont;
@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th)
fiber->cont.type = FIBER_CONTEXT;
fiber->cont.saved_ec.fiber_ptr = fiber;
fiber->cont.saved_ec.thread_ptr = th;
fiber->blocking = 1;
fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */
th->ec = &fiber->cont.saved_ec;
}
@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
}
}
VM_ASSERT(FIBER_RUNNABLE_P(fiber));
if (is_resume) {
fiber->prev = fiber_current();
}
VM_ASSERT(FIBER_RUNNABLE_P(fiber));
if (fiber_current()->blocking) {
th->blocking -= 1;
}
cont->argc = argc;
cont->kw_splat = kw_splat;
@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
fiber_stack_release(fiber);
}
if (fiber_current()->blocking) {
th->blocking += 1;
}
RUBY_VM_CHECK_INTS(th->ec);
EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil);
@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS);
}
VALUE
rb_fiber_blocking_p(VALUE fiber)
{
return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue;
}
void
rb_fiber_close(rb_fiber_t *fiber)
{
@ -2442,6 +2516,9 @@ Init_Cont(void)
fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size);
fiber_initialize_keywords[0] = rb_intern_const("blocking");
fiber_initialize_keywords[1] = rb_intern_const("pool");
char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS");
if (fiber_shared_fiber_pool_free_stacks) {
shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks);
@ -2452,11 +2529,14 @@ Init_Cont(void)
rb_eFiberError = rb_define_class("FiberError", rb_eStandardError);
rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1);
rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1);
rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0);
rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1);
rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1);
rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0);
rb_define_alias(rb_cFiber, "inspect", "to_s");
rb_define_global_function("Fiber", rb_f_fiber, -1);
#ifdef RB_EXPERIMENTAL_FIBER_POOL
rb_cFiberPool = rb_define_class("Pool", rb_cFiber);
rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc);

137
doc/fiber.rdoc Normal file
View file

@ -0,0 +1,137 @@
= Fiber
Fiber is a flow-control primitive which enable cooperative scheduling. This is
in contrast to threads which can be preemptively scheduled at any time. While
having a similar memory profiles, the cost of context switching fibers can be
significantly less than threads as it does not involve a system call.
== Design
=== Scheduler
The per-thread fiber scheduler interface is used to intercept blocking
operations. A typical implementation would be a wrapper for a gem like
EventMachine or Async. This design provides separation of concerns between the
event loop implementation and application code. It also allows for layered
schedulers which can perform instrumentation.
class Scheduler
# Wait for the given file descriptor to become readable.
def wait_readable(io)
end
# Wait for the given file descriptor to become writable.
def wait_writable(io)
end
# Wait for the given file descriptor to match the specified events within
# the specified timeout.
# @param event [Integer] a bit mask of +IO::WAIT_READABLE+,
# `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`.
# @param timeout [#to_f] the amount of time to wait for the event.
def wait_any(io, events, timeout)
end
# Sleep the current task for the specified duration, or forever if not
# specified.
# @param duration [#to_f] the amount of time to sleep.
def wait_sleep(duration = nil)
end
# The Ruby virtual machine is going to enter a system level blocking
# operation.
def enter_blocking_region
end
# The Ruby virtual machine has completed the system level blocking
# operation.
def exit_blocking_region
end
# Intercept the creation of a non-blocking fiber.
def fiber(&block)
Fiber.new(blocking: false, &block)
end
# Invoked when the thread exits.
def run
# Implement event loop here.
end
end
On CRuby, the following extra methods need to be implemented to handle the
public C interface:
class Scheduler
# Wrapper for rb_wait_readable(int) C function.
def wait_readable_fd(fd)
wait_readable(::IO.from_fd(fd, autoclose: false))
end
# Wrapper for rb_wait_readable(int) C function.
def wait_writable_fd(fd)
wait_writable(::IO.from_fd(fd, autoclose: false))
end
# Wrapper for rb_wait_for_single_fd(int) C function.
def wait_for_single_fd(fd, events, duration)
wait_any(::IO.from_fd(fd, autoclose: false), events, duration)
end
end
=== Non-blocking Fibers
By default fibers are blocking. Non-blocking fibers may invoke specific
scheduler hooks when a blocking operation occurs, and these hooks may introduce
context switching points.
Fiber.new(blocking: false) do
puts Fiber.current.blocking? # false
# May invoke `Thread.scheduler&.wait_readable`.
io.read(...)
# May invoke `Thread.scheduler&.wait_writable`.
io.write(...)
# Will invoke `Thread.scheduler&.wait_sleep`.
sleep(n)
end.resume
We also introduce a new method which simplifes the creation of these
non-blocking fibers:
Fiber do
puts Fiber.current.blocking? # false
end
The purpose of this method is to allow the scheduler to internally decide the
policy for when to start the fiber, and whether to use symmetric or asymmetric
fibers.
=== Mutex
Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex
is held by that thread. On +Mutex#lock+, fiber switching via the scheduler
is disabled and operations become blocking for all fibers of the same +Thread+.
On +Mutex#unlock+, the scheduler is enabled again.
mutex = Mutex.new
puts Thread.current.blocking? # 1 (true)
Fiber.new(blocking: false) do
puts Thread.current.blocking? # false
mutex.synchronize do
puts Thread.current.blocking? # (1) true
end
puts Thread.current.blocking? # false
end.resume
=== Non-blocking I/O
By default, I/O is non-blocking. Not all operating systems support non-blocking
I/O. Windows is a notable example where socket I/O can be non-blocking but pipe
I/O is blocking. Provided that there *is* a scheduler and the current thread *is
non-blocking*, the operation will invoke the scheduler.

View file

@ -2,7 +2,6 @@
#include <time.h>
int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
static VALUE sym_wait_readable, sym_wait_writable;
#if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL)
@ -1429,10 +1428,7 @@ make_io_for_unix_rights(VALUE ctl, struct cmsghdr *cmh, char *msg_end)
if (fstat(fd, &stbuf) == -1)
rb_raise(rb_eSocket, "invalid fd in SCM_RIGHTS");
rb_update_max_fd(fd);
if (rsock_cmsg_cloexec_state < 0)
rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd);
if (rsock_cmsg_cloexec_state == 0 || fd <= 2)
rb_maygvl_fd_fix_cloexec(fd);
rb_maygvl_fd_fix_cloexec(fd);
if (S_ISSOCK(stbuf.st_mode))
io = rsock_init_sock(rb_obj_alloc(rb_cSocket), fd);
else

View file

@ -408,84 +408,30 @@ rsock_write_nonblock(VALUE sock, VALUE str, VALUE ex)
}
#endif /* MSG_DONTWAIT_RELIABLE */
/* returns true if SOCK_CLOEXEC is supported */
int rsock_detect_cloexec(int fd)
static int
rsock_socket0(int domain, int type, int proto)
{
#ifdef SOCK_CLOEXEC
int flags = fcntl(fd, F_GETFD);
if (flags == -1)
rb_bug("rsock_detect_cloexec: fcntl(%d, F_GETFD) failed: %s", fd, strerror(errno));
if (flags & FD_CLOEXEC)
return 1;
type |= SOCK_CLOEXEC;
#endif
return 0;
}
#ifdef SOCK_CLOEXEC
static int
rsock_socket0(int domain, int type, int proto)
{
int ret;
static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
#ifdef SOCK_NONBLOCK
type |= SOCK_NONBLOCK;
#endif
if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto);
if (ret >= 0) {
if (ret <= 2)
goto fix_cloexec;
goto update_max_fd;
}
}
else if (cloexec_state < 0) { /* usually runs once only for detection */
ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto);
if (ret >= 0) {
cloexec_state = rsock_detect_cloexec(ret);
if (cloexec_state == 0 || ret <= 2)
goto fix_cloexec;
goto update_max_fd;
}
else if (ret == -1 && errno == EINVAL) {
/* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */
ret = socket(domain, type, proto);
if (ret != -1) {
cloexec_state = 0;
/* fall through to fix_cloexec */
}
}
}
else { /* cloexec_state == 0 */
ret = socket(domain, type, proto);
}
if (ret == -1)
int result = socket(domain, type, proto);
if (result == -1)
return -1;
fix_cloexec:
rb_maygvl_fd_fix_cloexec(ret);
if (RSOCK_NONBLOCK_DEFAULT) {
rsock_make_fd_nonblock(ret);
}
update_max_fd:
rb_update_max_fd(ret);
return ret;
rb_fd_fix_cloexec(result);
#ifndef SOCK_NONBLOCK
rsock_make_fd_nonblock(result);
#endif
return result;
}
#else /* !SOCK_CLOEXEC */
static int
rsock_socket0(int domain, int type, int proto)
{
int ret = socket(domain, type, proto);
if (ret == -1)
return -1;
rb_fd_fix_cloexec(ret);
if (RSOCK_NONBLOCK_DEFAULT) {
rsock_make_fd_nonblock(ret);
}
return ret;
}
#endif /* !SOCK_CLOEXEC */
int
rsock_socket(int domain, int type, int proto)
@ -637,6 +583,10 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks)
void
rsock_make_fd_nonblock(int fd)
{
#ifdef _WIN32
return;
#endif
int flags;
#ifdef F_GETFL
flags = fcntl(fd, F_GETFL);
@ -653,56 +603,34 @@ rsock_make_fd_nonblock(int fd)
}
static int
cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len,
int nonblock)
cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len)
{
int ret;
socklen_t len0 = 0;
#ifdef HAVE_ACCEPT4
static int try_accept4 = 1;
#endif
if (RSOCK_NONBLOCK_DEFAULT) {
nonblock = 1;
}
if (address_len) len0 = *address_len;
#ifdef HAVE_ACCEPT4
if (try_accept4) {
int flags = 0;
#ifdef SOCK_CLOEXEC
flags |= SOCK_CLOEXEC;
#endif
int flags = SOCK_CLOEXEC;
#ifdef SOCK_NONBLOCK
if (nonblock) {
flags |= SOCK_NONBLOCK;
}
flags |= SOCK_NONBLOCK;
#endif
ret = accept4(socket, address, address_len, flags);
/* accept4 is available since Linux 2.6.28, glibc 2.10. */
if (ret != -1) {
if (ret <= 2)
rb_maygvl_fd_fix_cloexec(ret);
int result = accept4(socket, address, address_len, flags);
if (result == -1) return -1;
#ifndef SOCK_NONBLOCK
if (nonblock) {
rsock_make_fd_nonblock(ret);
}
rsock_make_fd_nonblock(result);
#endif
if (address_len && len0 < *address_len) *address_len = len0;
return ret;
}
if (errno != ENOSYS) {
return -1;
}
try_accept4 = 0;
}
#else
int result = accept(socket, address, address_len);
if (result == -1) return -1;
rb_maygvl_fd_fix_cloexec(result);
rsock_make_fd_nonblock(result);
#endif
ret = accept(socket, address, address_len);
if (ret == -1) return -1;
if (address_len && len0 < *address_len) *address_len = len0;
rb_maygvl_fd_fix_cloexec(ret);
if (nonblock) {
rsock_make_fd_nonblock(ret);
}
return ret;
return result;
}
VALUE
@ -712,7 +640,7 @@ rsock_s_accept_nonblock(VALUE klass, VALUE ex, rb_io_t *fptr,
int fd2;
rb_io_set_nonblock(fptr);
fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1);
fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len);
if (fd2 < 0) {
int e = errno;
switch (e) {
@ -744,7 +672,7 @@ static VALUE
accept_blocking(void *data)
{
struct accept_arg *arg = data;
return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len, 0);
return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len);
}
VALUE

View file

@ -36,13 +36,7 @@
# if defined(_MSC_VER)
# undef HAVE_TYPE_STRUCT_SOCKADDR_DL
# endif
/*
* FIXME: failures if we make nonblocking the default
* [ruby-core:89973] [ruby-core:89976] [ruby-core:89977] [Bug #14968]
*/
# define RSOCK_NONBLOCK_DEFAULT (0)
#else
# define RSOCK_NONBLOCK_DEFAULT (0)
# include <sys/socket.h>
# include <netinet/in.h>
# ifdef HAVE_NETINET_IN_SYSTM_H
@ -260,7 +254,6 @@ typedef union {
#define INET_SOCKS 2
extern int rsock_do_not_reverse_lookup;
extern int rsock_cmsg_cloexec_state;
#define FMODE_NOREVLOOKUP 0x100
/* common socket families only */

View file

@ -168,93 +168,47 @@ pair_yield(VALUE pair)
#endif
#if defined HAVE_SOCKETPAIR
static int
rsock_socketpair0(int domain, int type, int protocol, int descriptors[2])
{
#ifdef SOCK_CLOEXEC
static int
rsock_socketpair0(int domain, int type, int protocol, int sv[2])
{
int ret;
static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */
static const int default_flags = SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT;
type |= SOCK_CLOEXEC;
#endif
if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */
ret = socketpair(domain, type|default_flags, protocol, sv);
if (ret == 0 && (sv[0] <= 2 || sv[1] <= 2)) {
goto fix_cloexec; /* highly unlikely */
}
goto update_max_fd;
}
else if (cloexec_state < 0) { /* usually runs once only for detection */
ret = socketpair(domain, type|default_flags, protocol, sv);
if (ret == 0) {
cloexec_state = rsock_detect_cloexec(sv[0]);
if ((cloexec_state == 0) || (sv[0] <= 2 || sv[1] <= 2))
goto fix_cloexec;
goto update_max_fd;
}
else if (ret == -1 && errno == EINVAL) {
/* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */
ret = socketpair(domain, type, protocol, sv);
if (ret != -1) {
/* The reason of EINVAL may be other than SOCK_CLOEXEC.
* So disable SOCK_CLOEXEC only if socketpair() succeeds without SOCK_CLOEXEC.
* Ex. Socket.pair(:UNIX, 0xff) fails with EINVAL.
*/
cloexec_state = 0;
}
}
}
else { /* cloexec_state == 0 */
ret = socketpair(domain, type, protocol, sv);
}
if (ret == -1) {
#ifdef SOCK_NONBLOCK
type |= SOCK_NONBLOCK;
#endif
int result = socketpair(domain, type, protocol, descriptors);
if (result == -1)
return -1;
}
fix_cloexec:
rb_maygvl_fd_fix_cloexec(sv[0]);
rb_maygvl_fd_fix_cloexec(sv[1]);
if (RSOCK_NONBLOCK_DEFAULT) {
rsock_make_fd_nonblock(sv[0]);
rsock_make_fd_nonblock(sv[1]);
}
#ifndef SOCK_CLOEXEC
rb_fd_fix_cloexec(descriptors[0]);
rb_fd_fix_cloexec(descriptors[1]);
#endif
update_max_fd:
rb_update_max_fd(sv[0]);
rb_update_max_fd(sv[1]);
#ifndef SOCK_NONBLOCK
rsock_make_fd_nonblock(descriptors[0]);
rsock_make_fd_nonblock(descriptors[1]);
#endif
return ret;
return result;
}
#else /* !SOCK_CLOEXEC */
static int
rsock_socketpair0(int domain, int type, int protocol, int sv[2])
{
int ret = socketpair(domain, type, protocol, sv);
if (ret == -1)
return -1;
rb_fd_fix_cloexec(sv[0]);
rb_fd_fix_cloexec(sv[1]);
if (RSOCK_NONBLOCK_DEFAULT) {
rsock_make_fd_nonblock(sv[0]);
rsock_make_fd_nonblock(sv[1]);
}
return ret;
}
#endif /* !SOCK_CLOEXEC */
static int
rsock_socketpair(int domain, int type, int protocol, int sv[2])
rsock_socketpair(int domain, int type, int protocol, int descriptors[2])
{
int ret;
int result;
ret = rsock_socketpair0(domain, type, protocol, sv);
if (ret < 0 && rb_gc_for_fd(errno)) {
ret = rsock_socketpair0(domain, type, protocol, sv);
result = rsock_socketpair0(domain, type, protocol, descriptors);
if (result < 0 && rb_gc_for_fd(errno)) {
result = rsock_socketpair0(domain, type, protocol, descriptors);
}
return ret;
return result;
}
/*

View file

@ -455,11 +455,7 @@ retry:
#endif
rb_update_max_fd(fd);
if (rsock_cmsg_cloexec_state < 0)
rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd);
if (rsock_cmsg_cloexec_state == 0 || fd <= 2)
rb_maygvl_fd_fix_cloexec(fd);
rb_maygvl_fd_fix_cloexec(fd);
if (klass == Qnil)
return INT2FIX(fd);

View file

@ -28,6 +28,7 @@ RBIMPL_SYMBOL_EXPORT_BEGIN()
/* cont.c */
VALUE rb_fiber_new(rb_block_call_func_t, VALUE);
VALUE rb_fiber_new_kw(rb_block_call_func_t, VALUE, int kw_splat);
VALUE rb_fiber_resume(VALUE fib, int argc, const VALUE *argv);
VALUE rb_fiber_resume_kw(VALUE fib, int argc, const VALUE *argv, int kw_splat);
VALUE rb_fiber_yield(int argc, const VALUE *argv);

View file

@ -71,6 +71,10 @@ VALUE rb_mutex_unlock(VALUE mutex);
VALUE rb_mutex_sleep(VALUE self, VALUE timeout);
VALUE rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg);
VALUE rb_thread_scheduler_get(VALUE);
VALUE rb_thread_scheduler_set(VALUE, VALUE);
VALUE rb_current_thread_scheduler(void);
RBIMPL_SYMBOL_EXPORT_END()
#endif /* RBIMPL_INTERN_THREAD_H */

113
io.c
View file

@ -177,15 +177,6 @@ off_t __syscall(quad_t number, ...);
#define rename(f, t) rb_w32_urename((f), (t))
#endif
#if defined(_WIN32)
# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
#elif defined(O_NONBLOCK)
/* disabled for [Bug #15356] (Rack::Deflater + rails) failure: */
# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
#else /* any platforms where O_NONBLOCK does not exist? */
# define RUBY_PIPE_NONBLOCK_DEFAULT (0)
#endif
VALUE rb_cIO;
VALUE rb_eEOFError;
VALUE rb_eIOError;
@ -406,44 +397,37 @@ rb_fd_set_nonblock(int fd)
}
int
rb_cloexec_pipe(int fildes[2])
rb_cloexec_pipe(int descriptors[2])
{
int ret;
#if defined(HAVE_PIPE2)
static int try_pipe2 = 1;
if (try_pipe2) {
ret = pipe2(fildes, O_CLOEXEC | RUBY_PIPE_NONBLOCK_DEFAULT);
if (ret != -1)
return ret;
/* pipe2 is available since Linux 2.6.27, glibc 2.9. */
if (errno == ENOSYS) {
try_pipe2 = 0;
ret = pipe(fildes);
}
}
else {
ret = pipe(fildes);
}
#ifdef HAVE_PIPE2
int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK);
#else
ret = pipe(fildes);
int result = pipe(descriptors);
#endif
if (ret < 0) return ret;
if (result < 0)
return result;
#ifdef __CYGWIN__
if (ret == 0 && fildes[1] == -1) {
close(fildes[0]);
fildes[0] = -1;
errno = ENFILE;
return -1;
if (ret == 0 && descriptors[1] == -1) {
close(descriptors[0]);
descriptors[0] = -1;
errno = ENFILE;
return -1;
}
#endif
rb_maygvl_fd_fix_cloexec(fildes[0]);
rb_maygvl_fd_fix_cloexec(fildes[1]);
if (RUBY_PIPE_NONBLOCK_DEFAULT) {
rb_fd_set_nonblock(fildes[0]);
rb_fd_set_nonblock(fildes[1]);
}
return ret;
#ifndef HAVE_PIPE2
rb_maygvl_fd_fix_cloexec(descriptors[0]);
rb_maygvl_fd_fix_cloexec(descriptors[1]);
#ifndef _WIN32
rb_fd_set_nonblock(descriptors[0]);
rb_fd_set_nonblock(descriptors[1]);
#endif
#endif
return result;
}
int
@ -1270,6 +1254,12 @@ io_fflush(rb_io_t *fptr)
int
rb_io_wait_readable(int f)
{
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
return RTEST(result);
}
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@ -1294,6 +1284,12 @@ rb_io_wait_readable(int f)
int
rb_io_wait_writable(int f)
{
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
return RTEST(result);
}
io_fd_check_closed(f);
switch (errno) {
case EINTR:
@ -10897,6 +10893,23 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp)
return FALSE;
}
struct wait_for_single_fd {
VALUE scheduler;
int fd;
short events;
VALUE result;
};
void * rb_thread_scheduler_wait_for_single_fd(void * _args) {
struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil);
return NULL;
}
#if USE_POLL
# define IOWAIT_SYSCALL "poll"
STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN);
@ -10904,6 +10917,13 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT);
static int
nogvl_wait_for_single_fd(int fd, short events)
{
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
return RTEST(args.result);
}
struct pollfd fds;
fds.fd = fd;
@ -10916,6 +10936,13 @@ nogvl_wait_for_single_fd(int fd, short events)
static int
nogvl_wait_for_single_fd(int fd, short events)
{
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events};
rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args);
return RTEST(args.result);
}
rb_fdset_t fds;
int ret;
@ -13283,6 +13310,10 @@ Init_IO(void)
rb_cIO = rb_define_class("IO", rb_cObject);
rb_include_module(rb_cIO, rb_mEnumerable);
rb_define_const(rb_cIO, "WAIT_READABLE", INT2NUM(RB_WAITFD_IN));
rb_define_const(rb_cIO, "WAIT_PRIORITY", INT2NUM(RB_WAITFD_PRI));
rb_define_const(rb_cIO, "WAIT_WRITABLE", INT2NUM(RB_WAITFD_OUT));
/* exception to wait for reading. see IO.select. */
rb_mWaitReadable = rb_define_module_under(rb_cIO, "WaitReadable");
/* exception to wait for writing. see IO.select. */

View file

@ -4892,9 +4892,14 @@ rb_f_spawn(int argc, VALUE *argv, VALUE _)
static VALUE
rb_f_sleep(int argc, VALUE *argv, VALUE _)
{
time_t beg, end;
VALUE scheduler = rb_current_thread_scheduler();
beg = time(0);
if (scheduler != Qnil) {
VALUE result = rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv);
return RTEST(result);
}
time_t beg = time(0);
if (argc == 0) {
rb_thread_sleep_forever();
}
@ -4903,7 +4908,7 @@ rb_f_sleep(int argc, VALUE *argv, VALUE _)
rb_thread_wait_for(rb_time_interval(argv[0]));
}
end = time(0) - beg;
time_t end = time(0) - beg;
return INT2FIX(end);
}

View file

@ -24,7 +24,7 @@ describe "BasicSocket#read_nonblock" do
platform_is :linux do
it 'does not set the IO in nonblock mode' do
require 'io/nonblock'
@r.should_not.nonblock?
@r.nonblock = false
IO.select([@r], nil, nil, 2)
@r.read_nonblock(3).should == "aaa"
@r.should_not.nonblock?
@ -34,7 +34,7 @@ describe "BasicSocket#read_nonblock" do
platform_is_not :linux, :windows do
it 'sets the IO in nonblock mode' do
require 'io/nonblock'
@r.should_not.nonblock?
@r.nonblock = false
IO.select([@r], nil, nil, 2)
@r.read_nonblock(3).should == "aaa"
@r.should.nonblock?

View file

@ -25,7 +25,7 @@ describe "BasicSocket#write_nonblock" do
platform_is :linux do
it 'does not set the IO in nonblock mode' do
require 'io/nonblock'
@w.should_not.nonblock?
@w.nonblock = false
@w.write_nonblock("aaa").should == 3
@w.should_not.nonblock?
end
@ -34,7 +34,7 @@ describe "BasicSocket#write_nonblock" do
platform_is_not :linux, :windows do
it 'sets the IO in nonblock mode' do
require 'io/nonblock'
@w.should_not.nonblock?
@w.nonblock = false
@w.write_nonblock("aaa").should == 3
@w.should.nonblock?
end

View file

@ -347,51 +347,6 @@ class TestFiber < Test::Unit::TestCase
EOS
end
def invoke_rec script, vm_stack_size, machine_stack_size, use_length = true
env = {}
env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size
env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size
out = Dir.mktmpdir("test_fiber") {|tmpdir|
out, err, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, chdir: tmpdir, timeout: 30)
assert(!status.signaled?, FailDesc[status, nil, err])
out
}
use_length ? out.length : out
end
def test_stack_size
skip 'too unstable on riscv' if RUBY_PLATFORM =~ /riscv/
h_default = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', nil, nil, false))
h_0 = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 0, 0, false))
h_large = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 1024 * 1024 * 5, 1024 * 1024 * 10, false))
assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size])
assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size])
assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size])
assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size])
# check VM machine stack size
script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume'
size_default = invoke_rec script, nil, nil
assert_operator(size_default, :>, 0)
size_0 = invoke_rec script, 0, nil
assert_operator(size_default, :>, size_0)
size_large = invoke_rec script, 1024 * 1024 * 5, nil
assert_operator(size_default, :<, size_large)
return if /mswin|mingw/ =~ RUBY_PLATFORM
# check machine stack size
# Note that machine stack size may not change size (depend on OSs)
script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume'
vm_stack_size = 1024 * 1024
size_default = invoke_rec script, vm_stack_size, nil
size_0 = invoke_rec script, vm_stack_size, 0
assert_operator(size_default, :>=, size_0)
size_large = invoke_rec script, vm_stack_size, 1024 * 1024 * 10
assert_operator(size_default, :<=, size_large)
end
def test_separate_lastmatch
bug7678 = '[ruby-core:51331]'
/a/ =~ "a"

81
test/ruby/test_stack.rb Normal file
View file

@ -0,0 +1,81 @@
# frozen_string_literal: false
require 'test/unit'
require 'tmpdir'
class TestStack < Test::Unit::TestCase
LARGE_VM_STACK_SIZE = 1024*1024*5
LARGE_MACHINE_STACK_SIZE = 1024*1024*10
def initialize(*)
super
@h_default = nil
@h_0 = nil
@h_large = nil
end
def invoke_ruby script, vm_stack_size: nil, machine_stack_size: nil
env = {}
env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size
env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size
stdout, stderr, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, timeout: 30)
assert(!status.signaled?, FailDesc[status, nil, stderr])
return stdout
end
def h_default
@h_default ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS'))
end
def h_0
@h_0 ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS',
vm_stack_size: 0,
machine_stack_size: 0
))
end
def h_large
@h_large ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS',
vm_stack_size: LARGE_VM_STACK_SIZE,
machine_stack_size: LARGE_MACHINE_STACK_SIZE
))
end
def test_relative_stack_sizes
assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size])
assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size])
assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size])
assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size])
end
def test_vm_stack_size
script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume'
size_default = invoke_ruby(script).bytesize
assert_operator(size_default, :>, 0)
size_0 = invoke_ruby(script, vm_stack_size: 0).bytesize
assert_operator(size_default, :>, size_0)
size_large = invoke_ruby(script, vm_stack_size: LARGE_VM_STACK_SIZE).bytesize
assert_operator(size_default, :<, size_large)
end
# Depending on OS, machine stack size may not change size.
def test_machine_stack_size
return if /mswin|mingw/ =~ RUBY_PLATFORM
script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume'
vm_stack_size = 1024 * 1024
size_default = invoke_ruby(script, vm_stack_size: vm_stack_size).bytesize
size_0 = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: 0).bytesize
assert_operator(size_default, :>=, size_0)
size_large = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: LARGE_MACHINE_STACK_SIZE).bytesize
assert_operator(size_default, :<=, size_large)
end
end

53
test/scheduler/http.rb Executable file
View file

@ -0,0 +1,53 @@
require 'benchmark'
TOPICS = ["cats", "dogs", "pigs", "skeletons", "zombies", "ocelots", "villagers", "pillagers"]
require 'net/http'
require 'uri'
require 'json'
require_relative 'scheduler'
def fetch_topics(topics)
responses = {}
topics.each do |topic|
Fiber.new(blocking: Fiber.current.blocking?) do
uri = URI("https://www.google.com/search?q=#{topic}")
responses[topic] = Net::HTTP.get(uri).scan(topic).size
end.resume
end
Thread.scheduler&.run
return responses
end
def sweep(repeats: 3, **options)
times = (1..8).map do |i|
$stderr.puts "Measuring #{i} topic(s)..."
topics = TOPICS[0...i]
Thread.new do
Benchmark.realtime do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
repeats.times do
Fiber.new(**options) do
pp fetch_topics(topics)
end.resume
scheduler.run
end
end
end.value / repeats
end
puts options.inspect
puts JSON.dump(times.map{|value| value.round(3)})
end
sweep(blocking: true)
sweep(blocking: false)

163
test/scheduler/scheduler.rb Normal file
View file

@ -0,0 +1,163 @@
# frozen_string_literal: true
require 'fiber'
begin
require 'io/nonblock'
rescue LoadError
# Ignore.
end
class Scheduler
def initialize
@readable = {}
@writable = {}
@waiting = {}
@blocking = []
@ios = ObjectSpace::WeakMap.new
end
attr :fiber
attr :readable
attr :writable
attr :waiting
attr :blocking
def next_timeout
fiber, timeout = @waiting.min_by{|key, value| value}
if timeout
offset = timeout - current_time
if offset < 0
return 0
else
return offset
end
end
end
def run
while @readable.any? or @writable.any? or @waiting.any?
# Can only handle file descriptors up to 1024...
readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout)
# puts "readable: #{readable}" if readable&.any?
# puts "writable: #{writable}" if writable&.any?
readable&.each do |io|
@readable[io]&.resume
end
writable&.each do |io|
@writable[io]&.resume
end
if @waiting.any?
time = current_time
waiting = @waiting
@waiting = {}
waiting.each do |fiber, timeout|
if timeout <= time
fiber.resume
else
@waiting[fiber] = timeout
end
end
end
end
end
def for_fd(fd)
@ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
end
def wait_readable(io)
@readable[io] = Fiber.current
Fiber.yield
@readable.delete(io)
return true
end
def wait_readable_fd(fd)
wait_readable(
for_fd(fd)
)
end
def wait_writable(io)
@writable[io] = Fiber.current
Fiber.yield
@writable.delete(io)
return true
end
def wait_writable_fd(fd)
wait_writable(
for_fd(fd)
)
end
def current_time
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def wait_sleep(duration = nil)
@waiting[Fiber.current] = current_time + duration
Fiber.yield
return true
end
def wait_any(io, events, duration)
unless (events & IO::WAIT_READABLE).zero?
@readable[io] = Fiber.current
end
unless (events & IO::WAIT_WRITABLE).zero?
@writable[io] = Fiber.current
end
Fiber.yield
@readable.delete(io)
@writable.delete(io)
return true
end
def wait_for_single_fd(fd, events, duration)
wait_any(
for_fd(fd),
events,
duration
)
end
def enter_blocking_region
# puts "Enter blocking region: #{caller.first}"
end
def exit_blocking_region
# puts "Exit blocking region: #{caller.first}"
@blocking << caller.first
end
def fiber(&block)
fiber = Fiber.new(blocking: false, &block)
fiber.resume
return fiber
end
end

View file

@ -0,0 +1,45 @@
# frozen_string_literal: true
require 'test/unit'
require 'socket'
require_relative 'scheduler'
class TestSchedulerEnumerator < Test::Unit::TestCase
MESSAGE = "Hello World"
def test_read_characters
skip unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
skip unless i.nonblock? && o.nonblock?
message = String.new
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
e = i.to_enum(:each_char)
Fiber do
o.write("Hello World")
o.close
end
Fiber do
begin
while c = e.next
message << c
end
rescue StopIteration
# Ignore.
end
i.close
end
end
thread.join
assert_equal(MESSAGE, message)
end
end

View file

@ -0,0 +1,29 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
class TestSchedulerFiber < Test::Unit::TestCase
def test_fiber_without_scheduler
# Cannot create fiber without scheduler.
assert_raise RuntimeError do
Fiber do
end
end
end
def test_fiber_blocking
scheduler = Scheduler.new
thread = Thread.new do
Thread.current.scheduler = scheduler
# Close is always a blocking operation.
IO.pipe.each(&:close)
end
thread.join
assert_not_empty scheduler.blocking
assert_match /test_fiber.rb:\d+:in `close'/, scheduler.blocking.last
end
end

View file

@ -0,0 +1,28 @@
# frozen_string_literal: true
require 'net/http'
require 'uri'
require 'openssl'
require 'test/unit'
require_relative 'scheduler'
class TestSchedulerHTTP < Test::Unit::TestCase
def test_get
Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber do
uri = URI("https://www.ruby-lang.org/en/")
http = Net::HTTP.new uri.host, uri.port
http.use_ssl = true
http.verify_mode = OpenSSL::SSL::VERIFY_NONE
body = http.get(uri.path).body
assert !body.empty?
end
end.join
end
end

35
test/scheduler/test_io.rb Normal file
View file

@ -0,0 +1,35 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
class TestSchedulerIO < Test::Unit::TestCase
MESSAGE = "Hello World"
def test_read
skip unless defined?(UNIXSocket)
i, o = UNIXSocket.pair
skip unless i.nonblock? && o.nonblock?
message = nil
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber do
message = i.read(20)
i.close
end
Fiber do
o.write("Hello World")
o.close
end
end
thread.join
assert_equal MESSAGE, message
end
end

View file

@ -0,0 +1,47 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
class TestSchedulerMutex < Test::Unit::TestCase
def test_mutex_synchronize
mutex = Mutex.new
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber do
assert_equal Thread.scheduler, scheduler
mutex.synchronize do
assert_nil Thread.scheduler
end
end
end
thread.join
end
def test_mutex_deadlock
mutex = Mutex.new
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
Fiber do
assert_equal Thread.scheduler, scheduler
mutex.synchronize do
Fiber.yield
end
end
assert_raise ThreadError do
mutex.lock
end
end
thread.join
end
end

View file

@ -0,0 +1,30 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
class TestSchedulerSleep < Test::Unit::TestCase
ITEMS = [0, 1, 2, 3, 4]
def test_sleep
items = []
thread = Thread.new do
scheduler = Scheduler.new
Thread.current.scheduler = scheduler
5.times do |i|
Fiber do
sleep(i/100.0)
items << i
end
end
# Should be 5 fibers waiting:
assert_equal scheduler.waiting.size, 5
end
thread.join
assert_equal ITEMS, items
end
end

View file

@ -159,8 +159,6 @@ class TestSocket_BasicSocket < Test::Unit::TestCase
set_nb = true
buf = String.new
if ssock.respond_to?(:nonblock?)
assert_not_predicate(ssock, :nonblock?)
assert_not_predicate(csock, :nonblock?)
csock.nonblock = ssock.nonblock = false
# Linux may use MSG_DONTWAIT to avoid setting O_NONBLOCK

109
thread.c
View file

@ -109,6 +109,8 @@ static VALUE sym_immediate;
static VALUE sym_on_blocking;
static VALUE sym_never;
static ID id_wait_for_single_fd;
enum SLEEP_FLAGS {
SLEEP_DEADLOCKABLE = 0x1,
SLEEP_SPURIOUS_CHECK = 0x2
@ -708,6 +710,11 @@ thread_do_start(rb_thread_t *th)
else {
th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg);
}
VALUE scheduler = th->scheduler;
if (scheduler != Qnil) {
rb_funcall(scheduler, rb_intern("run"), 0);
}
}
void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec);
@ -1471,6 +1478,7 @@ rb_nogvl(void *(*func)(void *), void *data1,
rb_thread_t *th = rb_ec_thread_ptr(ec);
int saved_errno = 0;
VALUE ubf_th = Qfalse;
VALUE scheduler = th->scheduler;
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
ubf = ubf_select;
@ -1485,6 +1493,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
}
}
if (scheduler != Qnil) {
rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0);
}
BLOCKING_REGION(th, {
val = func(data1);
saved_errno = errno;
@ -1500,6 +1512,10 @@ rb_nogvl(void *(*func)(void *), void *data1,
thread_value(rb_thread_kill(ubf_th));
}
if (scheduler != Qnil) {
rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0);
}
errno = saved_errno;
return val;
@ -3574,6 +3590,63 @@ rb_thread_variables(VALUE thread)
return ary;
}
VALUE rb_thread_scheduler_get(VALUE thread)
{
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
return th->scheduler;
}
VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler)
{
rb_thread_t * th = rb_thread_ptr(thread);
VM_ASSERT(th);
th->scheduler = scheduler;
return th->scheduler;
}
/*
* call-seq:
* Thread.scheduler -> scheduler or nil
*
* Returns the current scheduler if scheduling operations are permitted.
*
*/
static VALUE
rb_thread_scheduler(VALUE klass)
{
return rb_current_thread_scheduler();
}
VALUE rb_current_thread_scheduler(void)
{
rb_thread_t * th = GET_THREAD();
VM_ASSERT(th);
if (th->blocking == 0)
return th->scheduler;
else
return Qnil;
}
static VALUE
rb_thread_blocking_p(VALUE thread)
{
unsigned blocking = rb_thread_ptr(thread)->blocking;
if (blocking == 0)
return Qfalse;
return INT2NUM(blocking);
}
/*
* call-seq:
* thr.thread_variable?(key) -> true or false
@ -4129,6 +4202,15 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
}
static VALUE
rb_thread_timeout(struct timeval *timeout) {
if (timeout) {
return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
}
return Qnil;
}
#ifdef USE_POLL
/* The same with linux kernel. TODO: make platform independent definition. */
@ -4155,6 +4237,14 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
struct waiting_fd wfd;
int state;
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
rb_thread_timeout(timeout)
);
return RTEST(result);
}
wfd.th = GET_THREAD();
wfd.fd = fd;
list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node);
@ -4287,8 +4377,16 @@ select_single_cleanup(VALUE ptr)
}
int
rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
{
VALUE scheduler = rb_current_thread_scheduler();
if (scheduler != Qnil) {
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
rb_thread_timeout(timeout)
);
return RTEST(result);
}
rb_fdset_t rfds, wfds, efds;
struct select_args args;
int r;
@ -4298,7 +4396,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv)
args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL;
args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL;
args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL;
args.tv = tv;
args.tv = timeout;
args.wfd.fd = fd;
args.wfd.th = GET_THREAD();
@ -5185,6 +5283,8 @@ Init_Thread(void)
sym_immediate = ID2SYM(rb_intern("immediate"));
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
id_wait_for_single_fd = rb_intern("wait_for_single_fd");
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
@ -5223,6 +5323,7 @@ Init_Thread(void)
rb_define_method(rb_cThread, "keys", rb_thread_keys, 0);
rb_define_method(rb_cThread, "priority", rb_thread_priority, 0);
rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1);
rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0);
rb_define_method(rb_cThread, "status", rb_thread_status, 0);
rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1);
rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2);
@ -5239,6 +5340,10 @@ Init_Thread(void)
rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1);
rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1);
rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0);
rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0);
rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1);
rb_define_method(rb_cThread, "name", rb_thread_getname, 0);
rb_define_method(rb_cThread, "name=", rb_thread_setname, 1);
rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0);

View file

@ -190,6 +190,8 @@ mutex_locked(rb_thread_t *th, VALUE self)
mutex->next_mutex = th->keeping_mutexes;
}
th->keeping_mutexes = mutex;
th->blocking += 1;
}
/*
@ -365,6 +367,8 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th)
struct sync_waiter *cur = 0, *next;
rb_mutex_t **th_mutex = &th->keeping_mutexes;
th->blocking -= 1;
mutex->th = 0;
list_for_each_safe(&mutex->waitq, cur, next, node) {
list_del_init(&cur->node);
@ -404,8 +408,9 @@ rb_mutex_unlock(VALUE self)
{
const char *err;
rb_mutex_t *mutex = mutex_ptr(self);
rb_thread_t *th = GET_THREAD();
err = rb_mutex_unlock_th(mutex, GET_THREAD());
err = rb_mutex_unlock_th(mutex, th);
if (err) rb_raise(rb_eThreadError, "%s", err);
return self;

10
vm.c
View file

@ -2620,6 +2620,8 @@ thread_mark(void *ptr)
RUBY_MARK_UNLESS_NULL(th->locking_mutex);
RUBY_MARK_UNLESS_NULL(th->name);
RUBY_MARK_UNLESS_NULL(th->scheduler);
RUBY_MARK_LEAVE("thread");
}
@ -2734,6 +2736,10 @@ th_init(rb_thread_t *th, VALUE self)
th->self = self;
rb_threadptr_root_fiber_setup(th);
/* All threads are blocking until a non-blocking fiber is scheduled */
th->blocking = 1;
th->scheduler = Qnil;
if (self == 0) {
size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE);
rb_ec_initialize_vm_stack(th->ec, ALLOC_N(VALUE, size), size);
@ -3294,12 +3300,14 @@ Init_VM(void)
vm->self = TypedData_Wrap_Struct(rb_cRubyVM, &vm_data_type, vm);
/* create main thread */
th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th);
th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th);
vm->main_thread = th;
vm->running_thread = th;
th->vm = vm;
th->top_wrapper = 0;
th->top_self = rb_vm_top_self();
rb_thread_set_current(th);
rb_vm_living_threads_insert(vm, th);

View file

@ -969,13 +969,15 @@ typedef struct rb_thread_struct {
rb_fiber_t *root_fiber;
rb_jmpbuf_t root_jmpbuf;
VALUE scheduler;
unsigned blocking;
/* misc */
VALUE name;
#ifdef USE_SIGALTSTACK
void *altstack;
#endif
} rb_thread_t;
typedef enum {