require 'sidekiq' require 'sidekiq/util' require 'sidekiq/actor' module Sidekiq module Scheduled INITIAL_WAIT = 15 ## # The Poller checks Redis every N seconds for messages in the retry or scheduled # set have passed their timestamp and should be enqueued. If so, it # just pops the message back onto its original queue so the # workers can pick it up like any other message. class Poller include Util include Actor SETS = %w(retry schedule) def poll(first_time=false) watchdog('scheduling poller thread died!') do initial_wait if first_time begin # A message's "score" in Redis is the time at which it should be processed. # Just check Redis for the set of messages with a timestamp before now. now = Time.now.to_f.to_s Sidekiq.redis do |conn| SETS.each do |sorted_set| # Get the next item in the queue if it's score (time to execute) is <= now. # We need to go through the list one at a time to reduce the risk of something # going wrong between the time jobs are popped from the scheduled queue and when # they are pushed onto a work queue and losing the jobs. while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do # Pop item off the queue and add it to the work queue. If the job can't be popped from # the queue, it's because another process already popped it so we can move on to the # next one. if conn.zrem(sorted_set, message) Sidekiq::Client.push(Sidekiq.load_json(message)) logger.debug { "enqueued #{sorted_set}: #{message}" } end end end end rescue => ex # Most likely a problem with redis networking. # Punt and try again at the next interval logger.error ex.message logger.error ex.backtrace.first end after(poll_interval * rand) { poll } end end private # We do our best to tune poll_interval to the size of the active Sidekiq # cluster. If you have 30 processes and poll every 15 seconds, that means one # Sidekiq is checking Redis every 0.5 seconds - way too often for most people # and really bad if the retry or scheduled sets are large. # # Instead try to avoid polling more than once every 15 seconds. If you have # 30 Sidekiq processes, we'll set poll_interval to 30 * 15 * 2 or 900 seconds. # To keep things statistically random, we'll sleep a random amount between # 0 and 900 seconds for each poll or 450 seconds on average. Otherwise restarting # all your Sidekiq processes at the same time will lead to them all polling at # the same time: the thundering herd problem. # # We only do this if poll_interval is unset (the default). def poll_interval Sidekiq.options[:poll_interval] ||= begin pcount = Sidekiq.redis {|c| c.scard('processes') } || 1 pcount * 15 * 2 end end def initial_wait begin sleep(INITIAL_WAIT) rescue Celluloid::Task::TerminatedError # Hit Ctrl-C when Sidekiq is finished booting and we have a chance # to get here. end end end end end