1
0
Fork 0
mirror of https://github.com/ruby/ruby.git synced 2022-11-09 12:17:21 -05:00

relax Fiber#transfer's restriction

Using Fiber#transfer with Fiber#resume for a same Fiber is
limited (once Fiber#transfer is called for a fiber, the fiber
can not be resumed more). This restriction was introduced to
protect the resume/yield chain, but we realized that it is too much
to protect the chain. Instead of the current restriction, we
introduce some other protections.

(1) can not transfer to the resuming fiber.
(2) can not transfer to the yielding fiber.
(3) can not resume transferred fiber.
(4) can not yield from not-resumed fiber.

[Bug #17221]

Also at the end of a transferred fiber, it had continued on root fiber.
However, if the root fiber resumed a fiber (and that fiber can resumed
another fiber), this behavior also breaks the resume/yield chain.
So at the end of a transferred fiber, switch to the edge of resume
chain from root fiber.
For example, root fiber resumed f1 and f1 resumed f2, transferred to
f3 and f3 terminated, then continue from the fiber f2 (it was continued
from root fiber without this patch).
This commit is contained in:
Koichi Sasada 2020-10-08 01:15:32 +09:00
parent 2fd71112fb
commit bf3b2a4374
Notes: git 2020-10-12 22:59:06 +09:00
7 changed files with 269 additions and 91 deletions

104
cont.c
View file

@ -207,6 +207,7 @@ typedef struct rb_context_struct {
/* /*
* Fiber status:
* Fiber status: * Fiber status:
* [Fiber.new] ------> FIBER_CREATED * [Fiber.new] ------> FIBER_CREATED
* | [Fiber#resume] * | [Fiber#resume]
@ -235,14 +236,11 @@ struct rb_fiber_struct {
rb_context_t cont; rb_context_t cont;
VALUE first_proc; VALUE first_proc;
struct rb_fiber_struct *prev; struct rb_fiber_struct *prev;
BITFIELD(enum fiber_status, status, 2); VALUE resuming_fiber;
/* If a fiber invokes by "transfer",
* then this fiber can't be invoked by "resume" any more after that.
* You shouldn't mix "transfer" and "resume".
*/
unsigned int transferred : 1;
BITFIELD(enum fiber_status, status, 2);
/* Whether the fiber is allowed to implicitly yield. */ /* Whether the fiber is allowed to implicitly yield. */
unsigned int yielding : 1;
unsigned int blocking : 1; unsigned int blocking : 1;
struct coroutine_context context; struct coroutine_context context;
@ -919,11 +917,13 @@ cont_mark(void *ptr)
RUBY_MARK_LEAVE("cont"); RUBY_MARK_LEAVE("cont");
} }
#if 0
static int static int
fiber_is_root_p(const rb_fiber_t *fiber) fiber_is_root_p(const rb_fiber_t *fiber)
{ {
return fiber == fiber->cont.saved_ec.thread_ptr->root_fiber; return fiber == fiber->cont.saved_ec.thread_ptr->root_fiber;
} }
#endif
static void static void
cont_free(void *ptr) cont_free(void *ptr)
@ -2010,25 +2010,33 @@ fiber_current(void)
} }
static inline rb_fiber_t* static inline rb_fiber_t*
return_fiber(void) return_fiber(bool terminate)
{ {
rb_fiber_t *fiber = fiber_current(); rb_fiber_t *fiber = fiber_current();
rb_fiber_t *prev = fiber->prev; rb_fiber_t *prev = fiber->prev;
if (!prev) { if (prev) {
fiber->prev = NULL;
prev->resuming_fiber = Qnil;
return prev;
}
else {
if (!terminate) {
rb_raise(rb_eFiberError, "attempt to yield on a not resumed fiber");
}
rb_thread_t *th = GET_THREAD(); rb_thread_t *th = GET_THREAD();
rb_fiber_t *root_fiber = th->root_fiber; rb_fiber_t *root_fiber = th->root_fiber;
VM_ASSERT(root_fiber != NULL); VM_ASSERT(root_fiber != NULL);
if (root_fiber == fiber) { // search resuming fiber
rb_raise(rb_eFiberError, "can't yield from root fiber"); for (fiber = root_fiber;
RTEST(fiber->resuming_fiber);
fiber = fiber_ptr(fiber->resuming_fiber)) {
} }
return root_fiber;
} return fiber;
else {
fiber->prev = NULL;
return prev;
} }
} }
@ -2073,7 +2081,7 @@ fiber_store(rb_fiber_t *next_fiber, rb_thread_t *th)
} }
static inline VALUE static inline VALUE
fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int kw_splat) fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int kw_splat, VALUE resuming_fiber, bool yielding)
{ {
VALUE value; VALUE value;
rb_context_t *cont = &fiber->cont; rb_context_t *cont = &fiber->cont;
@ -2120,11 +2128,21 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
VM_ASSERT(FIBER_RUNNABLE_P(fiber)); VM_ASSERT(FIBER_RUNNABLE_P(fiber));
if (is_resume) { rb_fiber_t *current_fiber = fiber_current();
VM_ASSERT(!RTEST(current_fiber->resuming_fiber));
if (RTEST(resuming_fiber)) {
current_fiber->resuming_fiber = resuming_fiber;
fiber->prev = fiber_current(); fiber->prev = fiber_current();
fiber->yielding = 0;
} }
if (fiber_current()->blocking) { VM_ASSERT(!current_fiber->yielding);
if (yielding) {
current_fiber->yielding = 1;
}
if (current_fiber->blocking) {
th->blocking -= 1; th->blocking -= 1;
} }
@ -2134,7 +2152,7 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
value = fiber_store(fiber, th); value = fiber_store(fiber, th);
if (is_resume && FIBER_TERMINATED_P(fiber)) { if (RTEST(resuming_fiber) && FIBER_TERMINATED_P(fiber)) {
fiber_stack_release(fiber); fiber_stack_release(fiber);
} }
@ -2152,7 +2170,7 @@ fiber_switch(rb_fiber_t *fiber, int argc, const VALUE *argv, int is_resume, int
VALUE VALUE
rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv) rb_fiber_transfer(VALUE fiber_value, int argc, const VALUE *argv)
{ {
return fiber_switch(fiber_ptr(fiber_value), argc, argv, 0, RB_NO_KEYWORDS); return fiber_switch(fiber_ptr(fiber_value), argc, argv, RB_NO_KEYWORDS, Qfalse, false);
} }
VALUE VALUE
@ -2181,29 +2199,38 @@ rb_fiber_terminate(rb_fiber_t *fiber, int need_interrupt)
fiber->cont.machine.stack = NULL; fiber->cont.machine.stack = NULL;
fiber->cont.machine.stack_size = 0; fiber->cont.machine.stack_size = 0;
next_fiber = return_fiber(); next_fiber = return_fiber(true);
if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec); if (need_interrupt) RUBY_VM_SET_INTERRUPT(&next_fiber->cont.saved_ec);
fiber_switch(next_fiber, 1, &value, 0, RB_NO_KEYWORDS); fiber_switch(next_fiber, 1, &value, RB_NO_KEYWORDS, Qfalse, false);
} }
VALUE VALUE
rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat) rb_fiber_resume_kw(VALUE fiber_value, int argc, const VALUE *argv, int kw_splat)
{ {
rb_fiber_t *fiber = fiber_ptr(fiber_value); rb_fiber_t *fiber = fiber_ptr(fiber_value);
rb_fiber_t *current_fiber = fiber_current();
if (argc == -1 && FIBER_CREATED_P(fiber)) { if (argc == -1 && FIBER_CREATED_P(fiber)) {
rb_raise(rb_eFiberError, "cannot raise exception on unborn fiber"); rb_raise(rb_eFiberError, "cannot raise exception on unborn fiber");
} }
else if (FIBER_TERMINATED_P(fiber)) {
if (fiber->prev != 0 || fiber_is_root_p(fiber)) { rb_raise(rb_eFiberError, "attempt to resume a terminated fiber");
rb_raise(rb_eFiberError, "double resume"); }
else if (fiber == current_fiber) {
rb_raise(rb_eFiberError, "attempt to resume the current fiber");
}
else if (fiber->prev != NULL) {
rb_raise(rb_eFiberError, "attempt to resume a resumed fiber (double resume)");
}
else if (RTEST(fiber->resuming_fiber)) {
rb_raise(rb_eFiberError, "attempt to resume a resuming fiber");
}
else if (fiber->prev == NULL &&
(!fiber->yielding && fiber->status != FIBER_CREATED)) {
rb_raise(rb_eFiberError, "attempt to resume a transferring fiber");
} }
if (fiber->transferred != 0) { return fiber_switch(fiber, argc, argv, kw_splat, fiber_value, false);
rb_raise(rb_eFiberError, "cannot resume transferred Fiber");
}
return fiber_switch(fiber, argc, argv, 1, kw_splat);
} }
VALUE VALUE
@ -2215,13 +2242,13 @@ rb_fiber_resume(VALUE fiber_value, int argc, const VALUE *argv)
VALUE VALUE
rb_fiber_yield_kw(int argc, const VALUE *argv, int kw_splat) rb_fiber_yield_kw(int argc, const VALUE *argv, int kw_splat)
{ {
return fiber_switch(return_fiber(), argc, argv, 0, kw_splat); return fiber_switch(return_fiber(false), argc, argv, kw_splat, Qfalse, true);
} }
VALUE VALUE
rb_fiber_yield(int argc, const VALUE *argv) rb_fiber_yield(int argc, const VALUE *argv)
{ {
return fiber_switch(return_fiber(), argc, argv, 0, RB_NO_KEYWORDS); return fiber_switch(return_fiber(false), argc, argv, RB_NO_KEYWORDS, Qfalse, true);
} }
void void
@ -2362,8 +2389,13 @@ static VALUE
rb_fiber_m_transfer(int argc, VALUE *argv, VALUE fiber_value) rb_fiber_m_transfer(int argc, VALUE *argv, VALUE fiber_value)
{ {
rb_fiber_t *fiber = fiber_ptr(fiber_value); rb_fiber_t *fiber = fiber_ptr(fiber_value);
fiber->transferred = 1; if (RTEST(fiber->resuming_fiber)) {
return fiber_switch(fiber, argc, argv, 0, rb_keyword_given_p()); rb_raise(rb_eFiberError, "attempt to transfer to a resuming fiber");
}
if (fiber->yielding) {
rb_raise(rb_eFiberError, "attempt to transfer to a yielding fiber");
}
return fiber_switch(fiber, argc, argv, rb_keyword_given_p(), Qfalse, false);
} }
/* /*
@ -2411,8 +2443,8 @@ fiber_to_s(VALUE fiber_value)
const rb_proc_t *proc; const rb_proc_t *proc;
char status_info[0x20]; char status_info[0x20];
if (fiber->transferred) { if (RTEST(fiber->resuming_fiber)) {
snprintf(status_info, 0x20, " (%s, transferred)", fiber_status_name(fiber->status)); snprintf(status_info, 0x20, " (%s by resuming)", fiber_status_name(fiber->status));
} }
else { else {
snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status)); snprintf(status_info, 0x20, " (%s)", fiber_status_name(fiber->status));

View file

@ -6,9 +6,40 @@ describe "Fiber#resume" do
end end
describe "Fiber#resume" do describe "Fiber#resume" do
it "raises a FiberError if the Fiber tries to resume itself" do it "runs until Fiber.yield" do
fiber = Fiber.new { fiber.resume } obj = mock('obj')
-> { fiber.resume }.should raise_error(FiberError, /double resume/) obj.should_not_receive(:do)
fiber = Fiber.new { 1 + 2; Fiber.yield; obj.do }
fiber.resume
end
it "resumes from the last call to Fiber.yield on subsequent invocations" do
fiber = Fiber.new { Fiber.yield :first; :second }
fiber.resume.should == :first
fiber.resume.should == :second
end
it "sets the block parameters to its arguments on the first invocation" do
first = mock('first')
first.should_receive(:arg).with(:first).twice
fiber = Fiber.new { |arg| first.arg arg; Fiber.yield; first.arg arg; }
fiber.resume :first
fiber.resume :second
end
ruby_version_is '3.0' do
it "raises a FiberError if the Fiber tries to resume itself" do
fiber = Fiber.new { fiber.resume }
-> { fiber.resume }.should raise_error(FiberError, /current fiber/)
end
end
ruby_version_is '' ... '3.0' do
it "raises a FiberError if the Fiber tries to resume itself" do
fiber = Fiber.new { fiber.resume }
-> { fiber.resume }.should raise_error(FiberError, /double resume/)
end
end end
it "returns control to the calling Fiber if called from one" do it "returns control to the calling Fiber if called from one" do

View file

@ -42,10 +42,22 @@ describe "Fiber.current" do
fiber3 = Fiber.new do fiber3 = Fiber.new do
states << :fiber3 states << :fiber3
fiber2.transfer fiber2.transfer
flunk ruby_version_is '3.0' do
states << :fiber3_terminated
end
ruby_version_is '' ... '3.0' do
flunk
end
end end
fiber3.resume fiber3.resume
states.should == [:fiber3, :fiber2, :fiber]
ruby_version_is "" ... "3.0" do
states.should == [:fiber3, :fiber2, :fiber]
end
ruby_version_is "3.0" do
states.should == [:fiber3, :fiber2, :fiber, :fiber3_terminated]
end
end end
end end

View file

@ -3,10 +3,21 @@ require_relative '../../spec_helper'
require 'fiber' require 'fiber'
describe "Fiber#resume" do describe "Fiber#resume" do
it "raises a FiberError if the Fiber has transferred control to another Fiber" do ruby_version_is '' ... '3.0' do
fiber1 = Fiber.new { true } it "raises a FiberError if the Fiber has transferred control to another Fiber" do
fiber2 = Fiber.new { fiber1.transfer; Fiber.yield } fiber1 = Fiber.new { true }
fiber2.resume fiber2 = Fiber.new { fiber1.transfer; Fiber.yield }
-> { fiber2.resume }.should raise_error(FiberError) fiber2.resume
-> { fiber2.resume }.should raise_error(FiberError)
end
end
ruby_version_is '3.0' do
it "can work with Fiber#transfer" do
fiber1 = Fiber.new { true }
fiber2 = Fiber.new { fiber1.transfer; Fiber.yield 10 ; Fiber.yield 20; raise }
fiber2.resume.should == 10
fiber2.resume.should == 20
end
end end
end end

View file

@ -11,7 +11,13 @@ describe "Fiber#transfer" do
it "transfers control from one Fiber to another when called from a Fiber" do it "transfers control from one Fiber to another when called from a Fiber" do
fiber1 = Fiber.new { :fiber1 } fiber1 = Fiber.new { :fiber1 }
fiber2 = Fiber.new { fiber1.transfer; :fiber2 } fiber2 = Fiber.new { fiber1.transfer; :fiber2 }
fiber2.resume.should == :fiber1
ruby_version_is '' ... '3.0' do
fiber2.resume.should == :fiber1
end
ruby_version_is '3.0' do
fiber2.resume.should == :fiber2
end
end end
it "returns to the root Fiber when finished" do it "returns to the root Fiber when finished" do
@ -34,12 +40,24 @@ describe "Fiber#transfer" do
states.should == [:start, :end] states.should == [:start, :end]
end end
it "can transfer control to a Fiber that has transferred to another Fiber" do ruby_version_is '' ... '3.0' do
states = [] it "can transfer control to a Fiber that has transferred to another Fiber" do
fiber1 = Fiber.new { states << :fiber1 } states = []
fiber2 = Fiber.new { states << :fiber2_start; fiber1.transfer; states << :fiber2_end} fiber1 = Fiber.new { states << :fiber1 }
fiber2.resume.should == [:fiber2_start, :fiber1] fiber2 = Fiber.new { states << :fiber2_start; fiber1.transfer; states << :fiber2_end}
fiber2.transfer.should == [:fiber2_start, :fiber1, :fiber2_end] fiber2.resume.should == [:fiber2_start, :fiber1]
fiber2.transfer.should == [:fiber2_start, :fiber1, :fiber2_end]
end
end
ruby_version_is '3.0' do
it "can not transfer control to a Fiber that has suspended by Fiber.yield" do
states = []
fiber1 = Fiber.new { states << :fiber1 }
fiber2 = Fiber.new { states << :fiber2_start; Fiber.yield fiber1.transfer; states << :fiber2_end}
fiber2.resume.should == [:fiber2_start, :fiber1]
-> { fiber2.transfer }.should raise_error(FiberError)
end
end end
it "raises a FiberError when transferring to a Fiber which resumes itself" do it "raises a FiberError when transferring to a Fiber which resumes itself" do
@ -83,4 +101,28 @@ describe "Fiber#transfer" do
thread.join thread.join
states.should == [0, 1, 2, 3] states.should == [0, 1, 2, 3]
end end
ruby_version_is "" ... "3.0" do
it "runs until Fiber.yield" do
obj = mock('obj')
obj.should_not_receive(:do)
fiber = Fiber.new { 1 + 2; Fiber.yield; obj.do }
fiber.transfer
end
it "resumes from the last call to Fiber.yield on subsequent invocations" do
fiber = Fiber.new { Fiber.yield :first; :second }
fiber.transfer.should == :first
fiber.transfer.should == :second
end
it "sets the block parameters to its arguments on the first invocation" do
first = mock('first')
first.should_receive(:arg).with(:first).twice
fiber = Fiber.new { |arg| first.arg arg; Fiber.yield; first.arg arg; }
fiber.transfer :first
fiber.transfer :second
end
end
end end

View file

@ -35,32 +35,11 @@ describe :fiber_resume, shared: true do
fiber.send(@method) fiber.send(@method)
end end
it "runs until Fiber.yield" do
obj = mock('obj')
obj.should_not_receive(:do)
fiber = Fiber.new { 1 + 2; Fiber.yield; obj.do }
fiber.send(@method)
end
it "resumes from the last call to Fiber.yield on subsequent invocations" do
fiber = Fiber.new { Fiber.yield :first; :second }
fiber.send(@method).should == :first
fiber.send(@method).should == :second
end
it "accepts any number of arguments" do it "accepts any number of arguments" do
fiber = Fiber.new { |a| } fiber = Fiber.new { |a| }
-> { fiber.send(@method, *(1..10).to_a) }.should_not raise_error -> { fiber.send(@method, *(1..10).to_a) }.should_not raise_error
end end
it "sets the block parameters to its arguments on the first invocation" do
first = mock('first')
first.should_receive(:arg).with(:first).twice
fiber = Fiber.new { |arg| first.arg arg; Fiber.yield; first.arg arg; }
fiber.send(@method, :first)
fiber.send(@method, :second)
end
it "raises a FiberError if the Fiber is dead" do it "raises a FiberError if the Fiber is dead" do
fiber = Fiber.new { true } fiber = Fiber.new { true }
fiber.send(@method) fiber.send(@method)

View file

@ -184,6 +184,34 @@ class TestFiber < Test::Unit::TestCase
assert_equal([:baz], ary) assert_equal([:baz], ary)
end end
def test_terminate_transferred_fiber
root_fiber = Fiber.current
log = []
fa1 = fa2 = fb1 = r1 = nil
fa1 = Fiber.new{
fa2 = Fiber.new{
log << :fa2_terminate
}
fa2.resume
log << :fa1_terminate
}
fb1 = Fiber.new{
fa1.transfer
log << :fb1_terminate
}
r1 = Fiber.new{
fb1.transfer
log << :r1_terminate
}
r1.resume
log << :root_terminate
assert_equal [:fa2_terminate, :fa1_terminate, :r1_terminate, :root_terminate], log
end
def test_tls def test_tls
# #
def tvar(var, val) def tvar(var, val)
@ -278,29 +306,72 @@ class TestFiber < Test::Unit::TestCase
assert_instance_of(Class, Fiber.new(&Class.new.method(:undef_method)).resume(:to_s), bug5083) assert_instance_of(Class, Fiber.new(&Class.new.method(:undef_method)).resume(:to_s), bug5083)
end end
def test_prohibit_resume_transferred_fiber def test_prohibit_transfer_to_resuming_fiber
root_fiber = Fiber.current
assert_raise(FiberError){ assert_raise(FiberError){
root_fiber = Fiber.current fiber = Fiber.new{ root_fiber.transfer }
f = Fiber.new{ fiber.resume
root_fiber.transfer }
}
f.transfer fa1 = Fiber.new{
f.resume fa2 = Fiber.new{ root_fiber.transfer }
}
fb1 = Fiber.new{
fb2 = Fiber.new{ root_fiber.transfer }
}
fa1.transfer
fb1.transfer
assert_raise(FiberError){
fa1.transfer
} }
assert_raise(FiberError){ assert_raise(FiberError){
g=nil fb1.transfer
f=Fiber.new{
g.resume
g.resume
}
g=Fiber.new{
f.resume
f.resume
}
f.transfer
} }
end end
def test_prohibit_transfer_to_yielding_fiber
root_fiber = Fiber.current
f1 = f2 = f3 = nil
f1 = Fiber.new{
f2 = Fiber.new{
f3 = Fiber.new{
p f3: Fiber.yield
}
f3.resume
}
f2.resume
}
f1.resume
assert_raise(FiberError){ f3.transfer 10 }
end
def test_prohibit_resume_to_transferring_fiber
root_fiber = Fiber.current
assert_raise(FiberError){
Fiber.new{
root_fiber.resume
}.transfer
}
f1 = f2 = nil
f1 = Fiber.new do
f2.transfer
end
f2 = Fiber.new do
f1.resume # attempt to resume transferring fiber
end
assert_raise(FiberError){
f1.transfer
}
end
def test_fork_from_fiber def test_fork_from_fiber
skip 'fork not supported' unless Process.respond_to?(:fork) skip 'fork not supported' unless Process.respond_to?(:fork)
pid = nil pid = nil