=begin = distributed Ruby --- dRuby 2.0.4 Copyright (c) 1999-2003 Masatoshi SEKI You can redistribute it and/or modify it under the same terms as Ruby. =end require 'socket' require 'thread' require 'fcntl' module DRb class DRbError < RuntimeError; end class DRbConnError < DRbError; end class DRbIdConv def to_obj(ref) ObjectSpace._id2ref(ref) end def to_id(obj) obj.nil? ? nil : obj.__id__ end end module DRbUndumped def _dump(dummy) raise TypeError, 'can\'t dump' end end class DRbServerNotFound < DRbError; end class DRbBadURI < DRbError; end class DRbBadScheme < DRbError; end class DRbUnknownError < DRbError def initialize(unknown) @unknown = unknown super(unknown.name) end attr_reader :unknown def self._load(s) Marshal::load(s) end def _dump(lv) Marshal::dump(@unknown) end end class DRbUnknown def initialize(err, buf) case err when /uninitialized constant (\S+)/ @name = $1 when /undefined class\/module (\S+)/ @name = $1 else @name = nil end @buf = buf end attr_reader :name, :buf def self._load(s) begin Marshal::load(s) rescue NameError, ArgumentError DRbUnknown.new($!, s) end end def _dump(lv) @buf end def reload self.class._load(@buf) end def exception DRbUnknownError.new(self) end end class DRbMessage def initialize(config) @load_limit = config[:load_limit] @argc_limit = config[:argc_limit] end def dump(obj) obj = DRbObject.new(obj) if obj.kind_of? DRbUndumped begin str = Marshal::dump(obj) rescue str = Marshal::dump(DRbObject.new(obj)) end [str.size].pack('N') + str end def load(soc) sz = soc.read(4) # sizeof (N) raise(DRbConnError, 'connection closed') if sz.nil? raise(DRbConnError, 'premature header') if sz.size < 4 sz = sz.unpack('N')[0] raise(DRbConnError, "too large packet #{sz}") if @load_limit < sz str = soc.read(sz) raise(DRbConnError, 'connection closed') if sz.nil? raise(DRbConnError, 'premature marshal format(can\'t read)') if str.size < sz begin Marshal::load(str) rescue NameError, ArgumentError DRbUnknown.new($!, str) end end def send_request(stream, ref, msg_id, arg, b) ary = [] ary.push(dump(ref.__drbref)) ary.push(dump(msg_id.id2name)) ary.push(dump(arg.length)) arg.each do |e| ary.push(dump(e)) end ary.push(dump(b)) stream.write(ary.join('')) end def recv_request(stream) ref = load(stream) ro = DRb.to_obj(ref) msg = load(stream) argc = load(stream) raise ArgumentError, 'too many arguments' if @argc_limit < argc argv = Array.new(argc, nil) argc.times do |n| argv[n] = load(stream) end block = load(stream) return ro, msg, argv, block end def send_reply(stream, succ, result) stream.write(dump(succ) + dump(result)) end def recv_reply(stream) succ = load(stream) result = load(stream) [succ, result] end end module DRbProtocol module_function def add_protocol(prot) @protocol.push(prot) end module_function def open(uri, config, first=true) @protocol.each do |prot| begin return prot.open(uri, config) rescue DRbBadScheme rescue DRbConnError raise($!) rescue raise(DRbConnError, "#{uri} - #{$!.inspect}") end end if first && (config[:auto_load] != false) auto_load(uri, config) return open(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end module_function def open_server(uri, config, first=true) @protocol.each do |prot| begin return prot.open_server(uri, config) rescue DRbBadScheme end end if first && (config[:auto_load] != false) auto_load(uri, config) return open_server(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end module_function def uri_option(uri, config, first=true) @protocol.each do |prot| begin uri, opt = prot.uri_option(uri, config) # opt = nil if opt == '' return uri, opt rescue DRbBadScheme end end if first && (config[:auto_load] != false) auto_load(uri, config) return uri_option(uri, config, false) end raise DRbBadURI, 'can\'t parse uri:' + uri end module_function def auto_load(uri, config) if uri =~ /^drb([a-z0-9]+):/ require("drb/#{$1}") rescue nil end end end class DRbTCPSocket private def self.parse_uri(uri) if uri =~ /^druby:\/\/(.*?):(\d+)(\?(.*))?$/ host = $1 port = $2.to_i option = $4 [host, port, option] else raise(DRbBadScheme, uri) unless uri =~ /^druby:/ raise(DRbBadURI, 'can\'t parse uri:' + uri) end end public def self.open(uri, config) host, port, option = parse_uri(uri) host.untaint port.untaint soc = TCPSocket.open(host, port) self.new(uri, soc, config) end def self.open_server(uri, config) uri = 'druby://:0' unless uri host, port, opt = parse_uri(uri) if host.size == 0 soc = TCPServer.open(port) host = Socket.gethostname else soc = TCPServer.open(host, port) end port = soc.addr[1] if port == 0 uri = "druby://#{host}:#{port}" self.new(uri, soc, config) end def self.uri_option(uri, config) host, port, option = parse_uri(uri) return "druby://#{host}:#{port}", option end def initialize(uri, soc, config={}) @uri = uri @socket = soc @config = config @acl = config[:tcp_acl] @msg = DRbMessage.new(config) set_sockopt(@socket) end attr_reader :uri def peeraddr @socket.peeraddr end def stream; @socket; end def send_request(ref, msg_id, arg, b) @msg.send_request(stream, ref, msg_id, arg, b) end def recv_request @msg.recv_request(stream) end def send_reply(succ, result) @msg.send_reply(stream, succ, result) end def recv_reply @msg.recv_reply(stream) end public def close if @socket @socket.close @socket = nil end end def accept while true s = @socket.accept break if (@acl ? @acl.allow_socket?(s) : true) s.close end self.class.new(nil, s, @config) end def alive? return false unless @socket if IO.select([@socket], nil, nil, 0) close return false end true end def set_sockopt(soc) soc.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_NODELAY, 1) soc.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK) if defined? Fcntl::O_NONBLOCK soc.fcntl(Fcntl::F_SETFL, Fcntl::FD_CLOEXEC) if defined? Fcntl::FD_CLOEXEC end end module DRbProtocol @protocol = [DRbTCPSocket] # default end class DRbURIOption def initialize(option) @option = option.to_s end attr :option def to_s; @option; end def ==(other) return false unless DRbURIOption === other @option == other.option end def hash @option.hash end alias eql? == end class DRbObject def self._load(s) uri, ref = Marshal.load(s) if DRb.here?(uri) return DRb.to_obj(ref) end it = self.new(nil) it.reinit(uri, ref) it end def self.new_with_uri(uri) self.new(nil, uri) end def _dump(lv) Marshal.dump([@uri, @ref]) end def initialize(obj, uri=nil) @uri = nil @ref = nil if obj.nil? return if uri.nil? @uri, option = DRbProtocol.uri_option(uri, DRb.config) @ref = DRbURIOption.new(option) unless option.nil? else @uri = uri ? uri : (DRb.uri rescue nil) @ref = obj ? DRb.to_id(obj) : nil end end def reinit(uri, ref) @uri = uri @ref = ref end def __drburi @uri end def __drbref @ref end undef :to_s undef :to_a undef :respond_to? def method_missing(msg_id, *a, &b) if DRb.here?(@uri) obj = DRb.to_obj(@ref) DRb.current_server.check_insecure_method(obj, msg_id) return obj.__send__(msg_id, *a, &b) end succ, result = DRbConn.open(@uri) do |conn| conn.send_message(self, msg_id, a, b) end return result if succ unless DRbUnknown === result prefix = "(#{@uri}) " bt = [] result.backtrace.each do |x| break if /`__send__'$/ =~ x if /^\(druby:\/\// =~ x bt.push(x) else bt.push(prefix + x) end end raise result, result.message, bt + caller else raise result end end end class DRbConn POOL_SIZE = 16 @mutex = Mutex.new @pool = [] def self.open(remote_uri) begin conn = nil @mutex.synchronize do #FIXME new_pool = [] @pool.each do |c| if conn.nil? and c.uri == remote_uri conn = c if c.alive? else new_pool.push c end end @pool = new_pool end conn = self.new(remote_uri) unless conn succ, result = yield(conn) return succ, result ensure @mutex.synchronize do if @pool.size > POOL_SIZE or ! succ conn.close if conn else @pool.unshift(conn) end end end end def initialize(remote_uri) @uri = remote_uri @protocol = DRbProtocol.open(remote_uri, DRb.config) end attr_reader :uri def send_message(ref, msg_id, arg, block) @protocol.send_request(ref, msg_id, arg, block) @protocol.recv_reply end def close @protocol.close @protocol = nil end def alive? @protocol.alive? end end class DRbServer @@acl = nil @@idconv = DRbIdConv.new @@secondary_server = nil @@argc_limit = 256 @@load_limit = 256 * 102400 @@verbose = false def self.default_argc_limit(argc) @@argc_limit = argc end def self.default_load_limit(sz) @@load_limit = sz end def self.default_acl(acl) @@acl = acl end def self.default_id_conv(idconv) @@idconv = idconv end def self.verbose=(on) @@verbose = on end def self.verbose @@verbose end def self.make_config(hash={}) default_config = { :idconv => @@idconv, :verbose => @@verbose, :tcp_acl => @@acl, :load_limit => @@load_limit, :argc_limit => @@argc_limit } default_config.update(hash) end def initialize(uri=nil, front=nil, config_or_acl=nil) if Hash === config_or_acl config = config_or_acl.dup else acl = config_or_acl || @@acl config = { :tcp_acl => acl } end @config = self.class.make_config(config) @protocol = DRbProtocol.open_server(uri, @config) @uri = @protocol.uri @front = front @idconv = @config[:idconv] @grp = ThreadGroup.new @thread = run Thread.exclusive do DRb.primary_server = self unless DRb.primary_server end end attr_reader :uri, :thread, :front attr_reader :config def verbose=(v); @config[:verbose]=v; end def verbose; @config[:verbose]; end def alive? @thread.alive? end def stop_service @thread.kill end def to_obj(ref) return front if ref.nil? return front[ref.to_s] if DRbURIOption === ref @idconv.to_obj(ref) end def to_id(obj) return nil if obj.__id__ == front.__id__ @idconv.to_id(obj) end private def kill_sub_thread Thread.new do grp = ThreadGroup.new grp.add(Thread.current) list = @grp.list while list.size > 0 list.each do |th| th.kill if th.alive? end list = @grp.list end end end def run Thread.start do begin while true main_loop end ensure @protocol.close if @protocol kill_sub_thread end end end INSECURE_METHOD = [ :__send__ ] def insecure_method?(msg_id) INSECURE_METHOD.include?(msg_id) end def any_to_s(obj) obj.to_s rescue sprintf("#<%s:0x%lx>", obj.class, obj.__id__) end def check_insecure_method(obj, msg_id) return true if Proc === obj && msg_id == :__drb_yield raise(ArgumentError, "#{any_to_s(msg_id)} is not a symbol") unless Symbol == msg_id.class raise(SecurityError, "insecure method `#{msg_id}'") if insecure_method?(msg_id) unless obj.respond_to?(msg_id) desc = any_to_s(obj) if desc.nil? || desc[0] == '#' desc << ":#{obj.class}" end if obj.private_methods.include?(msg_id.to_s) raise NameError, "private method `#{msg_id}' called for #{desc}" else raise NameError, "undefined method `#{msg_id}' called for #{desc}" end end true end public :check_insecure_method class InvokeMethod def initialize(drb_server, client) @drb_server = drb_server @client = client end def perform @result = nil @succ = false setup_message if @block @result = perform_with_block else @result = perform_without_block end @succ = true return @succ, @result rescue StandardError, ScriptError, Interrupt @result = $! return @succ, @result end private def init_with_client obj, msg, argv, block = @client.recv_request @obj = obj @msg_id = msg.intern @argv = argv @block = block end def check_insecure_method @drb_server.check_insecure_method(@obj, @msg_id) end def setup_message init_with_client check_insecure_method end def perform_without_block if Proc === @obj && @msg_id == :__drb_yield if @argv.size == 1 ary = @argv else ary = [@argv] end ary.collect(&@obj)[0] else @obj.__send__(@msg_id, *@argv) end end def rescue_local_jump(err) case err.message when /^retry/ # retry from proc-closure return :retry when /^break/ # break from proc-closure return rescue_break(err) else return :unknown end end end if RUBY_VERSION >= '1.8' require 'drb/invokemethod' class InvokeMethod include InvokeMethod18Mixin end else module InvokeMethod16Mixin def block_yield(x) if x.class == Array block_value = @block.__drb_yield(*x) else block_value = @block.__drb_yield(x) end end def rescue_break(err) return :break end def perform_with_block @obj.__send__(@msg_id, *@argv) do |x| jump_error = nil begin block_value = block_yield(x) rescue LocalJumpError jump_error = $! end if jump_error reason ,= rescue_local_jump(jump_error) case reason when :retry retry when :break break else raise jump_error end end block_value end end end class InvokeMethod include InvokeMethod16Mixin end end def main_loop Thread.start(@protocol.accept) do |client| @grp.add Thread.current Thread.current['DRb'] = { 'client' => client , 'server' => self } loop do begin succ = false invoke_method = InvokeMethod.new(self, client) succ, result = invoke_method.perform if !succ && verbose p result result.backtrace.each do |x| puts x end end client.send_reply(succ, result) rescue nil ensure unless succ client.close return end end end end end end @primary_server = nil def start_service(uri=nil, front=nil, config=nil) @primary_server = DRbServer.new(uri, front, config) end module_function :start_service attr_accessor :primary_server module_function :primary_server=, :primary_server def current_server drb = Thread.current['DRb'] server = (drb && drb['server']) ? drb['server'] : @primary_server raise DRbServerNotFound unless server return server end module_function :current_server def stop_service @primary_server.stop_service if @primary_server @primary_server = nil end module_function :stop_service def uri current_server.uri end module_function :uri def here?(uri) (current_server.uri rescue nil) == uri end module_function :here? def config current_server.config rescue DRbServer.make_config end module_function :config def front current_server.front end module_function :front def to_obj(ref) current_server.to_obj(ref) end def to_id(obj) current_server.to_id(obj) end module_function :to_id module_function :to_obj def thread @primary_server ? @primary_server.thread : nil end module_function :thread def install_id_conv(idconv) DRbServer.default_id_conv(idconv) end module_function :install_id_conv def install_acl(acl) DRbServer.default_acl(acl) end module_function :install_acl end DRbObject = DRb::DRbObject DRbUndumped = DRb::DRbUndumped DRbIdConv = DRb::DRbIdConv