From caac5f777ae288b5982708b8690e712e1cae0cf6 Mon Sep 17 00:00:00 2001 From: Koichi Sasada Date: Sun, 20 Oct 2019 04:52:20 +0900 Subject: [PATCH] make monitor.so for performance. (#2576) Recent monitor.rb has performance problem because of interrupt handlers. 'Monitor#synchronize' is frequently used primitive so the performance of this method is important. This patch rewrite 'monitor.rb' with 'monitor.so' (C-extension) and make it faster. See [Feature #16255] for details. Monitor class objects are normal object which include MonitorMixin. This patch introduce a Monitor class which is implemented on C and MonitorMixin uses Monitor object as re-entrant (recursive) Mutex. This technique improve performance because we don't need to care atomicity and we don't need accesses to instance variables any more on Monitor class. --- NEWS | 3 + ext/monitor/depend | 13 ++ ext/monitor/extconf.rb | 2 + {lib => ext/monitor/lib}/monitor.rb | 106 ++++++---------- ext/monitor/monitor.c | 189 ++++++++++++++++++++++++++++ test/monitor/test_monitor.rb | 16 +-- 6 files changed, 250 insertions(+), 79 deletions(-) create mode 100644 ext/monitor/depend create mode 100644 ext/monitor/extconf.rb rename {lib => ext/monitor/lib}/monitor.rb (74%) create mode 100644 ext/monitor/monitor.c diff --git a/NEWS b/NEWS index b72ab731dd..5304df451d 100644 --- a/NEWS +++ b/NEWS @@ -511,6 +511,9 @@ File:: * File.realpath now uses realpath(3) on many platforms, which can significantly improve performance. +Monitor:: + * Monitor class is written in C-extension. [Feature #16255] + Thread:: * VM stack memory allocation is now combined with native thread stack, diff --git a/ext/monitor/depend b/ext/monitor/depend new file mode 100644 index 0000000000..89efe1766b --- /dev/null +++ b/ext/monitor/depend @@ -0,0 +1,13 @@ +# AUTOGENERATED DEPENDENCIES START +monitor.o: $(RUBY_EXTCONF_H) +monitor.o: $(arch_hdrdir)/ruby/config.h +monitor.o: $(hdrdir)/ruby/assert.h +monitor.o: $(hdrdir)/ruby/backward.h +monitor.o: $(hdrdir)/ruby/defines.h +monitor.o: $(hdrdir)/ruby/intern.h +monitor.o: $(hdrdir)/ruby/missing.h +monitor.o: $(hdrdir)/ruby/ruby.h +monitor.o: $(hdrdir)/ruby/st.h +monitor.o: $(hdrdir)/ruby/subst.h +monitor.o: monitor.c +# AUTOGENERATED DEPENDENCIES END diff --git a/ext/monitor/extconf.rb b/ext/monitor/extconf.rb new file mode 100644 index 0000000000..78c53fa0c5 --- /dev/null +++ b/ext/monitor/extconf.rb @@ -0,0 +1,2 @@ +require 'mkmf' +create_makefile('monitor') diff --git a/lib/monitor.rb b/ext/monitor/lib/monitor.rb similarity index 74% rename from lib/monitor.rb rename to ext/monitor/lib/monitor.rb index c4ce06794a..dba942c89a 100644 --- a/lib/monitor.rb +++ b/ext/monitor/lib/monitor.rb @@ -86,10 +86,10 @@ # This Class is implemented as subclass of Array which includes the # MonitorMixin module. # -module MonitorMixin - EXCEPTION_NEVER = {Exception => :never}.freeze - EXCEPTION_IMMEDIATE = {Exception => :immediate}.freeze +require 'monitor.so' + +module MonitorMixin # # FIXME: This isn't documented in Nutshell. # @@ -97,8 +97,6 @@ module MonitorMixin # above calls while_wait and signal, this class should be documented. # class ConditionVariable - class Timeout < Exception; end - # # Releases the lock held in the associated monitor and waits; reacquires the lock on wakeup. # @@ -106,17 +104,13 @@ module MonitorMixin # even if no other thread doesn't signal. # def wait(timeout = nil) - Thread.handle_interrupt(EXCEPTION_NEVER) do - @monitor.__send__(:mon_check_owner) - count = @monitor.__send__(:mon_exit_for_cond) - begin - Thread.handle_interrupt(EXCEPTION_IMMEDIATE) do - @cond.wait(@monitor.instance_variable_get(:@mon_mutex), timeout) - end - return true - ensure - @monitor.__send__(:mon_enter_for_cond, count) - end + @monitor.mon_check_owner + count = @monitor.__send__(:exit_for_cond) + begin + @cond.wait(@monitor.__send__(:mutex_for_cond), timeout) + return true + ensure + @monitor.__send__(:enter_for_cond, count) end end @@ -142,7 +136,7 @@ module MonitorMixin # Wakes up the first thread in line waiting for this lock. # def signal - @monitor.__send__(:mon_check_owner) + @monitor.mon_check_owner @cond.signal end @@ -150,7 +144,7 @@ module MonitorMixin # Wakes up all threads waiting for this lock. # def broadcast - @monitor.__send__(:mon_check_owner) + @monitor.mon_check_owner @cond.broadcast end @@ -171,15 +165,7 @@ module MonitorMixin # Attempts to enter exclusive section. Returns +false+ if lock fails. # def mon_try_enter - if @mon_owner != Thread.current - unless @mon_mutex.try_lock - return false - end - @mon_owner = Thread.current - @mon_count = 0 - end - @mon_count += 1 - return true + @mon_data.try_enter end # For backward compatibility alias try_mon_enter mon_try_enter @@ -188,12 +174,7 @@ module MonitorMixin # Enters exclusive section. # def mon_enter - if @mon_owner != Thread.current - @mon_mutex.lock - @mon_owner = Thread.current - @mon_count = 0 - end - @mon_count += 1 + @mon_data.enter end # @@ -201,25 +182,21 @@ module MonitorMixin # def mon_exit mon_check_owner - @mon_count -=1 - if @mon_count == 0 - @mon_owner = nil - @mon_mutex.unlock - end + @mon_data.exit end # # Returns true if this monitor is locked by any thread # def mon_locked? - @mon_mutex.locked? + @mon_data.mon_locked? end # # Returns true if this monitor is locked by current thread. # def mon_owned? - @mon_mutex.locked? && @mon_owner == Thread.current + @mon_data.mon_owned? end # @@ -227,14 +204,12 @@ module MonitorMixin # section automatically when the block exits. See example under # +MonitorMixin+. # - def mon_synchronize - # Prevent interrupt on handling interrupts; for example timeout errors - # it may break locking state. - Thread.handle_interrupt(EXCEPTION_NEVER){ mon_enter } + def mon_synchronize(&b) + @mon_data.enter begin yield ensure - Thread.handle_interrupt(EXCEPTION_NEVER){ mon_exit } + @mon_data.exit end end alias synchronize mon_synchronize @@ -244,7 +219,7 @@ module MonitorMixin # receiver. # def new_cond - return ConditionVariable.new(self) + return ConditionVariable.new(@mon_data) end private @@ -260,31 +235,15 @@ module MonitorMixin # Initializes the MonitorMixin after being included in a class or when an # object has been extended with the MonitorMixin def mon_initialize - if defined?(@mon_mutex) && @mon_mutex_owner_object_id == object_id + if defined?(@mon_data) && @mon_data_owner_object_id == self.object_id raise ThreadError, "already initialized" end - @mon_mutex = Thread::Mutex.new - @mon_mutex_owner_object_id = object_id - @mon_owner = nil - @mon_count = 0 + @mon_data = ::Monitor.new + @mon_data_owner_object_id = self.object_id end def mon_check_owner - if @mon_owner != Thread.current - raise ThreadError, "current thread not owner" - end - end - - def mon_enter_for_cond(count) - @mon_owner = Thread.current - @mon_count = count - end - - def mon_exit_for_cond - count = @mon_count - @mon_owner = nil - @mon_count = 0 - return count + @mon_data.mon_check_owner end end @@ -299,12 +258,17 @@ end # end # class Monitor - include MonitorMixin - alias try_enter try_mon_enter - alias enter mon_enter - alias exit mon_exit -end + def new_cond + ::MonitorMixin::ConditionVariable.new(self) + end + # for compatibility + alias try_mon_enter try_enter + alias mon_try_enter try_enter + alias mon_enter enter + alias mon_exit exit + alias mon_synchronize synchronize +end # Documentation comments: # - All documentation comes from Nutshell. diff --git a/ext/monitor/monitor.c b/ext/monitor/monitor.c new file mode 100644 index 0000000000..cf9e7fe07d --- /dev/null +++ b/ext/monitor/monitor.c @@ -0,0 +1,189 @@ +#include "ruby/ruby.h" + +/* Thread::Monitor */ + +struct rb_monitor { + long count; + const VALUE owner; + const VALUE mutex; +}; + +static void +monitor_mark(void *ptr) +{ + struct rb_monitor *mc = ptr; + rb_gc_mark(mc->owner); + rb_gc_mark(mc->mutex); +} + +static size_t +monitor_memsize(const void *ptr) +{ + return sizeof(struct rb_monitor); +} + +static const rb_data_type_t monitor_data_type = { + "monitor", + {monitor_mark, RUBY_TYPED_DEFAULT_FREE, monitor_memsize,}, + 0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED +}; + +static VALUE +monitor_alloc(VALUE klass) +{ + struct rb_monitor *mc; + VALUE obj; + + obj = TypedData_Make_Struct(klass, struct rb_monitor, &monitor_data_type, mc); + RB_OBJ_WRITE(obj, &mc->mutex, rb_mutex_new()); + RB_OBJ_WRITE(obj, &mc->owner, Qnil); + mc->count = 0; + + return obj; +} + +static struct rb_monitor * +monitor_ptr(VALUE monitor) +{ + struct rb_monitor *mc; + TypedData_Get_Struct(monitor, struct rb_monitor, &monitor_data_type, mc); + return mc; +} + +static int +mc_owner_p(struct rb_monitor *mc) +{ + return mc->owner == rb_thread_current(); +} + +static VALUE +monitor_try_enter(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + + if (!mc_owner_p(mc)) { + if (!rb_mutex_trylock(mc->mutex)) { + return Qfalse; + } + RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current()); + mc->count = 0; + } + mc->count += 1; + return Qtrue; +} + +static VALUE +monitor_enter(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + if (!mc_owner_p(mc)) { + rb_mutex_lock(mc->mutex); + RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current()); + mc->count = 0; + } + mc->count++; + return Qnil; +} + +static VALUE +monitor_exit(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + mc->count--; + if (mc->count == 0) { + RB_OBJ_WRITE(monitor, &mc->owner, Qnil); + rb_mutex_unlock(mc->mutex); + } + return Qnil; +} + +static VALUE +monitor_locked_p(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + return rb_mutex_locked_p(mc->mutex); +} + +static VALUE +monitor_owned_p(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + return (rb_mutex_locked_p(mc->mutex) && mc_owner_p(mc)) ? Qtrue : Qfalse; +} + +static VALUE +monitor_check_owner(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + if (!mc_owner_p(mc)) { + rb_raise(rb_eThreadError, "current thread not owner"); + } + return Qnil; +} + +static VALUE +monitor_enter_for_cond(VALUE monitor, VALUE count) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + RB_OBJ_WRITE(monitor, &mc->owner, rb_thread_current()); + mc->count = NUM2LONG(count); + return Qnil; +} + +static VALUE +monitor_exit_for_cond(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + long cnt = mc->count; + RB_OBJ_WRITE(monitor, &mc->owner, Qnil); + mc->count = 0; + return LONG2NUM(cnt); +} + +static VALUE +monitor_mutex_for_cond(VALUE monitor) +{ + struct rb_monitor *mc = monitor_ptr(monitor); + return mc->mutex; +} + +static VALUE +monitor_sync_body(VALUE monitor) +{ + return rb_yield_values(0); +} + +static VALUE +monitor_sync_ensure(VALUE monitor) +{ + return monitor_exit(monitor); +} + +static VALUE +monitor_synchronize(VALUE monitor) +{ + monitor_enter(monitor); + return rb_ensure(monitor_sync_body, monitor, monitor_sync_ensure, monitor); +} + +void +Init_monitor(void) +{ + VALUE rb_cMonitor = rb_define_class("Monitor", rb_cObject); + rb_define_alloc_func(rb_cMonitor, monitor_alloc); + + rb_define_method(rb_cMonitor, "try_enter", monitor_try_enter, 0); + rb_define_method(rb_cMonitor, "enter", monitor_enter, 0); + rb_define_method(rb_cMonitor, "exit", monitor_exit, 0); + rb_define_method(rb_cMonitor, "synchronize", monitor_synchronize, 0); + + /* internal methods for MonitorMixin */ + rb_define_method(rb_cMonitor, "mon_locked?", monitor_locked_p, 0); + rb_define_method(rb_cMonitor, "mon_check_owner", monitor_check_owner, 0); + rb_define_method(rb_cMonitor, "mon_owned?", monitor_owned_p, 0); + + /* internal methods for MonitorMixin::ConditionalVariable */ + rb_define_private_method(rb_cMonitor, "enter_for_cond", monitor_enter_for_cond, 1); + rb_define_private_method(rb_cMonitor, "exit_for_cond", monitor_exit_for_cond, 0); + rb_define_private_method(rb_cMonitor, "mutex_for_cond", monitor_mutex_for_cond, 0); +} diff --git a/test/monitor/test_monitor.rb b/test/monitor/test_monitor.rb index 9d07bf75f7..49c34e067e 100644 --- a/test/monitor/test_monitor.rb +++ b/test/monitor/test_monitor.rb @@ -273,24 +273,24 @@ class TestMonitor < Test::Unit::TestCase end def test_wait_interruption - queue = Queue.new cond = @monitor.new_cond - @monitor.define_singleton_method(:mon_enter_for_cond) do |*args| - queue.deq - super(*args) - end + th = Thread.start { @monitor.synchronize do begin cond.wait(0.1) + @monitor.mon_owned? rescue Interrupt - @monitor.instance_variable_get(:@mon_owner) + @monitor.mon_owned? end end } sleep(0.1) th.raise(Interrupt) - queue.enq(nil) - assert_equal th, th.value + + begin + assert_equal true, th.value + rescue Interrupt + end end end