mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
* thread.c: rename methods:
from Thread.async_interrupt_timing to Thread.handle_interrupt, from Thread.async_interrupted? to Thread.pending_interrupt?. Also rename option from `defer' to `never'. [ruby-core:51074] [ruby-trunk - Feature #6762] * vm_core.c, thread.c: rename functions and data structure `async_errinfo' to `pending_interrupt'. * thread.c: add global variables sym_immediate, sym_on_blocking and sym_never. * cont.c, process.c, vm.c, signal.c: ditto. * lib/sync.rb, lib/thread.rb: catch up this renaming. * test/ruby/test_thread.rb: ditto. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@38577 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
80b55686f0
commit
0f9b33c793
10 changed files with 208 additions and 170 deletions
20
ChangeLog
20
ChangeLog
|
@ -1,3 +1,23 @@
|
||||||
|
Sun Dec 23 19:09:16 2012 Koichi Sasada <ko1@atdot.net>
|
||||||
|
|
||||||
|
* thread.c: rename methods:
|
||||||
|
from Thread.async_interrupt_timing to Thread.handle_interrupt,
|
||||||
|
from Thread.async_interrupted? to Thread.pending_interrupt?.
|
||||||
|
Also rename option from `defer' to `never'.
|
||||||
|
[ruby-core:51074] [ruby-trunk - Feature #6762]
|
||||||
|
|
||||||
|
* vm_core.c, thread.c: rename functions and data structure
|
||||||
|
`async_errinfo' to `pending_interrupt'.
|
||||||
|
|
||||||
|
* thread.c: add global variables sym_immediate, sym_on_blocking and
|
||||||
|
sym_never.
|
||||||
|
|
||||||
|
* cont.c, process.c, vm.c, signal.c: ditto.
|
||||||
|
|
||||||
|
* lib/sync.rb, lib/thread.rb: catch up this renaming.
|
||||||
|
|
||||||
|
* test/ruby/test_thread.rb: ditto.
|
||||||
|
|
||||||
Sun Dec 23 17:57:30 2012 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
Sun Dec 23 17:57:30 2012 Nobuyoshi Nakada <nobu@ruby-lang.org>
|
||||||
|
|
||||||
* lib/profiler.rb (Profiler__::PROFILE_PROC, print_profile): store
|
* lib/profiler.rb (Profiler__::PROFILE_PROC, print_profile): store
|
||||||
|
|
4
cont.c
4
cont.c
|
@ -1162,12 +1162,12 @@ rb_fiber_start(void)
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
if (state == TAG_RAISE || state == TAG_FATAL) {
|
if (state == TAG_RAISE || state == TAG_FATAL) {
|
||||||
rb_threadptr_async_errinfo_enque(th, th->errinfo);
|
rb_threadptr_pending_interrupt_enque(th, th->errinfo);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
|
VALUE err = rb_vm_make_jump_tag_but_local_jump(state, th->errinfo);
|
||||||
if (!NIL_P(err))
|
if (!NIL_P(err))
|
||||||
rb_threadptr_async_errinfo_enque(th, err);
|
rb_threadptr_pending_interrupt_enque(th, err);
|
||||||
}
|
}
|
||||||
RUBY_VM_SET_INTERRUPT(th);
|
RUBY_VM_SET_INTERRUPT(th);
|
||||||
}
|
}
|
||||||
|
|
|
@ -135,7 +135,7 @@ module Sync_m
|
||||||
|
|
||||||
def sync_lock(m = EX)
|
def sync_lock(m = EX)
|
||||||
return unlock if m == UN
|
return unlock if m == UN
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
while true
|
while true
|
||||||
@sync_mutex.synchronize do
|
@sync_mutex.synchronize do
|
||||||
begin
|
begin
|
||||||
|
@ -227,7 +227,7 @@ module Sync_m
|
||||||
end
|
end
|
||||||
|
|
||||||
def sync_synchronize(mode = EX)
|
def sync_synchronize(mode = EX)
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
sync_lock(mode)
|
sync_lock(mode)
|
||||||
begin
|
begin
|
||||||
yield
|
yield
|
||||||
|
|
|
@ -63,9 +63,9 @@ class ConditionVariable
|
||||||
# even if no other thread doesn't signal.
|
# even if no other thread doesn't signal.
|
||||||
#
|
#
|
||||||
def wait(mutex, timeout=nil)
|
def wait(mutex, timeout=nil)
|
||||||
Thread.async_interrupt_timing(StandardError => :defer) do
|
Thread.handle_interrupt(StandardError => :never) do
|
||||||
begin
|
begin
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
@waiters_mutex.synchronize do
|
@waiters_mutex.synchronize do
|
||||||
@waiters[Thread.current] = true
|
@waiters[Thread.current] = true
|
||||||
end
|
end
|
||||||
|
@ -84,7 +84,7 @@ class ConditionVariable
|
||||||
# Wakes up the first thread in line waiting for this lock.
|
# Wakes up the first thread in line waiting for this lock.
|
||||||
#
|
#
|
||||||
def signal
|
def signal
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
begin
|
begin
|
||||||
t, _ = @waiters_mutex.synchronize { @waiters.shift }
|
t, _ = @waiters_mutex.synchronize { @waiters.shift }
|
||||||
t.run if t
|
t.run if t
|
||||||
|
@ -99,7 +99,7 @@ class ConditionVariable
|
||||||
# Wakes up all threads waiting for this lock.
|
# Wakes up all threads waiting for this lock.
|
||||||
#
|
#
|
||||||
def broadcast
|
def broadcast
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
threads = nil
|
threads = nil
|
||||||
@waiters_mutex.synchronize do
|
@waiters_mutex.synchronize do
|
||||||
threads = @waiters.keys
|
threads = @waiters.keys
|
||||||
|
@ -160,7 +160,7 @@ class Queue
|
||||||
# Pushes +obj+ to the queue.
|
# Pushes +obj+ to the queue.
|
||||||
#
|
#
|
||||||
def push(obj)
|
def push(obj)
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
@que.push obj
|
@que.push obj
|
||||||
@cond.signal
|
@cond.signal
|
||||||
|
@ -184,7 +184,7 @@ class Queue
|
||||||
# thread isn't suspended, and an exception is raised.
|
# thread isn't suspended, and an exception is raised.
|
||||||
#
|
#
|
||||||
def pop(non_block=false)
|
def pop(non_block=false)
|
||||||
Thread.async_interrupt_timing(StandardError => :on_blocking) do
|
Thread.handle_interrupt(StandardError => :on_blocking) do
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
while true
|
while true
|
||||||
if @que.empty?
|
if @que.empty?
|
||||||
|
@ -300,7 +300,7 @@ class SizedQueue < Queue
|
||||||
# until space becomes available.
|
# until space becomes available.
|
||||||
#
|
#
|
||||||
def push(obj)
|
def push(obj)
|
||||||
Thread.async_interrupt_timing(RuntimeError => :on_blocking) do
|
Thread.handle_interrupt(RuntimeError => :on_blocking) do
|
||||||
@mutex.synchronize do
|
@mutex.synchronize do
|
||||||
while true
|
while true
|
||||||
break if @que.length < @max
|
break if @que.length < @max
|
||||||
|
|
|
@ -1106,7 +1106,7 @@ after_exec(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
#define before_fork() before_exec()
|
#define before_fork() before_exec()
|
||||||
#define after_fork() (rb_threadptr_async_errinfo_clear(GET_THREAD()), after_exec())
|
#define after_fork() (rb_threadptr_pending_interrupt_clear(GET_THREAD()), after_exec())
|
||||||
|
|
||||||
#include "dln.h"
|
#include "dln.h"
|
||||||
|
|
||||||
|
|
2
signal.c
2
signal.c
|
@ -669,7 +669,7 @@ signal_exec(VALUE cmd, int safe, int sig)
|
||||||
cur_th->interrupt_mask = old_interrupt_mask;
|
cur_th->interrupt_mask = old_interrupt_mask;
|
||||||
|
|
||||||
if (state) {
|
if (state) {
|
||||||
/* XXX: should be replaced with rb_threadptr_async_errinfo_enque() */
|
/* XXX: should be replaced with rb_threadptr_pending_interrupt_enque() */
|
||||||
JUMP_TAG(state);
|
JUMP_TAG(state);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -516,12 +516,12 @@ class TestThread < Test::Unit::TestCase
|
||||||
assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083)
|
assert_equal("Can't call on top of Fiber or Thread", error.message, bug5083)
|
||||||
end
|
end
|
||||||
|
|
||||||
def make_async_interrupt_timing_test_thread1 flag
|
def make_handle_interrupt_test_thread1 flag
|
||||||
r = []
|
r = []
|
||||||
ready_p = false
|
ready_p = false
|
||||||
th = Thread.new{
|
th = Thread.new{
|
||||||
begin
|
begin
|
||||||
Thread.async_interrupt_timing(RuntimeError => flag){
|
Thread.handle_interrupt(RuntimeError => flag){
|
||||||
begin
|
begin
|
||||||
ready_p = true
|
ready_p = true
|
||||||
sleep 0.5
|
sleep 0.5
|
||||||
|
@ -543,46 +543,46 @@ class TestThread < Test::Unit::TestCase
|
||||||
r
|
r
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_timing
|
def test_handle_interrupt
|
||||||
[[:defer, :c2],
|
[[:never, :c2],
|
||||||
[:immediate, :c1],
|
[:immediate, :c1],
|
||||||
[:on_blocking, :c1]].each{|(flag, c)|
|
[:on_blocking, :c1]].each{|(flag, c)|
|
||||||
assert_equal([flag, c], [flag] + make_async_interrupt_timing_test_thread1(flag))
|
assert_equal([flag, c], [flag] + make_handle_interrupt_test_thread1(flag))
|
||||||
}
|
}
|
||||||
# TODO: complex cases are needed.
|
# TODO: complex cases are needed.
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_timing_invalid_argument
|
def test_handle_interrupt_invalid_argument
|
||||||
assert_raise(ArgumentError) {
|
assert_raise(ArgumentError) {
|
||||||
Thread.async_interrupt_timing(RuntimeError => :immediate) # no block
|
Thread.handle_interrupt(RuntimeError => :immediate) # no block
|
||||||
}
|
}
|
||||||
assert_raise(ArgumentError) {
|
assert_raise(ArgumentError) {
|
||||||
Thread.async_interrupt_timing(RuntimeError => :never) {} # never?
|
Thread.handle_interrupt(RuntimeError => :xyzzy) {}
|
||||||
}
|
}
|
||||||
assert_raise(TypeError) {
|
assert_raise(TypeError) {
|
||||||
Thread.async_interrupt_timing([]) {} # array
|
Thread.handle_interrupt([]) {} # array
|
||||||
}
|
}
|
||||||
end
|
end
|
||||||
|
|
||||||
def for_test_async_interrupt_with_return
|
def for_test_handle_interrupt_with_return
|
||||||
Thread.async_interrupt_timing(Object => :defer){
|
Thread.handle_interrupt(Object => :never){
|
||||||
Thread.current.raise RuntimeError.new("have to be rescured")
|
Thread.current.raise RuntimeError.new("have to be rescured")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rescue
|
rescue
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_with_return
|
def test_handle_interrupt_with_return
|
||||||
assert_nothing_raised do
|
assert_nothing_raised do
|
||||||
for_test_async_interrupt_with_return
|
for_test_handle_interrupt_with_return
|
||||||
_dummy_for_check_ints=nil
|
_dummy_for_check_ints=nil
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_with_break
|
def test_handle_interrupt_with_break
|
||||||
assert_nothing_raised do
|
assert_nothing_raised do
|
||||||
begin
|
begin
|
||||||
Thread.async_interrupt_timing(Object => :defer){
|
Thread.handle_interrupt(Object => :never){
|
||||||
Thread.current.raise RuntimeError.new("have to be rescured")
|
Thread.current.raise RuntimeError.new("have to be rescured")
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
@ -592,13 +592,13 @@ class TestThread < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_blocking
|
def test_handle_interrupt_blocking
|
||||||
r=:ng
|
r=:ng
|
||||||
e=Class.new(Exception)
|
e=Class.new(Exception)
|
||||||
th_s = Thread.current
|
th_s = Thread.current
|
||||||
begin
|
begin
|
||||||
th = Thread.start{
|
th = Thread.start{
|
||||||
Thread.async_interrupt_timing(Object => :on_blocking){
|
Thread.handle_interrupt(Object => :on_blocking){
|
||||||
begin
|
begin
|
||||||
Thread.current.raise RuntimeError
|
Thread.current.raise RuntimeError
|
||||||
r=:ok
|
r=:ok
|
||||||
|
@ -617,12 +617,12 @@ class TestThread < Test::Unit::TestCase
|
||||||
assert_equal(:ok,r)
|
assert_equal(:ok,r)
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_and_io
|
def test_handle_interrupt_and_io
|
||||||
assert_in_out_err([], <<-INPUT, %w(ok), [])
|
assert_in_out_err([], <<-INPUT, %w(ok), [])
|
||||||
th_waiting = true
|
th_waiting = true
|
||||||
|
|
||||||
t = Thread.new {
|
t = Thread.new {
|
||||||
Thread.async_interrupt_timing(RuntimeError => :on_blocking) {
|
Thread.handle_interrupt(RuntimeError => :on_blocking) {
|
||||||
nil while th_waiting
|
nil while th_waiting
|
||||||
# async interrupt should be raised _before_ writing puts arguments
|
# async interrupt should be raised _before_ writing puts arguments
|
||||||
puts "ng"
|
puts "ng"
|
||||||
|
@ -637,12 +637,12 @@ class TestThread < Test::Unit::TestCase
|
||||||
INPUT
|
INPUT
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupt_and_p
|
def test_handle_interrupt_and_p
|
||||||
assert_in_out_err([], <<-INPUT, %w(:ok :ok), [])
|
assert_in_out_err([], <<-INPUT, %w(:ok :ok), [])
|
||||||
th_waiting = true
|
th_waiting = true
|
||||||
|
|
||||||
t = Thread.new {
|
t = Thread.new {
|
||||||
Thread.async_interrupt_timing(RuntimeError => :on_blocking) {
|
Thread.handle_interrupt(RuntimeError => :on_blocking) {
|
||||||
nil while th_waiting
|
nil while th_waiting
|
||||||
# p shouldn't provide interruptible point
|
# p shouldn't provide interruptible point
|
||||||
p :ok
|
p :ok
|
||||||
|
@ -657,9 +657,9 @@ class TestThread < Test::Unit::TestCase
|
||||||
INPUT
|
INPUT
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_async_interrupted?
|
def test_handle_interrupted?
|
||||||
q = Queue.new
|
q = Queue.new
|
||||||
Thread.async_interrupt_timing(RuntimeError => :defer){
|
Thread.handle_interrupt(RuntimeError => :never){
|
||||||
th = Thread.new{
|
th = Thread.new{
|
||||||
q.push :e
|
q.push :e
|
||||||
begin
|
begin
|
||||||
|
@ -669,7 +669,7 @@ class TestThread < Test::Unit::TestCase
|
||||||
q.push :ng1
|
q.push :ng1
|
||||||
end
|
end
|
||||||
begin
|
begin
|
||||||
Thread.async_interrupt_timing(Object => :immediate){} if Thread.async_interrupted?
|
Thread.handle_interrupthandle_interrupt(Object => :immediate){} if Thread.pending_interrupt?
|
||||||
rescue => e
|
rescue => e
|
||||||
q.push :ok
|
q.push :ok
|
||||||
end
|
end
|
||||||
|
|
258
thread.c
258
thread.c
|
@ -63,13 +63,17 @@
|
||||||
VALUE rb_cMutex;
|
VALUE rb_cMutex;
|
||||||
VALUE rb_cThreadShield;
|
VALUE rb_cThreadShield;
|
||||||
|
|
||||||
|
static VALUE sym_immediate;
|
||||||
|
static VALUE sym_on_blocking;
|
||||||
|
static VALUE sym_never;
|
||||||
|
|
||||||
static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check);
|
static void sleep_timeval(rb_thread_t *th, struct timeval time, int spurious_check);
|
||||||
static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check);
|
static void sleep_wait_for_interrupt(rb_thread_t *th, double sleepsec, int spurious_check);
|
||||||
static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check);
|
static void sleep_forever(rb_thread_t *th, int nodeadlock, int spurious_check);
|
||||||
static double timeofday(void);
|
static double timeofday(void);
|
||||||
static int rb_threadptr_dead(rb_thread_t *th);
|
static int rb_threadptr_dead(rb_thread_t *th);
|
||||||
static void rb_check_deadlock(rb_vm_t *vm);
|
static void rb_check_deadlock(rb_vm_t *vm);
|
||||||
static int rb_threadptr_async_errinfo_empty_p(rb_thread_t *th);
|
static int rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th);
|
||||||
|
|
||||||
#define eKillSignal INT2FIX(0)
|
#define eKillSignal INT2FIX(0)
|
||||||
#define eTerminateSignal INT2FIX(1)
|
#define eTerminateSignal INT2FIX(1)
|
||||||
|
@ -344,7 +348,7 @@ terminate_i(st_data_t key, st_data_t val, rb_thread_t *main_thread)
|
||||||
|
|
||||||
if (th != main_thread) {
|
if (th != main_thread) {
|
||||||
thread_debug("terminate_i: %p\n", (void *)th);
|
thread_debug("terminate_i: %p\n", (void *)th);
|
||||||
rb_threadptr_async_errinfo_enque(th, eTerminateSignal);
|
rb_threadptr_pending_interrupt_enque(th, eTerminateSignal);
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_interrupt(th);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
@ -599,10 +603,10 @@ thread_create_core(VALUE thval, VALUE args, VALUE (*fn)(ANYARGS))
|
||||||
th->priority = current_th->priority;
|
th->priority = current_th->priority;
|
||||||
th->thgroup = current_th->thgroup;
|
th->thgroup = current_th->thgroup;
|
||||||
|
|
||||||
th->async_errinfo_queue = rb_ary_tmp_new(0);
|
th->pending_interrupt_queue = rb_ary_tmp_new(0);
|
||||||
th->async_errinfo_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
th->async_errinfo_mask_stack = rb_ary_dup(current_th->async_errinfo_mask_stack);
|
th->pending_interrupt_mask_stack = rb_ary_dup(current_th->pending_interrupt_mask_stack);
|
||||||
RBASIC(th->async_errinfo_mask_stack)->klass = 0;
|
RBASIC(th->pending_interrupt_mask_stack)->klass = 0;
|
||||||
|
|
||||||
th->interrupt_mask = 0;
|
th->interrupt_mask = 0;
|
||||||
|
|
||||||
|
@ -1418,47 +1422,47 @@ thread_s_pass(VALUE klass)
|
||||||
/*****************************************************/
|
/*****************************************************/
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* rb_threadptr_async_errinfo_* - manage async errors queue
|
* rb_threadptr_pending_interrupt_* - manage asynchronous error queue
|
||||||
*
|
*
|
||||||
* Async events such as an exception throwed by Thread#raise,
|
* Async events such as an exception throwed by Thread#raise,
|
||||||
* Thread#kill and thread termination (after main thread termination)
|
* Thread#kill and thread termination (after main thread termination)
|
||||||
* will be queued to th->async_errinfo_queue.
|
* will be queued to th->pending_interrupt_queue.
|
||||||
* - clear: clear the queue.
|
* - clear: clear the queue.
|
||||||
* - enque: enque err object into queue.
|
* - enque: enque err object into queue.
|
||||||
* - deque: deque err object from queue.
|
* - deque: deque err object from queue.
|
||||||
* - active_p: return 1 if the queue should be checked.
|
* - active_p: return 1 if the queue should be checked.
|
||||||
*
|
*
|
||||||
* All rb_threadptr_async_errinfo_* functions are called by
|
* All rb_threadptr_pending_interrupt_* functions are called by
|
||||||
* a GVL acquired thread, of course.
|
* a GVL acquired thread, of course.
|
||||||
* Note that all "rb_" prefix APIs need GVL to call.
|
* Note that all "rb_" prefix APIs need GVL to call.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_threadptr_async_errinfo_clear(rb_thread_t *th)
|
rb_threadptr_pending_interrupt_clear(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
rb_ary_clear(th->async_errinfo_queue);
|
rb_ary_clear(th->pending_interrupt_queue);
|
||||||
}
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v)
|
rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v)
|
||||||
{
|
{
|
||||||
rb_ary_push(th->async_errinfo_queue, v);
|
rb_ary_push(th->pending_interrupt_queue, v);
|
||||||
th->async_errinfo_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum async_interrupt_timing {
|
enum handle_interrupt_timing {
|
||||||
INTERRUPT_NONE,
|
INTERRUPT_NONE,
|
||||||
INTERRUPT_IMMEDIATE,
|
INTERRUPT_IMMEDIATE,
|
||||||
INTERRUPT_ON_BLOCKING,
|
INTERRUPT_ON_BLOCKING,
|
||||||
INTERRUPT_DEFER
|
INTERRUPT_NEVER
|
||||||
};
|
};
|
||||||
|
|
||||||
static enum async_interrupt_timing
|
static enum handle_interrupt_timing
|
||||||
rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err)
|
rb_threadptr_pending_interrupt_check_mask(rb_thread_t *th, VALUE err)
|
||||||
{
|
{
|
||||||
VALUE mask;
|
VALUE mask;
|
||||||
long mask_stack_len = RARRAY_LEN(th->async_errinfo_mask_stack);
|
long mask_stack_len = RARRAY_LEN(th->pending_interrupt_mask_stack);
|
||||||
VALUE *mask_stack = RARRAY_PTR(th->async_errinfo_mask_stack);
|
VALUE *mask_stack = RARRAY_PTR(th->pending_interrupt_mask_stack);
|
||||||
VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */
|
VALUE ancestors = rb_mod_ancestors(err); /* TODO: GC guard */
|
||||||
long ancestors_len = RARRAY_LEN(ancestors);
|
long ancestors_len = RARRAY_LEN(ancestors);
|
||||||
VALUE *ancestors_ptr = RARRAY_PTR(ancestors);
|
VALUE *ancestors_ptr = RARRAY_PTR(ancestors);
|
||||||
|
@ -1473,14 +1477,14 @@ rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err)
|
||||||
|
|
||||||
/* TODO: remove rb_intern() */
|
/* TODO: remove rb_intern() */
|
||||||
if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
|
if ((sym = rb_hash_aref(mask, klass)) != Qnil) {
|
||||||
if (sym == ID2SYM(rb_intern("immediate"))) {
|
if (sym == sym_immediate) {
|
||||||
return INTERRUPT_IMMEDIATE;
|
return INTERRUPT_IMMEDIATE;
|
||||||
}
|
}
|
||||||
else if (sym == ID2SYM(rb_intern("on_blocking"))) {
|
else if (sym == sym_on_blocking) {
|
||||||
return INTERRUPT_ON_BLOCKING;
|
return INTERRUPT_ON_BLOCKING;
|
||||||
}
|
}
|
||||||
else if (sym == ID2SYM(rb_intern("defer"))) {
|
else if (sym == sym_never) {
|
||||||
return INTERRUPT_DEFER;
|
return INTERRUPT_NEVER;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
rb_raise(rb_eThreadError, "unknown mask signature");
|
rb_raise(rb_eThreadError, "unknown mask signature");
|
||||||
|
@ -1493,17 +1497,17 @@ rb_threadptr_async_errinfo_check_mask(rb_thread_t *th, VALUE err)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
rb_threadptr_async_errinfo_empty_p(rb_thread_t *th)
|
rb_threadptr_pending_interrupt_empty_p(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
return RARRAY_LEN(th->async_errinfo_queue) == 0;
|
return RARRAY_LEN(th->pending_interrupt_queue) == 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
rb_threadptr_async_errinfo_include_p(rb_thread_t *th, VALUE err)
|
rb_threadptr_pending_interrupt_include_p(rb_thread_t *th, VALUE err)
|
||||||
{
|
{
|
||||||
int i;
|
int i;
|
||||||
for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) {
|
for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
|
||||||
VALUE e = RARRAY_PTR(th->async_errinfo_queue)[i];
|
VALUE e = RARRAY_PTR(th->pending_interrupt_queue)[i];
|
||||||
if (rb_class_inherited_p(e, err)) {
|
if (rb_class_inherited_p(e, err)) {
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
@ -1512,15 +1516,15 @@ rb_threadptr_async_errinfo_include_p(rb_thread_t *th, VALUE err)
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum async_interrupt_timing timing)
|
rb_threadptr_pending_interrupt_deque(rb_thread_t *th, enum handle_interrupt_timing timing)
|
||||||
{
|
{
|
||||||
#if 1 /* 1 to enable Thread#async_interrupt_timing, 0 to ignore it */
|
#if 1 /* 1 to enable Thread#handle_interrupt, 0 to ignore it */
|
||||||
int i;
|
int i;
|
||||||
|
|
||||||
for (i=0; i<RARRAY_LEN(th->async_errinfo_queue); i++) {
|
for (i=0; i<RARRAY_LEN(th->pending_interrupt_queue); i++) {
|
||||||
VALUE err = RARRAY_PTR(th->async_errinfo_queue)[i];
|
VALUE err = RARRAY_PTR(th->pending_interrupt_queue)[i];
|
||||||
|
|
||||||
enum async_interrupt_timing mask_timing = rb_threadptr_async_errinfo_check_mask(th, CLASS_OF(err));
|
enum handle_interrupt_timing mask_timing = rb_threadptr_pending_interrupt_check_mask(th, CLASS_OF(err));
|
||||||
|
|
||||||
switch (mask_timing) {
|
switch (mask_timing) {
|
||||||
case INTERRUPT_ON_BLOCKING:
|
case INTERRUPT_ON_BLOCKING:
|
||||||
|
@ -1530,37 +1534,37 @@ rb_threadptr_async_errinfo_deque(rb_thread_t *th, enum async_interrupt_timing ti
|
||||||
/* fall through */
|
/* fall through */
|
||||||
case INTERRUPT_NONE: /* default: IMMEDIATE */
|
case INTERRUPT_NONE: /* default: IMMEDIATE */
|
||||||
case INTERRUPT_IMMEDIATE:
|
case INTERRUPT_IMMEDIATE:
|
||||||
rb_ary_delete_at(th->async_errinfo_queue, i);
|
rb_ary_delete_at(th->pending_interrupt_queue, i);
|
||||||
return err;
|
return err;
|
||||||
case INTERRUPT_DEFER:
|
case INTERRUPT_NEVER:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
th->async_errinfo_queue_checked = 1;
|
th->pending_interrupt_queue_checked = 1;
|
||||||
return Qundef;
|
return Qundef;
|
||||||
#else
|
#else
|
||||||
VALUE err = rb_ary_shift(th->async_errinfo_queue);
|
VALUE err = rb_ary_shift(th->pending_interrupt_queue);
|
||||||
if (rb_threadptr_async_errinfo_empty_p(th)) {
|
if (rb_threadptr_pending_interrupt_empty_p(th)) {
|
||||||
th->async_errinfo_queue_checked = 1;
|
th->pending_interrupt_queue_checked = 1;
|
||||||
}
|
}
|
||||||
return err;
|
return err;
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
|
rb_threadptr_pending_interrupt_active_p(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* For optimization, we don't check async errinfo queue
|
* For optimization, we don't check async errinfo queue
|
||||||
* if it nor a thread interrupt mask were not changed
|
* if it nor a thread interrupt mask were not changed
|
||||||
* since last check.
|
* since last check.
|
||||||
*/
|
*/
|
||||||
if (th->async_errinfo_queue_checked) {
|
if (th->pending_interrupt_queue_checked) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rb_threadptr_async_errinfo_empty_p(th)) {
|
if (rb_threadptr_pending_interrupt_empty_p(th)) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1568,13 +1572,9 @@ rb_threadptr_async_errinfo_active_p(rb_thread_t *th)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
handle_interrupt_arg_check_i(VALUE key, VALUE val)
|
||||||
{
|
{
|
||||||
VALUE immediate = ID2SYM(rb_intern("immediate"));
|
if (val != sym_immediate && val != sym_on_blocking && val != sym_never) {
|
||||||
VALUE on_blocking = ID2SYM(rb_intern("on_blocking"));
|
|
||||||
VALUE defer = ID2SYM(rb_intern("defer"));
|
|
||||||
|
|
||||||
if (val != immediate && val != on_blocking && val != defer) {
|
|
||||||
rb_raise(rb_eArgError, "unknown mask signature");
|
rb_raise(rb_eArgError, "unknown mask signature");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1583,20 +1583,20 @@ async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* call-seq:
|
* call-seq:
|
||||||
* Thread.async_interrupt_timing(hash) { ... } -> result of the block
|
* Thread.handle_interrupt(hash) { ... } -> result of the block
|
||||||
*
|
*
|
||||||
* Thread.Thread#async_interrupt_timing controls async interrupt timing.
|
* Thread.Thread#handle_interrupt changes async interrupt timing.
|
||||||
*
|
*
|
||||||
* _async_interrupt_ means asynchronous event and corresponding procedure
|
* _interrupt_ means asynchronous event and corresponding procedure
|
||||||
* by Thread#raise, Thread#kill, signal trap (not supported yet)
|
* by Thread#raise, Thread#kill, signal trap (not supported yet)
|
||||||
* and main thread termination (if main thread terminates, then all
|
* and main thread termination (if main thread terminates, then all
|
||||||
* other thread will be killed).
|
* other thread will be killed).
|
||||||
*
|
*
|
||||||
* _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol
|
* _hash_ has pairs of ExceptionClass and TimingSymbol. TimingSymbol
|
||||||
* is one of them:
|
* is one of them:
|
||||||
* - :immediate Invoke async interrupt immediately.
|
* - :immediate Invoke interrupts immediately.
|
||||||
* - :on_blocking Invoke async interrupt while _BlockingOperation_.
|
* - :on_blocking Invoke interrupts while _BlockingOperation_.
|
||||||
* - :defer Defer all async interrupt.
|
* - :never Never invoke all interrupts.
|
||||||
*
|
*
|
||||||
* _BlockingOperation_ means that the operation will block the calling thread,
|
* _BlockingOperation_ means that the operation will block the calling thread,
|
||||||
* such as read and write. On CRuby implementation, _BlockingOperation_ is
|
* such as read and write. On CRuby implementation, _BlockingOperation_ is
|
||||||
|
@ -1605,9 +1605,9 @@ async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
||||||
* Masked async interrupts are delayed until they are enabled.
|
* Masked async interrupts are delayed until they are enabled.
|
||||||
* This method is similar to sigprocmask(3).
|
* This method is similar to sigprocmask(3).
|
||||||
*
|
*
|
||||||
* TODO (DOC): Thread#async_interrupt_timing is stacked.
|
* TODO (DOC): Thread#handle_interrupt is stacked.
|
||||||
* TODO (DOC): check ancestors.
|
* TODO (DOC): check ancestors.
|
||||||
* TODO (DOC): to prevent all async interrupt, {Object => :defer} works.
|
* TODO (DOC): to prevent all async interrupt, {Object => :never} works.
|
||||||
*
|
*
|
||||||
* NOTE: Asynchronous interrupts are difficult to use.
|
* NOTE: Asynchronous interrupts are difficult to use.
|
||||||
* If you need to communicate between threads,
|
* If you need to communicate between threads,
|
||||||
|
@ -1617,16 +1617,16 @@ async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
||||||
*
|
*
|
||||||
* # example: Guard from Thread#raise
|
* # example: Guard from Thread#raise
|
||||||
* th = Thread.new do
|
* th = Thread.new do
|
||||||
* Thead.async_interrupt_timing(RuntimeError => :defer) {
|
* Thead.handle_interrupt(RuntimeError => :never) {
|
||||||
* begin
|
* begin
|
||||||
* # Thread#raise doesn't async interrupt here.
|
* # Thread#raise doesn't async interrupt here.
|
||||||
* # You can write resource allocation code safely.
|
* # You can write resource allocation code safely.
|
||||||
* Thread.async_interrupt_timing(RuntimeError => :immediate) {
|
* Thread.handle_interrupt(RuntimeError => :immediate) {
|
||||||
* # ...
|
* # ...
|
||||||
* # It is possible to be interrupted by Thread#raise.
|
* # It is possible to be interrupted by Thread#raise.
|
||||||
* }
|
* }
|
||||||
* ensure
|
* ensure
|
||||||
* # Thread#raise doesn't async interrupt here.
|
* # Thread#raise doesn't interrupt here.
|
||||||
* # You can write resource dealocation code safely.
|
* # You can write resource dealocation code safely.
|
||||||
* end
|
* end
|
||||||
* }
|
* }
|
||||||
|
@ -1637,10 +1637,10 @@ async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
||||||
*
|
*
|
||||||
* # example: Guard from TimeoutError
|
* # example: Guard from TimeoutError
|
||||||
* require 'timeout'
|
* require 'timeout'
|
||||||
* Thread.async_interrupt_timing(TimeoutError => :defer) {
|
* Thread.handle_interrupt(TimeoutError => :never) {
|
||||||
* timeout(10){
|
* timeout(10){
|
||||||
* # TimeoutError doesn't occur here
|
* # TimeoutError doesn't occur here
|
||||||
* Thread.async_interrupt_timing(TimeoutError => :on_blocking) {
|
* Thread.handle_interrupt(TimeoutError => :on_blocking) {
|
||||||
* # possible to be killed by TimeoutError
|
* # possible to be killed by TimeoutError
|
||||||
* # while blocking operation
|
* # while blocking operation
|
||||||
* }
|
* }
|
||||||
|
@ -1649,20 +1649,20 @@ async_interrupt_timing_arg_check_i(VALUE key, VALUE val)
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* # example: Stack control settings
|
* # example: Stack control settings
|
||||||
* Thread.async_interrupt_timing(FooError => :defer) {
|
* Thread.handle_interrupt(FooError => :never) {
|
||||||
* Thread.async_interrupt_timing(BarError => :defer) {
|
* Thread.handle_interrupt(BarError => :never) {
|
||||||
* # FooError and BarError are prohibited.
|
* # FooError and BarError are prohibited.
|
||||||
* }
|
* }
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
* # example: check ancestors
|
* # example: check ancestors
|
||||||
* Thread.async_interrupt_timing(Exception => :defer) {
|
* Thread.handle_interrupt(Exception => :never) {
|
||||||
* # all exceptions inherited from Exception are prohibited.
|
* # all exceptions inherited from Exception are prohibited.
|
||||||
* }
|
* }
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
rb_thread_s_handle_interrupt(VALUE self, VALUE mask_arg)
|
||||||
{
|
{
|
||||||
VALUE mask;
|
VALUE mask;
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
|
@ -1674,10 +1674,10 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash");
|
mask = rb_convert_type(mask_arg, T_HASH, "Hash", "to_hash");
|
||||||
rb_hash_foreach(mask, async_interrupt_timing_arg_check_i, 0);
|
rb_hash_foreach(mask, handle_interrupt_arg_check_i, 0);
|
||||||
rb_ary_push(th->async_errinfo_mask_stack, mask);
|
rb_ary_push(th->pending_interrupt_mask_stack, mask);
|
||||||
if (!rb_threadptr_async_errinfo_empty_p(th)) {
|
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
|
||||||
th->async_errinfo_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
RUBY_VM_SET_INTERRUPT(th);
|
RUBY_VM_SET_INTERRUPT(th);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1687,9 +1687,9 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
}
|
}
|
||||||
TH_POP_TAG();
|
TH_POP_TAG();
|
||||||
|
|
||||||
rb_ary_pop(th->async_errinfo_mask_stack);
|
rb_ary_pop(th->pending_interrupt_mask_stack);
|
||||||
if (!rb_threadptr_async_errinfo_empty_p(th)) {
|
if (!rb_threadptr_pending_interrupt_empty_p(th)) {
|
||||||
th->async_errinfo_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
RUBY_VM_SET_INTERRUPT(th);
|
RUBY_VM_SET_INTERRUPT(th);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1701,23 +1701,56 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
|
|
||||||
return r;
|
return r;
|
||||||
}
|
}
|
||||||
|
/*
|
||||||
|
* call-seq:
|
||||||
|
* target_thread.pending_interrupt?(err = nil) -> true/false
|
||||||
|
*
|
||||||
|
* Check async queue is empty or not.
|
||||||
|
*/
|
||||||
|
static VALUE
|
||||||
|
rb_thread_pending_interrupt_p(int argc, VALUE *argv, VALUE target_thread)
|
||||||
|
{
|
||||||
|
rb_thread_t *target_th;
|
||||||
|
|
||||||
|
GetThreadPtr(target_thread, target_th);
|
||||||
|
|
||||||
|
if (rb_threadptr_pending_interrupt_empty_p(target_th)) {
|
||||||
|
return Qfalse;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
if (argc == 1) {
|
||||||
|
VALUE err;
|
||||||
|
rb_scan_args(argc, argv, "01", &err);
|
||||||
|
if (!rb_obj_is_kind_of(err, rb_cModule)) {
|
||||||
|
rb_raise(rb_eTypeError, "class or module required for rescue clause");
|
||||||
|
}
|
||||||
|
if (rb_threadptr_pending_interrupt_include_p(target_th, err)) {
|
||||||
|
return Qtrue;
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Qfalse;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Qtrue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* call-seq:
|
* call-seq:
|
||||||
* Thread.async_interrupted?(err = nil) -> true/false
|
* Thread.pending_interrupt?(err = nil) -> true/false
|
||||||
*
|
*
|
||||||
* Check async queue is empty or not.
|
* Check async queue is empty or not.
|
||||||
*
|
*
|
||||||
* Thread.async_interrupt_timing can defer asynchronous events.
|
* Thread.handle_interrupt can defer asynchronous events.
|
||||||
* This method returns deferred event are there.
|
* This method returns deferred event are there.
|
||||||
* If you find this method return true, then you may finish
|
* If you find this method return true, then you may finish
|
||||||
* defer block.
|
* never block.
|
||||||
*
|
*
|
||||||
* For example, the following method processes defferred async event
|
* For example, the following method processes defferred async event
|
||||||
* immediately.
|
* immediately.
|
||||||
*
|
*
|
||||||
* def Thread.kick_async_interrupt_immediately
|
* def Thread.kick_interrupt_immediately
|
||||||
* Thread.async_interrupt_timing(Object => :immediate) {
|
* Thread.handle_interrupt(Object => :immediate) {
|
||||||
* Thread.pass
|
* Thread.pass
|
||||||
* }
|
* }
|
||||||
* end
|
* end
|
||||||
|
@ -1727,12 +1760,12 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
* Examples:
|
* Examples:
|
||||||
*
|
*
|
||||||
* th = Thread.new{
|
* th = Thread.new{
|
||||||
* Thread.async_interrupt_timing(RuntimeError => :on_blocking){
|
* Thread.handle_interrupt(RuntimeError => :on_blocking){
|
||||||
* while true
|
* while true
|
||||||
* ...
|
* ...
|
||||||
* # reach safe point to invoke interrupt
|
* # reach safe point to invoke interrupt
|
||||||
* if Thread.async_interrupted?
|
* if Thread.pending_interrupt?
|
||||||
* Thread.async_interrupt_timing(Object => :immediate){}
|
* Thread.handle_interrupt(Object => :immediate){}
|
||||||
* end
|
* end
|
||||||
* ...
|
* ...
|
||||||
* end
|
* end
|
||||||
|
@ -1746,7 +1779,7 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
*
|
*
|
||||||
* flag = true
|
* flag = true
|
||||||
* th = Thread.new{
|
* th = Thread.new{
|
||||||
* Thread.async_interrupt_timing(RuntimeError => :on_blocking){
|
* Thread.handle_interrupt(RuntimeError => :on_blocking){
|
||||||
* while true
|
* while true
|
||||||
* ...
|
* ...
|
||||||
* # reach safe point to invoke interrupt
|
* # reach safe point to invoke interrupt
|
||||||
|
@ -1760,35 +1793,15 @@ rb_thread_s_async_interrupt_timing(VALUE self, VALUE mask_arg)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_thread_s_async_interrupt_p(int argc, VALUE *argv, VALUE self)
|
rb_thread_s_pending_interrupt_p(int argc, VALUE *argv, VALUE self)
|
||||||
{
|
{
|
||||||
rb_thread_t *cur_th = GET_THREAD();
|
return rb_thread_pending_interrupt_p(argc, argv, GET_THREAD()->self);
|
||||||
|
|
||||||
if (rb_threadptr_async_errinfo_empty_p(cur_th)) {
|
|
||||||
return Qfalse;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
if (argc == 1) {
|
|
||||||
VALUE err;
|
|
||||||
rb_scan_args(argc, argv, "01", &err);
|
|
||||||
if (!rb_obj_is_kind_of(err, rb_cModule)) {
|
|
||||||
rb_raise(rb_eTypeError, "class or module required for rescue clause");
|
|
||||||
}
|
|
||||||
if (rb_threadptr_async_errinfo_include_p(cur_th, err)) {
|
|
||||||
return Qtrue;
|
|
||||||
}
|
|
||||||
else {
|
|
||||||
return Qfalse;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return Qtrue;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static void
|
static void
|
||||||
rb_threadptr_to_kill(rb_thread_t *th)
|
rb_threadptr_to_kill(rb_thread_t *th)
|
||||||
{
|
{
|
||||||
rb_threadptr_async_errinfo_clear(th);
|
rb_threadptr_pending_interrupt_clear(th);
|
||||||
th->status = THREAD_RUNNABLE;
|
th->status = THREAD_RUNNABLE;
|
||||||
th->to_kill = 1;
|
th->to_kill = 1;
|
||||||
th->errinfo = INT2FIX(TAG_FATAL);
|
th->errinfo = INT2FIX(TAG_FATAL);
|
||||||
|
@ -1805,7 +1818,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||||
rb_atomic_t old;
|
rb_atomic_t old;
|
||||||
int sig;
|
int sig;
|
||||||
int timer_interrupt;
|
int timer_interrupt;
|
||||||
int async_errinfo_interrupt;
|
int pending_interrupt;
|
||||||
int finalizer_interrupt;
|
int finalizer_interrupt;
|
||||||
int trap_interrupt;
|
int trap_interrupt;
|
||||||
|
|
||||||
|
@ -1819,7 +1832,7 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
|
timer_interrupt = interrupt & TIMER_INTERRUPT_MASK;
|
||||||
async_errinfo_interrupt = interrupt & ASYNC_ERRINFO_INTERRUPT_MASK;
|
pending_interrupt = interrupt & PENDING_INTERRUPT_MASK;
|
||||||
finalizer_interrupt = interrupt & FINALIZER_INTERRUPT_MASK;
|
finalizer_interrupt = interrupt & FINALIZER_INTERRUPT_MASK;
|
||||||
trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
|
trap_interrupt = interrupt & TRAP_INTERRUPT_MASK;
|
||||||
|
|
||||||
|
@ -1834,8 +1847,8 @@ rb_threadptr_execute_interrupts(rb_thread_t *th, int blocking_timing)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* exception from another thread */
|
/* exception from another thread */
|
||||||
if (async_errinfo_interrupt && rb_threadptr_async_errinfo_active_p(th)) {
|
if (pending_interrupt && rb_threadptr_pending_interrupt_active_p(th)) {
|
||||||
VALUE err = rb_threadptr_async_errinfo_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
|
VALUE err = rb_threadptr_pending_interrupt_deque(th, blocking_timing ? INTERRUPT_ON_BLOCKING : INTERRUPT_NONE);
|
||||||
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
thread_debug("rb_thread_execute_interrupts: %"PRIdVALUE"\n", err);
|
||||||
|
|
||||||
if (err == Qundef) {
|
if (err == Qundef) {
|
||||||
|
@ -1906,7 +1919,7 @@ rb_threadptr_raise(rb_thread_t *th, int argc, VALUE *argv)
|
||||||
else {
|
else {
|
||||||
exc = rb_make_exception(argc, argv);
|
exc = rb_make_exception(argc, argv);
|
||||||
}
|
}
|
||||||
rb_threadptr_async_errinfo_enque(th, exc);
|
rb_threadptr_pending_interrupt_enque(th, exc);
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_interrupt(th);
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
@ -1976,7 +1989,7 @@ thread_fd_close_i(st_data_t key, st_data_t val, st_data_t data)
|
||||||
|
|
||||||
if (th->waiting_fd == fd) {
|
if (th->waiting_fd == fd) {
|
||||||
VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
|
VALUE err = th->vm->special_exceptions[ruby_error_closed_stream];
|
||||||
rb_threadptr_async_errinfo_enque(th, err);
|
rb_threadptr_pending_interrupt_enque(th, err);
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_interrupt(th);
|
||||||
}
|
}
|
||||||
return ST_CONTINUE;
|
return ST_CONTINUE;
|
||||||
|
@ -2061,7 +2074,7 @@ rb_thread_kill(VALUE thread)
|
||||||
rb_threadptr_to_kill(th);
|
rb_threadptr_to_kill(th);
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
rb_threadptr_async_errinfo_enque(th, eKillSignal);
|
rb_threadptr_pending_interrupt_enque(th, eKillSignal);
|
||||||
rb_threadptr_interrupt(th);
|
rb_threadptr_interrupt(th);
|
||||||
}
|
}
|
||||||
return thread;
|
return thread;
|
||||||
|
@ -4831,6 +4844,10 @@ Init_Thread(void)
|
||||||
VALUE cThGroup;
|
VALUE cThGroup;
|
||||||
rb_thread_t *th = GET_THREAD();
|
rb_thread_t *th = GET_THREAD();
|
||||||
|
|
||||||
|
sym_never = ID2SYM(rb_intern("never"));
|
||||||
|
sym_immediate = ID2SYM(rb_intern("immediate"));
|
||||||
|
sym_on_blocking = ID2SYM(rb_intern("on_blocking"));
|
||||||
|
|
||||||
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
|
rb_define_singleton_method(rb_cThread, "new", thread_s_new, -1);
|
||||||
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
|
rb_define_singleton_method(rb_cThread, "start", thread_start, -2);
|
||||||
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
|
rb_define_singleton_method(rb_cThread, "fork", thread_start, -2);
|
||||||
|
@ -4847,8 +4864,9 @@ Init_Thread(void)
|
||||||
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
|
rb_define_singleton_method(rb_cThread, "DEBUG", rb_thread_s_debug, 0);
|
||||||
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
|
rb_define_singleton_method(rb_cThread, "DEBUG=", rb_thread_s_debug_set, 1);
|
||||||
#endif
|
#endif
|
||||||
rb_define_singleton_method(rb_cThread, "async_interrupt_timing", rb_thread_s_async_interrupt_timing, 1);
|
rb_define_singleton_method(rb_cThread, "handle_interrupt", rb_thread_s_handle_interrupt, 1);
|
||||||
rb_define_singleton_method(rb_cThread, "async_interrupted?", rb_thread_s_async_interrupt_p, -1);
|
rb_define_singleton_method(rb_cThread, "pending_interrupt?", rb_thread_s_pending_interrupt_p, -1);
|
||||||
|
rb_define_method(rb_cThread, "pending_interrupt?", rb_thread_pending_interrupt_p, -1);
|
||||||
|
|
||||||
rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
|
rb_define_method(rb_cThread, "initialize", thread_initialize, -2);
|
||||||
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
|
rb_define_method(rb_cThread, "raise", thread_raise_m, -1);
|
||||||
|
@ -4921,9 +4939,9 @@ Init_Thread(void)
|
||||||
native_mutex_initialize(&th->vm->thread_destruct_lock);
|
native_mutex_initialize(&th->vm->thread_destruct_lock);
|
||||||
native_mutex_initialize(&th->interrupt_lock);
|
native_mutex_initialize(&th->interrupt_lock);
|
||||||
|
|
||||||
th->async_errinfo_queue = rb_ary_tmp_new(0);
|
th->pending_interrupt_queue = rb_ary_tmp_new(0);
|
||||||
th->async_errinfo_queue_checked = 0;
|
th->pending_interrupt_queue_checked = 0;
|
||||||
th->async_errinfo_mask_stack = rb_ary_tmp_new(0);
|
th->pending_interrupt_mask_stack = rb_ary_tmp_new(0);
|
||||||
|
|
||||||
th->interrupt_mask = 0;
|
th->interrupt_mask = 0;
|
||||||
}
|
}
|
||||||
|
@ -5058,8 +5076,8 @@ rb_uninterruptible(VALUE (*b_proc)(ANYARGS), VALUE data)
|
||||||
VALUE interrupt_mask = rb_hash_new();
|
VALUE interrupt_mask = rb_hash_new();
|
||||||
rb_thread_t *cur_th = GET_THREAD();
|
rb_thread_t *cur_th = GET_THREAD();
|
||||||
|
|
||||||
rb_hash_aset(interrupt_mask, rb_cObject, ID2SYM(rb_intern("defer")));
|
rb_hash_aset(interrupt_mask, rb_cObject, sym_never);
|
||||||
rb_ary_push(cur_th->async_errinfo_mask_stack, interrupt_mask);
|
rb_ary_push(cur_th->pending_interrupt_mask_stack, interrupt_mask);
|
||||||
|
|
||||||
return rb_ensure(b_proc, data, rb_ary_pop, cur_th->async_errinfo_mask_stack);
|
return rb_ensure(b_proc, data, rb_ary_pop, cur_th->pending_interrupt_mask_stack);
|
||||||
}
|
}
|
||||||
|
|
4
vm.c
4
vm.c
|
@ -1780,8 +1780,8 @@ rb_thread_mark(void *ptr)
|
||||||
RUBY_MARK_UNLESS_NULL(th->thgroup);
|
RUBY_MARK_UNLESS_NULL(th->thgroup);
|
||||||
RUBY_MARK_UNLESS_NULL(th->value);
|
RUBY_MARK_UNLESS_NULL(th->value);
|
||||||
RUBY_MARK_UNLESS_NULL(th->errinfo);
|
RUBY_MARK_UNLESS_NULL(th->errinfo);
|
||||||
RUBY_MARK_UNLESS_NULL(th->async_errinfo_queue);
|
RUBY_MARK_UNLESS_NULL(th->pending_interrupt_queue);
|
||||||
RUBY_MARK_UNLESS_NULL(th->async_errinfo_mask_stack);
|
RUBY_MARK_UNLESS_NULL(th->pending_interrupt_mask_stack);
|
||||||
RUBY_MARK_UNLESS_NULL(th->root_svar);
|
RUBY_MARK_UNLESS_NULL(th->root_svar);
|
||||||
RUBY_MARK_UNLESS_NULL(th->top_self);
|
RUBY_MARK_UNLESS_NULL(th->top_self);
|
||||||
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
|
RUBY_MARK_UNLESS_NULL(th->top_wrapper);
|
||||||
|
|
22
vm_core.h
22
vm_core.h
|
@ -542,9 +542,9 @@ typedef struct rb_thread_struct {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
/* async errinfo queue */
|
/* async errinfo queue */
|
||||||
VALUE async_errinfo_queue;
|
VALUE pending_interrupt_queue;
|
||||||
int async_errinfo_queue_checked;
|
int pending_interrupt_queue_checked;
|
||||||
VALUE async_errinfo_mask_stack;
|
VALUE pending_interrupt_mask_stack;
|
||||||
|
|
||||||
rb_atomic_t interrupt_flag;
|
rb_atomic_t interrupt_flag;
|
||||||
unsigned long interrupt_mask;
|
unsigned long interrupt_mask;
|
||||||
|
@ -894,16 +894,16 @@ GET_THREAD(void)
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
TIMER_INTERRUPT_MASK = 0x01,
|
TIMER_INTERRUPT_MASK = 0x01,
|
||||||
ASYNC_ERRINFO_INTERRUPT_MASK = 0x02,
|
PENDING_INTERRUPT_MASK = 0x02,
|
||||||
FINALIZER_INTERRUPT_MASK = 0x04,
|
FINALIZER_INTERRUPT_MASK = 0x04,
|
||||||
TRAP_INTERRUPT_MASK = 0x08
|
TRAP_INTERRUPT_MASK = 0x08
|
||||||
};
|
};
|
||||||
|
|
||||||
#define RUBY_VM_SET_TIMER_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, TIMER_INTERRUPT_MASK)
|
#define RUBY_VM_SET_TIMER_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, TIMER_INTERRUPT_MASK)
|
||||||
#define RUBY_VM_SET_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, ASYNC_ERRINFO_INTERRUPT_MASK)
|
#define RUBY_VM_SET_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, PENDING_INTERRUPT_MASK)
|
||||||
#define RUBY_VM_SET_FINALIZER_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, FINALIZER_INTERRUPT_MASK)
|
#define RUBY_VM_SET_FINALIZER_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, FINALIZER_INTERRUPT_MASK)
|
||||||
#define RUBY_VM_SET_TRAP_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, TRAP_INTERRUPT_MASK)
|
#define RUBY_VM_SET_TRAP_INTERRUPT(th) ATOMIC_OR((th)->interrupt_flag, TRAP_INTERRUPT_MASK)
|
||||||
#define RUBY_VM_INTERRUPTED(th) ((th)->interrupt_flag & ~(th)->interrupt_mask & (ASYNC_ERRINFO_INTERRUPT_MASK|TRAP_INTERRUPT_MASK))
|
#define RUBY_VM_INTERRUPTED(th) ((th)->interrupt_flag & ~(th)->interrupt_mask & (PENDING_INTERRUPT_MASK|TRAP_INTERRUPT_MASK))
|
||||||
#define RUBY_VM_INTERRUPTED_ANY(th) ((th)->interrupt_flag & ~(th)->interrupt_mask)
|
#define RUBY_VM_INTERRUPTED_ANY(th) ((th)->interrupt_flag & ~(th)->interrupt_mask)
|
||||||
|
|
||||||
int rb_signal_buff_size(void);
|
int rb_signal_buff_size(void);
|
||||||
|
@ -914,16 +914,16 @@ void rb_threadptr_signal_exit(rb_thread_t *th);
|
||||||
void rb_threadptr_execute_interrupts(rb_thread_t *, int);
|
void rb_threadptr_execute_interrupts(rb_thread_t *, int);
|
||||||
void rb_threadptr_interrupt(rb_thread_t *th);
|
void rb_threadptr_interrupt(rb_thread_t *th);
|
||||||
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
|
void rb_threadptr_unlock_all_locking_mutexes(rb_thread_t *th);
|
||||||
void rb_threadptr_async_errinfo_clear(rb_thread_t *th);
|
void rb_threadptr_pending_interrupt_clear(rb_thread_t *th);
|
||||||
void rb_threadptr_async_errinfo_enque(rb_thread_t *th, VALUE v);
|
void rb_threadptr_pending_interrupt_enque(rb_thread_t *th, VALUE v);
|
||||||
int rb_threadptr_async_errinfo_active_p(rb_thread_t *th);
|
int rb_threadptr_pending_interrupt_active_p(rb_thread_t *th);
|
||||||
|
|
||||||
void rb_thread_lock_unlock(rb_thread_lock_t *);
|
void rb_thread_lock_unlock(rb_thread_lock_t *);
|
||||||
void rb_thread_lock_destroy(rb_thread_lock_t *);
|
void rb_thread_lock_destroy(rb_thread_lock_t *);
|
||||||
|
|
||||||
#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \
|
#define RUBY_VM_CHECK_INTS_BLOCKING(th) do { \
|
||||||
if (UNLIKELY(!rb_threadptr_async_errinfo_empty_p(th))) { \
|
if (UNLIKELY(!rb_threadptr_pending_interrupt_empty_p(th))) { \
|
||||||
th->async_errinfo_queue_checked = 0; \
|
th->pending_interrupt_queue_checked = 0; \
|
||||||
RUBY_VM_SET_INTERRUPT(th); \
|
RUBY_VM_SET_INTERRUPT(th); \
|
||||||
rb_threadptr_execute_interrupts(th, 1); \
|
rb_threadptr_execute_interrupts(th, 1); \
|
||||||
} \
|
} \
|
||||||
|
|
Loading…
Reference in a new issue