mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
thread_sync.c: avoid reaching across stacks of dead threads
rb_ensure is insufficient cleanup for fork and we must reinitialize all waitqueues in the child process. Unfortunately this increases the footprint of ConditionVariable, Queue and SizedQueue by 8 bytes on 32-bit (16 bytes on 64-bit). [ruby-core:86316] [Bug #14634] git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@62934 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
98e9444b5f
commit
a2d63ea2fb
4 changed files with 139 additions and 5 deletions
|
@ -219,4 +219,23 @@ INPUT
|
|||
Marshal.dump(condvar)
|
||||
end
|
||||
end
|
||||
|
||||
def test_condvar_fork
|
||||
mutex = Mutex.new
|
||||
condvar = ConditionVariable.new
|
||||
thrs = (1..10).map do
|
||||
Thread.new { mutex.synchronize { condvar.wait(mutex) } }
|
||||
end
|
||||
thrs.each { 3.times { Thread.pass } }
|
||||
pid = fork do
|
||||
mutex.synchronize { condvar.broadcast }
|
||||
exit!(0)
|
||||
end
|
||||
_, s = Process.waitpid2(pid)
|
||||
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
|
||||
until thrs.empty?
|
||||
mutex.synchronize { condvar.broadcast }
|
||||
thrs.delete_if { |t| t.join(0.01) }
|
||||
end
|
||||
end if Process.respond_to?(:fork)
|
||||
end
|
||||
|
|
|
@ -565,4 +565,52 @@ class TestQueue < Test::Unit::TestCase
|
|||
puts 'exit'
|
||||
INPUT
|
||||
end
|
||||
|
||||
def test_fork_while_queue_waiting
|
||||
q = Queue.new
|
||||
sq = SizedQueue.new(1)
|
||||
thq = Thread.new { q.pop }
|
||||
thsq = Thread.new { sq.pop }
|
||||
Thread.pass until thq.stop? && thsq.stop?
|
||||
|
||||
pid = fork do
|
||||
exit!(1) if q.num_waiting != 0
|
||||
exit!(2) if sq.num_waiting != 0
|
||||
exit!(6) unless q.empty?
|
||||
exit!(7) unless sq.empty?
|
||||
q.push :child_q
|
||||
sq.push :child_sq
|
||||
exit!(3) if q.pop != :child_q
|
||||
exit!(4) if sq.pop != :child_sq
|
||||
exit!(0)
|
||||
end
|
||||
_, s = Process.waitpid2(pid)
|
||||
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
|
||||
|
||||
q.push :thq
|
||||
sq.push :thsq
|
||||
assert_equal :thq, thq.value
|
||||
assert_equal :thsq, thsq.value
|
||||
|
||||
sq.push(1)
|
||||
th = Thread.new { q.pop; sq.pop }
|
||||
thsq = Thread.new { sq.push(2) }
|
||||
Thread.pass until th.stop? && thsq.stop?
|
||||
pid = fork do
|
||||
exit!(1) if q.num_waiting != 0
|
||||
exit!(2) if sq.num_waiting != 0
|
||||
exit!(3) unless q.empty?
|
||||
exit!(4) if sq.empty?
|
||||
exit!(5) if sq.pop != 1
|
||||
exit!(0)
|
||||
end
|
||||
_, s = Process.waitpid2(pid)
|
||||
assert_predicate s, :success?, 'no segfault [ruby-core:86316] [Bug #14634]'
|
||||
|
||||
assert_predicate thsq, :stop?
|
||||
assert_equal 1, sq.pop
|
||||
assert_same sq, thsq.value
|
||||
q.push('restart th')
|
||||
assert_equal 2, th.value
|
||||
end if Process.respond_to?(:fork)
|
||||
end
|
||||
|
|
2
thread.c
2
thread.c
|
@ -4216,6 +4216,8 @@ rb_thread_atfork_internal(rb_thread_t *th, void (*atfork)(rb_thread_t *, const r
|
|||
}
|
||||
rb_vm_living_threads_init(vm);
|
||||
rb_vm_living_threads_insert(vm, th);
|
||||
rb_thread_sync_reset_all();
|
||||
|
||||
vm->sleeper = 0;
|
||||
clear_coverage();
|
||||
}
|
||||
|
|
|
@ -4,6 +4,14 @@
|
|||
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
||||
static VALUE rb_eClosedQueueError;
|
||||
|
||||
/*
|
||||
* keep these globally so we can walk and reinitialize them at fork
|
||||
* in the child process
|
||||
*/
|
||||
static LIST_HEAD(szqueue_list);
|
||||
static LIST_HEAD(queue_list);
|
||||
static LIST_HEAD(condvar_list);
|
||||
|
||||
/* sync_waiter is always on-stack */
|
||||
struct sync_waiter {
|
||||
rb_thread_t *th;
|
||||
|
@ -54,6 +62,7 @@ typedef struct rb_mutex_struct {
|
|||
static void rb_mutex_abandon_all(rb_mutex_t *mutexes);
|
||||
static void rb_mutex_abandon_keeping_mutexes(rb_thread_t *th);
|
||||
static void rb_mutex_abandon_locking_mutex(rb_thread_t *th);
|
||||
static void rb_thread_sync_reset_all(void);
|
||||
#endif
|
||||
static const char* rb_mutex_unlock_th(rb_mutex_t *mutex, rb_thread_t volatile *th);
|
||||
|
||||
|
@ -538,7 +547,9 @@ void rb_mutex_allow_trap(VALUE self, int val)
|
|||
/* Queue */
|
||||
|
||||
#define queue_waitq(q) UNALIGNED_MEMBER_PTR(q, waitq)
|
||||
#define queue_live(q) UNALIGNED_MEMBER_PTR(q, live)
|
||||
PACKED_STRUCT_UNALIGNED(struct rb_queue {
|
||||
struct list_node live;
|
||||
struct list_head waitq;
|
||||
const VALUE que;
|
||||
int num_waiting;
|
||||
|
@ -546,6 +557,7 @@ PACKED_STRUCT_UNALIGNED(struct rb_queue {
|
|||
|
||||
#define szqueue_waitq(sq) UNALIGNED_MEMBER_PTR(sq, q.waitq)
|
||||
#define szqueue_pushq(sq) UNALIGNED_MEMBER_PTR(sq, pushq)
|
||||
#define szqueue_live(sq) UNALIGNED_MEMBER_PTR(sq, q.live)
|
||||
PACKED_STRUCT_UNALIGNED(struct rb_szqueue {
|
||||
struct rb_queue q;
|
||||
int num_waiting_push;
|
||||
|
@ -562,6 +574,14 @@ queue_mark(void *ptr)
|
|||
rb_gc_mark(q->que);
|
||||
}
|
||||
|
||||
static void
|
||||
queue_free(void *ptr)
|
||||
{
|
||||
struct rb_queue *q = ptr;
|
||||
list_del(queue_live(q));
|
||||
ruby_xfree(ptr);
|
||||
}
|
||||
|
||||
static size_t
|
||||
queue_memsize(const void *ptr)
|
||||
{
|
||||
|
@ -570,7 +590,7 @@ queue_memsize(const void *ptr)
|
|||
|
||||
static const rb_data_type_t queue_data_type = {
|
||||
"queue",
|
||||
{queue_mark, RUBY_TYPED_DEFAULT_FREE, queue_memsize,},
|
||||
{queue_mark, queue_free, queue_memsize,},
|
||||
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
|
||||
};
|
||||
|
||||
|
@ -582,6 +602,7 @@ queue_alloc(VALUE klass)
|
|||
|
||||
obj = TypedData_Make_Struct(klass, struct rb_queue, &queue_data_type, q);
|
||||
list_head_init(queue_waitq(q));
|
||||
list_add(&queue_list, queue_live(q));
|
||||
return obj;
|
||||
}
|
||||
|
||||
|
@ -604,6 +625,14 @@ szqueue_mark(void *ptr)
|
|||
queue_mark(&sq->q);
|
||||
}
|
||||
|
||||
static void
|
||||
szqueue_free(void *ptr)
|
||||
{
|
||||
struct rb_szqueue *sq = ptr;
|
||||
list_del(szqueue_live(sq));
|
||||
ruby_xfree(ptr);
|
||||
}
|
||||
|
||||
static size_t
|
||||
szqueue_memsize(const void *ptr)
|
||||
{
|
||||
|
@ -612,7 +641,7 @@ szqueue_memsize(const void *ptr)
|
|||
|
||||
static const rb_data_type_t szqueue_data_type = {
|
||||
"sized_queue",
|
||||
{szqueue_mark, RUBY_TYPED_DEFAULT_FREE, szqueue_memsize,},
|
||||
{szqueue_mark, szqueue_free, szqueue_memsize,},
|
||||
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
|
||||
};
|
||||
|
||||
|
@ -624,6 +653,7 @@ szqueue_alloc(VALUE klass)
|
|||
&szqueue_data_type, sq);
|
||||
list_head_init(szqueue_waitq(sq));
|
||||
list_head_init(szqueue_pushq(sq));
|
||||
list_add(&szqueue_list, szqueue_live(sq));
|
||||
return obj;
|
||||
}
|
||||
|
||||
|
@ -878,7 +908,7 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
|
|||
list_add_tail(&qw.as.q->waitq, &qw.w.node);
|
||||
qw.as.q->num_waiting++;
|
||||
|
||||
rb_ensure(queue_sleep, Qfalse, queue_sleep_done, (VALUE)&qw);
|
||||
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&qw);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1120,7 +1150,7 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
|||
list_add_tail(pushq, &qw.w.node);
|
||||
sq->num_waiting_push++;
|
||||
|
||||
rb_ensure(queue_sleep, Qfalse, szqueue_sleep_done, (VALUE)&qw);
|
||||
rb_ensure(queue_sleep, self, szqueue_sleep_done, (VALUE)&qw);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1233,6 +1263,7 @@ rb_szqueue_empty_p(VALUE self)
|
|||
/* TODO: maybe this can be IMEMO */
|
||||
struct rb_condvar {
|
||||
struct list_head waitq;
|
||||
struct list_node live;
|
||||
};
|
||||
|
||||
/*
|
||||
|
@ -1263,6 +1294,14 @@ struct rb_condvar {
|
|||
* }
|
||||
*/
|
||||
|
||||
static void
|
||||
condvar_free(void *ptr)
|
||||
{
|
||||
struct rb_condvar *cv = ptr;
|
||||
list_del(&cv->live);
|
||||
ruby_xfree(ptr);
|
||||
}
|
||||
|
||||
static size_t
|
||||
condvar_memsize(const void *ptr)
|
||||
{
|
||||
|
@ -1271,7 +1310,7 @@ condvar_memsize(const void *ptr)
|
|||
|
||||
static const rb_data_type_t cv_data_type = {
|
||||
"condvar",
|
||||
{0, RUBY_TYPED_DEFAULT_FREE, condvar_memsize,},
|
||||
{0, condvar_free, condvar_memsize,},
|
||||
0, 0, RUBY_TYPED_FREE_IMMEDIATELY|RUBY_TYPED_WB_PROTECTED
|
||||
};
|
||||
|
||||
|
@ -1293,6 +1332,7 @@ condvar_alloc(VALUE klass)
|
|||
|
||||
obj = TypedData_Make_Struct(klass, struct rb_condvar, &cv_data_type, cv);
|
||||
list_head_init(&cv->waitq);
|
||||
list_add(&condvar_list, &cv->live);
|
||||
|
||||
return obj;
|
||||
}
|
||||
|
@ -1406,6 +1446,31 @@ define_thread_class(VALUE outer, const char *name, VALUE super)
|
|||
return klass;
|
||||
}
|
||||
|
||||
#if defined(HAVE_WORKING_FORK)
|
||||
/* we must not reference stacks of dead threads in a forked child */
|
||||
static void
|
||||
rb_thread_sync_reset_all(void)
|
||||
{
|
||||
struct rb_queue *q = 0;
|
||||
struct rb_szqueue *sq = 0;
|
||||
struct rb_condvar *cv = 0;
|
||||
|
||||
list_for_each(&queue_list, q, live) {
|
||||
list_head_init(queue_waitq(q));
|
||||
q->num_waiting = 0;
|
||||
}
|
||||
list_for_each(&szqueue_list, sq, q.live) {
|
||||
list_head_init(szqueue_waitq(sq));
|
||||
list_head_init(szqueue_pushq(sq));
|
||||
sq->num_waiting_push = 0;
|
||||
sq->q.num_waiting = 0;
|
||||
}
|
||||
list_for_each(&condvar_list, cv, live) {
|
||||
list_head_init(&cv->waitq);
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
static void
|
||||
Init_thread_sync(void)
|
||||
{
|
||||
|
|
Loading…
Reference in a new issue