diff --git a/ChangeLog b/ChangeLog index 215f9c051e..1005586840 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,34 @@ +Thu Aug 27 07:45:34 2015 Koichi Sasada + + * thread_tools.c: add Queue#close(exception=false) and + SizedQueue#close(exception=false). + [Feature #10600] + + Trying to deq from a closed empty queue return nil + if exception parameter equals to false (default). + + If exception parameter is truthy, it raises + ClosedQueueError (< StopIteration). + ClosedQueueError inherits StopIteration so that you can write: + + loop{ e = q.deq; (using e) } + + Trying to close a closed queue raises ClosedQueueError. + + Blocking threads to wait deq for Queue and SizedQueue will be + restarted immediately by returning nil (exception=false) or + raising a ClosedQueueError (exception=true). + + Blocking threads to wait enq for SizedQueue will be + restarted by raising a ClosedQueueError immediately. + + The above specification is not proposed specification, so that + we need to continue discussion to conclude specification this + method. + + * test/thread/test_queue.rb: add tests originally written by + John Anderson and modify detailed behavior. + Wed Aug 26 10:52:02 2015 Nobuyoshi Nakada * re.c (rb_memsearch_wchar, rb_memsearch_qchar): test matching diff --git a/test/thread/test_queue.rb b/test/thread/test_queue.rb index 8fb0adca3f..6195140d58 100644 --- a/test/thread/test_queue.rb +++ b/test/thread/test_queue.rb @@ -43,6 +43,7 @@ class TestQueue < Test::Unit::TestCase } }.join + # close the queue the old way to test for backwards-compatibility num_threads.times { to_workers.push nil } workers.each { |t| t.join } @@ -277,4 +278,293 @@ class TestQueue < Test::Unit::TestCase Marshal.dump(q) end end + + def test_close + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate.call + assert_equal false, q.closed? + q << :something + assert_equal q, q.close + assert q.closed? + assert_raise_with_message(ClosedQueueError, /closed/){q << :nothing} + assert_equal q.pop, :something + assert_nil q.pop + # non-blocking + assert_raise_with_message(ThreadError, /queue empty/){q.pop(non_block=true)} + end + end + + # test that waiting producers are woken up on close + def close_wakeup( num_items, num_threads, &qcreate ) + raise "This test won't work with num_items(#{num_items}) >= num_threads(#{num_threads})" if num_items >= num_threads + + # create the Queue + q = yield + threads = num_threads.times.map{Thread.new{q.pop}} + num_items.times{|i| q << i} + + # wait until queue empty + (Thread.pass; sleep 0.01) until q.size == 0 + + # now there should be some waiting consumers + assert_equal num_threads - num_items, threads.count{|thr| thr.status} + + # tell them all to go away + q.close + + # wait for them to go away + Thread.pass until threads.all?{|thr| thr.status == false} + + # check that they've gone away. Convert nil to -1 so we can sort and do the comparison + expected_values = [-1] * (num_threads - num_items) + num_items.times.to_a + assert_equal expected_values, threads.map{|thr| thr.value || -1 }.sort + end + + def test_queue_close_wakeup + close_wakeup(15, 18){Queue.new} + end + + def test_size_queue_close_wakeup + close_wakeup(5, 8){SizedQueue.new 9} + end + + def test_sized_queue_one_closed_interrupt + q = SizedQueue.new 1 + q << :one + t1 = Thread.new { q << :two } + sleep 0.01 until t1.stop? + q.close + + t1.kill.join + assert_equal 1, q.size + assert_equal :one, q.pop + assert q.empty?, "queue not empty" + end + + # make sure that shutdown state is handled properly by empty? for the non-blocking case + def test_empty_non_blocking + return + q = SizedQueue.new 3 + 3.times{|i| q << i} + + # these all block cos the queue is full + prod_threads = 4.times.map{|i| Thread.new{q << 3+i}} + sleep 0.01 until prod_threads.all?{|thr| thr.status == 'sleep'} + q.close + + items = [] + # sometimes empty? is false but pop will raise ThreadError('empty'), + # meaning a value is not immediately available but will be soon. + until q.empty? + items << q.pop(non_block=true) rescue nil + end + items.compact! + + assert_equal 7.times.to_a, items.sort + assert q.empty? + end + + def test_sized_queue_closed_push_non_blocking + q = SizedQueue.new 7 + q.close + assert_raise_with_message(ClosedQueueError, /queue closed/){q.push(non_block=true)} + end + + def test_blocked_pushers + q = SizedQueue.new 3 + prod_threads = 6.times.map do |i| + thr = Thread.new{q << i}; thr[:pc] = i; thr + end + + # wait until some producer threads have finished, and the other 3 are blocked + sleep 0.01 while prod_threads.reject{|t| t.status}.count < 3 + # this would ensure that all producer threads call push before close + # sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3 + q.close + + # more than prod_threads + cons_threads = 10.times.map do |i| + thr = Thread.new{q.pop}; thr[:pc] = i; thr + end + + # values that came from the queue + popped_values = cons_threads.map &:value + + # wait untl all threads have finished + sleep 0.01 until prod_threads.find_all{|t| t.status}.count == 0 + + # pick only the producer threads that got in before close + successful_prod_threads = prod_threads.reject{|thr| thr.status == nil} + assert_nothing_raised{ successful_prod_threads.map(&:value) } + + # the producer threads that tried to push after q.close should all fail + unsuccessful_prod_threads = prod_threads - successful_prod_threads + unsuccessful_prod_threads.each do |thr| + assert_raise(ClosedQueueError){ thr.value } + end + + assert_equal cons_threads.size, popped_values.size + assert_equal 0, q.size + + # check that consumer threads with values match producers that called push before close + assert_equal successful_prod_threads.map{|thr| thr[:pc]}, popped_values.compact.sort + assert_nil q.pop + end + + def test_deny_pushers + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + prod_threads = nil + q = qcreate[] + synq = Queue.new + producers_start = Thread.new do + prod_threads = 20.times.map do |i| + Thread.new{ synq.pop; q << i } + end + end + q.close + synq.close # start producer threads + + # wait for all threads to be finished, because of exceptions + # NOTE: thr.status will be nil (raised) or false (terminated) + sleep 0.01 until prod_threads && prod_threads.all?{|thr| !thr.status} + + # check that all threads failed to call push + prod_threads.each do |thr| + assert_kind_of ClosedQueueError, (thr.value rescue $!) + end + end + end + + # size should account for waiting pushers during shutdown + def sized_queue_size_close + q = SizedQueue.new 4 + 4.times{|i| q << i} + Thread.new{ q << 5 } + Thread.new{ q << 6 } + assert_equal 4, q.size + assert_equal 4, q.items + q.close + assert_equal 6, q.size + assert_equal 4, q.items + end + + def test_blocked_pushers_empty + q = SizedQueue.new 3 + prod_threads = 6.times.map do |i| + Thread.new{ q << i} + end + + # this ensures that all producer threads call push before close + sleep 0.01 while prod_threads.select{|t| t.status == 'sleep'}.count < 3 + q.close + + ary = [] + until q.empty? + ary << q.pop + end + assert_equal 0, q.size + + assert_equal 3, ary.size + ary.each{|e| assert [0,1,2,3,4,5].include?(e)} + assert_nil q.pop + + prod_threads.each{|t| + begin + t.join + rescue => e + end + } + end + + # test thread wakeup on one-element SizedQueue with close + def test_one_element_sized_queue + q = SizedQueue.new 1 + t = Thread.new{ q.pop } + q.close + assert_nil t.value + end + + def test_close_token_exception + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + q.close true + assert_raise(ClosedQueueError){q.pop} + end + end + + def test_close_token_loop + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + popped_items = [] + consumer_thread = Thread.new{loop{popped_items << q.pop}; :done} + 7.times{|i| q << i} + q.close true + sleep 0.1 unless q.empty? + assert_equal :done, consumer_thread.value + assert_equal 7.times.to_a, popped_items + end + end + + def test_close_twice + [->{Queue.new}, ->{SizedQueue.new 3}].each do |qcreate| + q = qcreate[] + q.close + assert_raise(ClosedQueueError){q.close} + + q = qcreate[] + q.close(true) + assert_raise(ClosedQueueError){q.close(false)} + end + end + + def test_queue_close_multi_multi + q = SizedQueue.new rand(800..1200) + + count_items = rand(3000..5000) + count_producers = rand(10..20) + + producers = count_producers.times.map do + Thread.new do + sleep(rand / 100) + count_items.times{|i| q << [i,"#{i} for #{Thread.current.inspect}"]} + end + end + + consumers = rand(7..12).times.map do + Thread.new do + count = 0 + loop do + i, st = q.pop + count += 1 if i.is_a?(Fixnum) && st.is_a?(String) + end + count + end + end + + # No dead or finished threads + assert (consumers + producers).all?{|thr| thr.status =~ /\Arun|sleep\Z/}, 'no threads runnning' + + # just exercising the concurrency of the support methods. + counter = Thread.new do + until q.closed? && q.empty? + raise if q.size > q.max + # otherwise this exercise causes too much contention on the lock + sleep 0.01 + end + end + + producers.each &:join + q.close true + + # results not randomly distributed. Not sure why. + # consumers.map{|thr| thr.value}.each do |x| + # assert_not_equal 0, x + # end + + all_items_count = consumers.map{|thr| thr.value}.inject(:+) + assert_equal count_items * count_producers, all_items_count + + # don't leak this thread + assert_nothing_raised{counter.join} + end end diff --git a/thread_tools.c b/thread_tools.c index eee185e6c3..06e73751bb 100644 --- a/thread_tools.c +++ b/thread_tools.c @@ -1,6 +1,7 @@ /* included by thraed.c */ VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable; +VALUE rb_eClosedQueueError; /* Mutex */ @@ -521,6 +522,9 @@ enum { END_QUEUE }; +#define QUEUE_CLOSED FL_USER5 +#define QUEUE_CLOSE_EXCEPTION FL_USER6 + #define GET_QUEUE_QUE(q) get_array((q), QUEUE_QUE) #define GET_QUEUE_WAITERS(q) get_array((q), QUEUE_WAITERS) #define GET_SZQUEUE_WAITERS(q) get_array((q), SZQUEUE_WAITERS) @@ -566,6 +570,77 @@ wakeup_all_threads(VALUE list) rb_ary_clear(list); } +static unsigned long +queue_length(VALUE self) +{ + VALUE que = GET_QUEUE_QUE(self); + return RARRAY_LEN(que); +} + +static unsigned long +queue_num_waiting(VALUE self) +{ + VALUE waiters = GET_QUEUE_WAITERS(self); + return RARRAY_LEN(waiters); +} + +static unsigned long +szqueue_num_waiting_producer(VALUE self) +{ + VALUE waiters = GET_SZQUEUE_WAITERS(self); + return RARRAY_LEN(waiters); +} + +static int +queue_closed_p(VALUE self) +{ + return FL_TEST_RAW(self, QUEUE_CLOSED) != 0; +} + +static void +raise_closed_queue_error(VALUE self) +{ + rb_raise(rb_eClosedQueueError, "queue closed"); +} + +static VALUE +queue_closed_result(VALUE self) +{ + assert(queue_length(self) == 0); + + if (FL_TEST(self, QUEUE_CLOSE_EXCEPTION)) { + raise_closed_queue_error(self); + } + return Qnil; +} + +static VALUE +queue_do_close(VALUE self, int argc, VALUE *argv, int is_szq) +{ + VALUE exception = Qfalse; + + if (queue_closed_p(self)) raise_closed_queue_error(self); + + rb_scan_args(argc, argv, "01", &exception); + FL_SET(self, QUEUE_CLOSED); + + if (RTEST(exception)) { + FL_SET(self, QUEUE_CLOSE_EXCEPTION); + } + + if (queue_num_waiting(self) > 0) { + VALUE waiters = GET_QUEUE_WAITERS(self); + wakeup_all_threads(waiters); + } + + if (is_szq && szqueue_num_waiting_producer(self) > 0) { + VALUE waiters = GET_SZQUEUE_WAITERS(self); + wakeup_all_threads(waiters); + } + + return self; +} + /* * Document-class: Queue * @@ -611,11 +686,74 @@ rb_queue_initialize(VALUE self) static VALUE queue_do_push(VALUE self, VALUE obj) { + if (queue_closed_p(self)) { + raise_closed_queue_error(self); + } rb_ary_push(GET_QUEUE_QUE(self), obj); wakeup_first_thread(GET_QUEUE_WAITERS(self)); return self; } +/* + * Document-method: Queue#close + * call-seq: + * close(exception=false) + * + * Closes the queue. A closed queue cannot be re-opened. + * + * After the call to close completes, the following are true: + * + * - +closed?+ will return true + * + * - calling enq/push/<< will raise ClosedQueueError('queue closed') + * + * - when +empty?+ is false, calling deq/pop/shift will return an object + * from the queue as usual. + * + * - when +empty?+ is true, deq(non_block=false) will not suspend and + * will either return nil. If +exception+ parameter is true, raise ClosedQueueError error. + * deq(non_block=true) will ignore the parameter and raise a ThreadError('queue empty'). + * + * ClosedQueueError is inherited from StopIteration, so that you can break loop block. + * + * Example: + * + * q = Queue.new + * Thread.new{ + * while e = q.deq # wait for nil to break loop + * # ... + * end + * } + * q.close # equals to q.close(false) + * + * q = Queue.new + * Thread.new{ + * loop{ + * e = q.deq; ... # braek with ClosedQueueError + * } + * } + * q.close(true) + */ + +static VALUE +rb_queue_close(int argc, VALUE *argv, VALUE self) +{ + return queue_do_close(self, argc, argv, FALSE); +} + +/* + * Document-method: Queue#closed? + * call-seq: closed? + * + * Returns +true+ if the queue is closed. + */ + +static VALUE +rb_queue_closed_p(VALUE self) +{ + return queue_closed_p(self) ? Qtrue : Qfalse; +} + /* * Document-method: Queue#push * call-seq: @@ -632,20 +770,6 @@ rb_queue_push(VALUE self, VALUE obj) return queue_do_push(self, obj); } -static unsigned long -queue_length(VALUE self) -{ - VALUE que = GET_QUEUE_QUE(self); - return RARRAY_LEN(que); -} - -static unsigned long -queue_num_waiting(VALUE self) -{ - VALUE waiters = GET_QUEUE_WAITERS(self); - return RARRAY_LEN(waiters); -} - struct waiting_delete { VALUE waiting; VALUE th; @@ -676,8 +800,16 @@ queue_do_pop(VALUE self, int should_block) if (!should_block) { rb_raise(rb_eThreadError, "queue empty"); } - rb_ary_push(args.waiting, args.th); - rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + else if (queue_closed_p(self)) { + return queue_closed_result(self); + } + else { + assert(queue_length(self) == 0); + assert(queue_closed_p(self) == 0); + + rb_ary_push(args.waiting, args.th); + rb_ensure(queue_sleep, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } } return rb_ary_shift(GET_QUEUE_QUE(self)); @@ -804,6 +936,24 @@ rb_szqueue_initialize(VALUE self, VALUE vmax) return self; } +/* + * Document-method: SizedQueue#close + * call-seq: + * close(exception=false) + * + * Similar to Queue#close. + * + * The difference is behavior with waiting enqueuing threads. + * + * If there are waiting enqueuing threads, they are interrupted by + * raising ClosedQueueError('queue closed'). + */ +static VALUE +rb_szqueue_close(int argc, VALUE *argv, VALUE self) +{ + return queue_do_close(self, argc, argv, TRUE); +} + /* * Document-method: SizedQueue#max * @@ -879,9 +1029,20 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) if (!should_block) { rb_raise(rb_eThreadError, "queue full"); } - rb_ary_push(args.waiting, args.th); - rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + else if (queue_closed_p(self)) { + goto closed; + } + else { + rb_ary_push(args.waiting, args.th); + rb_ensure((VALUE (*)())rb_thread_sleep_deadly, (VALUE)0, queue_delete_from_waiting, (VALUE)&args); + } } + + if (queue_closed_p(self)) { + closed: + raise_closed_queue_error(self); + } + return queue_do_push(self, argv[0]); } @@ -941,9 +1102,7 @@ rb_szqueue_clear(VALUE self) static VALUE rb_szqueue_num_waiting(VALUE self) { - long len = queue_num_waiting(self); - VALUE waiters = GET_SZQUEUE_WAITERS(self); - len += RARRAY_LEN(waiters); + long len = queue_num_waiting(self) + szqueue_num_waiting_producer(self); return ULONG2NUM(len); } @@ -1106,14 +1265,14 @@ Init_thread_tools(void) rb_cThread, "Queue", rb_cObject, rb_struct_alloc_noinit, "que", "waiters", NULL); - rb_cSizedQueue = rb_struct_define_without_accessor_under( - rb_cThread, - "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, - "que", "waiters", "queue_waiters", "size", NULL); + + rb_eClosedQueueError = rb_define_class("ClosedQueueError", rb_eStopIteration); rb_define_method(rb_cQueue, "initialize", rb_queue_initialize, 0); rb_undef_method(rb_cQueue, "initialize_copy"); rb_define_method(rb_cQueue, "marshal_dump", undumpable, 0); + rb_define_method(rb_cQueue, "close", rb_queue_close, -1); + rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0); rb_define_method(rb_cQueue, "push", rb_queue_push, 1); rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1); rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0); @@ -1127,7 +1286,13 @@ Init_thread_tools(void) rb_define_alias(rb_cQueue, "shift", "pop"); /* Alias for #pop. */ rb_define_alias(rb_cQueue, "size", "length"); /* Alias for #length. */ + rb_cSizedQueue = rb_struct_define_without_accessor_under( + rb_cThread, + "SizedQueue", rb_cQueue, rb_struct_alloc_noinit, + "que", "waiters", "queue_waiters", "size", NULL); + rb_define_method(rb_cSizedQueue, "initialize", rb_szqueue_initialize, 1); + rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, -1); rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);