mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
Standardised scheduler interface.
This commit is contained in:
parent
905e9c8093
commit
d387029f39
Notes:
git
2020-09-14 13:44:39 +09:00
13 changed files with 313 additions and 216 deletions
|
@ -128,6 +128,7 @@ COMMONOBJS = array.$(OBJEXT) \
|
|||
regparse.$(OBJEXT) \
|
||||
regsyntax.$(OBJEXT) \
|
||||
ruby.$(OBJEXT) \
|
||||
scheduler.$(OBJEXT) \
|
||||
signal.$(OBJEXT) \
|
||||
sprintf.$(OBJEXT) \
|
||||
st.$(OBJEXT) \
|
||||
|
@ -12123,6 +12124,8 @@ ruby.$(OBJEXT): {$(VPATH)}thread_native.h
|
|||
ruby.$(OBJEXT): {$(VPATH)}util.h
|
||||
ruby.$(OBJEXT): {$(VPATH)}vm_core.h
|
||||
ruby.$(OBJEXT): {$(VPATH)}vm_opts.h
|
||||
scheduler.$(OBJEXT): {$(VPATH)}scheduler.c
|
||||
scheduler.$(OBJEXT): {$(VPATH)}internal/scheduler.h
|
||||
setproctitle.$(OBJEXT): $(hdrdir)/ruby.h
|
||||
setproctitle.$(OBJEXT): $(hdrdir)/ruby/ruby.h
|
||||
setproctitle.$(OBJEXT): {$(VPATH)}assert.h
|
||||
|
|
|
@ -39,35 +39,6 @@
|
|||
#define FIONREAD_POSSIBLE_P(fd) ((void)(fd),Qtrue)
|
||||
#endif
|
||||
|
||||
static VALUE io_ready_p _((VALUE io));
|
||||
static VALUE io_wait_readable _((int argc, VALUE *argv, VALUE io));
|
||||
static VALUE io_wait_writable _((int argc, VALUE *argv, VALUE io));
|
||||
void Init_wait _((void));
|
||||
|
||||
static struct timeval *
|
||||
get_timeout(int argc, VALUE *argv, struct timeval *timerec)
|
||||
{
|
||||
VALUE timeout = Qnil;
|
||||
rb_check_arity(argc, 0, 1);
|
||||
if (!argc || NIL_P(timeout = argv[0])) {
|
||||
return NULL;
|
||||
}
|
||||
else {
|
||||
*timerec = rb_time_interval(timeout);
|
||||
return timerec;
|
||||
}
|
||||
}
|
||||
|
||||
static int
|
||||
wait_for_single_fd(rb_io_t *fptr, int events, struct timeval *tv)
|
||||
{
|
||||
int i = rb_wait_for_single_fd(fptr->fd, events, tv);
|
||||
if (i < 0)
|
||||
rb_sys_fail(0);
|
||||
rb_io_check_closed(fptr);
|
||||
return (i & events);
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.nread -> int
|
||||
|
@ -79,13 +50,12 @@ wait_for_single_fd(rb_io_t *fptr, int events, struct timeval *tv)
|
|||
static VALUE
|
||||
io_nread(VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
int len;
|
||||
rb_io_t *fptr = NULL;
|
||||
ioctl_arg n;
|
||||
|
||||
GetOpenFile(io, fptr);
|
||||
rb_io_check_readable(fptr);
|
||||
len = rb_io_read_pending(fptr);
|
||||
int len = rb_io_read_pending(fptr);
|
||||
if (len > 0) return INT2FIX(len);
|
||||
if (!FIONREAD_POSSIBLE_P(fptr->fd)) return INT2FIX(0);
|
||||
if (ioctl(fptr->fd, FIONREAD, &n)) return INT2FIX(0);
|
||||
|
@ -93,76 +63,113 @@ io_nread(VALUE io)
|
|||
return INT2FIX(0);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
io_wait_event(VALUE io, int event, VALUE timeout)
|
||||
{
|
||||
VALUE result = rb_io_wait(io, RB_INT2NUM(event), timeout);
|
||||
|
||||
if (!RB_TEST(result)) {
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
int mask = RB_NUM2INT(result);
|
||||
|
||||
if (mask & event) {
|
||||
return io;
|
||||
} else {
|
||||
return Qfalse;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.ready? -> true or false
|
||||
*
|
||||
* Returns true if input available without blocking, or false.
|
||||
* Returns +true+ if input available without blocking, or +false+.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
io_ready_p(VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
struct timeval tv = {0, 0};
|
||||
|
||||
GetOpenFile(io, fptr);
|
||||
rb_io_check_readable(fptr);
|
||||
if (rb_io_read_pending(fptr)) return Qtrue;
|
||||
if (wait_for_single_fd(fptr, RB_WAITFD_IN, &tv))
|
||||
return Qtrue;
|
||||
return Qfalse;
|
||||
|
||||
return io_wait_event(io, RUBY_IO_READABLE, RB_INT2NUM(0));
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.wait_readable -> IO, true or nil
|
||||
* io.wait_readable(timeout) -> IO, true or nil
|
||||
* io.wait_readable -> true or false
|
||||
* io.wait_readable(timeout) -> true or false
|
||||
*
|
||||
* Waits until IO is readable without blocking and returns +self+, or
|
||||
* +nil+ when times out.
|
||||
* Waits until IO is readable and returns +true+, or
|
||||
* +false+ when times out.
|
||||
* Returns +true+ immediately when buffered data is available.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
io_wait_readable(int argc, VALUE *argv, VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
struct timeval timerec;
|
||||
struct timeval *tv;
|
||||
rb_io_t *fptr = NULL;
|
||||
|
||||
GetOpenFile(io, fptr);
|
||||
RB_IO_POINTER(io, fptr);
|
||||
rb_io_check_readable(fptr);
|
||||
tv = get_timeout(argc, argv, &timerec);
|
||||
|
||||
if (rb_io_read_pending(fptr)) return Qtrue;
|
||||
if (wait_for_single_fd(fptr, RB_WAITFD_IN, tv)) {
|
||||
return io;
|
||||
}
|
||||
return Qnil;
|
||||
|
||||
rb_check_arity(argc, 0, 1);
|
||||
VALUE timeout = (argc == 1 ? argv[0] : Qnil);
|
||||
|
||||
return io_wait_event(io, RUBY_IO_READABLE, timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.wait_writable -> IO
|
||||
* io.wait_writable(timeout) -> IO or nil
|
||||
* io.wait_writable -> true or false
|
||||
* io.wait_writable(timeout) -> true or false
|
||||
*
|
||||
* Waits until IO is writable without blocking and returns +self+ or
|
||||
* +nil+ when times out.
|
||||
* Waits until IO is writable and returns +true+ or
|
||||
* +false+ when times out.
|
||||
*/
|
||||
static VALUE
|
||||
io_wait_writable(int argc, VALUE *argv, VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
struct timeval timerec;
|
||||
struct timeval *tv;
|
||||
rb_io_t *fptr = NULL;
|
||||
|
||||
GetOpenFile(io, fptr);
|
||||
RB_IO_POINTER(io, fptr);
|
||||
rb_io_check_writable(fptr);
|
||||
tv = get_timeout(argc, argv, &timerec);
|
||||
if (wait_for_single_fd(fptr, RB_WAITFD_OUT, tv)) {
|
||||
return io;
|
||||
}
|
||||
return Qnil;
|
||||
|
||||
rb_check_arity(argc, 0, 1);
|
||||
VALUE timeout = (argc == 1 ? argv[0] : Qnil);
|
||||
|
||||
return io_wait_event(io, RUBY_IO_WRITABLE, timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.wait_priority -> true or false
|
||||
* io.wait_priority(timeout) -> true or false
|
||||
*
|
||||
* Waits until IO is priority and returns +true+ or
|
||||
* +false+ when times out.
|
||||
*/
|
||||
static VALUE
|
||||
io_wait_priority(int argc, VALUE *argv, VALUE io)
|
||||
{
|
||||
rb_io_t *fptr = NULL;
|
||||
|
||||
RB_IO_POINTER(io, fptr);
|
||||
rb_io_check_readable(fptr);
|
||||
|
||||
if (rb_io_read_pending(fptr)) return Qtrue;
|
||||
|
||||
rb_check_arity(argc, 0, 1);
|
||||
VALUE timeout = argc == 1 ? argv[0] : Qnil;
|
||||
|
||||
return io_wait_event(io, RUBY_IO_PRIORITY, timeout);
|
||||
}
|
||||
|
||||
static int
|
||||
|
@ -201,41 +208,60 @@ wait_mode_sym(VALUE mode)
|
|||
|
||||
/*
|
||||
* call-seq:
|
||||
* io.wait(timeout = nil, mode = :read) -> IO, true or nil
|
||||
* io.wait(events, timeout) -> event mask or false.
|
||||
* io.wait(timeout = nil, mode = :read) -> event mask or false (deprecated)
|
||||
*
|
||||
* Waits until the IO becomes ready for the specified events and returns the
|
||||
* subset of events that become ready, or +false+ when times out.
|
||||
*
|
||||
* The events can be a bit mask of +IO::READABLE+, +IO::WRITABLE+ or
|
||||
* +IO::PRIORITY+.
|
||||
*
|
||||
* Waits until IO is readable or writable without blocking and returns
|
||||
* +self+, or +nil+ when times out.
|
||||
* Returns +true+ immediately when buffered data is available.
|
||||
*
|
||||
* Optional parameter +mode+ is one of +:read+, +:write+, or
|
||||
* +:read_write+.
|
||||
* +:read_write+ (deprecated).
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
io_wait_readwrite(int argc, VALUE *argv, VALUE io)
|
||||
io_wait(int argc, VALUE *argv, VALUE io)
|
||||
{
|
||||
rb_io_t *fptr;
|
||||
struct timeval timerec;
|
||||
struct timeval *tv = NULL;
|
||||
int event = 0;
|
||||
int i;
|
||||
VALUE timeout = Qnil;
|
||||
rb_io_event_t events = 0;
|
||||
|
||||
GetOpenFile(io, fptr);
|
||||
for (i = 0; i < argc; ++i) {
|
||||
if (SYMBOL_P(argv[i])) {
|
||||
event |= wait_mode_sym(argv[i]);
|
||||
}
|
||||
else {
|
||||
*(tv = &timerec) = rb_time_interval(argv[i]);
|
||||
}
|
||||
if (argc < 2 || (argc >= 2 && RB_SYMBOL_P(argv[1]))) {
|
||||
if (argc > 0) {
|
||||
timeout = argv[0];
|
||||
}
|
||||
|
||||
for (int i = 1; i < argc; i += 1) {
|
||||
events |= wait_mode_sym(argv[i]);
|
||||
}
|
||||
} else if (argc == 2) {
|
||||
events = RB_NUM2UINT(argv[0]);
|
||||
|
||||
if (argv[1] != Qnil) {
|
||||
timeout = argv[1];
|
||||
}
|
||||
} else {
|
||||
// TODO error
|
||||
return Qnil;
|
||||
}
|
||||
/* rb_time_interval() and might_mode() might convert the argument */
|
||||
rb_io_check_closed(fptr);
|
||||
if (!event) event = RB_WAITFD_IN;
|
||||
if ((event & RB_WAITFD_IN) && rb_io_read_pending(fptr))
|
||||
return Qtrue;
|
||||
if (wait_for_single_fd(fptr, event, tv))
|
||||
return io;
|
||||
return Qnil;
|
||||
|
||||
if (events == 0) {
|
||||
events = RUBY_IO_READABLE;
|
||||
}
|
||||
|
||||
if (events & RUBY_IO_READABLE) {
|
||||
rb_io_t *fptr = NULL;
|
||||
RB_IO_POINTER(io, fptr);
|
||||
|
||||
if (rb_io_read_pending(fptr)) {
|
||||
return Qtrue;
|
||||
}
|
||||
}
|
||||
|
||||
return io_wait_event(io, events, timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -247,7 +273,10 @@ Init_wait(void)
|
|||
{
|
||||
rb_define_method(rb_cIO, "nread", io_nread, 0);
|
||||
rb_define_method(rb_cIO, "ready?", io_ready_p, 0);
|
||||
rb_define_method(rb_cIO, "wait", io_wait_readwrite, -1);
|
||||
|
||||
rb_define_method(rb_cIO, "wait", io_wait, -1);
|
||||
|
||||
rb_define_method(rb_cIO, "wait_readable", io_wait_readable, -1);
|
||||
rb_define_method(rb_cIO, "wait_writable", io_wait_writable, -1);
|
||||
rb_define_method(rb_cIO, "wait_priority", io_wait_priority, -1);
|
||||
}
|
||||
|
|
|
@ -147,14 +147,17 @@ VALUE rb_io_get_io(VALUE io);
|
|||
VALUE rb_io_check_io(VALUE io);
|
||||
VALUE rb_io_get_write_io(VALUE io);
|
||||
VALUE rb_io_set_write_io(VALUE io, VALUE w);
|
||||
int rb_io_wait_readable(int);
|
||||
int rb_io_wait_writable(int);
|
||||
int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
|
||||
void rb_io_set_nonblock(rb_io_t *fptr);
|
||||
int rb_io_extract_encoding_option(VALUE opt, rb_encoding **enc_p, rb_encoding **enc2_p, int *fmode_p);
|
||||
void rb_io_extract_modeenc(VALUE *vmode_p, VALUE *vperm_p, VALUE opthash, int *oflags_p, int *fmode_p, rb_io_enc_t *convconfig_p);
|
||||
ssize_t rb_io_bufwrite(VALUE io, const void *buf, size_t size);
|
||||
|
||||
int rb_io_wait_readable(int);
|
||||
int rb_io_wait_writable(int);
|
||||
int rb_wait_for_single_fd(int fd, int events, struct timeval *tv);
|
||||
|
||||
VALUE rb_io_wait(VALUE io, VALUE events, VALUE timeout);
|
||||
|
||||
/* compatibility for ruby 1.8 and older */
|
||||
#define rb_io_mode_flags(modestr) [<"rb_io_mode_flags() is obsolete; use rb_io_modestr_fmode()">]
|
||||
#define rb_io_modenum_flags(oflags) [<"rb_io_modenum_flags() is obsolete; use rb_io_oflags_fmode()">]
|
||||
|
|
1
inits.c
1
inits.c
|
@ -65,6 +65,7 @@ rb_call_inits(void)
|
|||
CALL(VM);
|
||||
CALL(ISeq);
|
||||
CALL(Thread);
|
||||
CALL(Scheduler);
|
||||
CALL(process);
|
||||
CALL(Cont);
|
||||
CALL(Rational);
|
||||
|
|
27
internal/scheduler.h
Normal file
27
internal/scheduler.h
Normal file
|
@ -0,0 +1,27 @@
|
|||
#ifndef RUBY_SCHEDULER_H /*-*-C-*-vi:se ft=c:*/
|
||||
#define RUBY_SCHEDULER_H
|
||||
/**
|
||||
* @file
|
||||
* @author Ruby developers <ruby-core@ruby-lang.org>
|
||||
* @copyright This file is a part of the programming language Ruby.
|
||||
* Permission is hereby granted, to either redistribute and/or
|
||||
* modify this file, provided that the conditions mentioned in the
|
||||
* file COPYING are met. Consult the file for details.
|
||||
* @brief Internal header for Scheduler.
|
||||
*/
|
||||
#include "ruby/ruby.h"
|
||||
#include "ruby/intern.h"
|
||||
|
||||
VALUE rb_scheduler_timeout(struct timeval *timeout);
|
||||
|
||||
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
|
||||
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
|
||||
|
||||
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout);
|
||||
VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io);
|
||||
VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
|
||||
|
||||
VALUE rb_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length);
|
||||
VALUE rb_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length);
|
||||
|
||||
#endif /* RUBY_SCHEDULER_H */
|
|
@ -37,6 +37,8 @@ void rb_mutex_allow_trap(VALUE self, int val);
|
|||
VALUE rb_uninterruptible(VALUE (*b_proc)(VALUE), VALUE data);
|
||||
VALUE rb_mutex_owned_p(VALUE self);
|
||||
|
||||
int rb_thread_wait_for_single_fd(int fd, int events, struct timeval * timeout);
|
||||
|
||||
VALUE rb_thread_scheduler_get(VALUE thread);
|
||||
VALUE rb_thread_scheduler_set(VALUE thread, VALUE scheduler);
|
||||
|
||||
|
|
81
io.c
81
io.c
|
@ -13,6 +13,8 @@
|
|||
|
||||
#include "ruby/internal/config.h"
|
||||
|
||||
#include "internal/scheduler.h"
|
||||
|
||||
#ifdef _WIN32
|
||||
# include "ruby/ruby.h"
|
||||
# include "ruby/io.h"
|
||||
|
@ -213,6 +215,8 @@ static VALUE sym_DATA;
|
|||
static VALUE sym_HOLE;
|
||||
#endif
|
||||
|
||||
static VALUE rb_io_initialize(int argc, VALUE *argv, VALUE io);
|
||||
|
||||
struct argf {
|
||||
VALUE filename, current_file;
|
||||
long last_lineno; /* $. */
|
||||
|
@ -1256,13 +1260,65 @@ io_fflush(rb_io_t *fptr)
|
|||
return 0;
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_wait(VALUE io, VALUE events, VALUE timeout) {
|
||||
VALUE scheduler = rb_thread_current_scheduler();
|
||||
|
||||
if (scheduler != Qnil) {
|
||||
return rb_scheduler_io_wait(scheduler, io, events, timeout);
|
||||
}
|
||||
|
||||
rb_io_t * fptr = NULL;
|
||||
RB_IO_POINTER(io, fptr);
|
||||
|
||||
struct timeval tv_storage;
|
||||
struct timeval *tv = NULL;
|
||||
|
||||
if (timeout != Qnil) {
|
||||
tv_storage = rb_time_interval(timeout);
|
||||
tv = &tv_storage;
|
||||
}
|
||||
|
||||
int ready = rb_thread_wait_for_single_fd(fptr->fd, RB_NUM2INT(events), tv);
|
||||
|
||||
if (ready < 0) {
|
||||
rb_sys_fail(0);
|
||||
}
|
||||
|
||||
// Not sure if this is necessary:
|
||||
rb_io_check_closed(fptr);
|
||||
|
||||
if (ready > 0) {
|
||||
return RB_INT2NUM(ready);
|
||||
} else {
|
||||
return Qfalse;
|
||||
}
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_io_from_fd(int f)
|
||||
{
|
||||
VALUE io = rb_obj_alloc(rb_cIO);
|
||||
VALUE argv[] = {RB_INT2NUM(f)};
|
||||
|
||||
rb_io_initialize(1, argv, io);
|
||||
|
||||
rb_io_t *fptr;
|
||||
RB_IO_POINTER(io, fptr);
|
||||
|
||||
fptr->mode &= ~FMODE_PREP;
|
||||
|
||||
return io;
|
||||
}
|
||||
|
||||
int
|
||||
rb_io_wait_readable(int f)
|
||||
{
|
||||
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_funcall(scheduler, rb_intern("wait_readable_fd"), 1, INT2NUM(f));
|
||||
return RTEST(result);
|
||||
return RTEST(
|
||||
rb_scheduler_io_wait_readable(scheduler, rb_io_from_fd(f))
|
||||
);
|
||||
}
|
||||
|
||||
io_fd_check_closed(f);
|
||||
|
@ -1291,8 +1347,9 @@ rb_io_wait_writable(int f)
|
|||
{
|
||||
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_funcall(scheduler, rb_intern("wait_writable_fd"), 1, INT2NUM(f));
|
||||
return RTEST(result);
|
||||
return RTEST(
|
||||
rb_scheduler_io_wait_writable(scheduler, rb_io_from_fd(f))
|
||||
);
|
||||
}
|
||||
|
||||
io_fd_check_closed(f);
|
||||
|
@ -1325,6 +1382,20 @@ rb_io_wait_writable(int f)
|
|||
}
|
||||
}
|
||||
|
||||
int
|
||||
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||
{
|
||||
VALUE scheduler = rb_thread_current_scheduler();
|
||||
|
||||
if (scheduler != Qnil) {
|
||||
return RTEST(
|
||||
rb_scheduler_io_wait(scheduler, rb_io_from_fd(fd), RB_INT2NUM(events), rb_scheduler_timeout(timeout))
|
||||
);
|
||||
}
|
||||
|
||||
return rb_thread_wait_for_single_fd(fd, events, timeout);
|
||||
}
|
||||
|
||||
static void
|
||||
make_writeconv(rb_io_t *fptr)
|
||||
{
|
||||
|
@ -10975,7 +11046,7 @@ rb_thread_scheduler_wait_for_single_fd(void * _args)
|
|||
{
|
||||
struct wait_for_single_fd *args = (struct wait_for_single_fd *)_args;
|
||||
|
||||
args->result = rb_funcall(args->scheduler, rb_intern("wait_for_single_fd"), 3, INT2NUM(args->fd), INT2NUM(args->events), Qnil);
|
||||
args->result = rb_scheduler_io_wait(args->scheduler, rb_io_from_fd(args->fd), INT2NUM(args->events), Qnil);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -13,6 +13,8 @@
|
|||
|
||||
#include "ruby/internal/config.h"
|
||||
|
||||
#include "internal/scheduler.h"
|
||||
|
||||
#include <ctype.h>
|
||||
#include <errno.h>
|
||||
#include <signal.h>
|
||||
|
@ -4927,7 +4929,7 @@ rb_f_sleep(int argc, VALUE *argv, VALUE _)
|
|||
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
|
||||
if (scheduler != Qnil) {
|
||||
rb_funcallv(scheduler, rb_intern("wait_sleep"), argc, argv);
|
||||
rb_scheduler_kernel_sleepv(scheduler, argc, argv);
|
||||
}
|
||||
else {
|
||||
if (argc == 0) {
|
||||
|
|
71
scheduler.c
Normal file
71
scheduler.c
Normal file
|
@ -0,0 +1,71 @@
|
|||
/**********************************************************************
|
||||
|
||||
scheduler.c
|
||||
|
||||
$Author$
|
||||
|
||||
Copyright (C) 2020 Samuel Grant Dawson Williams
|
||||
|
||||
**********************************************************************/
|
||||
|
||||
#include "internal/scheduler.h"
|
||||
#include "ruby/io.h"
|
||||
|
||||
static ID id_kernel_sleep;
|
||||
static ID id_io_read;
|
||||
static ID id_io_write;
|
||||
static ID id_io_wait;
|
||||
|
||||
void
|
||||
Init_Scheduler(void)
|
||||
{
|
||||
id_kernel_sleep = rb_intern_const("kernel_sleep");
|
||||
id_io_read = rb_intern_const("io_read");
|
||||
id_io_write = rb_intern_const("io_write");
|
||||
id_io_wait = rb_intern_const("io_wait");
|
||||
}
|
||||
|
||||
VALUE
|
||||
rb_scheduler_timeout(struct timeval *timeout) {
|
||||
if (timeout) {
|
||||
return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
|
||||
}
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE timeout)
|
||||
{
|
||||
return rb_funcall(scheduler, id_kernel_sleep, 1, timeout);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
|
||||
{
|
||||
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeout)
|
||||
{
|
||||
return rb_funcall(scheduler, id_io_wait, 3, io, events, timeout);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_io_wait_readable(VALUE scheduler, VALUE io)
|
||||
{
|
||||
return rb_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_READABLE), Qnil);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_io_wait_writable(VALUE scheduler, VALUE io)
|
||||
{
|
||||
return rb_scheduler_io_wait(scheduler, io, RB_UINT2NUM(RUBY_IO_WRITABLE), Qnil);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length)
|
||||
{
|
||||
return rb_funcall(scheduler, id_io_read, 4, io, buffer, offset, length);
|
||||
}
|
||||
|
||||
VALUE rb_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, VALUE offset, VALUE length)
|
||||
{
|
||||
// We should ensure string has capacity to receive data, and then resize it afterwards.
|
||||
return rb_funcall(scheduler, id_io_write, 4, io, buffer, offset, length);
|
||||
}
|
|
@ -14,15 +14,11 @@ class Scheduler
|
|||
@readable = {}
|
||||
@writable = {}
|
||||
@waiting = {}
|
||||
@blocking = []
|
||||
|
||||
@ios = ObjectSpace::WeakMap.new
|
||||
end
|
||||
|
||||
attr :readable
|
||||
attr :writable
|
||||
attr :waiting
|
||||
attr :blocking
|
||||
|
||||
def next_timeout
|
||||
_fiber, timeout = @waiting.min_by{|key, value| value}
|
||||
|
@ -70,47 +66,11 @@ class Scheduler
|
|||
end
|
||||
end
|
||||
|
||||
def for_fd(fd)
|
||||
@ios[fd] ||= ::IO.for_fd(fd, autoclose: false)
|
||||
end
|
||||
|
||||
def wait_readable(io)
|
||||
@readable[io] = Fiber.current
|
||||
|
||||
Fiber.yield
|
||||
|
||||
@readable.delete(io)
|
||||
|
||||
return true
|
||||
end
|
||||
|
||||
def wait_readable_fd(fd)
|
||||
wait_readable(
|
||||
for_fd(fd)
|
||||
)
|
||||
end
|
||||
|
||||
def wait_writable(io)
|
||||
@writable[io] = Fiber.current
|
||||
|
||||
Fiber.yield
|
||||
|
||||
@writable.delete(io)
|
||||
|
||||
return true
|
||||
end
|
||||
|
||||
def wait_writable_fd(fd)
|
||||
wait_writable(
|
||||
for_fd(fd)
|
||||
)
|
||||
end
|
||||
|
||||
def current_time
|
||||
Process.clock_gettime(Process::CLOCK_MONOTONIC)
|
||||
end
|
||||
|
||||
def wait_sleep(duration = nil)
|
||||
def kernel_sleep(duration = nil)
|
||||
@waiting[Fiber.current] = current_time + duration
|
||||
|
||||
Fiber.yield
|
||||
|
@ -118,7 +78,7 @@ class Scheduler
|
|||
return true
|
||||
end
|
||||
|
||||
def wait_any(io, events, duration)
|
||||
def io_wait(io, events, duration)
|
||||
unless (events & IO::READABLE).zero?
|
||||
@readable[io] = Fiber.current
|
||||
end
|
||||
|
@ -135,23 +95,6 @@ class Scheduler
|
|||
return true
|
||||
end
|
||||
|
||||
def wait_for_single_fd(fd, events, duration)
|
||||
wait_any(
|
||||
for_fd(fd),
|
||||
events,
|
||||
duration
|
||||
)
|
||||
end
|
||||
|
||||
def enter_blocking_region
|
||||
# puts "Enter blocking region: #{caller.first}"
|
||||
end
|
||||
|
||||
def exit_blocking_region
|
||||
# puts "Exit blocking region: #{caller.first}"
|
||||
@blocking << caller.first
|
||||
end
|
||||
|
||||
def fiber(&block)
|
||||
fiber = Fiber.new(blocking: false, &block)
|
||||
|
||||
|
|
|
@ -10,20 +10,4 @@ class TestFiberScheduler < Test::Unit::TestCase
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
def test_fiber_blocking
|
||||
scheduler = Scheduler.new
|
||||
|
||||
thread = Thread.new do
|
||||
Thread.current.scheduler = scheduler
|
||||
|
||||
# Close is always a blocking operation.
|
||||
IO.pipe.each(&:close)
|
||||
end
|
||||
|
||||
thread.join
|
||||
|
||||
assert_not_empty scheduler.blocking
|
||||
assert_match(/test_scheduler\.rb:\d+:in `close'/, scheduler.blocking.last)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -43,5 +43,4 @@ class TestFiberSleep < Test::Unit::TestCase
|
|||
|
||||
assert_operator seconds, :>=, 2, "actual: %p" % seconds
|
||||
end
|
||||
|
||||
end
|
||||
|
|
44
thread.c
44
thread.c
|
@ -112,8 +112,6 @@ static VALUE sym_immediate;
|
|||
static VALUE sym_on_blocking;
|
||||
static VALUE sym_never;
|
||||
|
||||
static ID id_wait_for_single_fd;
|
||||
|
||||
enum SLEEP_FLAGS {
|
||||
SLEEP_DEADLOCKABLE = 0x1,
|
||||
SLEEP_SPURIOUS_CHECK = 0x2
|
||||
|
@ -1603,7 +1601,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||
rb_thread_t *th = rb_ec_thread_ptr(ec);
|
||||
int saved_errno = 0;
|
||||
VALUE ubf_th = Qfalse;
|
||||
VALUE scheduler = th->scheduler;
|
||||
|
||||
if (ubf == RUBY_UBF_IO || ubf == RUBY_UBF_PROCESS) {
|
||||
ubf = ubf_select;
|
||||
|
@ -1618,10 +1615,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||
}
|
||||
}
|
||||
|
||||
if (scheduler != Qnil) {
|
||||
rb_funcall(scheduler, rb_intern("enter_blocking_region"), 0);
|
||||
}
|
||||
|
||||
BLOCKING_REGION(th, {
|
||||
val = func(data1);
|
||||
saved_errno = errno;
|
||||
|
@ -1637,10 +1630,6 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||
thread_value(rb_thread_kill(ubf_th));
|
||||
}
|
||||
|
||||
if (scheduler != Qnil) {
|
||||
rb_funcall(scheduler, rb_intern("exit_blocking_region"), 0);
|
||||
}
|
||||
|
||||
errno = saved_errno;
|
||||
|
||||
return val;
|
||||
|
@ -3749,7 +3738,7 @@ rb_thread_scheduler(VALUE klass)
|
|||
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
}
|
||||
|
||||
static VALUE
|
||||
VALUE
|
||||
rb_thread_current_scheduler()
|
||||
{
|
||||
return rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
|
@ -4332,15 +4321,6 @@ rb_thread_fd_select(int max, rb_fdset_t * read, rb_fdset_t * write, rb_fdset_t *
|
|||
return (int)rb_ensure(do_select, (VALUE)&set, select_set_free, (VALUE)&set);
|
||||
}
|
||||
|
||||
static VALUE
|
||||
rb_thread_timeout(struct timeval *timeout) {
|
||||
if (timeout) {
|
||||
return rb_float_new((double)timeout->tv_sec + (0.000001f * timeout->tv_usec));
|
||||
}
|
||||
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
#ifdef USE_POLL
|
||||
|
||||
/* The same with linux kernel. TODO: make platform independent definition. */
|
||||
|
@ -4356,7 +4336,7 @@ rb_thread_timeout(struct timeval *timeout) {
|
|||
* returns a mask of events
|
||||
*/
|
||||
int
|
||||
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||
{
|
||||
struct pollfd fds[2];
|
||||
int result = 0, lerrno;
|
||||
|
@ -4367,14 +4347,6 @@ rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
|||
struct waiting_fd wfd;
|
||||
int state;
|
||||
|
||||
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
|
||||
rb_thread_timeout(timeout)
|
||||
);
|
||||
return RTEST(result);
|
||||
}
|
||||
|
||||
wfd.th = GET_THREAD();
|
||||
wfd.fd = fd;
|
||||
|
||||
|
@ -4513,16 +4485,8 @@ select_single_cleanup(VALUE ptr)
|
|||
}
|
||||
|
||||
int
|
||||
rb_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||
rb_thread_wait_for_single_fd(int fd, int events, struct timeval *timeout)
|
||||
{
|
||||
VALUE scheduler = rb_thread_scheduler_if_nonblocking(rb_thread_current());
|
||||
if (scheduler != Qnil) {
|
||||
VALUE result = rb_funcall(scheduler, id_wait_for_single_fd, 3, INT2NUM(fd), INT2NUM(events),
|
||||
rb_thread_timeout(timeout)
|
||||
);
|
||||
return RTEST(result);
|
||||
}
|
||||
|
||||
rb_fdset_t rfds, wfds, efds;
|
||||
struct select_args args;
|
||||
int r;
|
||||
|
@ -5450,8 +5414,6 @@ Init_Thread(void)
|
|||
sym_immediate = ID2SYM(rb_intern("immediate"));
|
||||
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
|
||||
|
||||
id_wait_for_single_fd = rb_intern("wait_for_single_fd");
|
||||
|
||||
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
|
||||
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
|
||||
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
|
||||
|
|
Loading…
Reference in a new issue