mirror of
https://github.com/endofunky/sidetiq.git
synced 2022-11-09 13:53:30 -05:00
commit
3834c2e3e7
4 changed files with 123 additions and 10 deletions
|
@ -131,15 +131,20 @@ module Sidetiq
|
||||||
|
|
||||||
def enqueue(worker, time)
|
def enqueue(worker, time)
|
||||||
key = "sidetiq:#{worker.name}"
|
key = "sidetiq:#{worker.name}"
|
||||||
|
time_f = time.to_f
|
||||||
|
|
||||||
synchronize_clockworks("#{key}:lock") do |redis|
|
synchronize_clockworks("#{key}:lock") do |redis|
|
||||||
status = redis.get(key)
|
next_run = (redis.get("#{key}:next") || -1).to_f
|
||||||
|
|
||||||
if status.nil? || status.to_f < time.to_f
|
if next_run < time_f
|
||||||
time_f = time.to_f
|
Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})"
|
||||||
Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f})"
|
|
||||||
redis.set(key, time_f)
|
redis.mset("#{key}:last", next_run, "#{key}:next", time_f)
|
||||||
worker.perform_at(time)
|
|
||||||
|
arity = [worker.instance_method(:perform).arity - 1, -1].max
|
||||||
|
args = [next_run, time_f][0..arity]
|
||||||
|
|
||||||
|
worker.perform_at(time, *args)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -176,4 +181,3 @@ module Sidetiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -12,12 +12,28 @@ module Sidetiq
|
||||||
# end
|
# end
|
||||||
module Schedulable
|
module Schedulable
|
||||||
module ClassMethods
|
module ClassMethods
|
||||||
|
def last_scheduled_occurrence
|
||||||
|
get_timestamp "last"
|
||||||
|
end
|
||||||
|
|
||||||
|
def next_scheduled_occurrence
|
||||||
|
get_timestamp "next"
|
||||||
|
end
|
||||||
|
|
||||||
def tiq(&block) # :nodoc:
|
def tiq(&block) # :nodoc:
|
||||||
clock = Sidetiq::Clock.instance
|
clock = Sidetiq::Clock.instance
|
||||||
clock.synchronize do
|
clock.synchronize do
|
||||||
clock.schedule_for(self).instance_eval(&block)
|
clock.schedule_for(self).instance_eval(&block)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def get_timestamp(key)
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
(redis.get("sidetiq:#{name}:#{key}") || -1).to_f
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.included(klass) # :nodoc:
|
def self.included(klass) # :nodoc:
|
||||||
|
@ -25,4 +41,3 @@ module Sidetiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
require_relative 'helper'
|
require_relative 'helper'
|
||||||
|
|
||||||
class TestClock < Sidetiq::TestCase
|
class TestClock < Sidetiq::TestCase
|
||||||
class FakeWorker;
|
class FakeWorker
|
||||||
|
def perform
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def test_delegates_to_instance
|
def test_delegates_to_instance
|
||||||
|
@ -61,5 +63,72 @@ class TestClock < Sidetiq::TestCase
|
||||||
clock.tick
|
clock.tick
|
||||||
clock.tick
|
clock.tick
|
||||||
end
|
end
|
||||||
end
|
|
||||||
|
|
||||||
|
class LastTickWorker
|
||||||
|
def perform last_tick
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_enqueues_jobs_with_default_last_tick_arg_on_first_run
|
||||||
|
schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME)
|
||||||
|
schedule.hourly
|
||||||
|
|
||||||
|
time = Time.local(2011, 1, 1, 1, 30)
|
||||||
|
|
||||||
|
clock.stubs(:gettime).returns(time, time + 3600)
|
||||||
|
clock.stubs(:schedules).returns(LastTickWorker => schedule)
|
||||||
|
|
||||||
|
expected_first_tick = time + 1800
|
||||||
|
expected_second_tick = expected_first_tick + 3600
|
||||||
|
|
||||||
|
LastTickWorker.expects(:perform_at).with(expected_first_tick, -1).once
|
||||||
|
LastTickWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f).once
|
||||||
|
|
||||||
|
clock.tick
|
||||||
|
clock.tick
|
||||||
|
end
|
||||||
|
|
||||||
|
class LastAndScheduledTicksWorker
|
||||||
|
def perform last_tick, scheduled_tick
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_enqueues_jobs_with_last_run_timestamp_and_next_run_timestamp
|
||||||
|
schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME)
|
||||||
|
schedule.hourly
|
||||||
|
|
||||||
|
time = Time.local(2011, 1, 1, 1, 30)
|
||||||
|
|
||||||
|
clock.stubs(:gettime).returns(time, time + 3600)
|
||||||
|
clock.stubs(:schedules).returns(LastAndScheduledTicksWorker => schedule)
|
||||||
|
|
||||||
|
expected_first_tick = time + 1800
|
||||||
|
expected_second_tick = expected_first_tick + 3600
|
||||||
|
|
||||||
|
LastAndScheduledTicksWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once
|
||||||
|
clock.tick
|
||||||
|
|
||||||
|
LastAndScheduledTicksWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f, expected_second_tick.to_f).once
|
||||||
|
clock.tick
|
||||||
|
end
|
||||||
|
|
||||||
|
class SplatArgsWorker
|
||||||
|
def perform arg1, *args
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_enqueues_jobs_correctly_for_splat_args_perform_methods
|
||||||
|
schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME)
|
||||||
|
schedule.hourly
|
||||||
|
|
||||||
|
time = Time.local(2011, 1, 1, 1, 30)
|
||||||
|
|
||||||
|
clock.stubs(:gettime).returns(time, time + 3600)
|
||||||
|
clock.stubs(:schedules).returns(SplatArgsWorker => schedule)
|
||||||
|
|
||||||
|
expected_first_tick = time + 1800
|
||||||
|
|
||||||
|
SplatArgsWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once
|
||||||
|
clock.tick
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
25
test/test_worker.rb
Normal file
25
test/test_worker.rb
Normal file
|
@ -0,0 +1,25 @@
|
||||||
|
require_relative 'helper'
|
||||||
|
|
||||||
|
class TestWorker < Sidetiq::TestCase
|
||||||
|
class FakeWorker
|
||||||
|
include Sidetiq::Schedulable
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timestamps_for_new_worker
|
||||||
|
assert FakeWorker.last_scheduled_occurrence == -1
|
||||||
|
assert FakeWorker.next_scheduled_occurrence == -1
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_timestamps_for_existing_worker
|
||||||
|
last_run = (Time.now - 100).to_f
|
||||||
|
next_run = (Time.now + 100).to_f
|
||||||
|
|
||||||
|
Sidekiq.redis do |redis|
|
||||||
|
redis.set "sidetiq:TestWorker::FakeWorker:last", last_run
|
||||||
|
redis.set "sidetiq:TestWorker::FakeWorker:next", next_run
|
||||||
|
end
|
||||||
|
|
||||||
|
assert FakeWorker.last_scheduled_occurrence == last_run
|
||||||
|
assert FakeWorker.next_scheduled_occurrence == next_run
|
||||||
|
end
|
||||||
|
end
|
Loading…
Add table
Add a link
Reference in a new issue