1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

code shuffling and cleanup

This commit is contained in:
Mike Perham 2015-10-09 15:33:42 -07:00
parent 1e9e209bfd
commit 8bc677f655
4 changed files with 61 additions and 45 deletions

View file

@ -2,8 +2,26 @@ require 'sidekiq'
module Sidekiq
class BasicFetch
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down.
TIMEOUT = 2
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# nothing to do
end
def queue_name
queue.gsub(/.*queue:/, ''.freeze)
end
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
end
end
end
def initialize(options)
@strictly_ordered_queues = !!options[:strict]
@queues = options[:queues].map { |q| "queue:#{q}" }
@ -18,6 +36,22 @@ module Sidekiq
UnitOfWork.new(*work) if work
end
# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
# an instance method will make it async to the Fetcher actor
def self.bulk_requeue(inprogress, options)
@ -42,35 +76,5 @@ module Sidekiq
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# nothing to do
end
def queue_name
queue.gsub(/.*queue:/, ''.freeze)
end
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
end
end
end
# Creating the Redis#brpop command takes into account any
# configured queue weights. By default Redis#brpop returns
# data from the first queue that has pending elements. We
# recreate the queue command each time we invoke Redis#brpop
# to honor weights and avoid queue starvation.
def queues_cmd
if @strictly_ordered_queues
@queues
else
queues = @queues.shuffle.uniq
queues << TIMEOUT
queues
end
end
end
end

View file

@ -12,19 +12,17 @@ module Sidekiq
#
# Tasks:
#
# 1. start: Spin up Processors. Issue fetch requests for each.
# 2. processor_done: Handle job success, issue fetch request.
# 3. processor_died: Handle job failure, throw away Processor, issue fetch request.
# 4. quiet: shutdown idle Processors, ignore further fetch requests.
# 1. start: Spin up Processors.
# 3. processor_died: Handle job failure, throw away Processor, create new one.
# 4. quiet: shutdown idle Processors.
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires a Thread since it has to monitor
# Note that only the last task requires its own Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
#
class Manager
include Util
#attr_writer :fetcher
attr_reader :workers
attr_reader :options

View file

@ -6,9 +6,21 @@ require 'concurrent/atomic/atomic_fixnum'
module Sidekiq
##
# The Processor receives a message from the Manager and actually
# processes it. It instantiates the worker, runs the middleware
# chain and then calls Sidekiq::Worker#perform.
# The Processor is a standalone thread which:
#
# 1. fetches a job from Redis
# 2. executes the job
# a. instantiate the Worker
# b. run the middleware chain
# c. call #perform
#
# A Processor can exit due to shutdown (processor_stopped)
# or due to an error during job execution (processor_died)
#
# If an error occurs in the job execution, the
# Processor calls the Manager to create a new one
# to replace itself and exits.
#
class Processor
include Util
@ -21,6 +33,7 @@ module Sidekiq
@down = false
@done = false
@job = nil
@thread = nil
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
end
@ -54,6 +67,8 @@ module Sidekiq
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end

View file

@ -43,18 +43,17 @@ module Sidekiq
def initialize
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
@sleeper = ConnectionPool::TimedStack.new
@done = false
@queue = ConnectionPool::TimedStack.new
end
# Shut down this Fetcher instance, will pause until
# the thread is dead.
# Shut down this instance, will pause until the thread is dead.
def terminate
@done = true
if @thread
t = @thread
@thread = nil
@queue << 0
@sleeper << 0
t.value
end
end
@ -85,7 +84,7 @@ module Sidekiq
private
def wait
@queue.pop(random_poll_interval)
@sleeper.pop(random_poll_interval)
rescue Timeout::Error
end
@ -128,7 +127,7 @@ module Sidekiq
total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average]
total += (5 * rand)
@queue.pop(total)
@sleeper.pop(total)
rescue Timeout::Error
end