mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
thread_pthread.c: call SUSPENDED event when entering native_sleep
[Bug #18900] Thread#join and a few other codepaths are using native sleep as a way to suspend the current thread. So we should call the relevant hook when this happen, otherwise some thread may transition directly from `RESUMED` to `READY`.
This commit is contained in:
parent
61c7ae4d27
commit
587d2d199b
Notes:
git
2022-07-08 00:49:26 +09:00
3 changed files with 86 additions and 48 deletions
|
@ -8,6 +8,19 @@ static rb_atomic_t resumed_count = 0;
|
|||
static rb_atomic_t suspended_count = 0;
|
||||
static rb_atomic_t exited_count = 0;
|
||||
|
||||
#if __STDC_VERSION__ >= 201112
|
||||
#define RB_THREAD_LOCAL_SPECIFIER _Thread_local
|
||||
#elif defined(__GNUC__) && !defined(RB_THREAD_LOCAL_SPECIFIER_IS_UNSUPPORTED)
|
||||
/* note that ICC (linux) and Clang are covered by __GNUC__ */
|
||||
#define RB_THREAD_LOCAL_SPECIFIER __thread
|
||||
#else
|
||||
#define RB_THREAD_LOCAL_SPECIFIER
|
||||
#endif
|
||||
|
||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_ready_count = 0;
|
||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_resumed_count = 0;
|
||||
static RB_THREAD_LOCAL_SPECIFIER unsigned int local_suspended_count = 0;
|
||||
|
||||
void
|
||||
ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_data, void *user_data)
|
||||
{
|
||||
|
@ -17,12 +30,15 @@ ex_callback(rb_event_flag_t event, const rb_internal_thread_event_data_t *event_
|
|||
break;
|
||||
case RUBY_INTERNAL_THREAD_EVENT_READY:
|
||||
RUBY_ATOMIC_INC(ready_count);
|
||||
local_ready_count++;
|
||||
break;
|
||||
case RUBY_INTERNAL_THREAD_EVENT_RESUMED:
|
||||
RUBY_ATOMIC_INC(resumed_count);
|
||||
local_resumed_count++;
|
||||
break;
|
||||
case RUBY_INTERNAL_THREAD_EVENT_SUSPENDED:
|
||||
RUBY_ATOMIC_INC(suspended_count);
|
||||
local_suspended_count++;
|
||||
break;
|
||||
case RUBY_INTERNAL_THREAD_EVENT_EXITED:
|
||||
RUBY_ATOMIC_INC(exited_count);
|
||||
|
@ -44,6 +60,16 @@ thread_counters(VALUE thread)
|
|||
return array;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
thread_local_counters(VALUE thread)
|
||||
{
|
||||
VALUE array = rb_ary_new2(3);
|
||||
rb_ary_push(array, UINT2NUM(local_ready_count));
|
||||
rb_ary_push(array, UINT2NUM(local_resumed_count));
|
||||
rb_ary_push(array, UINT2NUM(local_suspended_count));
|
||||
return array;
|
||||
}
|
||||
|
||||
static VALUE
|
||||
thread_reset_counters(VALUE thread)
|
||||
{
|
||||
|
@ -52,6 +78,9 @@ thread_reset_counters(VALUE thread)
|
|||
RUBY_ATOMIC_SET(resumed_count, 0);
|
||||
RUBY_ATOMIC_SET(suspended_count, 0);
|
||||
RUBY_ATOMIC_SET(exited_count, 0);
|
||||
local_ready_count = 0;
|
||||
local_resumed_count = 0;
|
||||
local_suspended_count = 0;
|
||||
return Qtrue;
|
||||
}
|
||||
|
||||
|
@ -104,6 +133,7 @@ Init_instrumentation(void)
|
|||
VALUE mBug = rb_define_module("Bug");
|
||||
VALUE klass = rb_define_module_under(mBug, "ThreadInstrumentation");
|
||||
rb_define_singleton_method(klass, "counters", thread_counters, 0);
|
||||
rb_define_singleton_method(klass, "local_counters", thread_local_counters, 0);
|
||||
rb_define_singleton_method(klass, "reset_counters", thread_reset_counters, 0);
|
||||
rb_define_singleton_method(klass, "register_callback", thread_register_callback, 0);
|
||||
rb_define_singleton_method(klass, "unregister_callback", thread_unregister_callback, 0);
|
||||
|
|
|
@ -3,76 +3,82 @@ require 'envutil'
|
|||
|
||||
class TestThreadInstrumentation < Test::Unit::TestCase
|
||||
def setup
|
||||
pend("TODO: No windows support yet") if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM
|
||||
pend("No windows support") if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM
|
||||
|
||||
require '-test-/thread/instrumentation'
|
||||
Bug::ThreadInstrumentation.reset_counters
|
||||
Bug::ThreadInstrumentation::register_callback
|
||||
end
|
||||
|
||||
def teardown
|
||||
return if /mswin|mingw|bccwin/ =~ RUBY_PLATFORM
|
||||
Bug::ThreadInstrumentation::unregister_callback
|
||||
end
|
||||
|
||||
THREADS_COUNT = 3
|
||||
|
||||
def test_thread_instrumentation
|
||||
require '-test-/thread/instrumentation'
|
||||
Bug::ThreadInstrumentation.reset_counters
|
||||
Bug::ThreadInstrumentation::register_callback
|
||||
|
||||
begin
|
||||
threads = threaded_cpu_work
|
||||
assert_equal [false] * THREADS_COUNT, threads.map(&:status)
|
||||
counters = Bug::ThreadInstrumentation.counters
|
||||
counters.each do |c|
|
||||
assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}"
|
||||
end
|
||||
|
||||
assert_equal THREADS_COUNT, counters.first
|
||||
assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet.
|
||||
ensure
|
||||
Bug::ThreadInstrumentation::unregister_callback
|
||||
threads = threaded_cpu_work
|
||||
assert_equal [false] * THREADS_COUNT, threads.map(&:status)
|
||||
counters = Bug::ThreadInstrumentation.counters
|
||||
counters.each do |c|
|
||||
assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}"
|
||||
end
|
||||
|
||||
assert_equal THREADS_COUNT, counters.first
|
||||
assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet.
|
||||
end
|
||||
|
||||
def test_join_counters # Bug #18900
|
||||
thr = Thread.new { fib(30) }
|
||||
Bug::ThreadInstrumentation.reset_counters
|
||||
thr.join
|
||||
assert_equal [1, 1, 1], Bug::ThreadInstrumentation.local_counters
|
||||
end
|
||||
|
||||
def test_thread_instrumentation_fork_safe
|
||||
skip "No fork()" unless Process.respond_to?(:fork)
|
||||
|
||||
require '-test-/thread/instrumentation'
|
||||
Bug::ThreadInstrumentation::register_callback
|
||||
|
||||
read_pipe, write_pipe = IO.pipe
|
||||
|
||||
begin
|
||||
pid = fork do
|
||||
Bug::ThreadInstrumentation.reset_counters
|
||||
threads = threaded_cpu_work
|
||||
write_pipe.write(Marshal.dump(threads.map(&:status)))
|
||||
write_pipe.write(Marshal.dump(Bug::ThreadInstrumentation.counters))
|
||||
write_pipe.close
|
||||
exit!(0)
|
||||
end
|
||||
pid = fork do
|
||||
Bug::ThreadInstrumentation.reset_counters
|
||||
threads = threaded_cpu_work
|
||||
write_pipe.write(Marshal.dump(threads.map(&:status)))
|
||||
write_pipe.write(Marshal.dump(Bug::ThreadInstrumentation.counters))
|
||||
write_pipe.close
|
||||
_, status = Process.wait2(pid)
|
||||
assert_predicate status, :success?
|
||||
|
||||
thread_statuses = Marshal.load(read_pipe)
|
||||
assert_equal [false] * THREADS_COUNT, thread_statuses
|
||||
|
||||
counters = Marshal.load(read_pipe)
|
||||
read_pipe.close
|
||||
counters.each do |c|
|
||||
assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}"
|
||||
end
|
||||
|
||||
assert_equal THREADS_COUNT, counters.first
|
||||
assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet.
|
||||
ensure
|
||||
Bug::ThreadInstrumentation::unregister_callback
|
||||
exit!(0)
|
||||
end
|
||||
write_pipe.close
|
||||
_, status = Process.wait2(pid)
|
||||
assert_predicate status, :success?
|
||||
|
||||
thread_statuses = Marshal.load(read_pipe)
|
||||
assert_equal [false] * THREADS_COUNT, thread_statuses
|
||||
|
||||
counters = Marshal.load(read_pipe)
|
||||
read_pipe.close
|
||||
counters.each do |c|
|
||||
assert_predicate c, :nonzero?, "Call counters: #{counters.inspect}"
|
||||
end
|
||||
|
||||
assert_equal THREADS_COUNT, counters.first
|
||||
assert_in_delta THREADS_COUNT, counters.last, 1 # It's possible that a thread didn't execute its EXIT hook yet.
|
||||
end
|
||||
|
||||
def test_thread_instrumentation_unregister
|
||||
require '-test-/thread/instrumentation'
|
||||
Bug::ThreadInstrumentation::unregister_callback
|
||||
assert Bug::ThreadInstrumentation::register_and_unregister_callbacks
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def threaded_cpu_work
|
||||
THREADS_COUNT.times.map { Thread.new { 100.times { |i| i + i } } }.each(&:join)
|
||||
def fib(n = 20)
|
||||
return n if n <= 1
|
||||
fib(n-1) + fib(n-2)
|
||||
end
|
||||
|
||||
def threaded_cpu_work(size = 20)
|
||||
THREADS_COUNT.times.map { Thread.new { fib(size) } }.each(&:join)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2337,6 +2337,8 @@ native_sleep(rb_thread_t *th, rb_hrtime_t *rel)
|
|||
int sigwait_fd = rb_sigwait_fd_get(th);
|
||||
rb_ractor_blocking_threads_inc(th->ractor, __FILE__, __LINE__);
|
||||
|
||||
RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED);
|
||||
|
||||
if (sigwait_fd >= 0) {
|
||||
rb_native_mutex_lock(&th->interrupt_lock);
|
||||
th->unblock.func = ubf_sigwait;
|
||||
|
|
Loading…
Add table
Reference in a new issue