mirror of
https://github.com/ruby/ruby.git
synced 2022-11-09 12:17:21 -05:00
* thread_sync.c: reduce the specification of Queue#close.
* Queue#close accepts no arguments. * deq'ing on closed queue returns nil, always. [Feature #10600] * test/thread/test_queue.rb: catch up this fix. git-svn-id: svn+ssh://ci.ruby-lang.org/ruby/trunk@52691 b2dd03c8-39d4-4d8f-98ff-823fe69b080e
This commit is contained in:
parent
a77fb26da8
commit
e2609033ab
3 changed files with 23 additions and 62 deletions
|
@ -1,3 +1,12 @@
|
||||||
|
Sat Nov 21 09:18:10 2015 Koichi Sasada <ko1@atdot.net>
|
||||||
|
|
||||||
|
* thread_sync.c: reduce the specification of Queue#close.
|
||||||
|
* Queue#close accepts no arguments.
|
||||||
|
* deq'ing on closed queue returns nil, always.
|
||||||
|
[Feature #10600]
|
||||||
|
|
||||||
|
* test/thread/test_queue.rb: catch up this fix.
|
||||||
|
|
||||||
Sat Nov 21 08:44:21 2015 Koichi Sasada <ko1@atdot.net>
|
Sat Nov 21 08:44:21 2015 Koichi Sasada <ko1@atdot.net>
|
||||||
|
|
||||||
* compile.c (iseq_compile_each): add debug information to NODE_STR
|
* compile.c (iseq_compile_each): add debug information to NODE_STR
|
||||||
|
|
|
@ -289,6 +289,7 @@ class TestQueue < Test::Unit::TestCase
|
||||||
assert_raise_with_message(ClosedQueueError, /closed/){q << :nothing}
|
assert_raise_with_message(ClosedQueueError, /closed/){q << :nothing}
|
||||||
assert_equal q.pop, :something
|
assert_equal q.pop, :something
|
||||||
assert_nil q.pop
|
assert_nil q.pop
|
||||||
|
assert_nil q.pop
|
||||||
# non-blocking
|
# non-blocking
|
||||||
assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
|
assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)}
|
||||||
end
|
end
|
||||||
|
@ -484,36 +485,11 @@ class TestQueue < Test::Unit::TestCase
|
||||||
assert_nil t.value
|
assert_nil t.value
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_close_token_exception
|
|
||||||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
|
||||||
q = qcreate[]
|
|
||||||
q.close true
|
|
||||||
assert_raise(ClosedQueueError){q.pop}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_close_token_loop
|
|
||||||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
|
||||||
q = qcreate[]
|
|
||||||
popped_items = []
|
|
||||||
consumer_thread = Thread.new{loop{popped_items << q.pop}; :done}
|
|
||||||
7.times{|i| q << i}
|
|
||||||
q.close true
|
|
||||||
sleep 0.1 unless q.empty?
|
|
||||||
assert_equal :done, consumer_thread.value
|
|
||||||
assert_equal 7.times.to_a, popped_items
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def test_close_twice
|
def test_close_twice
|
||||||
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
[->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate|
|
||||||
q = qcreate[]
|
q = qcreate[]
|
||||||
q.close
|
q.close
|
||||||
assert_nothing_raised(ClosedQueueError){q.close}
|
assert_nothing_raised(ClosedQueueError){q.close}
|
||||||
|
|
||||||
q = qcreate[]
|
|
||||||
q.close(true)
|
|
||||||
assert_nothing_raised(ClosedQueueError){q.close(false)}
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -533,8 +509,8 @@ class TestQueue < Test::Unit::TestCase
|
||||||
consumers = rand(7..12).times.map do
|
consumers = rand(7..12).times.map do
|
||||||
Thread.new do
|
Thread.new do
|
||||||
count = 0
|
count = 0
|
||||||
loop do
|
while e = q.pop
|
||||||
i, st = q.pop
|
i, st = e
|
||||||
count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
|
count += 1 if i.is_a?(Fixnum) && st.is_a?(String)
|
||||||
end
|
end
|
||||||
count
|
count
|
||||||
|
@ -554,7 +530,7 @@ class TestQueue < Test::Unit::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
producers.each &:join
|
producers.each &:join
|
||||||
q.close true
|
q.close
|
||||||
|
|
||||||
# results not randomly distributed. Not sure why.
|
# results not randomly distributed. Not sure why.
|
||||||
# consumers.map{|thr| thr.value}.each do |x|
|
# consumers.map{|thr| thr.value}.each do |x|
|
||||||
|
|
|
@ -523,7 +523,6 @@ enum {
|
||||||
};
|
};
|
||||||
|
|
||||||
#define QUEUE_CLOSED FL_USER5
|
#define QUEUE_CLOSED FL_USER5
|
||||||
#define QUEUE_CLOSE_EXCEPTION FL_USER6
|
|
||||||
|
|
||||||
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
#define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE)
|
||||||
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
#define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS)
|
||||||
|
@ -607,26 +606,15 @@ static VALUE
|
||||||
queue_closed_result(VALUE self)
|
queue_closed_result(VALUE self)
|
||||||
{
|
{
|
||||||
assert(queue_length(self) == 0);
|
assert(queue_length(self) == 0);
|
||||||
|
|
||||||
if (FL_TEST(self, QUEUE_CLOSE_EXCEPTION)) {
|
|
||||||
raise_closed_queue_error(self);
|
|
||||||
}
|
|
||||||
return Qnil;
|
return Qnil;
|
||||||
}
|
}
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
queue_do_close(VALUE self, int argc, VALUE *argv, int is_szq)
|
queue_do_close(VALUE self, int is_szq)
|
||||||
{
|
{
|
||||||
VALUE exception = Qfalse;
|
|
||||||
|
|
||||||
if (!queue_closed_p(self)) {
|
if (!queue_closed_p(self)) {
|
||||||
rb_scan_args(argc, argv, "01", &exception);
|
|
||||||
FL_SET(self, QUEUE_CLOSED);
|
FL_SET(self, QUEUE_CLOSED);
|
||||||
|
|
||||||
if (RTEST(exception)) {
|
|
||||||
FL_SET(self, QUEUE_CLOSE_EXCEPTION);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (queue_num_waiting(self) > 0) {
|
if (queue_num_waiting(self) > 0) {
|
||||||
VALUE waiters = GET_QUEUE_WAITERS(self);
|
VALUE waiters = GET_QUEUE_WAITERS(self);
|
||||||
wakeup_all_threads(waiters);
|
wakeup_all_threads(waiters);
|
||||||
|
@ -697,7 +685,7 @@ queue_do_push(VALUE self, VALUE obj)
|
||||||
/*
|
/*
|
||||||
* Document-method: Queue#close
|
* Document-method: Queue#close
|
||||||
* call-seq:
|
* call-seq:
|
||||||
* close(exception=false)
|
* close
|
||||||
*
|
*
|
||||||
* Closes the queue. A closed queue cannot be re-opened.
|
* Closes the queue. A closed queue cannot be re-opened.
|
||||||
*
|
*
|
||||||
|
@ -707,15 +695,11 @@ queue_do_push(VALUE self, VALUE obj)
|
||||||
*
|
*
|
||||||
* - +close+ will be ignored.
|
* - +close+ will be ignored.
|
||||||
*
|
*
|
||||||
* - calling enq/push/<< will raise ClosedQueueError('queue closed')
|
* - calling enq/push/<< will return nil.
|
||||||
*
|
*
|
||||||
* - when +empty?+ is false, calling deq/pop/shift will return an object
|
* - when +empty?+ is false, calling deq/pop/shift will return an object
|
||||||
* from the queue as usual.
|
* from the queue as usual.
|
||||||
*
|
*
|
||||||
* - when +empty?+ is true, deq(non_block=false) will not suspend and
|
|
||||||
* will either return nil. If +exception+ parameter is true, raise ClosedQueueError error.
|
|
||||||
* deq(non_block=true) will ignore the parameter and raise a ThreadError('queue empty').
|
|
||||||
*
|
|
||||||
* ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
* ClosedQueueError is inherited from StopIteration, so that you can break loop block.
|
||||||
*
|
*
|
||||||
* Example:
|
* Example:
|
||||||
|
@ -726,21 +710,13 @@ queue_do_push(VALUE self, VALUE obj)
|
||||||
* # ...
|
* # ...
|
||||||
* end
|
* end
|
||||||
* }
|
* }
|
||||||
* q.close # equals to q.close(false)
|
* q.close
|
||||||
*
|
|
||||||
* q = Queue.new
|
|
||||||
* Thread.new{
|
|
||||||
* loop{
|
|
||||||
* e = q.deq; ... # break with ClosedQueueError
|
|
||||||
* }
|
|
||||||
* }
|
|
||||||
* q.close(true)
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_queue_close(int argc, VALUE *argv, VALUE self)
|
rb_queue_close(VALUE self)
|
||||||
{
|
{
|
||||||
return queue_do_close(self, argc, argv, FALSE);
|
return queue_do_close(self, FALSE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -951,9 +927,9 @@ rb_szqueue_initialize(VALUE self, VALUE vmax)
|
||||||
* raising ClosedQueueError('queue closed').
|
* raising ClosedQueueError('queue closed').
|
||||||
*/
|
*/
|
||||||
static VALUE
|
static VALUE
|
||||||
rb_szqueue_close(int argc, VALUE *argv, VALUE self)
|
rb_szqueue_close(VALUE self)
|
||||||
{
|
{
|
||||||
return queue_do_close(self, argc, argv, TRUE);
|
return queue_do_close(self, TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1273,7 +1249,7 @@ Init_thread_sync(void)
|
||||||
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
|
rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0);
|
||||||
rb_undef_method(rb_cQueue, "initialize_copy");
|
rb_undef_method(rb_cQueue, "initialize_copy");
|
||||||
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0);
|
||||||
rb_define_method(rb_cQueue, "close", rb_queue_close, -1);
|
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||||||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||||||
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
||||||
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
||||||
|
@ -1294,7 +1270,7 @@ Init_thread_sync(void)
|
||||||
"que", "waiters", "queue_waiters", "size", NULL);
|
"que", "waiters", "queue_waiters", "size", NULL);
|
||||||
|
|
||||||
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1);
|
||||||
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1);
|
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
|
||||||
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
||||||
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
||||||
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
||||||
|
|
Loading…
Add table
Reference in a new issue