From 8bc677f65580ae08bdeccd35719ed83d00de2ec6 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Fri, 9 Oct 2015 15:33:42 -0700 Subject: [PATCH] code shuffling and cleanup --- lib/sidekiq/fetch.rb | 64 +++++++++++++++++++++------------------- lib/sidekiq/manager.rb | 10 +++---- lib/sidekiq/processor.rb | 21 +++++++++++-- lib/sidekiq/scheduled.rb | 11 ++++--- 4 files changed, 61 insertions(+), 45 deletions(-) diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 7c463f01..6071b64f 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -2,8 +2,26 @@ require 'sidekiq' module Sidekiq class BasicFetch + # We want the fetch operation to timeout every few seconds so the thread + # can check if the process is shutting down. TIMEOUT = 2 + UnitOfWork = Struct.new(:queue, :message) do + def acknowledge + # nothing to do + end + + def queue_name + queue.gsub(/.*queue:/, ''.freeze) + end + + def requeue + Sidekiq.redis do |conn| + conn.rpush("queue:#{queue_name}", message) + end + end + end + def initialize(options) @strictly_ordered_queues = !!options[:strict] @queues = options[:queues].map { |q| "queue:#{q}" } @@ -18,6 +36,22 @@ module Sidekiq UnitOfWork.new(*work) if work end + # Creating the Redis#brpop command takes into account any + # configured queue weights. By default Redis#brpop returns + # data from the first queue that has pending elements. We + # recreate the queue command each time we invoke Redis#brpop + # to honor weights and avoid queue starvation. + def queues_cmd + if @strictly_ordered_queues + @queues + else + queues = @queues.shuffle.uniq + queues << TIMEOUT + queues + end + end + + # By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it # an instance method will make it async to the Fetcher actor def self.bulk_requeue(inprogress, options) @@ -42,35 +76,5 @@ module Sidekiq Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end - UnitOfWork = Struct.new(:queue, :message) do - def acknowledge - # nothing to do - end - - def queue_name - queue.gsub(/.*queue:/, ''.freeze) - end - - def requeue - Sidekiq.redis do |conn| - conn.rpush("queue:#{queue_name}", message) - end - end - end - - # Creating the Redis#brpop command takes into account any - # configured queue weights. By default Redis#brpop returns - # data from the first queue that has pending elements. We - # recreate the queue command each time we invoke Redis#brpop - # to honor weights and avoid queue starvation. - def queues_cmd - if @strictly_ordered_queues - @queues - else - queues = @queues.shuffle.uniq - queues << TIMEOUT - queues - end - end end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index fec1672c..b0658edc 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -12,19 +12,17 @@ module Sidekiq # # Tasks: # - # 1. start: Spin up Processors. Issue fetch requests for each. - # 2. processor_done: Handle job success, issue fetch request. - # 3. processor_died: Handle job failure, throw away Processor, issue fetch request. - # 4. quiet: shutdown idle Processors, ignore further fetch requests. + # 1. start: Spin up Processors. + # 3. processor_died: Handle job failure, throw away Processor, create new one. + # 4. quiet: shutdown idle Processors. # 5. stop: hard stop the Processors by deadline. # - # Note that only the last task requires a Thread since it has to monitor + # Note that only the last task requires its own Thread since it has to monitor # the shutdown process. The other tasks are performed by other threads. # class Manager include Util - #attr_writer :fetcher attr_reader :workers attr_reader :options diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 1ad28196..cc728472 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -6,9 +6,21 @@ require 'concurrent/atomic/atomic_fixnum' module Sidekiq ## - # The Processor receives a message from the Manager and actually - # processes it. It instantiates the worker, runs the middleware - # chain and then calls Sidekiq::Worker#perform. + # The Processor is a standalone thread which: + # + # 1. fetches a job from Redis + # 2. executes the job + # a. instantiate the Worker + # b. run the middleware chain + # c. call #perform + # + # A Processor can exit due to shutdown (processor_stopped) + # or due to an error during job execution (processor_died) + # + # If an error occurs in the job execution, the + # Processor calls the Manager to create a new one + # to replace itself and exits. + # class Processor include Util @@ -21,6 +33,7 @@ module Sidekiq @down = false @done = false @job = nil + @thread = nil @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options) end @@ -54,6 +67,8 @@ module Sidekiq process_one end @mgr.processor_stopped(self) + rescue Sidekiq::Shutdown + @mgr.processor_stopped(self) rescue Exception => ex @mgr.processor_died(self, ex) end diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index fae3b095..3d5b989a 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -43,18 +43,17 @@ module Sidekiq def initialize @enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new + @sleeper = ConnectionPool::TimedStack.new @done = false - @queue = ConnectionPool::TimedStack.new end - # Shut down this Fetcher instance, will pause until - # the thread is dead. + # Shut down this instance, will pause until the thread is dead. def terminate @done = true if @thread t = @thread @thread = nil - @queue << 0 + @sleeper << 0 t.value end end @@ -85,7 +84,7 @@ module Sidekiq private def wait - @queue.pop(random_poll_interval) + @sleeper.pop(random_poll_interval) rescue Timeout::Error end @@ -128,7 +127,7 @@ module Sidekiq total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average] total += (5 * rand) - @queue.pop(total) + @sleeper.pop(total) rescue Timeout::Error end