1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

Add support for non-blocking Process.wait.

This commit is contained in:
Samuel Williams 2020-12-08 09:29:09 +13:00
parent a4a92ae6d9
commit 2553c5f94a
Notes: git 2020-12-09 04:56:08 +09:00
10 changed files with 262 additions and 46 deletions

View file

@ -10002,6 +10002,7 @@ process.$(OBJEXT): $(top_srcdir)/internal/thread.h
process.$(OBJEXT): $(top_srcdir)/internal/variable.h
process.$(OBJEXT): $(top_srcdir)/internal/vm.h
process.$(OBJEXT): $(top_srcdir)/internal/warnings.h
process.$(OBJEXT): {$(VPATH)}$(COROUTINE_H)
process.$(OBJEXT): {$(VPATH)}assert.h
process.$(OBJEXT): {$(VPATH)}backward/2/assume.h
process.$(OBJEXT): {$(VPATH)}backward/2/attributes.h

View file

@ -12,6 +12,17 @@ This is the interface you need to implement.
~~~ ruby
class Scheduler
# Wait for the specified process ID to exit.
# This hook is optional.
# @parameter pid [Integer] The process ID to wait for.
# @parameter flags [Integer] A bit-mask of flags suitable for `Process::Status.wait`.
# @returns [Process::Status] A process status instance.
def process_wait(pid, flags)
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
# Wait for the given file descriptor to match the specified events within
# the specified timeout.
# @parameter event [Integer] A bit mask of `IO::READABLE`,

View file

@ -28,7 +28,9 @@
RBIMPL_SYMBOL_EXPORT_BEGIN()
/* process.c */
void rb_last_status_set(int status, rb_pid_t pid);
RUBY_EXTERN void (* rb_socket_before_fork_func)();
void rb_last_status_set(rb_pid_t pid, int status, int error);
VALUE rb_last_status_get(void);
int rb_proc_exec(const char*);

View file

@ -25,6 +25,9 @@ VALUE rb_scheduler_close(VALUE scheduler);
VALUE rb_scheduler_kernel_sleep(VALUE scheduler, VALUE duration);
VALUE rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv);
int rb_scheduler_supports_process_wait(VALUE scheduler);
VALUE rb_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags);
VALUE rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout);
VALUE rb_scheduler_unblock(VALUE scheduler, VALUE blocker, VALUE fiber);

10
io.c
View file

@ -4913,9 +4913,9 @@ fptr_waitpid(rb_io_t *fptr, int nohang)
{
int status;
if (fptr->pid) {
rb_last_status_clear();
rb_waitpid(fptr->pid, &status, nohang ? WNOHANG : 0);
fptr->pid = 0;
rb_last_status_clear();
rb_waitpid(fptr->pid, &status, nohang ? WNOHANG : 0);
fptr->pid = 0;
}
}
@ -6433,11 +6433,11 @@ pipe_finalize(rb_io_t *fptr, int noraise)
#if !defined(HAVE_WORKING_FORK) && !defined(_WIN32)
int status = 0;
if (fptr->stdio_file) {
status = pclose(fptr->stdio_file);
status = pclose(fptr->stdio_file);
}
fptr->fd = -1;
fptr->stdio_file = 0;
rb_last_status_set(status, fptr->pid);
rb_last_status_set(fptr->pid, status, 0);
#else
fptr_finalize(fptr, noraise);
#endif

216
process.c
View file

@ -14,6 +14,7 @@
#include "ruby/internal/config.h"
#include "internal/scheduler.h"
#include "coroutine/Stack.h"
#include <ctype.h>
#include <errno.h>
@ -568,6 +569,27 @@ proc_get_ppid(VALUE _)
static VALUE rb_cProcessStatus;
struct rb_process_status {
rb_pid_t pid;
int status;
int error;
};
static const rb_data_type_t rb_process_status_type = {
.wrap_struct_name = "Process::Status",
.function = {
.dfree = RUBY_DEFAULT_FREE,
},
.data = NULL,
.flags = RUBY_TYPED_FREE_IMMEDIATELY,
};
static VALUE rb_process_status_allocate(VALUE klass) {
struct rb_process_status *data = NULL;
return TypedData_Make_Struct(klass, struct rb_process_status, &rb_process_status_type, data);
}
VALUE
rb_last_status_get(void)
{
@ -596,13 +618,20 @@ proc_s_last_status(VALUE mod)
}
void
rb_last_status_set(int status, rb_pid_t pid)
rb_last_status_set(rb_pid_t pid, int status, int error)
{
rb_thread_t *th = GET_THREAD();
th->last_status = rb_obj_alloc(rb_cProcessStatus);
rb_ivar_set(th->last_status, id_status, INT2FIX(status));
rb_ivar_set(th->last_status, id_pid, PIDT2NUM(pid));
rb_obj_freeze(th->last_status);
VALUE last_status = rb_process_status_allocate(rb_cProcessStatus);
struct rb_process_status *data = RTYPEDDATA_DATA(last_status);
data->pid = pid;
data->status = status;
data->error = error;
rb_obj_freeze(last_status);
th->last_status = last_status;
}
void
@ -624,9 +653,11 @@ rb_last_status_clear(void)
*/
static VALUE
pst_to_i(VALUE st)
pst_to_i(VALUE self)
{
return rb_ivar_get(st, id_status);
struct rb_process_status *data = RTYPEDDATA_DATA(self);
return RB_INT2NUM(data->status);
}
#define PST2INT(st) NUM2INT(pst_to_i(st))
@ -643,9 +674,11 @@ pst_to_i(VALUE st)
*/
static VALUE
pst_pid(VALUE st)
pst_pid(VALUE self)
{
return rb_attr_get(st, id_pid);
struct rb_process_status *data = RTYPEDDATA_DATA(self);
return PIDT2NUM(data->pid);
}
static VALUE pst_message_status(VALUE str, int status);
@ -1104,6 +1137,7 @@ waitpid_state_init(struct waitpid_state *w, rb_pid_t pid, int options)
w->ret = 0;
w->pid = pid;
w->options = options;
w->errnum = 0;
}
static const rb_hrtime_t *
@ -1214,8 +1248,10 @@ waitpid_wait(struct waitpid_state *w)
*/
rb_native_mutex_lock(&vm->waitpid_lock);
if (w->pid > 0 || list_empty(&vm->waiting_pids))
if (w->pid > 0 || list_empty(&vm->waiting_pids)) {
w->ret = do_waitpid(w->pid, &w->status, w->options | WNOHANG);
}
if (w->ret) {
if (w->ret == -1) w->errnum = errno;
}
@ -1264,35 +1300,125 @@ waitpid_no_SIGCHLD(struct waitpid_state *w)
w->errnum = errno;
}
/*
* call-seq:
* Process::Status.wait(pid=-1, flags=0) -> Process::Status
*
* Waits for a child process to exit and returns a Process::Status object
* containing information on that process. Which child it waits on
* depends on the value of _pid_:
*
* > 0:: Waits for the child whose process ID equals _pid_.
*
* 0:: Waits for any child whose process group ID equals that of the
* calling process.
*
* -1:: Waits for any child process (the default if no _pid_ is
* given).
*
* < -1:: Waits for any child whose process group ID equals the absolute
* value of _pid_.
*
* The _flags_ argument may be a logical or of the flag values
* Process::WNOHANG (do not block if no child available)
* or Process::WUNTRACED (return stopped children that
* haven't been reported). Not all flags are available on all
* platforms, but a flag value of zero will work on all platforms.
*
* Calling this method raises a SystemCallError if there are no child
* processes. Not available on all platforms.
*
* May invoke the scheduler hook _process_wait_.
*
* fork { exit 99 } #=> 27429
* Process::Status.wait #=> pid 27429 exit 99
* $? #=> nil
*
* pid = fork { sleep 3 } #=> 27440
* Time.now #=> 2008-03-08 19:56:16 +0900
* Process::Status.wait(pid, Process::WNOHANG) #=> nil
* Time.now #=> 2008-03-08 19:56:16 +0900
* Process::Status.wait(pid, 0) #=> pid 27440 exit 99
* Time.now #=> 2008-03-08 19:56:19 +0900
*/
VALUE rb_process_status_wait(rb_pid_t pid, int flags)
{
// We only enter the scheduler if we are "blocking":
if (!(flags & WNOHANG)) {
VALUE scheduler = rb_scheduler_current();
if (rb_scheduler_supports_process_wait(scheduler)) {
return rb_scheduler_process_wait(scheduler, pid, flags);
}
}
COROUTINE_STACK_LOCAL(struct waitpid_state, w);
waitpid_state_init(w, pid, flags);
w->ec = GET_EC();
if (WAITPID_USE_SIGCHLD) {
waitpid_wait(w);
}
else {
waitpid_no_SIGCHLD(w);
}
if (w->ret > 0) {
if (ruby_nocldwait) {
w->ret = -1;
w->errnum = ECHILD;
}
}
VALUE status = rb_process_status_allocate(rb_cProcessStatus);
struct rb_process_status *data = RTYPEDDATA_DATA(status);
data->pid = w->ret;
data->status = w->status;
data->error = w->errnum;
COROUTINE_STACK_FREE(w);
return status;
}
VALUE
rb_process_status_waitv(int argc, VALUE *argv, VALUE _)
{
rb_check_arity(argc, 0, 2);
rb_pid_t pid = -1;
int flags = 0;
if (argc >= 1) {
pid = NUM2PIDT(argv[0]);
}
if (argc >= 2) {
flags = RB_NUM2INT(argv[1]);
}
return rb_process_status_wait(pid, flags);
}
rb_pid_t
rb_waitpid(rb_pid_t pid, int *st, int flags)
{
struct waitpid_state w;
VALUE status = rb_process_status_wait(pid, flags);
struct rb_process_status *data = RTYPEDDATA_DATA(status);
waitpid_state_init(&w, pid, flags);
w.ec = GET_EC();
if (st) *st = data->status;
if (WAITPID_USE_SIGCHLD) {
waitpid_wait(&w);
if (data->pid == -1) {
errno = data->error;
}
else {
waitpid_no_SIGCHLD(&w);
rb_obj_freeze(status);
GET_THREAD()->last_status = status;
}
if (st) *st = w.status;
if (w.ret == -1) {
errno = w.errnum;
}
else if (w.ret > 0) {
if (ruby_nocldwait) {
w.ret = -1;
errno = ECHILD;
}
else {
rb_last_status_set(w.status, w.ret);
}
}
return w.ret;
RB_GC_GUARD(status);
return data->pid;
}
static VALUE
@ -1312,12 +1438,15 @@ proc_wait(int argc, VALUE *argv)
flags = NUM2UINT(vflags);
}
}
if ((pid = rb_waitpid(pid, &status, flags)) < 0)
rb_sys_fail(0);
if (pid == 0) {
rb_last_status_clear();
return Qnil;
}
return PIDT2NUM(pid);
}
@ -4411,8 +4540,7 @@ rb_spawn_process(struct rb_execarg *eargp, char *errmsg, size_t errmsg_buflen)
#endif
#if defined HAVE_WORKING_FORK && !USE_SPAWNV
pid = fork_check_err(0, rb_exec_atfork, eargp, eargp->redirect_fds,
errmsg, errmsg_buflen, eargp);
pid = fork_check_err(0, rb_exec_atfork, eargp, eargp->redirect_fds, errmsg, errmsg_buflen, eargp);
#else
prog = eargp->use_shell ? eargp->invoke.sh.shell_script : eargp->invoke.cmd.command_name;
@ -4426,32 +4554,37 @@ rb_spawn_process(struct rb_execarg *eargp, char *errmsg, size_t errmsg_buflen)
}
# if defined HAVE_SPAWNV
if (eargp->use_shell) {
pid = proc_spawn_sh(RSTRING_PTR(prog));
pid = proc_spawn_sh(RSTRING_PTR(prog));
}
else {
char **argv = ARGVSTR2ARGV(eargp->invoke.cmd.argv_str);
pid = proc_spawn_cmd(argv, prog, eargp);
pid = proc_spawn_cmd(argv, prog, eargp);
}
if (pid == -1) {
rb_last_status_set(pid, 0x7f << 8, 0);
}
if (pid == -1)
rb_last_status_set(0x7f << 8, 0);
# else
status = system(rb_execarg_commandline(eargp, &prog));
rb_last_status_set((status & 0xff) << 8, 0);
pid = 1; /* dummy */
rb_last_status_set(pid, (status & 0xff) << 8, 0);
# endif
if (eargp->waitpid_state && eargp->waitpid_state != WAITPID_LOCK_ONLY) {
eargp->waitpid_state->pid = pid;
}
rb_execarg_run_options(&sarg, NULL, errmsg, errmsg_buflen);
#endif
return pid;
}
struct spawn_args {
VALUE execarg;
struct {
char *ptr;
size_t buflen;
char *ptr;
size_t buflen;
} errmsg;
};
@ -4587,7 +4720,7 @@ rb_f_system(int argc, VALUE *argv, VALUE _)
else {
waitpid_no_SIGCHLD(w);
}
rb_last_status_set(w->status, w->ret);
rb_last_status_set(w->ret, w->status, 0);
}
#endif
if (w->pid < 0 /* fork failure */ || pid < 0 /* exec failure */) {
@ -8502,8 +8635,11 @@ InitVM_process(void)
rb_define_method(rb_cWaiter, "pid", detach_process_pid, 0);
rb_cProcessStatus = rb_define_class_under(rb_mProcess, "Status", rb_cObject);
rb_define_alloc_func(rb_cProcessStatus, rb_process_status_allocate);
rb_undef_method(CLASS_OF(rb_cProcessStatus), "new");
rb_define_singleton_method(rb_cProcessStatus, "wait", rb_process_status_waitv, -1);
rb_define_method(rb_cProcessStatus, "==", pst_equal, 1);
rb_define_method(rb_cProcessStatus, "&", pst_bitand, 1);
rb_define_method(rb_cProcessStatus, ">>", pst_rshift, 1);

View file

@ -18,6 +18,7 @@ static ID id_block;
static ID id_unblock;
static ID id_kernel_sleep;
static ID id_process_wait;
static ID id_io_read;
static ID id_io_write;
@ -32,6 +33,7 @@ Init_Scheduler(void)
id_unblock = rb_intern_const("unblock");
id_kernel_sleep = rb_intern_const("kernel_sleep");
id_process_wait = rb_intern_const("process_wait");
id_io_read = rb_intern_const("io_read");
id_io_write = rb_intern_const("io_write");
@ -118,6 +120,18 @@ rb_scheduler_kernel_sleepv(VALUE scheduler, int argc, VALUE * argv)
return rb_funcallv(scheduler, id_kernel_sleep, argc, argv);
}
int
rb_scheduler_supports_process_wait(VALUE scheduler)
{
return rb_respond_to(scheduler, id_process_wait);
}
VALUE
rb_scheduler_process_wait(VALUE scheduler, rb_pid_t pid, int flags)
{
return rb_funcall(scheduler, id_process_wait, 2, PIDT2NUM(pid), RB_INT2NUM(flags));
}
VALUE
rb_scheduler_block(VALUE scheduler, VALUE blocker, VALUE timeout)
{

View file

@ -117,6 +117,13 @@ class Scheduler
Process.clock_gettime(Process::CLOCK_MONOTONIC)
end
def process_wait(pid, flags)
# This is a very simple way to implement a non-blocking wait:
Thread.new do
Process::Status.wait(pid, flags)
end.value
end
def io_wait(io, events, duration)
unless (events & IO::READABLE).zero?
@readable[io] = Fiber.current

View file

@ -0,0 +1,36 @@
# frozen_string_literal: true
require 'test/unit'
require_relative 'scheduler'
class TestFiberProcess < Test::Unit::TestCase
def test_process_wait
Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
pid = Process.spawn("true")
Process.wait(pid)
# TODO test that scheduler was invoked.
assert_predicate $?, :success?
end
end.join
end
def test_system
Thread.new do
scheduler = Scheduler.new
Fiber.set_scheduler scheduler
Fiber.schedule do
system("true")
# TODO test that scheduler was invoked (currently it's not).
assert_predicate $?, :success?
end
end.join
end
end

View file

@ -2492,6 +2492,12 @@ EOS
assert_same(Process.last_status, $?)
end
def test_last_status_failure
assert_nil system("sad")
assert_not_predicate $?, :success?
assert_equal $?.exitstatus, 127
end
def test_exec_failure_leaves_no_child
assert_raise(Errno::ENOENT) do
spawn('inexistent_command')