diff --git a/spec/ruby/shared/sizedqueue/enque.rb b/spec/ruby/shared/sizedqueue/enque.rb index 6ef12349f8..126470594a 100644 --- a/spec/ruby/shared/sizedqueue/enque.rb +++ b/spec/ruby/shared/sizedqueue/enque.rb @@ -47,4 +47,61 @@ describe :sizedqueue_enq, shared: true do t.join q.pop.should == 1 end + + describe "with a timeout" do + ruby_version_is "3.2" do + it "returns self if the item was pushed in time" do + q = @object.call(1) + q << 1 + + t = Thread.new { + q.send(@method, 2, timeout: 1).should == q + } + Thread.pass until t.status == "sleep" && q.num_waiting == 1 + q.pop + t.join + end + + it "does nothing if the timeout is nil" do + q = @object.call(1) + q << 1 + t = Thread.new { + q.send(@method, 2, timeout: nil).should == q + } + t.join(0.2).should == nil + q.pop + t.join + end + + it "returns nil if no item is available in time" do + q = @object.call(1) + q << 1 + t = Thread.new { + q.send(@method, 2, timeout: 0.1).should == nil + } + t.join + end + + it "raise TypeError if timeout is not a valid numeric" do + q = @object.call(1) + -> { q.send(@method, 2, timeout: "1") }.should raise_error( + TypeError, + "no implicit conversion to float from string", + ) + + -> { q.send(@method, 2, timeout: false) }.should raise_error( + TypeError, + "no implicit conversion to float from false", + ) + end + + it "raise ArgumentError if non_block = true is passed too" do + q = @object.call(1) + -> { q.send(@method, 2, true, timeout: 1) }.should raise_error( + ArgumentError, + "can't set a timeout if non_block is enabled", + ) + end + end + end end diff --git a/test/ruby/test_thread_queue.rb b/test/ruby/test_thread_queue.rb index 1c852474b4..bd5728389d 100644 --- a/test/ruby/test_thread_queue.rb +++ b/test/ruby/test_thread_queue.rb @@ -168,6 +168,24 @@ class TestThreadQueue < Test::Unit::TestCase end end + def test_sized_queue_push_timeout + q = Thread::SizedQueue.new(1) + + q << 1 + assert_equal 1, q.size + + t1 = Thread.new { q.push(2, timeout: 1) } + assert_equal t1, t1.join(2) + assert_nil t1.value + + t2 = Thread.new { q.push(2, timeout: 0.1) } + assert_equal t2, t2.join(0.2) + assert_nil t2.value + ensure + t1&.kill + t2&.kill + end + def test_sized_queue_push_interrupt q = Thread::SizedQueue.new(1) q.push(1) diff --git a/thread_sync.c b/thread_sync.c index 63db1c4392..4ae404ec05 100644 --- a/thread_sync.c +++ b/thread_sync.c @@ -1229,39 +1229,15 @@ rb_szqueue_max_set(VALUE self, VALUE vmax) return vmax; } -static int -szqueue_push_should_block(int argc, const VALUE *argv) -{ - int should_block = 1; - rb_check_arity(argc, 1, 2); - if (argc > 1) { - should_block = !RTEST(argv[1]); - } - return should_block; -} - -/* - * Document-method: Thread::SizedQueue#push - * call-seq: - * push(object, non_block=false) - * enq(object, non_block=false) - * <<(object) - * - * Pushes +object+ to the queue. - * - * If there is no space left in the queue, waits until space becomes - * available, unless +non_block+ is true. If +non_block+ is true, the - * thread isn't suspended, and +ThreadError+ is raised. - */ - static VALUE -rb_szqueue_push(int argc, VALUE *argv, VALUE self) +rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout) { + rb_hrtime_t end = queue_timeout2hrtime(timeout); + bool timed_out = false; struct rb_szqueue *sq = szqueue_ptr(self); - int should_block = szqueue_push_should_block(argc, argv); while (queue_length(self, &sq->q) >= sq->max) { - if (!should_block) { + if (RTEST(non_block)) { rb_raise(rb_eThreadError, "queue full"); } else if (queue_closed_p(self)) { @@ -1281,11 +1257,14 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) struct queue_sleep_arg queue_sleep_arg = { .self = self, - .timeout = Qnil, - .end = 0 + .timeout = timeout, + .end = end }; - rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter); + if (!NIL_P(timeout) && rb_hrtime_now() >= end) { + timed_out = true; + break; + } } } @@ -1293,7 +1272,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self) raise_closed_queue_error(self); } - return queue_do_push(self, &sq->q, argv[0]); + if (timed_out) return Qnil; + + return queue_do_push(self, &sq->q, object); } static VALUE @@ -1611,13 +1592,10 @@ Init_thread_sync(void) rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0); rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0); rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1); - rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1); rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0); rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0); rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0); rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0); - rb_define_alias(rb_cSizedQueue, "enq", "push"); - rb_define_alias(rb_cSizedQueue, "<<", "push"); rb_define_alias(rb_cSizedQueue, "size", "length"); /* CVar */ diff --git a/thread_sync.rb b/thread_sync.rb index d567ca51af..7e4c341ad0 100644 --- a/thread_sync.rb +++ b/thread_sync.rb @@ -41,5 +41,28 @@ class Thread end alias_method :deq, :pop alias_method :shift, :pop + + # call-seq: + # push(object, non_block=false, timeout: nil) + # enq(object, non_block=false, timeout: nil) + # <<(object) + # + # Pushes +object+ to the queue. + # + # If there is no space left in the queue, waits until space becomes + # available, unless +non_block+ is true. If +non_block+ is true, the + # thread isn't suspended, and +ThreadError+ is raised. + # + # If +timeout+ seconds have passed and no space is available +nil+ is + # returned. + # Otherwise it returns +self+. + def push(object, non_block = false, timeout: nil) + if non_block && timeout + raise ArgumentError, "can't set a timeout if non_block is enabled" + end + Primitive.rb_szqueue_push(object, non_block, timeout) + end + alias_method :enq, :push + alias_method :<<, :push end end