1
0
Fork 0
mirror of https://github.com/endofunky/sidetiq.git synced 2022-11-09 13:53:30 -05:00
endofunky--sidetiq/lib/sidetiq/handler.rb

55 lines
1.5 KiB
Ruby
Raw Permalink Normal View History

require 'sidekiq/exception_handler'
2013-09-16 07:20:21 -04:00
module Sidetiq
class Handler
include Logging
include Sidekiq::ExceptionHandler
2013-09-16 07:20:21 -04:00
def dispatch(worker, tick)
schedule = worker.schedule
return unless schedule.schedule_next?(tick)
2013-09-16 07:20:21 -04:00
2013-09-16 08:49:56 -04:00
Lock::Redis.new(worker).synchronize do |redis|
if schedule.backfill? && (last = worker.last_scheduled_occurrence) > 0
2013-09-16 07:20:21 -04:00
last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last)
schedule.occurrences_between(last + 1, tick).each do |past_t|
2013-09-16 07:20:21 -04:00
enqueue(worker, past_t, redis)
end
end
enqueue(worker, schedule.next_occurrence(tick), redis)
2013-09-16 07:20:21 -04:00
end
rescue StandardError => e
handle_exception(e, context: "Sidetiq::Handler#dispatch")
2013-09-17 09:00:10 -04:00
raise e
2013-09-16 07:20:21 -04:00
end
private
def enqueue(worker, time, redis)
key = "sidetiq:#{worker.name}"
time_f = time.to_f
next_run = (redis.get("#{key}:next") || -1).to_f
if next_run < time_f
info "Enqueue: #{worker.name} (at: #{time_f}) (last: #{next_run})"
redis.mset("#{key}:last", next_run, "#{key}:next", time_f)
case worker.instance_method(:perform).arity.abs
when 0
worker.perform_at(time)
when 1
worker.perform_at(time, next_run)
else
worker.perform_at(time, next_run, time_f)
end
end
rescue StandardError => e
handle_exception(e, context: "Sidetiq::Handler#enqueue")
2013-09-17 09:00:10 -04:00
raise e
2013-09-16 07:20:21 -04:00
end
end
end