mirror of
				https://github.com/ruby/ruby.git
				synced 2022-11-09 12:17:21 -05:00 
			
		
		
		
	
		
			
				
	
	
		
			187 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
			
		
		
	
	
			187 lines
		
	
	
	
		
			5 KiB
		
	
	
	
		
			Ruby
		
	
	
	
	
	
class Ractor
 | 
						|
  # Create a new Ractor with args and a block.
 | 
						|
  # args are passed via incoming channel.
 | 
						|
  # A block (Proc) will be isolated (can't access to outer variables)
 | 
						|
  #
 | 
						|
  # A ractor has default two channels:
 | 
						|
  # an incoming channel and an outgoing channel.
 | 
						|
  #
 | 
						|
  # Other ractors send objects to the ractor via the incoming channel and
 | 
						|
  # the ractor receives them.
 | 
						|
  # The ractor send objects via the outgoing channel and other ractors can
 | 
						|
  # receive them.
 | 
						|
  #
 | 
						|
  # The result of the block is sent via the outgoing channel
 | 
						|
  # and other
 | 
						|
  #
 | 
						|
  #   r = Ractor.new do
 | 
						|
  #     Ractor.receive # receive via r's mailbox => 1
 | 
						|
  #     Ractor.receive # receive via r's mailbox => 2
 | 
						|
  #     Ractor.yield 3 # yield a message (3) and wait for taking by another ractor.
 | 
						|
  #     'ok'           # the return value will be yielded.
 | 
						|
  #                    # and r's incoming/outgoing ports are closed automatically.
 | 
						|
  #   end
 | 
						|
  #   r.send 1 # send a message (1) into r's mailbox.
 | 
						|
  #   r <<   2 # << is an alias of `send`.
 | 
						|
  #   p r.take   # take a message from r's outgoing port => 3
 | 
						|
  #   p r.take   # => 'ok'
 | 
						|
  #   p r.take   # raise Ractor::ClosedError
 | 
						|
  #
 | 
						|
  # other options:
 | 
						|
  #   name: Ractor's name
 | 
						|
  #
 | 
						|
  def self.new(*args, name: nil, &block)
 | 
						|
    b = block # TODO: builtin bug
 | 
						|
    raise ArgumentError, "must be called with a block" unless block
 | 
						|
    loc = caller_locations(1, 1).first
 | 
						|
    loc = "#{loc.path}:#{loc.lineno}"
 | 
						|
    __builtin_ractor_create(loc, name, args, b)
 | 
						|
  end
 | 
						|
 | 
						|
  # return current Ractor
 | 
						|
  def self.current
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      rb_ec_ractor_ptr(ec)->self
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  def self.count
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ULONG2NUM(GET_VM()->ractor.cnt);
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  # Multiplex multiple Ractor communications.
 | 
						|
  #
 | 
						|
  #   r, obj = Ractor.select(r1, r2)
 | 
						|
  #   #=> wait for taking from r1 or r2
 | 
						|
  #   #   returned obj is a taken object from Ractor r
 | 
						|
  #
 | 
						|
  #   r, obj = Ractor.select(r1, r2, Ractor.current)
 | 
						|
  #   #=> wait for taking from r1 or r2
 | 
						|
  #   #         or receive from incoming queue
 | 
						|
  #   #   If receive is succeed, then obj is received value
 | 
						|
  #   #   and r is :receive (Ractor.current)
 | 
						|
  #
 | 
						|
  #   r, obj = Ractor.select(r1, r2, Ractor.current, yield_value: obj)
 | 
						|
  #   #=> wait for taking from r1 or r2
 | 
						|
  #   #         or receive from incoming queue
 | 
						|
  #   #         or yield (Ractor.yield) obj
 | 
						|
  #   #   If yield is succeed, then obj is nil
 | 
						|
  #   #   and r is :yield
 | 
						|
  #
 | 
						|
  def self.select(*ractors, yield_value: yield_unspecified = true, move: false)
 | 
						|
    __builtin_cstmt! %q{
 | 
						|
      const VALUE *rs = RARRAY_CONST_PTR_TRANSIENT(ractors);
 | 
						|
      VALUE rv;
 | 
						|
      VALUE v = ractor_select(ec, rs, RARRAY_LENINT(ractors),
 | 
						|
                              yield_unspecified == Qtrue ? Qundef : yield_value,
 | 
						|
                              (bool)RTEST(move) ? true : false, &rv);
 | 
						|
      return rb_ary_new_from_args(2, rv, v);
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  # Receive an incoming message from Ractor's incoming queue.
 | 
						|
  def self.receive
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_receive(ec, rb_ec_ractor_ptr(ec))
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  class << self
 | 
						|
    alias recv receive
 | 
						|
  end
 | 
						|
 | 
						|
  private def receive
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      // TODO: check current actor
 | 
						|
      ractor_receive(ec, RACTOR_PTR(self))
 | 
						|
    }
 | 
						|
  end
 | 
						|
  alias recv receive
 | 
						|
 | 
						|
  # Send a message to a Ractor's incoming queue.
 | 
						|
  #
 | 
						|
  # # Example:
 | 
						|
  #   r = Ractor.new do
 | 
						|
  #     p Ractor.receive #=> 'ok'
 | 
						|
  #   end
 | 
						|
  #   r.send 'ok' # send to r's incoming queue.
 | 
						|
  def send(obj, move: false)
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_send(ec, RACTOR_PTR(self), obj, move)
 | 
						|
    }
 | 
						|
  end
 | 
						|
  alias << send
 | 
						|
 | 
						|
  # yield a message to the ractor's outgoing port.
 | 
						|
  def self.yield(obj, move: false)
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_yield(ec, rb_ec_ractor_ptr(ec), obj, move)
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  # Take a message from ractor's outgoing port.
 | 
						|
  #
 | 
						|
  # Example:
 | 
						|
  #   r = Ractor.new{ 'oK' }
 | 
						|
  #   p r.take #=> 'ok'
 | 
						|
  def take
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_take(ec, RACTOR_PTR(self))
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  def inspect
 | 
						|
    loc  = __builtin_cexpr! %q{ RACTOR_PTR(self)->loc }
 | 
						|
    name = __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
 | 
						|
    id   = __builtin_cexpr! %q{ INT2FIX(RACTOR_PTR(self)->id) }
 | 
						|
    status = __builtin_cexpr! %q{
 | 
						|
      rb_str_new2(ractor_status_str(RACTOR_PTR(self)->status_))
 | 
						|
    }
 | 
						|
    "#<Ractor:##{id}#{name ? ' '+name : ''}#{loc ? " " + loc : ''} #{status}>"
 | 
						|
  end
 | 
						|
 | 
						|
  def name
 | 
						|
    __builtin_cexpr! %q{ RACTOR_PTR(self)->name }
 | 
						|
  end
 | 
						|
 | 
						|
  class RemoteError
 | 
						|
    attr_reader :ractor
 | 
						|
  end
 | 
						|
 | 
						|
  # Closes the incoming port and returns its previous state.
 | 
						|
  def close_incoming
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_close_incoming(ec, RACTOR_PTR(self));
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  # Closes the outgoing port and returns its previous state.
 | 
						|
  def close_outgoing
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      ractor_close_outgoing(ec, RACTOR_PTR(self));
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  # Closes both incoming and outgoing ports.
 | 
						|
  def close
 | 
						|
    close_incoming
 | 
						|
    close_outgoing
 | 
						|
 | 
						|
    self
 | 
						|
  end
 | 
						|
 | 
						|
  # utility method
 | 
						|
  def self.shareable? obj
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      rb_ractor_shareable_p(obj) ? Qtrue : Qfalse;
 | 
						|
    }
 | 
						|
  end
 | 
						|
 | 
						|
  def self.make_shareable obj
 | 
						|
    __builtin_cexpr! %q{
 | 
						|
      rb_ractor_make_shareable(obj);
 | 
						|
    }
 | 
						|
  end
 | 
						|
end
 |