require 'monitor'
require 'thread'
require 'drb/drb'
require 'rinda/rinda'

module Rinda
  class TupleEntry
    include DRbUndumped

    def initialize(ary, sec=nil)
      @cancel = false
      @ary = make_tuple(ary)
      @renewer = nil
      renew(sec)
    end
    attr_accessor :expires

    def cancel
      @cancel = true
    end

    def alive?
      !canceled? && !expired?
    end

    def value; @ary.value; end
    def canceled?; @cancel; end
    def expired?
      return true unless @expires
      return false if @expires > Time.now
      return true if @renewer.nil?
      renew(@renewer)
      return true unless @expires
      return @expires < Time.now
    end

    def renew(sec_or_renewer)
      sec, @renewer = get_renewer(sec_or_renewer)
      @expires = make_expires(sec)
    end
    
    def make_expires(sec=nil)
      case sec
      when Numeric
	Time.now + sec
      when true
	Time.at(1)
      when nil
	Time.at(2**31-1)
      end
    end

    def [](key)
      @ary[key]
    end

    def size
      @ary.size
    end
    
    def make_tuple(ary)
      Rinda::Tuple.new(ary)
    end
    
    private
    def get_renewer(it)
      case it
      when Numeric, true, nil
	return it, nil
      else
	begin
	  return it.renew, it
	rescue Exception
	  return it, nil
	end
      end
    end
  end

  class TemplateEntry < TupleEntry
    def initialize(ary, expires=nil)
      super(ary, expires)
      @template = Rinda::Template.new(ary)
    end

    def match(tuple)
      @template.match(tuple)
    end

    def ===(tuple)
      match(tuple)
    end

    def make_tuple(ary)
      Rinda::Template.new(ary)
    end
  end

  class WaitTemplateEntry < TemplateEntry
    def initialize(place, ary, expires=nil)
      super(ary, expires)
      @place = place
      @cond = place.new_cond
      @found = nil
    end
    attr_reader :found

    def cancel
      super
      signal
    end

    def wait
      @cond.wait
    end
    
    def read(tuple)
      @found = tuple
      signal
    end
    
    def signal
      @place.synchronize do
	@cond.signal
      end
    end
  end

  class NotifyTemplateEntry < TemplateEntry
    def initialize(place, event, tuple, expires=nil)
      ary = [event, Rinda::Template.new(tuple)]
      super(ary, expires)
      @queue = Queue.new
      @done = false
    end
    
    def notify(ev)
      @queue.push(ev)
    end

    def pop
      raise RequestExpiredError if @done
      it = @queue.pop
      @done = true if it[0] == 'close'
      return it
    end
 
    def each
      while !@done
        it = pop
        yield(it)
      end
    rescue 
    ensure
      cancel
    end
  end

  class TupleBag
    def initialize
      @hash = {}
    end
    
    def push(ary)
      size = ary.size
      @hash[size] ||= []
      @hash[size].push(ary)
    end
    
    def delete(ary)
      size = ary.size
      @hash.fetch(size, []).delete(ary)
    end

    def find_all(template)
      @hash.fetch(template.size, []).find_all do |tuple|
	tuple.alive? && template.match(tuple)
      end
    end

    def find(template)
      @hash.fetch(template.size, []).find do |tuple|
	tuple.alive? && template.match(tuple)
      end
    end

    def find_all_template(tuple)
      @hash.fetch(tuple.size, []).find_all do |template|
	template.alive? && template.match(tuple)
      end
    end

    def delete_unless_alive
      deleted = []
      @hash.keys.each do |size|
	ary = []
	@hash[size].each do |tuple|
	  if tuple.alive?
	    ary.push(tuple)
	  else
	    deleted.push(tuple)
	  end
	end
	@hash[size] = ary
      end
      deleted
    end
  end

  class TupleSpace
    include DRbUndumped
    include MonitorMixin
    def initialize(timeout=60)
      super()
      @bag = TupleBag.new
      @read_waiter = TupleBag.new
      @take_waiter = TupleBag.new
      @notify_waiter = TupleBag.new
      @timeout = timeout
      @period = timeout * 2
      @keeper = keeper
    end

    def write(tuple, sec=nil)
      entry = TupleEntry.new(tuple, sec)
      synchronize do
	if entry.expired?
	  @read_waiter.find_all_template(entry).each do |template|
	    template.read(tuple)
	  end
	  notify_event('write', entry.value)
	  notify_event('delete', entry.value)
	else
	  @bag.push(entry)
	  @read_waiter.find_all_template(entry).each do |template|
	    template.read(tuple)
	  end
	  @take_waiter.find_all_template(entry).each do |template|
	    template.signal
	  end
	  notify_event('write', entry.value)
	end
      end
      entry
    end

    def take(tuple, sec=nil, &block)
      move(nil, tuple, sec, &block)
    end

    def move(port, tuple, sec=nil)
      template = WaitTemplateEntry.new(self, tuple, sec)
      yield(template) if block_given?
      synchronize do
	entry = @bag.find(template)
	if entry
	  port.push(entry.value) if port
	  @bag.delete(entry)
	  notify_event('take', entry.value)
	  return entry.value
	end
	return nil if template.expired?

	begin
	  @take_waiter.push(template)
	  while true
	    raise RequestCanceledError if template.canceled?
	    raise RequestExpiredError if template.expired?
	    entry = @bag.find(template)
	    if entry
	      port.push(entry.value) if port
	      @bag.delete(entry)
	      notify_event('take', entry.value)
	      return entry.value
	    end
	    template.wait
	  end
	ensure
	  @take_waiter.delete(template)
	end
      end
    end

    def read(tuple, sec=nil)
      template = WaitTemplateEntry.new(self, tuple, sec)
      yield(template) if block_given?
      synchronize do
	entry = @bag.find(template)
	return entry.value if entry
	return nil if template.expired?

	begin
	  @read_waiter.push(template)
	  template.wait
	  raise RequestCanceledError if template.canceled?
	  raise RequestExpiredError if template.expired?
	  return template.found
	ensure
	  @read_waiter.delete(template)
	end
      end
    end

    def read_all(tuple)
      template = WaitTemplateEntry.new(self, tuple, nil)
      synchronize do
	entry = @bag.find_all(template)
	entry.collect do |e|
	  e.value
	end
      end
    end

    def notify(event, tuple, sec=nil)
      template = NotifyTemplateEntry.new(self, event, tuple, sec)
      synchronize do
	@notify_waiter.push(template)
      end
      template
    end
    
    private
    def keep_clean
      synchronize do
	@read_waiter.delete_unless_alive.each do |e|
	  e.signal
	end
	@take_waiter.delete_unless_alive.each do |e|
	  e.signal
	end
	@notify_waiter.delete_unless_alive.each do |e|
	  e.notify(['close'])
	end
	@bag.delete_unless_alive.each do |e|
	  notify_event('delete', e.value)
	end
      end
    end
    
    def notify_event(event, tuple)
      ev = [event, tuple]
      @notify_waiter.find_all_template(ev).each do |template|
	template.notify(ev)
      end
    end

    def keeper
      Thread.new do
	loop do
	  sleep(@period)
	  keep_clean
	end
      end
    end
  end
end