2012-05-25 23:21:42 -04:00
|
|
|
require 'sidekiq'
|
|
|
|
require 'sidekiq/util'
|
2013-05-10 23:43:53 -04:00
|
|
|
require 'sidekiq/actor'
|
2014-10-06 11:53:06 -04:00
|
|
|
require 'sidekiq/api'
|
2012-05-25 23:21:42 -04:00
|
|
|
|
|
|
|
module Sidekiq
|
|
|
|
module Scheduled
|
2015-01-27 18:07:56 -05:00
|
|
|
SETS = %w(retry schedule)
|
2012-05-25 23:21:42 -04:00
|
|
|
|
2015-01-27 18:07:56 -05:00
|
|
|
class Enq
|
|
|
|
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
|
|
|
|
# A job's "score" in Redis is the time at which it should be processed.
|
|
|
|
# Just check Redis for the set of jobs with a timestamp before now.
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
sorted_sets.each do |sorted_set|
|
|
|
|
# Get the next item in the queue if it's score (time to execute) is <= now.
|
|
|
|
# We need to go through the list one at a time to reduce the risk of something
|
|
|
|
# going wrong between the time jobs are popped from the scheduled queue and when
|
|
|
|
# they are pushed onto a work queue and losing the jobs.
|
|
|
|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
|
|
|
|
|
|
|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
|
|
|
# the queue, it's because another process already popped it so we can move on to the
|
|
|
|
# next one.
|
|
|
|
if conn.zrem(sorted_set, job)
|
|
|
|
Sidekiq::Client.push(Sidekiq.load_json(job))
|
|
|
|
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
2012-05-25 23:21:42 -04:00
|
|
|
|
|
|
|
##
|
2014-10-06 11:53:25 -04:00
|
|
|
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
|
2012-05-25 23:21:42 -04:00
|
|
|
# set have passed their timestamp and should be enqueued. If so, it
|
2014-10-06 11:53:25 -04:00
|
|
|
# just pops the job back onto its original queue so the
|
|
|
|
# workers can pick it up like any other job.
|
2012-05-25 23:21:42 -04:00
|
|
|
class Poller
|
2013-05-10 23:43:53 -04:00
|
|
|
include Util
|
|
|
|
include Actor
|
2012-05-25 23:21:42 -04:00
|
|
|
|
2015-01-27 18:07:56 -05:00
|
|
|
INITIAL_WAIT = 10
|
|
|
|
|
|
|
|
def initialize
|
2015-01-29 14:39:41 -05:00
|
|
|
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
|
2015-01-27 18:07:56 -05:00
|
|
|
end
|
2012-05-25 23:21:42 -04:00
|
|
|
|
2012-06-14 11:36:00 -04:00
|
|
|
def poll(first_time=false)
|
2012-05-25 23:21:42 -04:00
|
|
|
watchdog('scheduling poller thread died!') do
|
2014-06-07 18:12:31 -04:00
|
|
|
initial_wait if first_time
|
2012-06-14 11:36:00 -04:00
|
|
|
|
2012-07-30 13:23:34 -04:00
|
|
|
begin
|
2015-01-27 18:07:56 -05:00
|
|
|
@enq.enqueue_jobs
|
2013-04-04 14:52:32 -04:00
|
|
|
rescue => ex
|
|
|
|
# Most likely a problem with redis networking.
|
|
|
|
# Punt and try again at the next interval
|
|
|
|
logger.error ex.message
|
2013-05-10 23:43:53 -04:00
|
|
|
logger.error ex.backtrace.first
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
|
2015-04-24 12:00:58 -04:00
|
|
|
after(random_poll_interval) { poll }
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-06-14 11:36:00 -04:00
|
|
|
private
|
|
|
|
|
2015-04-24 12:00:58 -04:00
|
|
|
# Calculates a random interval that is ±50% the desired average.
|
|
|
|
def random_poll_interval
|
2015-04-24 11:35:47 -04:00
|
|
|
poll_interval_average * rand + poll_interval_average.to_f / 2
|
2015-04-24 12:00:58 -04:00
|
|
|
end
|
|
|
|
|
2014-06-07 18:12:31 -04:00
|
|
|
# We do our best to tune poll_interval to the size of the active Sidekiq
|
|
|
|
# cluster. If you have 30 processes and poll every 15 seconds, that means one
|
|
|
|
# Sidekiq is checking Redis every 0.5 seconds - way too often for most people
|
|
|
|
# and really bad if the retry or scheduled sets are large.
|
|
|
|
#
|
|
|
|
# Instead try to avoid polling more than once every 15 seconds. If you have
|
2015-04-24 12:00:58 -04:00
|
|
|
# 30 Sidekiq processes, we'll set poll_interval to 30 * 15 or 450 seconds.
|
2014-06-07 18:12:31 -04:00
|
|
|
# To keep things statistically random, we'll sleep a random amount between
|
2015-04-24 12:00:58 -04:00
|
|
|
# 225 and 675 seconds for each poll or 450 seconds on average. Otherwise restarting
|
2014-06-07 18:12:31 -04:00
|
|
|
# all your Sidekiq processes at the same time will lead to them all polling at
|
|
|
|
# the same time: the thundering herd problem.
|
|
|
|
#
|
|
|
|
# We only do this if poll_interval is unset (the default).
|
2015-04-24 11:35:47 -04:00
|
|
|
def poll_interval_average
|
|
|
|
Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
|
|
|
|
end
|
|
|
|
|
|
|
|
# Calculates an average poll interval based on the number of known Sidekiq processes.
|
|
|
|
# This minimizes a single point of failure by dispersing check-ins but without taxing
|
|
|
|
# Redis if you run many Sidekiq processes.
|
|
|
|
def scaled_poll_interval
|
|
|
|
pcount = Sidekiq::ProcessSet.new.size
|
|
|
|
pcount = 1 if pcount == 0
|
2015-05-04 13:38:10 -04:00
|
|
|
pcount * Sidekiq.options[:average_scheduled_poll_interval]
|
2012-06-19 16:39:20 -04:00
|
|
|
end
|
|
|
|
|
2014-06-07 18:12:31 -04:00
|
|
|
def initial_wait
|
2012-06-14 11:36:00 -04:00
|
|
|
begin
|
2015-01-27 17:56:01 -05:00
|
|
|
# Have all processes sleep between 5-15 seconds. 10 seconds
|
|
|
|
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
|
|
|
|
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
|
|
|
|
sleep(INITIAL_WAIT) unless Sidekiq.options[:poll_interval]
|
2014-06-07 23:57:33 -04:00
|
|
|
sleep(5 * rand)
|
2012-06-14 11:36:00 -04:00
|
|
|
rescue Celluloid::Task::TerminatedError
|
|
|
|
# Hit Ctrl-C when Sidekiq is finished booting and we have a chance
|
|
|
|
# to get here.
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|