2013-01-31 12:42:19 -05:00
|
|
|
module Sidetiq
|
|
|
|
configure do |config|
|
2013-01-31 12:50:37 -05:00
|
|
|
config.priority = Thread.main.priority
|
2013-02-01 11:53:34 -05:00
|
|
|
config.resolution = 1
|
|
|
|
config.lock_expire = 1000
|
2013-01-31 12:42:19 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
class Clock
|
|
|
|
include Singleton
|
|
|
|
include MonitorMixin
|
|
|
|
|
2013-02-01 11:53:34 -05:00
|
|
|
START_TIME = Time.local(2010, 1, 1)
|
|
|
|
|
2013-01-31 12:42:19 -05:00
|
|
|
attr_reader :schedules
|
|
|
|
|
|
|
|
def initialize
|
|
|
|
super
|
|
|
|
@schedules = {}
|
|
|
|
start!
|
|
|
|
end
|
|
|
|
|
|
|
|
def schedule_for(worker)
|
2013-02-01 11:53:34 -05:00
|
|
|
schedules[worker] ||= Sidetiq::Schedule.new(START_TIME)
|
2013-01-31 12:42:19 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def tick
|
|
|
|
@tick = gettime
|
|
|
|
synchronize do
|
|
|
|
schedules.each do |worker, schedule|
|
|
|
|
if schedule.schedule_next?(@tick)
|
2013-02-01 11:53:34 -05:00
|
|
|
enqueue(worker, schedule.next_occurrence(@tick))
|
2013-01-31 12:42:19 -05:00
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
private
|
|
|
|
|
2013-02-01 11:53:34 -05:00
|
|
|
def enqueue(worker, time)
|
|
|
|
key = "sidetiq:#{worker.name}"
|
|
|
|
|
|
|
|
synchronize_clockworks("#{key}:lock") do |redis|
|
|
|
|
status = redis.get(key)
|
|
|
|
|
|
|
|
if status.nil? || status.to_f < time.to_f
|
|
|
|
time_f = time.to_f
|
|
|
|
Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f})"
|
|
|
|
redis.set(key, time_f)
|
|
|
|
worker.perform_at(time)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
def synchronize_clockworks(lock)
|
|
|
|
Sidekiq.redis do |redis|
|
|
|
|
if redis.setnx(lock, 1)
|
|
|
|
Sidekiq.logger.debug "Sidetiq::Clock lock #{lock} #{Thread.current.inspect}"
|
|
|
|
|
|
|
|
redis.pexpire(lock, Sidetiq.config.lock_expire)
|
|
|
|
yield redis
|
|
|
|
redis.del(lock)
|
|
|
|
|
|
|
|
Sidekiq.logger.debug "Sidetiq::Clock unlock #{lock} #{Thread.current.inspect}"
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2013-01-31 12:42:19 -05:00
|
|
|
def start!
|
|
|
|
Sidekiq.logger.info "Sidetiq::Clock start"
|
|
|
|
thr = Thread.start { clock { tick } }
|
|
|
|
thr.abort_on_exception = true
|
|
|
|
thr.priority = Sidetiq.config.resolution
|
|
|
|
end
|
|
|
|
|
|
|
|
def clock
|
|
|
|
loop do
|
|
|
|
yield
|
|
|
|
Thread.pass
|
|
|
|
sleep Sidetiq.config.resolution
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|