2012-05-25 23:21:42 -04:00
|
|
|
require 'sidekiq'
|
|
|
|
require 'sidekiq/util'
|
2013-05-10 23:43:53 -04:00
|
|
|
require 'sidekiq/actor'
|
2012-05-25 23:21:42 -04:00
|
|
|
|
|
|
|
module Sidekiq
|
|
|
|
module Scheduled
|
|
|
|
|
|
|
|
POLL_INTERVAL = 15
|
|
|
|
|
|
|
|
##
|
|
|
|
# The Poller checks Redis every N seconds for messages in the retry or scheduled
|
|
|
|
# set have passed their timestamp and should be enqueued. If so, it
|
|
|
|
# just pops the message back onto its original queue so the
|
|
|
|
# workers can pick it up like any other message.
|
|
|
|
class Poller
|
2013-05-10 23:43:53 -04:00
|
|
|
include Util
|
|
|
|
include Actor
|
2012-05-25 23:21:42 -04:00
|
|
|
|
|
|
|
SETS = %w(retry schedule)
|
|
|
|
|
2012-06-14 11:36:00 -04:00
|
|
|
def poll(first_time=false)
|
2012-05-25 23:21:42 -04:00
|
|
|
watchdog('scheduling poller thread died!') do
|
2012-06-14 11:36:00 -04:00
|
|
|
add_jitter if first_time
|
|
|
|
|
2012-07-30 13:23:34 -04:00
|
|
|
begin
|
|
|
|
# A message's "score" in Redis is the time at which it should be processed.
|
|
|
|
# Just check Redis for the set of messages with a timestamp before now.
|
|
|
|
now = Time.now.to_f.to_s
|
|
|
|
Sidekiq.redis do |conn|
|
|
|
|
SETS.each do |sorted_set|
|
2013-04-11 03:43:34 -04:00
|
|
|
# Get the next item in the queue if it's score (time to execute) is <= now.
|
|
|
|
# We need to go through the list one at a time to reduce the risk of something
|
|
|
|
# going wrong between the time jobs are popped from the scheduled queue and when
|
|
|
|
# they are pushed onto a work queue and losing the jobs.
|
2013-04-10 20:38:36 -04:00
|
|
|
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
2013-05-23 23:32:49 -04:00
|
|
|
|
2013-04-11 03:43:34 -04:00
|
|
|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
|
|
|
# the queue, it's because another process already popped it so we can move on to the
|
|
|
|
# next one.
|
|
|
|
if conn.zrem(sorted_set, message)
|
2013-05-24 23:32:55 -04:00
|
|
|
Sidekiq::Client.push(Sidekiq.load_json(message))
|
2013-05-10 23:43:53 -04:00
|
|
|
logger.debug { "enqueued #{sorted_set}: #{message}" }
|
2012-07-30 13:23:34 -04:00
|
|
|
end
|
2012-07-07 00:59:47 -04:00
|
|
|
end
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
end
|
2013-04-04 14:52:32 -04:00
|
|
|
rescue => ex
|
|
|
|
# Most likely a problem with redis networking.
|
|
|
|
# Punt and try again at the next interval
|
|
|
|
logger.error ex.message
|
2013-05-10 23:43:53 -04:00
|
|
|
logger.error ex.backtrace.first
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
|
2012-06-19 16:39:20 -04:00
|
|
|
after(poll_interval) { poll }
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-06-14 11:36:00 -04:00
|
|
|
private
|
|
|
|
|
2012-06-19 16:39:20 -04:00
|
|
|
def poll_interval
|
|
|
|
Sidekiq.options[:poll_interval] || POLL_INTERVAL
|
|
|
|
end
|
|
|
|
|
2012-06-14 11:36:00 -04:00
|
|
|
def add_jitter
|
|
|
|
begin
|
2012-07-16 02:11:38 -04:00
|
|
|
sleep(poll_interval * rand)
|
2012-06-14 11:36:00 -04:00
|
|
|
rescue Celluloid::Task::TerminatedError
|
|
|
|
# Hit Ctrl-C when Sidekiq is finished booting and we have a chance
|
|
|
|
# to get here.
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2012-05-25 23:21:42 -04:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|