diff --git a/cont.c b/cont.c index 1ea9056248..a10e0580e6 100644 --- a/cont.c +++ b/cont.c @@ -241,12 +241,17 @@ struct rb_fiber_struct { */ unsigned int transferred : 1; + /* Whether the fiber is allowed to implicitly yield. */ + unsigned int blocking : 1; + struct coroutine_context context; struct fiber_pool_stack stack; }; static struct fiber_pool shared_fiber_pool = {NULL, NULL, 0, 0, 0, 0}; +static ID fiber_initialize_keywords[2] = {0}; + /* * FreeBSD require a first (i.e. addr) argument of mmap(2) is not NULL * if MAP_STACK is passed. @@ -1733,7 +1738,7 @@ fiber_alloc(VALUE klass) } static rb_fiber_t* -fiber_t_alloc(VALUE fiber_value) +fiber_t_alloc(VALUE fiber_value, unsigned int blocking) { rb_fiber_t *fiber; rb_thread_t *th = GET_THREAD(); @@ -1746,6 +1751,7 @@ fiber_t_alloc(VALUE fiber_value) fiber = ZALLOC(rb_fiber_t); fiber->cont.self = fiber_value; fiber->cont.type = FIBER_CONTEXT; + fiber->blocking = blocking; cont_init(&fiber->cont, th); fiber->cont.saved_ec.fiber_ptr = fiber; @@ -1763,9 +1769,9 @@ fiber_t_alloc(VALUE fiber_value) } static VALUE -fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool) +fiber_initialize(VALUE self, VALUE proc, struct fiber_pool * fiber_pool, unsigned int blocking) { - rb_fiber_t *fiber = fiber_t_alloc(self); + rb_fiber_t *fiber = fiber_t_alloc(self, blocking); fiber->first_proc = proc; fiber->stack.base = NULL; @@ -1793,17 +1799,66 @@ fiber_prepare_stack(rb_fiber_t *fiber) sec->local_storage_recursive_hash_for_trace = Qnil; } +static struct fiber_pool * +rb_fiber_pool_default(VALUE pool) +{ + return &shared_fiber_pool; +} + +/* :nodoc: */ +static VALUE +rb_fiber_initialize_kw(int argc, VALUE* argv, VALUE self, int kw_splat) +{ + VALUE pool = Qnil; + VALUE blocking = Qtrue; + + if (kw_splat != RB_NO_KEYWORDS) { + VALUE options = Qnil; + VALUE arguments[2] = {Qundef}; + + argc = rb_scan_args_kw(kw_splat, argc, argv, ":", &options); + rb_get_kwargs(options, fiber_initialize_keywords, 0, 2, arguments); + + blocking = arguments[0]; + pool = arguments[1]; + } + + return fiber_initialize(self, rb_block_proc(), rb_fiber_pool_default(pool), RTEST(blocking)); +} + /* :nodoc: */ static VALUE rb_fiber_initialize(int argc, VALUE* argv, VALUE self) { - return fiber_initialize(self, rb_block_proc(), &shared_fiber_pool); + return rb_fiber_initialize_kw(argc, argv, self, rb_keyword_given_p()); } VALUE rb_fiber_new(rb_block_call_func_t func, VALUE obj) { - return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), &shared_fiber_pool); + return fiber_initialize(fiber_alloc(rb_cFiber), rb_proc_new(func, obj), rb_fiber_pool_default(Qnil), 1); +} + +static VALUE +rb_f_fiber_kw(int argc, VALUE* argv, int kw_splat) +{ + rb_thread_t * th = GET_THREAD(); + VALUE scheduler = th->scheduler; + VALUE fiber = Qnil; + + if (scheduler != Qnil) { + fiber = rb_funcall_passing_block_kw(scheduler, rb_intern("fiber"), argc, argv, kw_splat); + } else { + rb_raise(rb_eRuntimeError, "No scheduler is available!"); + } + + return fiber; +} + +static VALUE +rb_f_fiber(int argc, VALUE *argv, VALUE obj) +{ + return rb_f_fiber_kw(argc, argv, rb_keyword_given_p()); } static void rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt); @@ -1820,6 +1875,10 @@ rb_fiber_start(void) VM_ASSERT(th->ec == ruby_current_execution_context_ptr); VM_ASSERT(FIBER_RESUMED_P(fiber)); + if (fiber->blocking) { + th->blocking += 1; + } + EC_PUSH_TAG(th->ec); if ((state = EC_EXEC_TAG()) == TAG_NONE) { rb_context_t *cont = &VAR_FROM_MEMORY(fiber)->cont; @@ -1892,6 +1951,7 @@ rb_threadptr_root_fiber_setup(rb_thread_t *th) fiber->cont.type = FIBER_CONTEXT; fiber->cont.saved_ec.fiber_ptr = fiber; fiber->cont.saved_ec.thread_ptr = th; + fiber->blocking = 1; fiber_status_set(fiber, FIBER_RESUMED); /* skip CREATED */ th->ec = &fiber->cont.saved_ec; } @@ -2044,11 +2104,15 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int } } + VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (is_resume) { fiber->prev = fiber_current(); } - VM_ASSERT(FIBER_RUNNABLE_P(fiber)); + if (fiber_current()->blocking) { + th->blocking -= 1; + } cont->argc = argc; cont->kw_splat = kw_splat; @@ -2060,6 +2124,10 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int fiber_stack_release(fiber); } + if (fiber_current()->blocking) { + th->blocking += 1; + } + RUBY_VM_CHECK_INTS(th->ec); EXEC_EVENT_HOOK(th->ec, RUBY_EVENT_FIBER_SWITCH, th->self, 0, 0, 0, Qnil); @@ -2073,6 +2141,12 @@ rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS); } +VALUE +rb_fiber_blocking_p(VALUE fiber) +{ + return (fiber_ptr(fiber)->blocking == 0) ? Qfalse : Qtrue; +} + void rb_fiber_close(rb_fiber_t *fiber) { @@ -2442,6 +2516,9 @@ Init_Cont(void) fiber_pool_initialize(&shared_fiber_pool, stack_size, FIBER_POOL_INITIAL_SIZE, vm_stack_size); + fiber_initialize_keywords[0] = rb_intern_const("blocking"); + fiber_initialize_keywords[1] = rb_intern_const("pool"); + char * fiber_shared_fiber_pool_free_stacks = getenv("RUBY_SHARED_FIBER_POOL_FREE_STACKS"); if (fiber_shared_fiber_pool_free_stacks) { shared_fiber_pool.free_stacks = atoi(fiber_shared_fiber_pool_free_stacks); @@ -2452,11 +2529,14 @@ Init_Cont(void) rb_eFiberError = rb_define_class("FiberError", rb_eStandardError); rb_define_singleton_method(rb_cFiber, "yield", rb_fiber_s_yield, -1); rb_define_method(rb_cFiber, "initialize", rb_fiber_initialize, -1); + rb_define_method(rb_cFiber, "blocking?", rb_fiber_blocking_p, 0); rb_define_method(rb_cFiber, "resume", rb_fiber_m_resume, -1); rb_define_method(rb_cFiber, "raise", rb_fiber_raise, -1); rb_define_method(rb_cFiber, "to_s", fiber_to_s, 0); rb_define_alias(rb_cFiber, "inspect", "to_s"); + rb_define_global_function("Fiber", rb_f_fiber, -1); + #ifdef RB_EXPERIMENTAL_FIBER_POOL rb_cFiberPool = rb_define_class("Pool", rb_cFiber); rb_define_alloc_func(rb_cFiberPool, fiber_pool_alloc); diff --git a/doc/fiber.rdoc b/doc/fiber.rdoc new file mode 100644 index 0000000000..d3c19a0d14 --- /dev/null +++ b/doc/fiber.rdoc @@ -0,0 +1,137 @@ += Fiber + +Fiber is a flow-control primitive which enable cooperative scheduling. This is +in contrast to threads which can be preemptively scheduled at any time. While +having a similar memory profiles, the cost of context switching fibers can be +significantly less than threads as it does not involve a system call. + +== Design + +=== Scheduler + +The per-thread fiber scheduler interface is used to intercept blocking +operations. A typical implementation would be a wrapper for a gem like +EventMachine or Async. This design provides separation of concerns between the +event loop implementation and application code. It also allows for layered +schedulers which can perform instrumentation. + + class Scheduler + # Wait for the given file descriptor to become readable. + def wait_readable(io) + end + + # Wait for the given file descriptor to become writable. + def wait_writable(io) + end + + # Wait for the given file descriptor to match the specified events within + # the specified timeout. + # @param event [Integer] a bit mask of +IO::WAIT_READABLE+, + # `IO::WAIT_WRITABLE` and `IO::WAIT_PRIORITY`. + # @param timeout [#to_f] the amount of time to wait for the event. + def wait_any(io, events, timeout) + end + + # Sleep the current task for the specified duration, or forever if not + # specified. + # @param duration [#to_f] the amount of time to sleep. + def wait_sleep(duration = nil) + end + + # The Ruby virtual machine is going to enter a system level blocking + # operation. + def enter_blocking_region + end + + # The Ruby virtual machine has completed the system level blocking + # operation. + def exit_blocking_region + end + + # Intercept the creation of a non-blocking fiber. + def fiber(&block) + Fiber.new(blocking: false, &block) + end + + # Invoked when the thread exits. + def run + # Implement event loop here. + end + end + +On CRuby, the following extra methods need to be implemented to handle the +public C interface: + + class Scheduler + # Wrapper for rb_wait_readable(int) C function. + def wait_readable_fd(fd) + wait_readable(::IO.from_fd(fd, autoclose: false)) + end + + # Wrapper for rb_wait_readable(int) C function. + def wait_writable_fd(fd) + wait_writable(::IO.from_fd(fd, autoclose: false)) + end + + # Wrapper for rb_wait_for_single_fd(int) C function. + def wait_for_single_fd(fd, events, duration) + wait_any(::IO.from_fd(fd, autoclose: false), events, duration) + end + end + +=== Non-blocking Fibers + +By default fibers are blocking. Non-blocking fibers may invoke specific +scheduler hooks when a blocking operation occurs, and these hooks may introduce +context switching points. + + Fiber.new(blocking: false) do + puts Fiber.current.blocking? # false + + # May invoke `Thread.scheduler&.wait_readable`. + io.read(...) + + # May invoke `Thread.scheduler&.wait_writable`. + io.write(...) + + # Will invoke `Thread.scheduler&.wait_sleep`. + sleep(n) + end.resume + +We also introduce a new method which simplifes the creation of these +non-blocking fibers: + + Fiber do + puts Fiber.current.blocking? # false + end + +The purpose of this method is to allow the scheduler to internally decide the +policy for when to start the fiber, and whether to use symmetric or asymmetric +fibers. + +=== Mutex + +Locking a mutex causes the +Thread#scheduler+ to not be used while the mutex +is held by that thread. On +Mutex#lock+, fiber switching via the scheduler +is disabled and operations become blocking for all fibers of the same +Thread+. +On +Mutex#unlock+, the scheduler is enabled again. + + mutex = Mutex.new + + puts Thread.current.blocking? # 1 (true) + + Fiber.new(blocking: false) do + puts Thread.current.blocking? # false + mutex.synchronize do + puts Thread.current.blocking? # (1) true + end + + puts Thread.current.blocking? # false + end.resume + +=== Non-blocking I/O + +By default, I/O is non-blocking. Not all operating systems support non-blocking +I/O. Windows is a notable example where socket I/O can be non-blocking but pipe +I/O is blocking. Provided that there *is* a scheduler and the current thread *is +non-blocking*, the operation will invoke the scheduler. diff --git a/ext/socket/ancdata.c b/ext/socket/ancdata.c index 84463af061..e0fc247f8c 100644 --- a/ext/socket/ancdata.c +++ b/ext/socket/ancdata.c @@ -2,7 +2,6 @@ #include -int rsock_cmsg_cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ static VALUE sym_wait_readable, sym_wait_writable; #if defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) @@ -1429,10 +1428,7 @@ make_io_for_unix_rights(VALUE ctl, struct cmsghdr *cmh, char *msg_end) if (fstat(fd, &stbuf) == -1) rb_raise(rb_eSocket, "invalid fd in SCM_RIGHTS"); rb_update_max_fd(fd); - if (rsock_cmsg_cloexec_state < 0) - rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd); - if (rsock_cmsg_cloexec_state == 0 || fd <= 2) - rb_maygvl_fd_fix_cloexec(fd); + rb_maygvl_fd_fix_cloexec(fd); if (S_ISSOCK(stbuf.st_mode)) io = rsock_init_sock(rb_obj_alloc(rb_cSocket), fd); else diff --git a/ext/socket/init.c b/ext/socket/init.c index 6d17ecfb4e..0604e8b72f 100644 --- a/ext/socket/init.c +++ b/ext/socket/init.c @@ -408,84 +408,30 @@ rsock_write_nonblock(VALUE sock, VALUE str, VALUE ex) } #endif /* MSG_DONTWAIT_RELIABLE */ -/* returns true if SOCK_CLOEXEC is supported */ -int rsock_detect_cloexec(int fd) +static int +rsock_socket0(int domain, int type, int proto) { #ifdef SOCK_CLOEXEC - int flags = fcntl(fd, F_GETFD); - - if (flags == -1) - rb_bug("rsock_detect_cloexec: fcntl(%d, F_GETFD) failed: %s", fd, strerror(errno)); - - if (flags & FD_CLOEXEC) - return 1; + type |= SOCK_CLOEXEC; #endif - return 0; -} -#ifdef SOCK_CLOEXEC -static int -rsock_socket0(int domain, int type, int proto) -{ - int ret; - static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ +#ifdef SOCK_NONBLOCK + type |= SOCK_NONBLOCK; +#endif - if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */ - ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto); - if (ret >= 0) { - if (ret <= 2) - goto fix_cloexec; - goto update_max_fd; - } - } - else if (cloexec_state < 0) { /* usually runs once only for detection */ - ret = socket(domain, type|SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT, proto); - if (ret >= 0) { - cloexec_state = rsock_detect_cloexec(ret); - if (cloexec_state == 0 || ret <= 2) - goto fix_cloexec; - goto update_max_fd; - } - else if (ret == -1 && errno == EINVAL) { - /* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */ - ret = socket(domain, type, proto); - if (ret != -1) { - cloexec_state = 0; - /* fall through to fix_cloexec */ - } - } - } - else { /* cloexec_state == 0 */ - ret = socket(domain, type, proto); - } - if (ret == -1) + int result = socket(domain, type, proto); + + if (result == -1) return -1; -fix_cloexec: - rb_maygvl_fd_fix_cloexec(ret); - if (RSOCK_NONBLOCK_DEFAULT) { - rsock_make_fd_nonblock(ret); - } -update_max_fd: - rb_update_max_fd(ret); - return ret; + rb_fd_fix_cloexec(result); + +#ifndef SOCK_NONBLOCK + rsock_make_fd_nonblock(result); +#endif + + return result; } -#else /* !SOCK_CLOEXEC */ -static int -rsock_socket0(int domain, int type, int proto) -{ - int ret = socket(domain, type, proto); - - if (ret == -1) - return -1; - rb_fd_fix_cloexec(ret); - if (RSOCK_NONBLOCK_DEFAULT) { - rsock_make_fd_nonblock(ret); - } - - return ret; -} -#endif /* !SOCK_CLOEXEC */ int rsock_socket(int domain, int type, int proto) @@ -637,6 +583,10 @@ rsock_connect(int fd, const struct sockaddr *sockaddr, int len, int socks) void rsock_make_fd_nonblock(int fd) { +#ifdef _WIN32 + return; +#endif + int flags; #ifdef F_GETFL flags = fcntl(fd, F_GETFL); @@ -653,56 +603,34 @@ rsock_make_fd_nonblock(int fd) } static int -cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len, - int nonblock) +cloexec_accept(int socket, struct sockaddr *address, socklen_t *address_len) { - int ret; socklen_t len0 = 0; -#ifdef HAVE_ACCEPT4 - static int try_accept4 = 1; -#endif - if (RSOCK_NONBLOCK_DEFAULT) { - nonblock = 1; - } if (address_len) len0 = *address_len; + #ifdef HAVE_ACCEPT4 - if (try_accept4) { - int flags = 0; -#ifdef SOCK_CLOEXEC - flags |= SOCK_CLOEXEC; -#endif + int flags = SOCK_CLOEXEC; + #ifdef SOCK_NONBLOCK - if (nonblock) { - flags |= SOCK_NONBLOCK; - } + flags |= SOCK_NONBLOCK; #endif - ret = accept4(socket, address, address_len, flags); - /* accept4 is available since Linux 2.6.28, glibc 2.10. */ - if (ret != -1) { - if (ret <= 2) - rb_maygvl_fd_fix_cloexec(ret); + + int result = accept4(socket, address, address_len, flags); + if (result == -1) return -1; + #ifndef SOCK_NONBLOCK - if (nonblock) { - rsock_make_fd_nonblock(ret); - } + rsock_make_fd_nonblock(result); #endif - if (address_len && len0 < *address_len) *address_len = len0; - return ret; - } - if (errno != ENOSYS) { - return -1; - } - try_accept4 = 0; - } +#else + int result = accept(socket, address, address_len); + if (result == -1) return -1; + + rb_maygvl_fd_fix_cloexec(result); + rsock_make_fd_nonblock(result); #endif - ret = accept(socket, address, address_len); - if (ret == -1) return -1; + if (address_len && len0 < *address_len) *address_len = len0; - rb_maygvl_fd_fix_cloexec(ret); - if (nonblock) { - rsock_make_fd_nonblock(ret); - } - return ret; + return result; } VALUE @@ -712,7 +640,7 @@ rsock_s_accept_nonblock(VALUE klass, VALUE ex, rb_io_t *fptr, int fd2; rb_io_set_nonblock(fptr); - fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len, 1); + fd2 = cloexec_accept(fptr->fd, (struct sockaddr*)sockaddr, len); if (fd2 < 0) { int e = errno; switch (e) { @@ -744,7 +672,7 @@ static VALUE accept_blocking(void *data) { struct accept_arg *arg = data; - return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len, 0); + return (VALUE)cloexec_accept(arg->fd, arg->sockaddr, arg->len); } VALUE diff --git a/ext/socket/rubysocket.h b/ext/socket/rubysocket.h index 867e215273..fcea2a5a05 100644 --- a/ext/socket/rubysocket.h +++ b/ext/socket/rubysocket.h @@ -36,13 +36,7 @@ # if defined(_MSC_VER) # undef HAVE_TYPE_STRUCT_SOCKADDR_DL # endif -/* - * FIXME: failures if we make nonblocking the default - * [ruby-core:89973] [ruby-core:89976] [ruby-core:89977] [Bug #14968] - */ -# define RSOCK_NONBLOCK_DEFAULT (0) #else -# define RSOCK_NONBLOCK_DEFAULT (0) # include # include # ifdef HAVE_NETINET_IN_SYSTM_H @@ -260,7 +254,6 @@ typedef union { #define INET_SOCKS 2 extern int rsock_do_not_reverse_lookup; -extern int rsock_cmsg_cloexec_state; #define FMODE_NOREVLOOKUP 0x100 /* common socket families only */ diff --git a/ext/socket/socket.c b/ext/socket/socket.c index bfeb30340c..e4504620fb 100644 --- a/ext/socket/socket.c +++ b/ext/socket/socket.c @@ -168,93 +168,47 @@ pair_yield(VALUE pair) #endif #if defined HAVE_SOCKETPAIR - +static int +rsock_socketpair0(int domain, int type, int protocol, int descriptors[2]) +{ #ifdef SOCK_CLOEXEC -static int -rsock_socketpair0(int domain, int type, int protocol, int sv[2]) -{ - int ret; - static int cloexec_state = -1; /* <0: unknown, 0: ignored, >0: working */ - static const int default_flags = SOCK_CLOEXEC|RSOCK_NONBLOCK_DEFAULT; + type |= SOCK_CLOEXEC; +#endif - if (cloexec_state > 0) { /* common path, if SOCK_CLOEXEC is defined */ - ret = socketpair(domain, type|default_flags, protocol, sv); - if (ret == 0 && (sv[0] <= 2 || sv[1] <= 2)) { - goto fix_cloexec; /* highly unlikely */ - } - goto update_max_fd; - } - else if (cloexec_state < 0) { /* usually runs once only for detection */ - ret = socketpair(domain, type|default_flags, protocol, sv); - if (ret == 0) { - cloexec_state = rsock_detect_cloexec(sv[0]); - if ((cloexec_state == 0) || (sv[0] <= 2 || sv[1] <= 2)) - goto fix_cloexec; - goto update_max_fd; - } - else if (ret == -1 && errno == EINVAL) { - /* SOCK_CLOEXEC is available since Linux 2.6.27. Linux 2.6.18 fails with EINVAL */ - ret = socketpair(domain, type, protocol, sv); - if (ret != -1) { - /* The reason of EINVAL may be other than SOCK_CLOEXEC. - * So disable SOCK_CLOEXEC only if socketpair() succeeds without SOCK_CLOEXEC. - * Ex. Socket.pair(:UNIX, 0xff) fails with EINVAL. - */ - cloexec_state = 0; - } - } - } - else { /* cloexec_state == 0 */ - ret = socketpair(domain, type, protocol, sv); - } - if (ret == -1) { +#ifdef SOCK_NONBLOCK + type |= SOCK_NONBLOCK; +#endif + + int result = socketpair(domain, type, protocol, descriptors); + + if (result == -1) return -1; - } -fix_cloexec: - rb_maygvl_fd_fix_cloexec(sv[0]); - rb_maygvl_fd_fix_cloexec(sv[1]); - if (RSOCK_NONBLOCK_DEFAULT) { - rsock_make_fd_nonblock(sv[0]); - rsock_make_fd_nonblock(sv[1]); - } +#ifndef SOCK_CLOEXEC + rb_fd_fix_cloexec(descriptors[0]); + rb_fd_fix_cloexec(descriptors[1]); +#endif -update_max_fd: - rb_update_max_fd(sv[0]); - rb_update_max_fd(sv[1]); +#ifndef SOCK_NONBLOCK + rsock_make_fd_nonblock(descriptors[0]); + rsock_make_fd_nonblock(descriptors[1]); +#endif - return ret; + return result; } -#else /* !SOCK_CLOEXEC */ -static int -rsock_socketpair0(int domain, int type, int protocol, int sv[2]) -{ - int ret = socketpair(domain, type, protocol, sv); - - if (ret == -1) - return -1; - - rb_fd_fix_cloexec(sv[0]); - rb_fd_fix_cloexec(sv[1]); - if (RSOCK_NONBLOCK_DEFAULT) { - rsock_make_fd_nonblock(sv[0]); - rsock_make_fd_nonblock(sv[1]); - } - return ret; -} -#endif /* !SOCK_CLOEXEC */ static int -rsock_socketpair(int domain, int type, int protocol, int sv[2]) +rsock_socketpair(int domain, int type, int protocol, int descriptors[2]) { - int ret; + int result; - ret = rsock_socketpair0(domain, type, protocol, sv); - if (ret < 0 && rb_gc_for_fd(errno)) { - ret = rsock_socketpair0(domain, type, protocol, sv); + result = rsock_socketpair0(domain, type, protocol, descriptors); + + if (result < 0 && rb_gc_for_fd(errno)) { + result = rsock_socketpair0(domain, type, protocol, descriptors); } - return ret; + return result; } /* diff --git a/ext/socket/unixsocket.c b/ext/socket/unixsocket.c index 0c3a01d21e..53a50958ed 100644 --- a/ext/socket/unixsocket.c +++ b/ext/socket/unixsocket.c @@ -455,11 +455,7 @@ retry: #endif rb_update_max_fd(fd); - - if (rsock_cmsg_cloexec_state < 0) - rsock_cmsg_cloexec_state = rsock_detect_cloexec(fd); - if (rsock_cmsg_cloexec_state == 0 || fd <= 2) - rb_maygvl_fd_fix_cloexec(fd); + rb_maygvl_fd_fix_cloexec(fd); if (klass == Qnil) return INT2FIX(fd); diff --git a/include/ruby/internal/intern/cont.h b/include/ruby/internal/intern/cont.h index 6988f753c3..cfa5630af2 100644 --- a/include/ruby/internal/intern/cont.h +++ b/include/ruby/internal/intern/cont.h @@ -28,6 +28,7 @@ RBIMPL_SYMBOL_EXPORT_BEGIN() /* cont.c */ VALUE rb_fiber_new(rb_block_call_func_t, VALUE); +VALUE rb_fiber_new_kw(rb_block_call_func_t, VALUE, int kw_splat); VALUE rb_fiber_resume(VALUE fib, int argc, const VALUE *argv); VALUE rb_fiber_resume_kw(VALUE fib, int argc, const VALUE *argv, int kw_splat); VALUE rb_fiber_yield(int argc, const VALUE *argv); diff --git a/include/ruby/internal/intern/thread.h b/include/ruby/internal/intern/thread.h index a12a371058..4a840cd881 100644 --- a/include/ruby/internal/intern/thread.h +++ b/include/ruby/internal/intern/thread.h @@ -71,6 +71,10 @@ VALUE rb_mutex_unlock(VALUE mutex); VALUE rb_mutex_sleep(VALUE self, VALUE timeout); VALUE rb_mutex_synchronize(VALUE mutex, VALUE (*func)(VALUE arg), VALUE arg); +VALUE rb_thread_scheduler_get(VALUE); +VALUE rb_thread_scheduler_set(VALUE, VALUE); +VALUE rb_current_thread_scheduler(void); + RBIMPL_SYMBOL_EXPORT_END() #endif /* RBIMPL_INTERN_THREAD_H */ diff --git a/io.c b/io.c index fc817f96b2..26202e0806 100644 --- a/io.c +++ b/io.c @@ -177,15 +177,6 @@ off_t __syscall(quad_t number, ...); #define rename(f, t) rb_w32_urename((f), (t)) #endif -#if defined(_WIN32) -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#elif defined(O_NONBLOCK) - /* disabled for [Bug #15356] (Rack::Deflater + rails) failure: */ -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#else /* any platforms where O_NONBLOCK does not exist? */ -# define RUBY_PIPE_NONBLOCK_DEFAULT (0) -#endif - VALUE rb_cIO; VALUE rb_eEOFError; VALUE rb_eIOError; @@ -406,44 +397,37 @@ rb_fd_set_nonblock(int fd) } int -rb_cloexec_pipe(int fildes[2]) +rb_cloexec_pipe(int descriptors[2]) { - int ret; - -#if defined(HAVE_PIPE2) - static int try_pipe2 = 1; - if (try_pipe2) { - ret = pipe2(fildes, O_CLOEXEC | RUBY_PIPE_NONBLOCK_DEFAULT); - if (ret != -1) - return ret; - /* pipe2 is available since Linux 2.6.27, glibc 2.9. */ - if (errno == ENOSYS) { - try_pipe2 = 0; - ret = pipe(fildes); - } - } - else { - ret = pipe(fildes); - } +#ifdef HAVE_PIPE2 + int result = pipe2(descriptors, O_CLOEXEC | O_NONBLOCK); #else - ret = pipe(fildes); + int result = pipe(descriptors); #endif - if (ret < 0) return ret; + + if (result < 0) + return result; + #ifdef __CYGWIN__ - if (ret == 0 && fildes[1] == -1) { - close(fildes[0]); - fildes[0] = -1; - errno = ENFILE; - return -1; + if (ret == 0 && descriptors[1] == -1) { + close(descriptors[0]); + descriptors[0] = -1; + errno = ENFILE; + return -1; } #endif - rb_maygvl_fd_fix_cloexec(fildes[0]); - rb_maygvl_fd_fix_cloexec(fildes[1]); - if (RUBY_PIPE_NONBLOCK_DEFAULT) { - rb_fd_set_nonblock(fildes[0]); - rb_fd_set_nonblock(fildes[1]); - } - return ret; + +#ifndef HAVE_PIPE2 + rb_maygvl_fd_fix_cloexec(descriptors[0]); + rb_maygvl_fd_fix_cloexec(descriptors[1]); + +#ifndef _WIN32 + rb_fd_set_nonblock(descriptors[0]); + rb_fd_set_nonblock(descriptors[1]); +#endif +#endif + + return result; } int @@ -1270,6 +1254,12 @@ io_fflush(rb_io_t *fptr) int rb_io_wait_readable(int f) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f)); + return RTEST(result); + } + io_fd_check_closed(f); switch (errno) { case EINTR: @@ -1294,6 +1284,12 @@ rb_io_wait_readable(int f) int rb_io_wait_writable(int f) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f)); + return RTEST(result); + } + io_fd_check_closed(f); switch (errno) { case EINTR: @@ -10897,6 +10893,23 @@ maygvl_copy_stream_continue_p(int has_gvl, struct copy_stream_struct *stp) return FALSE; } +struct wait_for_single_fd { + VALUE scheduler; + + int fd; + short events; + + VALUE result; +}; + +void * rb_thread_scheduler_wait_for_single_fd(void * _args) { + struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args; + + args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil); + + return NULL; +} + #if USE_POLL # define IOWAIT_SYSCALL "poll" STATIC_ASSERT(pollin_expected, POLLIN == RB_WAITFD_IN); @@ -10904,6 +10917,13 @@ STATIC_ASSERT(pollout_expected, POLLOUT == RB_WAITFD_OUT); static int nogvl_wait_for_single_fd(int fd, short events) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; + rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args); + return RTEST(args.result); + } + struct pollfd fds; fds.fd = fd; @@ -10916,6 +10936,13 @@ nogvl_wait_for_single_fd(int fd, short events) static int nogvl_wait_for_single_fd(int fd, short events) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + struct wait_for_single_fd args = {.scheduler = scheduler, .fd = fd, .events = events}; + rb_thread_call_with_gvl(rb_thread_scheduler_wait_for_single_fd, &args); + return RTEST(args.result); + } + rb_fdset_t fds; int ret; @@ -13283,6 +13310,10 @@ Init_IO(void) rb_cIO = rb_define_class("IO", rb_cObject); rb_include_module(rb_cIO, rb_mEnumerable); + rb_define_const(rb_cIO, "WAIT_READABLE", INT2NUM(RB_WAITFD_IN)); + rb_define_const(rb_cIO, "WAIT_PRIORITY", INT2NUM(RB_WAITFD_PRI)); + rb_define_const(rb_cIO, "WAIT_WRITABLE", INT2NUM(RB_WAITFD_OUT)); + /* exception to wait for reading. see IO.select. */ rb_mWaitReadable = rb_define_module_under(rb_cIO, "WaitReadable"); /* exception to wait for writing. see IO.select. */ diff --git a/process.c b/process.c index 74301bba91..ee5b164dd7 100644 --- a/process.c +++ b/process.c @@ -4892,9 +4892,14 @@ rb_f_spawn(int argc, VALUE *argv, VALUE _) static VALUE rb_f_sleep(int argc, VALUE *argv, VALUE _) { - time_t beg, end; + VALUE scheduler = rb_current_thread_scheduler(); - beg = time(0); + if (scheduler != Qnil) { + VALUE result = rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv); + return RTEST(result); + } + + time_t beg = time(0); if (argc == 0) { rb_thread_sleep_forever(); } @@ -4903,7 +4908,7 @@ rb_f_sleep(int argc, VALUE *argv, VALUE _) rb_thread_wait_for(rb_time_interval(argv[0])); } - end = time(0) - beg; + time_t end = time(0) - beg; return INT2FIX(end); } diff --git a/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb b/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb index b8def7e930..df44a50afa 100644 --- a/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb +++ b/spec/ruby/library/socket/basicsocket/read_nonblock_spec.rb @@ -24,7 +24,7 @@ describe "BasicSocket#read_nonblock" do platform_is :linux do it 'does not set the IO in nonblock mode' do require 'io/nonblock' - @r.should_not.nonblock? + @r.nonblock = false IO.select([@r], nil, nil, 2) @r.read_nonblock(3).should == "aaa" @r.should_not.nonblock? @@ -34,7 +34,7 @@ describe "BasicSocket#read_nonblock" do platform_is_not :linux, :windows do it 'sets the IO in nonblock mode' do require 'io/nonblock' - @r.should_not.nonblock? + @r.nonblock = false IO.select([@r], nil, nil, 2) @r.read_nonblock(3).should == "aaa" @r.should.nonblock? diff --git a/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb b/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb index c385beab24..523e732959 100644 --- a/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb +++ b/spec/ruby/library/socket/basicsocket/write_nonblock_spec.rb @@ -25,7 +25,7 @@ describe "BasicSocket#write_nonblock" do platform_is :linux do it 'does not set the IO in nonblock mode' do require 'io/nonblock' - @w.should_not.nonblock? + @w.nonblock = false @w.write_nonblock("aaa").should == 3 @w.should_not.nonblock? end @@ -34,7 +34,7 @@ describe "BasicSocket#write_nonblock" do platform_is_not :linux, :windows do it 'sets the IO in nonblock mode' do require 'io/nonblock' - @w.should_not.nonblock? + @w.nonblock = false @w.write_nonblock("aaa").should == 3 @w.should.nonblock? end diff --git a/test/ruby/test_fiber.rb b/test/ruby/test_fiber.rb index 7070fdf03c..4d103a7f76 100644 --- a/test/ruby/test_fiber.rb +++ b/test/ruby/test_fiber.rb @@ -347,51 +347,6 @@ class TestFiber < Test::Unit::TestCase EOS end - def invoke_rec script, vm_stack_size, machine_stack_size, use_length = true - env = {} - env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size - env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size - out = Dir.mktmpdir("test_fiber") {|tmpdir| - out, err, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, chdir: tmpdir, timeout: 30) - assert(!status.signaled?, FailDesc[status, nil, err]) - out - } - use_length ? out.length : out - end - - def test_stack_size - skip 'too unstable on riscv' if RUBY_PLATFORM =~ /riscv/ - h_default = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', nil, nil, false)) - h_0 = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 0, 0, false)) - h_large = eval(invoke_rec('p RubyVM::DEFAULT_PARAMS', 1024 * 1024 * 5, 1024 * 1024 * 10, false)) - - assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size]) - assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size]) - assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size]) - assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size]) - - # check VM machine stack size - script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume' - size_default = invoke_rec script, nil, nil - assert_operator(size_default, :>, 0) - size_0 = invoke_rec script, 0, nil - assert_operator(size_default, :>, size_0) - size_large = invoke_rec script, 1024 * 1024 * 5, nil - assert_operator(size_default, :<, size_large) - - return if /mswin|mingw/ =~ RUBY_PLATFORM - - # check machine stack size - # Note that machine stack size may not change size (depend on OSs) - script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume' - vm_stack_size = 1024 * 1024 - size_default = invoke_rec script, vm_stack_size, nil - size_0 = invoke_rec script, vm_stack_size, 0 - assert_operator(size_default, :>=, size_0) - size_large = invoke_rec script, vm_stack_size, 1024 * 1024 * 10 - assert_operator(size_default, :<=, size_large) - end - def test_separate_lastmatch bug7678 = '[ruby-core:51331]' /a/ =~ "a" diff --git a/test/ruby/test_stack.rb b/test/ruby/test_stack.rb new file mode 100644 index 0000000000..6657b9e83c --- /dev/null +++ b/test/ruby/test_stack.rb @@ -0,0 +1,81 @@ +# frozen_string_literal: false +require 'test/unit' +require 'tmpdir' + +class TestStack < Test::Unit::TestCase + LARGE_VM_STACK_SIZE = 1024*1024*5 + LARGE_MACHINE_STACK_SIZE = 1024*1024*10 + + def initialize(*) + super + + @h_default = nil + @h_0 = nil + @h_large = nil + end + + def invoke_ruby script, vm_stack_size: nil, machine_stack_size: nil + env = {} + env['RUBY_FIBER_VM_STACK_SIZE'] = vm_stack_size.to_s if vm_stack_size + env['RUBY_FIBER_MACHINE_STACK_SIZE'] = machine_stack_size.to_s if machine_stack_size + + stdout, stderr, status = EnvUtil.invoke_ruby([env, '-e', script], '', true, true, timeout: 30) + assert(!status.signaled?, FailDesc[status, nil, stderr]) + + return stdout + end + + def h_default + @h_default ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS')) + end + + def h_0 + @h_0 ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS', + vm_stack_size: 0, + machine_stack_size: 0 + )) + end + + def h_large + @h_large ||= eval(invoke_ruby('p RubyVM::DEFAULT_PARAMS', + vm_stack_size: LARGE_VM_STACK_SIZE, + machine_stack_size: LARGE_MACHINE_STACK_SIZE + )) + end + + def test_relative_stack_sizes + assert_operator(h_default[:fiber_vm_stack_size], :>, h_0[:fiber_vm_stack_size]) + assert_operator(h_default[:fiber_vm_stack_size], :<, h_large[:fiber_vm_stack_size]) + assert_operator(h_default[:fiber_machine_stack_size], :>=, h_0[:fiber_machine_stack_size]) + assert_operator(h_default[:fiber_machine_stack_size], :<=, h_large[:fiber_machine_stack_size]) + end + + def test_vm_stack_size + script = '$stdout.sync=true; def rec; print "."; rec; end; Fiber.new{rec}.resume' + + size_default = invoke_ruby(script).bytesize + assert_operator(size_default, :>, 0) + + size_0 = invoke_ruby(script, vm_stack_size: 0).bytesize + assert_operator(size_default, :>, size_0) + + size_large = invoke_ruby(script, vm_stack_size: LARGE_VM_STACK_SIZE).bytesize + assert_operator(size_default, :<, size_large) + end + + # Depending on OS, machine stack size may not change size. + def test_machine_stack_size + return if /mswin|mingw/ =~ RUBY_PLATFORM + + script = '$stdout.sync=true; def rec; print "."; 1.times{1.times{1.times{rec}}}; end; Fiber.new{rec}.resume' + + vm_stack_size = 1024 * 1024 + size_default = invoke_ruby(script, vm_stack_size: vm_stack_size).bytesize + + size_0 = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: 0).bytesize + assert_operator(size_default, :>=, size_0) + + size_large = invoke_ruby(script, vm_stack_size: vm_stack_size, machine_stack_size: LARGE_MACHINE_STACK_SIZE).bytesize + assert_operator(size_default, :<=, size_large) + end +end diff --git a/test/scheduler/http.rb b/test/scheduler/http.rb new file mode 100755 index 0000000000..e2a007bc84 --- /dev/null +++ b/test/scheduler/http.rb @@ -0,0 +1,53 @@ + +require 'benchmark' + +TOPICS = ["cats", "dogs", "pigs", "skeletons", "zombies", "ocelots", "villagers", "pillagers"] + +require 'net/http' +require 'uri' +require 'json' + +require_relative 'scheduler' + +def fetch_topics(topics) + responses = {} + + topics.each do |topic| + Fiber.new(blocking: Fiber.current.blocking?) do + uri = URI("https://www.google.com/search?q=#{topic}") + responses[topic] = Net::HTTP.get(uri).scan(topic).size + end.resume + end + + Thread.scheduler&.run + + return responses +end + +def sweep(repeats: 3, **options) + times = (1..8).map do |i| + $stderr.puts "Measuring #{i} topic(s)..." + topics = TOPICS[0...i] + + Thread.new do + Benchmark.realtime do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + repeats.times do + Fiber.new(**options) do + pp fetch_topics(topics) + end.resume + + scheduler.run + end + end + end.value / repeats + end + + puts options.inspect + puts JSON.dump(times.map{|value| value.round(3)}) +end + +sweep(blocking: true) +sweep(blocking: false) diff --git a/test/scheduler/scheduler.rb b/test/scheduler/scheduler.rb new file mode 100644 index 0000000000..b2d36cc728 --- /dev/null +++ b/test/scheduler/scheduler.rb @@ -0,0 +1,163 @@ +# frozen_string_literal: true + +require 'fiber' + +begin + require 'io/nonblock' +rescue LoadError + # Ignore. +end + +class Scheduler + def initialize + @readable = {} + @writable = {} + @waiting = {} + @blocking = [] + + @ios = ObjectSpace::WeakMap.new + end + + attr :fiber + + attr :readable + attr :writable + attr :waiting + attr :blocking + + def next_timeout + fiber, timeout = @waiting.min_by{|key, value| value} + + if timeout + offset = timeout - current_time + + if offset < 0 + return 0 + else + return offset + end + end + end + + def run + while @readable.any? or @writable.any? or @waiting.any? + # Can only handle file descriptors up to 1024... + readable, writable = IO.select(@readable.keys, @writable.keys, [], next_timeout) + + # puts "readable: #{readable}" if readable&.any? + # puts "writable: #{writable}" if writable&.any? + + readable&.each do |io| + @readable[io]&.resume + end + + writable&.each do |io| + @writable[io]&.resume + end + + if @waiting.any? + time = current_time + waiting = @waiting + @waiting = {} + + waiting.each do |fiber, timeout| + if timeout <= time + fiber.resume + else + @waiting[fiber] = timeout + end + end + end + end + end + + def for_fd(fd) + @ios[fd] ||= ::IO.for_fd(fd, autoclose: false) + end + + def wait_readable(io) + @readable[io] = Fiber.current + + Fiber.yield + + @readable.delete(io) + + return true + end + + def wait_readable_fd(fd) + wait_readable( + for_fd(fd) + ) + end + + def wait_writable(io) + @writable[io] = Fiber.current + + Fiber.yield + + @writable.delete(io) + + return true + end + + def wait_writable_fd(fd) + wait_writable( + for_fd(fd) + ) + end + + def current_time + Process.clock_gettime(Process::CLOCK_MONOTONIC) + end + + def wait_sleep(duration = nil) + @waiting[Fiber.current] = current_time + duration + + Fiber.yield + + return true + end + + def wait_any(io, events, duration) + unless (events & IO::WAIT_READABLE).zero? + @readable[io] = Fiber.current + end + + unless (events & IO::WAIT_WRITABLE).zero? + @writable[io] = Fiber.current + end + + Fiber.yield + + @readable.delete(io) + @writable.delete(io) + + return true + end + + def wait_for_single_fd(fd, events, duration) + wait_any( + for_fd(fd), + events, + duration + ) + end + + def enter_blocking_region + # puts "Enter blocking region: #{caller.first}" + end + + def exit_blocking_region + # puts "Exit blocking region: #{caller.first}" + @blocking << caller.first + end + + def fiber(&block) + fiber = Fiber.new(blocking: false, &block) + + fiber.resume + + return fiber + end +end diff --git a/test/scheduler/test_enumerator.rb b/test/scheduler/test_enumerator.rb new file mode 100644 index 0000000000..7c97382c52 --- /dev/null +++ b/test/scheduler/test_enumerator.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true +require 'test/unit' +require 'socket' +require_relative 'scheduler' + +class TestSchedulerEnumerator < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read_characters + skip unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + skip unless i.nonblock? && o.nonblock? + + message = String.new + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + e = i.to_enum(:each_char) + + Fiber do + o.write("Hello World") + o.close + end + + Fiber do + begin + while c = e.next + message << c + end + rescue StopIteration + # Ignore. + end + + i.close + end + end + + thread.join + + assert_equal(MESSAGE, message) + end +end diff --git a/test/scheduler/test_fiber.rb b/test/scheduler/test_fiber.rb new file mode 100644 index 0000000000..3452591cd9 --- /dev/null +++ b/test/scheduler/test_fiber.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestSchedulerFiber < Test::Unit::TestCase + def test_fiber_without_scheduler + # Cannot create fiber without scheduler. + assert_raise RuntimeError do + Fiber do + end + end + end + + def test_fiber_blocking + scheduler = Scheduler.new + + thread = Thread.new do + Thread.current.scheduler = scheduler + + # Close is always a blocking operation. + IO.pipe.each(&:close) + end + + thread.join + + assert_not_empty scheduler.blocking + assert_match /test_fiber.rb:\d+:in `close'/, scheduler.blocking.last + end +end diff --git a/test/scheduler/test_http.rb b/test/scheduler/test_http.rb new file mode 100644 index 0000000000..82aa73ca35 --- /dev/null +++ b/test/scheduler/test_http.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +require 'net/http' +require 'uri' +require 'openssl' + +require 'test/unit' +require_relative 'scheduler' + +class TestSchedulerHTTP < Test::Unit::TestCase + def test_get + Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber do + uri = URI("https://www.ruby-lang.org/en/") + + http = Net::HTTP.new uri.host, uri.port + http.use_ssl = true + http.verify_mode = OpenSSL::SSL::VERIFY_NONE + body = http.get(uri.path).body + + assert !body.empty? + end + end.join + end +end diff --git a/test/scheduler/test_io.rb b/test/scheduler/test_io.rb new file mode 100644 index 0000000000..ef46d1ac2c --- /dev/null +++ b/test/scheduler/test_io.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestSchedulerIO < Test::Unit::TestCase + MESSAGE = "Hello World" + + def test_read + skip unless defined?(UNIXSocket) + + i, o = UNIXSocket.pair + skip unless i.nonblock? && o.nonblock? + + message = nil + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber do + message = i.read(20) + i.close + end + + Fiber do + o.write("Hello World") + o.close + end + end + + thread.join + + assert_equal MESSAGE, message + end +end diff --git a/test/scheduler/test_mutex.rb b/test/scheduler/test_mutex.rb new file mode 100644 index 0000000000..8395e5522f --- /dev/null +++ b/test/scheduler/test_mutex.rb @@ -0,0 +1,47 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestSchedulerMutex < Test::Unit::TestCase + def test_mutex_synchronize + mutex = Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber do + assert_equal Thread.scheduler, scheduler + + mutex.synchronize do + assert_nil Thread.scheduler + end + end + end + + thread.join + end + + def test_mutex_deadlock + mutex = Mutex.new + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + Fiber do + assert_equal Thread.scheduler, scheduler + + mutex.synchronize do + Fiber.yield + end + end + + assert_raise ThreadError do + mutex.lock + end + end + + thread.join + end +end diff --git a/test/scheduler/test_sleep.rb b/test/scheduler/test_sleep.rb new file mode 100644 index 0000000000..0be760341e --- /dev/null +++ b/test/scheduler/test_sleep.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true +require 'test/unit' +require_relative 'scheduler' + +class TestSchedulerSleep < Test::Unit::TestCase + ITEMS = [0, 1, 2, 3, 4] + + def test_sleep + items = [] + + thread = Thread.new do + scheduler = Scheduler.new + Thread.current.scheduler = scheduler + + 5.times do |i| + Fiber do + sleep(i/100.0) + items << i + end + end + + # Should be 5 fibers waiting: + assert_equal scheduler.waiting.size, 5 + end + + thread.join + + assert_equal ITEMS, items + end +end diff --git a/test/socket/test_basicsocket.rb b/test/socket/test_basicsocket.rb index c8e9b23f83..7b1c9b4a06 100644 --- a/test/socket/test_basicsocket.rb +++ b/test/socket/test_basicsocket.rb @@ -159,8 +159,6 @@ class TestSocket_BasicSocket < Test::Unit::TestCase set_nb = true buf = String.new if ssock.respond_to?(:nonblock?) - assert_not_predicate(ssock, :nonblock?) - assert_not_predicate(csock, :nonblock?) csock.nonblock = ssock.nonblock = false # Linux may use MSG_DONTWAIT to avoid setting O_NONBLOCK diff --git a/thread.c b/thread.c index 7ecc535b88..13fef6be9a 100644 --- a/thread.c +++ b/thread.c @@ -109,6 +109,8 @@ static VALUE sym_immediate; static VALUE sym_on_blocking; static VALUE sym_never; +static ID id_wait_for_single_fd; + enum SLEEP_FLAGS { SLEEP_DEADLOCKABLE = 0x1, SLEEP_SPURIOUS_CHECK = 0x2 @@ -708,6 +710,11 @@ thread_do_start(rb_thread_t *th) else { th->value = (*th->invoke_arg.func.func)(th->invoke_arg.func.arg); } + + VALUE scheduler = th->scheduler; + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("run"), 0); + } } void rb_ec_clear_current_thread_trace_func(const rb_execution_context_t *ec); @@ -1471,6 +1478,7 @@ rb_nogvl(void *(*func)(void *), void *data1, rb_thread_t *th = rb_ec_thread_ptr(ec); int saved_errno = 0; VALUE ubf_th = Qfalse; + VALUE scheduler = th->scheduler; if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) { ubf = ubf_select; @@ -1485,6 +1493,10 @@ rb_nogvl(void *(*func)(void *), void *data1, } } + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0); + } + BLOCKING_REGION(th, { val = func(data1); saved_errno = errno; @@ -1500,6 +1512,10 @@ rb_nogvl(void *(*func)(void *), void *data1, thread_value(rb_thread_kill(ubf_th)); } + if (scheduler != Qnil) { + rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0); + } + errno = saved_errno; return val; @@ -3574,6 +3590,63 @@ rb_thread_variables(VALUE thread) return ary; } +VALUE rb_thread_scheduler_get(VALUE thread) +{ + rb_thread_t * th = rb_thread_ptr(thread); + + VM_ASSERT(th); + + return th->scheduler; +} + +VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler) +{ + rb_thread_t * th = rb_thread_ptr(thread); + + VM_ASSERT(th); + + th->scheduler = scheduler; + + return th->scheduler; +} + +/* + * call-seq: + * Thread.scheduler -> scheduler or nil + * + * Returns the current scheduler if scheduling operations are permitted. + * + */ + +static VALUE +rb_thread_scheduler(VALUE klass) +{ + return rb_current_thread_scheduler(); +} + +VALUE rb_current_thread_scheduler(void) +{ + rb_thread_t * th = GET_THREAD(); + + VM_ASSERT(th); + + if (th->blocking == 0) + return th->scheduler; + else + return Qnil; +} + +static VALUE +rb_thread_blocking_p(VALUE thread) +{ + unsigned blocking = rb_thread_ptr(thread)->blocking; + + if (blocking == 0) + return Qfalse; + + return INT2NUM(blocking); +} + /* * call-seq: * thr.thread_variable?(key) -> true or false @@ -4129,6 +4202,15 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t * return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set); } +static VALUE +rb_thread_timeout(struct timeval *timeout) { + if (timeout) { + return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec)); + } + + return Qnil; +} + #ifdef USE_POLL /* The same with linux kernel. TODO: make platform independent definition. */ @@ -4155,6 +4237,14 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) struct waiting_fd wfd; int state; + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events), + rb_thread_timeout(timeout) + ); + return RTEST(result); + } + wfd.th = GET_THREAD(); wfd.fd = fd; list_add(&wfd.th->vm->waiting_fds, &wfd.wfd_node); @@ -4287,8 +4377,16 @@ select_single_cleanup(VALUE ptr) } int -rb_wait_for_single_fd(int fd, int events, struct timeval *tv) +rb_wait_for_single_fd(int fd, int events, struct timeval *timeout) { + VALUE scheduler = rb_current_thread_scheduler(); + if (scheduler != Qnil) { + VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events), + rb_thread_timeout(timeout) + ); + return RTEST(result); + } + rb_fdset_t rfds, wfds, efds; struct select_args args; int r; @@ -4298,7 +4396,7 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *tv) args.read = (events & RB_WAITFD_IN) ? init_set_fd(fd, &rfds) : NULL; args.write = (events & RB_WAITFD_OUT) ? init_set_fd(fd, &wfds) : NULL; args.except = (events & RB_WAITFD_PRI) ? init_set_fd(fd, &efds) : NULL; - args.tv = tv; + args.tv = timeout; args.wfd.fd = fd; args.wfd.th = GET_THREAD(); @@ -5185,6 +5283,8 @@ Init_Thread(void) sym_immediate = ID2SYM(rb_intern("immediate")); sym_on_blocking = ID2SYM(rb_intern("on_blocking")); + id_wait_for_single_fd = rb_intern("wait_for_single_fd"); + rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1); rb_define_singleton_method(rb_cThread, "start", thread_start, -2); rb_define_singleton_method(rb_cThread, "fork", thread_start, -2); @@ -5223,6 +5323,7 @@ Init_Thread(void) rb_define_method(rb_cThread, "keys", rb_thread_keys, 0); rb_define_method(rb_cThread, "priority", rb_thread_priority, 0); rb_define_method(rb_cThread, "priority=", rb_thread_priority_set, 1); + rb_define_method(rb_cThread, "blocking?", rb_thread_blocking_p, 0); rb_define_method(rb_cThread, "status", rb_thread_status, 0); rb_define_method(rb_cThread, "thread_variable_get", rb_thread_variable_get, 1); rb_define_method(rb_cThread, "thread_variable_set", rb_thread_variable_set, 2); @@ -5239,6 +5340,10 @@ Init_Thread(void) rb_define_method(rb_cThread, "backtrace", rb_thread_backtrace_m, -1); rb_define_method(rb_cThread, "backtrace_locations", rb_thread_backtrace_locations_m, -1); + rb_define_singleton_method(rb_cThread, "scheduler", rb_thread_scheduler, 0); + rb_define_method(rb_cThread, "scheduler", rb_thread_scheduler_get, 0); + rb_define_method(rb_cThread, "scheduler=", rb_thread_scheduler_set, 1); + rb_define_method(rb_cThread, "name", rb_thread_getname, 0); rb_define_method(rb_cThread, "name=", rb_thread_setname, 1); rb_define_method(rb_cThread, "to_s", rb_thread_to_s, 0); diff --git a/thread_sync.c b/thread_sync.c index 2e20812f4d..3689dee789 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -190,6 +190,8 @@ mutex_locked(rb_thread_t *th, VALUE self) mutex->next_mutex = th->keeping_mutexes; } th->keeping_mutexes = mutex; + + th->blocking += 1; } /* @@ -365,6 +367,8 @@ rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t *th) struct sync_waiter *cur = 0, *next; rb_mutex_t **th_mutex = &th->keeping_mutexes; + th->blocking -= 1; + mutex->th = 0; list_for_each_safe(&mutex->waitq, cur, next, node) { list_del_init(&cur->node); @@ -404,8 +408,9 @@ rb_mutex_unlock(VALUE self) { const char *err; rb_mutex_t *mutex = mutex_ptr(self); + rb_thread_t *th = GET_THREAD(); - err = rb_mutex_unlock_th(mutex, GET_THREAD()); + err = rb_mutex_unlock_th(mutex, th); if (err) rb_raise(rb_eThreadError, "%s", err); return self; diff --git a/vm.c b/vm.c index 6f9c999adb..d286bc7210 100644 --- a/vm.c +++ b/vm.c @@ -2620,6 +2620,8 @@ thread_mark(void *ptr) RUBY_MARK_UNLESS_NULL(th->locking_mutex); RUBY_MARK_UNLESS_NULL(th->name); + RUBY_MARK_UNLESS_NULL(th->scheduler); + RUBY_MARK_LEAVE("thread"); } @@ -2734,6 +2736,10 @@ th_init(rb_thread_t *th, VALUE self) th->self = self; rb_threadptr_root_fiber_setup(th); + /* All threads are blocking until a non-blocking fiber is scheduled */ + th->blocking = 1; + th->scheduler = Qnil; + if (self == 0) { size_t size = th->vm->default_params.thread_vm_stack_size / sizeof(VALUE); rb_ec_initialize_vm_stack(th->ec, ALLOC_N(VALUE, size), size); @@ -3294,12 +3300,14 @@ Init_VM(void) vm->self = TypedData_Wrap_Struct(rb_cRubyVM, &vm_data_type, vm); /* create main thread */ - th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th); + th->self = TypedData_Wrap_Struct(rb_cThread, &thread_data_type, th); + vm->main_thread = th; vm->running_thread = th; th->vm = vm; th->top_wrapper = 0; th->top_self = rb_vm_top_self(); + rb_thread_set_current(th); rb_vm_living_threads_insert(vm, th); diff --git a/vm_core.h b/vm_core.h index e725e4d8d0..4f122cacb8 100644 --- a/vm_core.h +++ b/vm_core.h @@ -969,13 +969,15 @@ typedef struct rb_thread_struct { rb_fiber_t *root_fiber; rb_jmpbuf_t root_jmpbuf; + VALUE scheduler; + unsigned blocking; + /* misc */ VALUE name; #ifdef USE_SIGALTSTACK void *altstack; #endif - } rb_thread_t; typedef enum {