1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/scheduled.rb

175 lines
6.5 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
require 'sidekiq'
require 'sidekiq/util'
2014-10-06 11:53:06 -04:00
require 'sidekiq/api'
module Sidekiq
module Scheduled
2015-01-27 18:07:56 -05:00
SETS = %w(retry schedule)
2015-01-27 18:07:56 -05:00
class Enq
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
2015-01-27 18:07:56 -05:00
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
2015-01-27 18:07:56 -05:00
end
end
end
end
end
end
##
2014-10-06 11:53:25 -04:00
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
# set have passed their timestamp and should be enqueued. If so, it
2014-10-06 11:53:25 -04:00
# just pops the job back onto its original queue so the
# workers can pick it up like any other job.
class Poller
include Util
2015-01-27 18:07:56 -05:00
INITIAL_WAIT = 10
def initialize
2015-01-29 14:39:41 -05:00
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
2015-10-09 18:33:42 -04:00
@sleeper = ConnectionPool::TimedStack.new
@done = false
@thread = nil
2015-01-27 18:07:56 -05:00
end
2015-10-09 18:33:42 -04:00
# Shut down this instance, will pause until the thread is dead.
def terminate
@done = true
if @thread
t = @thread
@thread = nil
2015-10-09 18:33:42 -04:00
@sleeper << 0
t.value
end
end
def start
@thread ||= safe_thread("scheduler") do
2015-10-06 17:45:10 -04:00
initial_wait
while !@done
enqueue
2015-10-06 17:45:10 -04:00
wait
end
2015-10-06 17:45:10 -04:00
Sidekiq.logger.info("Scheduler exiting...")
end
end
def enqueue
begin
@enq.enqueue_jobs
rescue => ex
# Most likely a problem with redis networking.
# Punt and try again at the next interval
logger.error ex.message
2017-11-30 13:12:39 -05:00
handle_exception(ex)
end
end
private
2015-10-06 17:45:10 -04:00
def wait
2015-10-09 18:33:42 -04:00
@sleeper.pop(random_poll_interval)
2015-10-06 17:45:10 -04:00
rescue Timeout::Error
# expected
rescue => ex
# if poll_interval_average hasn't been calculated yet, we can
# raise an error trying to reach Redis.
logger.error ex.message
2017-11-30 13:12:39 -05:00
handle_exception(ex)
sleep 5
2015-10-06 17:45:10 -04:00
end
def random_poll_interval
# We want one Sidekiq process to schedule jobs every N seconds. We have M processes
# and **don't** want to coordinate.
#
# So in N*M second timespan, we want each process to schedule once. The basic loop is:
#
2018-07-20 13:46:29 -04:00
# * sleep a random amount within that N*M timespan
# * wake up and schedule
#
2018-07-20 13:46:29 -04:00
# We want to avoid one edge case: imagine a set of 2 processes, scheduling every 5 seconds,
# so N*M = 10. Each process decides to randomly sleep 8 seconds, now we've failed to meet
# that 5 second average. Thankfully each schedule cycle will sleep randomly so the next
# iteration could see each process sleep for 1 second, undercutting our average.
#
# So below 10 processes, we special case and ensure the processes sleep closer to the average.
2018-07-20 13:46:29 -04:00
# In the example above, each process should schedule every 10 seconds on average. We special
# case smaller clusters to add 50% so they would sleep somewhere between 5 and 15 seconds.
# As we run more processes, the scheduling interval average will approach an even spread
# between 0 and poll interval so we don't need this artifical boost.
#
if process_count < 10
2018-07-20 13:46:29 -04:00
# For small clusters, calculate a random interval that is ±50% the desired average.
poll_interval_average * rand + poll_interval_average.to_f / 2
else
# With 10+ processes, we should have enough randomness to get decent polling
# across the entire timespan
2018-07-20 13:46:29 -04:00
poll_interval_average * rand
end
end
2015-05-04 13:52:40 -04:00
# We do our best to tune the poll interval to the size of the active Sidekiq
# cluster. If you have 30 processes and poll every 15 seconds, that means one
# Sidekiq is checking Redis every 0.5 seconds - way too often for most people
# and really bad if the retry or scheduled sets are large.
#
# Instead try to avoid polling more than once every 15 seconds. If you have
2015-05-04 13:52:40 -04:00
# 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
# To keep things statistically random, we'll sleep a random amount between
# 225 and 675 seconds for each poll or 450 seconds on average. Otherwise restarting
# all your Sidekiq processes at the same time will lead to them all polling at
# the same time: the thundering herd problem.
#
2015-10-14 19:26:04 -04:00
# We only do this if poll_interval_average is unset (the default).
def poll_interval_average
Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
end
# Calculates an average poll interval based on the number of known Sidekiq processes.
# This minimizes a single point of failure by dispersing check-ins but without taxing
# Redis if you run many Sidekiq processes.
def scaled_poll_interval
process_count * Sidekiq.options[:average_scheduled_poll_interval]
end
def process_count
pcount = Sidekiq::ProcessSet.new.size
pcount = 1 if pcount == 0
pcount
2012-06-19 16:39:20 -04:00
end
def initial_wait
# Have all processes sleep between 5-15 seconds. 10 seconds
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
total = 0
total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average]
total += (5 * rand)
2015-10-06 17:45:10 -04:00
2015-10-09 18:33:42 -04:00
@sleeper.pop(total)
2015-10-06 17:45:10 -04:00
rescue Timeout::Error
end
end
end
end