1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Extract enqueue logic,

This commit is contained in:
Mike Perham 2015-01-27 15:07:56 -08:00
parent 24b35ec580
commit 339571a4e0

View file

@ -5,8 +5,32 @@ require 'sidekiq/api'
module Sidekiq
module Scheduled
SETS = %w(retry schedule)
INITIAL_WAIT = 10
class Enq
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
end
##
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
@ -17,34 +41,18 @@ module Sidekiq
include Util
include Actor
SETS = %w(retry schedule)
INITIAL_WAIT = 10
def initialize
@enq = (Sidekiq.options[:poll_enq] || Sidekiq::Scheduled::Enq).new
end
def poll(first_time=false)
watchdog('scheduling poller thread died!') do
initial_wait if first_time
begin
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
@enq.enqueue_jobs
rescue => ex
# Most likely a problem with redis networking.
# Punt and try again at the next interval