From 26600bef236fe4d807ed2991955f73faa8da68c7 Mon Sep 17 00:00:00 2001 From: Lee Henson Date: Fri, 8 Mar 2013 15:08:19 +0000 Subject: [PATCH 1/3] send last and scheduled invocation timestamps to worker.perform, if required --- lib/sidetiq/clock.rb | 18 ++++++----- test/test_clock.rb | 73 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 82 insertions(+), 9 deletions(-) diff --git a/lib/sidetiq/clock.rb b/lib/sidetiq/clock.rb index 1461484..c6433d6 100644 --- a/lib/sidetiq/clock.rb +++ b/lib/sidetiq/clock.rb @@ -131,15 +131,20 @@ module Sidetiq def enqueue(worker, time) key = "sidetiq:#{worker.name}" + time_f = time.to_f synchronize_clockworks("#{key}:lock") do |redis| - status = redis.get(key) + next_run = (redis.get("#{key}:next") || -1).to_f - if status.nil? || status.to_f < time.to_f - time_f = time.to_f - Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f})" - redis.set(key, time_f) - worker.perform_at(time) + if next_run < time_f + Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})" + + redis.mset("#{key}:last", next_run, "#{key}:next", time_f) + + arity = [worker.instance_method(:perform).arity - 1, -1].max + args = [next_run, time_f][0..arity] + + worker.perform_at(time, *args) end end end @@ -176,4 +181,3 @@ module Sidetiq end end end - diff --git a/test/test_clock.rb b/test/test_clock.rb index 7754cc4..d8e0989 100644 --- a/test/test_clock.rb +++ b/test/test_clock.rb @@ -1,7 +1,9 @@ require_relative 'helper' class TestClock < Sidetiq::TestCase - class FakeWorker; + class FakeWorker + def perform + end end def test_delegates_to_instance @@ -61,5 +63,72 @@ class TestClock < Sidetiq::TestCase clock.tick clock.tick end -end + class LastTickWorker + def perform last_tick + end + end + + def test_enqueues_jobs_with_default_last_tick_arg_on_first_run + schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME) + schedule.hourly + + time = Time.local(2011, 1, 1, 1, 30) + + clock.stubs(:gettime).returns(time, time + 3600) + clock.stubs(:schedules).returns(LastTickWorker => schedule) + + expected_first_tick = time + 1800 + expected_second_tick = expected_first_tick + 3600 + + LastTickWorker.expects(:perform_at).with(expected_first_tick, -1).once + LastTickWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f).once + + clock.tick + clock.tick + end + + class LastAndScheduledTicksWorker + def perform last_tick, scheduled_tick + end + end + + def test_enqueues_jobs_with_last_run_timestamp_and_next_run_timestamp + schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME) + schedule.hourly + + time = Time.local(2011, 1, 1, 1, 30) + + clock.stubs(:gettime).returns(time, time + 3600) + clock.stubs(:schedules).returns(LastAndScheduledTicksWorker => schedule) + + expected_first_tick = time + 1800 + expected_second_tick = expected_first_tick + 3600 + + LastAndScheduledTicksWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once + clock.tick + + LastAndScheduledTicksWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f, expected_second_tick.to_f).once + clock.tick + end + + class SplatArgsWorker + def perform arg1, *args + end + end + + def test_enqueues_jobs_correctly_for_splat_args_perform_methods + schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME) + schedule.hourly + + time = Time.local(2011, 1, 1, 1, 30) + + clock.stubs(:gettime).returns(time, time + 3600) + clock.stubs(:schedules).returns(SplatArgsWorker => schedule) + + expected_first_tick = time + 1800 + + SplatArgsWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once + clock.tick + end +end From 91e90f418ee88fc8dfeeeec0c0cd4aafecfe7269 Mon Sep 17 00:00:00 2001 From: Lee Henson Date: Fri, 8 Mar 2013 16:02:34 +0000 Subject: [PATCH 2/3] add class methods to get last/next run timestamps for a worker --- lib/sidetiq/schedulable.rb | 17 ++++++++++++++++- test/test_worker.rb | 25 +++++++++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) create mode 100644 test/test_worker.rb diff --git a/lib/sidetiq/schedulable.rb b/lib/sidetiq/schedulable.rb index 4beaf60..67b1978 100644 --- a/lib/sidetiq/schedulable.rb +++ b/lib/sidetiq/schedulable.rb @@ -12,12 +12,28 @@ module Sidetiq # end module Schedulable module ClassMethods + def last_scheduled_occurrence + get_timestamp "last" + end + + def next_scheduled_occurrence + get_timestamp "next" + end + def tiq(&block) # :nodoc: clock = Sidetiq::Clock.instance clock.synchronize do clock.schedule_for(self).instance_eval(&block) end end + + private + + def get_timestamp key + Sidekiq.redis do |redis| + (redis.get("sidetiq:#{name}:#{key}") || -1).to_f + end + end end def self.included(klass) # :nodoc: @@ -25,4 +41,3 @@ module Sidetiq end end end - diff --git a/test/test_worker.rb b/test/test_worker.rb new file mode 100644 index 0000000..a69f60c --- /dev/null +++ b/test/test_worker.rb @@ -0,0 +1,25 @@ +require_relative 'helper' + +class TestWorker < Sidetiq::TestCase + class FakeWorker + include Sidetiq::Schedulable + end + + def test_timestamps_for_new_worker + assert FakeWorker.last_scheduled_occurrence == -1 + assert FakeWorker.next_scheduled_occurrence == -1 + end + + def test_timestamps_for_existing_worker + last_run = (Time.now - 100).to_f + next_run = (Time.now + 100).to_f + + Sidekiq.redis do |redis| + redis.set "sidetiq:TestWorker::FakeWorker:last", last_run + redis.set "sidetiq:TestWorker::FakeWorker:next", next_run + end + + assert FakeWorker.last_scheduled_occurrence == last_run + assert FakeWorker.next_scheduled_occurrence == next_run + end +end From 65eb63db586c2d183f45e8856a112888e5b1a820 Mon Sep 17 00:00:00 2001 From: Lee Henson Date: Fri, 8 Mar 2013 16:07:25 +0000 Subject: [PATCH 3/3] add parentheses --- lib/sidetiq/schedulable.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidetiq/schedulable.rb b/lib/sidetiq/schedulable.rb index 67b1978..4519216 100644 --- a/lib/sidetiq/schedulable.rb +++ b/lib/sidetiq/schedulable.rb @@ -29,7 +29,7 @@ module Sidetiq private - def get_timestamp key + def get_timestamp(key) Sidekiq.redis do |redis| (redis.get("sidetiq:#{name}:#{key}") || -1).to_f end