# encoding: utf-8
require 'sidekiq/util'
require 'sidekiq/processor'
require 'sidekiq/fetch'
require 'thread'

module Sidekiq

  ##
  # The Manager is the central coordination point in Sidekiq, controlling
  # the lifecycle of the Processors and feeding them jobs as necessary.
  #
  # Tasks:
  #
  # 1. start: Spin up Processors.  Issue fetch requests for each.
  # 2. processor_done: Handle job success, issue fetch request.
  # 3. processor_died: Handle job failure, throw away Processor, issue fetch request.
  # 4. quiet: shutdown idle Processors, ignore further fetch requests.
  # 5. stop: hard stop the Processors by deadline.
  #
  # Note that only the last task requires a Thread since it has to monitor
  # the shutdown process.  The other tasks are performed by other threads.
  #
  class Manager
    include Util

    #attr_writer :fetcher
    attr_reader :workers

    SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1

    def initialize(condvar, options={})
      logger.debug { options.inspect }
      @options = options
      @count = options[:concurrency] || 25
      raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1

      @done = false
      @workers = Set.new
      @count.times do
        @workers << Processor.new(self, options)
      end
      @plock = Mutex.new
    end

    def start
      @workers.each do |x|
        x.start
      end
    end

    def quiet
      return if @done
      @done = true

      logger.info { "Terminating quiet workers" }
      @workers.each { |x| x.terminate }
    end

    def stop(deadline)
      quiet
      return if @workers.empty?

      logger.info { "Pausing to allow workers to finish..." }
      remaining = deadline - Time.now
      while remaining > 0.5
        return if @workers.empty?
        sleep 0.5
        remaining = deadline - Time.now
      end
      return if @workers.empty?

      hard_shutdown
    end

    def processor_died(processor, reason)
      @plock.synchronize do
        @workers.delete(processor)
        unless @done
          p = Processor.new(self, @options)
          @workers << p
          p.start
        end
      end
    end

    def stopped?
      @done
    end

    private

    def hard_shutdown
      # We've reached the timeout and we still have busy workers.
      # They must die but their jobs shall live on.
      cleanup = nil
      @plock.synchronize do
        cleanup = @workers.dup
      end

      if cleanup.size > 0
        jobs = cleanup.map {|p| p.job }.compact

        logger.warn { "Terminating #{cleanup.size} busy worker threads" }
        logger.warn { "Work still in progress #{jobs.inspect}" }

        # Re-enqueue unfinished jobs
        # NOTE: You may notice that we may push a job back to redis before
        # the worker thread is terminated. This is ok because Sidekiq's
        # contract says that jobs are run AT LEAST once. Process termination
        # is delayed until we're certain the jobs are back in Redis because
        # it is worse to lose a job than to run it twice.
        strategy = (@options[:fetch] || Sidekiq::BasicFetch)
        strategy.bulk_requeue(jobs, @options)
      end

      cleanup.each do |processor|
        processor.kill
      end
    end

  end
end