From f5c1b9158767510faa42c99f4e4c4691c0a9ff9c Mon Sep 17 00:00:00 2001 From: Tobias Svensson Date: Tue, 17 Sep 2013 16:48:41 +0100 Subject: [PATCH] Store schedules on the workers instead of a global, mutable Hash. --- lib/sidetiq.rb | 6 ++++ lib/sidetiq/api.rb | 4 +-- lib/sidetiq/clock.rb | 9 +++--- lib/sidetiq/handler.rb | 12 ++++---- lib/sidetiq/middleware/history.rb | 8 ++--- lib/sidetiq/schedulable.rb | 15 +++++++++- lib/sidetiq/views/sidetiq.erb | 5 ++-- lib/sidetiq/web.rb | 20 +++++++------ .../last_and_scheduled_ticks_worker.rb | 5 ++++ test/fixtures/last_tick_worker.rb | 5 ++++ test/fixtures/optional_arguments_worker.rb | 4 ++- test/fixtures/simple_worker.rb | 2 ++ test/fixtures/splat_args_worker.rb | 5 ++++ test/test_clock.rb | 30 +++++++------------ test/test_sidetiq.rb | 11 +++---- test/test_web.rb | 5 ++-- test/test_worker.rb | 2 +- 17 files changed, 89 insertions(+), 59 deletions(-) diff --git a/lib/sidetiq.rb b/lib/sidetiq.rb index d50599b..c4dff0e 100644 --- a/lib/sidetiq.rb +++ b/lib/sidetiq.rb @@ -2,6 +2,7 @@ require 'ostruct' require 'singleton' require 'socket' +require 'time' # gems require 'ice_cube' @@ -12,6 +13,7 @@ require 'celluloid' require 'sidetiq/config' require 'sidetiq/logging' require 'sidetiq/api' +require 'sidetiq/subclass_tracking' require 'sidetiq/clock' require 'sidetiq/handler' require 'sidetiq/lock/meta_data' @@ -58,3 +60,7 @@ module Sidetiq Sidetiq::Supervisor.handler end end + +if Sidekiq.server? + Sidetiq::Supervisor.run! +end diff --git a/lib/sidetiq/api.rb b/lib/sidetiq/api.rb index 91c9519..07c7f5a 100644 --- a/lib/sidetiq/api.rb +++ b/lib/sidetiq/api.rb @@ -3,12 +3,12 @@ module Sidetiq module API # Public: Returns an Array of workers including Sidetiq::Schedulable. def workers - schedules.keys + Sidetiq::Schedulable.subclasses(true) end # Public: Returns a Hash of Sidetiq::Schedule instances. def schedules - clock.schedules.dup + workers.map(&:schedule) end # Public: Currently scheduled recurring jobs. diff --git a/lib/sidetiq/clock.rb b/lib/sidetiq/clock.rb index 14e4577..d143bf4 100644 --- a/lib/sidetiq/clock.rb +++ b/lib/sidetiq/clock.rb @@ -8,7 +8,6 @@ module Sidetiq def initialize # :nodoc: super - @schedules = {} end # Public: Get the schedule for `worker`. @@ -22,7 +21,9 @@ module Sidetiq # # Returns a Sidetiq::Schedule instances. def schedule_for(worker) - schedules[worker] ||= Sidetiq::Schedule.new + if worker.respond_to?(:schedule) + worker.schedule + end end # Public: Issue a single clock tick. @@ -35,8 +36,8 @@ module Sidetiq # Returns a hash of Sidetiq::Schedule instances. def tick tick = gettime - schedules.each do |worker, sched| - Sidetiq.handler.dispatch(worker,sched, tick) + Sidetiq.workers.each do |worker| + Sidetiq.handler.dispatch(worker, tick) end end diff --git a/lib/sidetiq/handler.rb b/lib/sidetiq/handler.rb index 875db36..8d09779 100644 --- a/lib/sidetiq/handler.rb +++ b/lib/sidetiq/handler.rb @@ -3,18 +3,20 @@ module Sidetiq include Logging include Sidekiq::ExceptionHandler - def dispatch(worker, sched, tick) - return unless sched.schedule_next?(tick) + def dispatch(worker, tick) + schedule = worker.schedule + + return unless schedule.schedule_next?(tick) Lock::Redis.new(worker).synchronize do |redis| - if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0 + if schedule.backfill? && (last = worker.last_scheduled_occurrence) > 0 last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last) - sched.occurrences_between(last + 1, tick).each do |past_t| + schedule.occurrences_between(last + 1, tick).each do |past_t| enqueue(worker, past_t, redis) end end - enqueue(worker, sched.next_occurrence(tick), redis) + enqueue(worker, schedule.next_occurrence(tick), redis) end rescue StandardError => e handle_exception(e, context: "Sidetiq::Handler#dispatch") diff --git a/lib/sidetiq/middleware/history.rb b/lib/sidetiq/middleware/history.rb index a3a3e1b..4e3bcaa 100644 --- a/lib/sidetiq/middleware/history.rb +++ b/lib/sidetiq/middleware/history.rb @@ -39,12 +39,10 @@ module Sidetiq def save_entry_for_worker(entry, worker) Sidekiq.redis do |redis| - redis.pipelined do |pipe| - list_name = "sidetiq:#{worker.class.name}:history" + list_name = "sidetiq:#{worker.class.name}:history" - pipe.lpush(list_name, JSON.dump(entry)) - pipe.ltrim(list_name, 0, Sidetiq.config.worker_history - 1) - end + redis.lpush(list_name, JSON.dump(entry)) + redis.ltrim(list_name, 0, Sidetiq.config.worker_history - 1) end end end diff --git a/lib/sidetiq/schedulable.rb b/lib/sidetiq/schedulable.rb index f53a829..1dc2a17 100644 --- a/lib/sidetiq/schedulable.rb +++ b/lib/sidetiq/schedulable.rb @@ -11,14 +11,24 @@ module Sidetiq # recurrence { daily } # end module Schedulable + extend SubclassTracking + module ClassMethods include Logging + include SubclassTracking + + attr_writer :schedule # Public: Returns a Float timestamp of the last scheduled run. def last_scheduled_occurrence get_timestamp "last" end + # Public: Returns the Sidetiq::Schedule for this worker. + def schedule + @schedule ||= Sidetiq::Schedule.new + end + # Public: Returns a Float timestamp of the next scheduled run. def next_scheduled_occurrence get_timestamp "next" @@ -32,7 +42,6 @@ module Sidetiq end def recurrence(options = {}, &block) # :nodoc: - schedule = Sidetiq.clock.schedule_for(self) schedule.instance_eval(&block) schedule.set_options(options) end @@ -47,7 +56,11 @@ module Sidetiq end def self.included(klass) # :nodoc: + super + klass.extend(Sidetiq::Schedulable::ClassMethods) + klass.extend(Sidetiq::SubclassTracking) + subclasses << klass end end end diff --git a/lib/sidetiq/views/sidetiq.erb b/lib/sidetiq/views/sidetiq.erb index b4b2abc..50552b3 100644 --- a/lib/sidetiq/views/sidetiq.erb +++ b/lib/sidetiq/views/sidetiq.erb @@ -11,7 +11,7 @@ <%= erb File.read(File.join(File.dirname(__FILE__), 'views', '_home_nav.erb')) %>
- <% if @schedules.length > 0 %> + <% if @workers.length > 0 %> @@ -19,7 +19,8 @@ - <% @schedules.each do |worker, schedule| %> + <% @workers.each do |worker| %> + <% schedule = worker.schedule %>
WorkerNext Run Actions
"><%= worker.name %> diff --git a/lib/sidetiq/web.rb b/lib/sidetiq/web.rb index ddcb112..80f3ad6 100644 --- a/lib/sidetiq/web.rb +++ b/lib/sidetiq/web.rb @@ -6,7 +6,7 @@ module Sidetiq def self.registered(app) app.get "/sidetiq" do - @schedules = Sidetiq.schedules + @workers = Sidetiq.workers @time = Sidetiq.clock.gettime erb File.read(File.join(VIEWS, 'sidetiq.erb')) end @@ -22,9 +22,11 @@ module Sidetiq @time = Sidetiq.clock.gettime - @worker, @schedule = Sidetiq.schedules.select do |worker, _| + @worker = Sidetiq.workers.detect do |worker| worker.name == name - end.flatten + end + + @schedule = @worker.schedule erb File.read(File.join(VIEWS, 'schedule.erb')) end @@ -34,12 +36,12 @@ module Sidetiq @time = Sidetiq.clock.gettime - @worker, @schedule = Sidetiq.schedules.select do |worker, _| + @worker = Sidetiq.workers.detect do |worker| worker.name == name - end.flatten + end @history = Sidekiq.redis do |redis| - redis.lrange("sidetiq:#{@worker.name}:history", 0, -1) + redis.lrange("sidetiq:#{name}:history", 0, -1) end erb File.read(File.join(VIEWS, 'history.erb')) @@ -48,9 +50,9 @@ module Sidetiq app.post "/sidetiq/:name/trigger" do halt 404 unless (name = params[:name]) - worker, _ = Sidetiq.schedules.select do |w, _| - w.name == name - end.flatten + worker = Sidetiq.workers.detect do |worker| + worker.name == name + end worker.perform_async diff --git a/test/fixtures/last_and_scheduled_ticks_worker.rb b/test/fixtures/last_and_scheduled_ticks_worker.rb index c036b20..291d432 100644 --- a/test/fixtures/last_and_scheduled_ticks_worker.rb +++ b/test/fixtures/last_and_scheduled_ticks_worker.rb @@ -1,4 +1,9 @@ class LastAndScheduledTicksWorker + include Sidekiq::Worker + include Sidetiq::Schedulable + + recurrence { hourly } + def perform(last_tick, scheduled_tick) end end diff --git a/test/fixtures/last_tick_worker.rb b/test/fixtures/last_tick_worker.rb index ab20c50..fb063ab 100644 --- a/test/fixtures/last_tick_worker.rb +++ b/test/fixtures/last_tick_worker.rb @@ -1,4 +1,9 @@ class LastTickWorker + include Sidekiq::Worker + include Sidetiq::Schedulable + + recurrence { hourly } + def perform(last_tick) end end diff --git a/test/fixtures/optional_arguments_worker.rb b/test/fixtures/optional_arguments_worker.rb index 21e74c2..375a68e 100644 --- a/test/fixtures/optional_arguments_worker.rb +++ b/test/fixtures/optional_arguments_worker.rb @@ -1,7 +1,9 @@ class OptionalArgumentWorker - include Sidekiq::Worker + include Sidekiq::Worker include Sidetiq::Schedulable + recurrence { hourly } + def perform(last_tick = nil) end end diff --git a/test/fixtures/simple_worker.rb b/test/fixtures/simple_worker.rb index dfe2899..10bd652 100644 --- a/test/fixtures/simple_worker.rb +++ b/test/fixtures/simple_worker.rb @@ -2,6 +2,8 @@ class SimpleWorker include Sidekiq::Worker include Sidetiq::Schedulable + recurrence { daily } + def perform end end diff --git a/test/fixtures/splat_args_worker.rb b/test/fixtures/splat_args_worker.rb index 62333c3..d01135c 100644 --- a/test/fixtures/splat_args_worker.rb +++ b/test/fixtures/splat_args_worker.rb @@ -1,4 +1,9 @@ class SplatArgsWorker + include Sidekiq::Worker + include Sidetiq::Schedulable + + recurrence { hourly } + def perform(arg1, *args) end end diff --git a/test/test_clock.rb b/test/test_clock.rb index 040fdf7..77fbe9b 100644 --- a/test/test_clock.rb +++ b/test/test_clock.rb @@ -18,6 +18,7 @@ class TestClock < Sidetiq::TestCase def test_backfilling BackfillWorker.jobs.clear + Sidetiq.stubs(:workers).returns([BackfillWorker]) start = Sidetiq::Schedule::START_TIME BackfillWorker.stubs(:last_scheduled_occurrence).returns(start.to_f) @@ -32,10 +33,7 @@ class TestClock < Sidetiq::TestCase end def test_enqueues_jobs_by_schedule - schedule = Sidetiq::Schedule.new - schedule.daily - - clock.stubs(:schedules).returns(SimpleWorker => schedule) + Sidetiq.stubs(:workers).returns([SimpleWorker]) SimpleWorker.expects(:perform_at).times(10) @@ -51,13 +49,11 @@ class TestClock < Sidetiq::TestCase end def test_enqueues_jobs_with_default_last_tick_arg_on_first_run - schedule = Sidetiq::Schedule.new - schedule.hourly - time = Time.local(2011, 1, 1, 1, 30) clock.stubs(:gettime).returns(time, time + 3600) - clock.stubs(:schedules).returns(LastTickWorker => schedule) + + Sidetiq.stubs(:workers).returns([LastTickWorker]) expected_first_tick = time + 1800 expected_second_tick = expected_first_tick + 3600 @@ -71,13 +67,11 @@ class TestClock < Sidetiq::TestCase end def test_enqueues_jobs_with_last_run_timestamp_and_next_run_timestamp - schedule = Sidetiq::Schedule.new - schedule.hourly - time = Time.local(2011, 1, 1, 1, 30) clock.stubs(:gettime).returns(time, time + 3600) - clock.stubs(:schedules).returns(LastAndScheduledTicksWorker => schedule) + + Sidetiq.stubs(:workers).returns([LastAndScheduledTicksWorker]) expected_first_tick = time + 1800 expected_second_tick = expected_first_tick + 3600 @@ -95,13 +89,11 @@ class TestClock < Sidetiq::TestCase end def test_enqueues_jobs_with_last_run_timestamp_if_optional_argument - schedule = Sidetiq::Schedule.new - schedule.hourly - time = Time.local(2011, 1, 1, 1, 30) clock.stubs(:gettime).returns(time, time + 3600) - clock.stubs(:schedules).returns(OptionalArgumentWorker => schedule) + + Sidetiq.stubs(:workers).returns([OptionalArgumentWorker]) expected_first_tick = time + 1800 @@ -111,13 +103,11 @@ class TestClock < Sidetiq::TestCase end def test_enqueues_jobs_correctly_for_splat_args_perform_methods - schedule = Sidetiq::Schedule.new - schedule.hourly - time = Time.local(2011, 1, 1, 1, 30) clock.stubs(:gettime).returns(time, time + 3600) - clock.stubs(:schedules).returns(SplatArgsWorker => schedule) + + Sidetiq.stubs(:workers).returns([SplatArgsWorker]) expected_first_tick = time + 1800 diff --git a/test/test_sidetiq.rb b/test/test_sidetiq.rb index 33cf55b..bb5cbd0 100644 --- a/test/test_sidetiq.rb +++ b/test/test_sidetiq.rb @@ -4,13 +4,11 @@ class TestSidetiq < Sidetiq::TestCase def test_schedules schedules = Sidetiq.schedules - assert_equal 2, schedules.length + assert_includes schedules, ScheduledWorker.schedule + assert_includes schedules, BackfillWorker.schedule - assert_includes schedules.keys, ScheduledWorker - assert_includes schedules.keys, BackfillWorker - - assert_kind_of Sidetiq::Schedule, schedules[ScheduledWorker] - assert_kind_of Sidetiq::Schedule, schedules[BackfillWorker] + assert_kind_of Sidetiq::Schedule, ScheduledWorker.schedule + assert_kind_of Sidetiq::Schedule, BackfillWorker.schedule end def test_workers @@ -18,7 +16,6 @@ class TestSidetiq < Sidetiq::TestCase assert_includes workers, ScheduledWorker assert_includes workers, BackfillWorker - assert_equal 2, workers.length end def test_scheduled diff --git a/test/test_web.rb b/test/test_web.rb index 7112800..f06f724 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -14,6 +14,7 @@ class TestWeb < Sidetiq::TestCase def setup super ScheduledWorker.jobs.clear + Sidetiq.stubs(:workers).returns([ScheduledWorker]) end def test_home_tab @@ -27,7 +28,7 @@ class TestWeb < Sidetiq::TestCase get '/sidetiq' assert_equal 200, last_response.status - clock.schedules.each do |worker, schedule| + Sidetiq.workers.each do |worker| assert_match /#{worker.name}/, last_response.body assert_match /#{worker.get_sidekiq_options['queue']}/, last_response.body end @@ -46,7 +47,7 @@ class TestWeb < Sidetiq::TestCase def test_schedule_page get "/sidetiq/ScheduledWorker/schedule" assert_equal 200, last_response.status - schedule = clock.schedules[ScheduledWorker] + schedule = ScheduledWorker.schedule schedule.recurrence_rules.each do |rule| assert_match /#{rule.to_s}/, last_response.body diff --git a/test/test_worker.rb b/test/test_worker.rb index 033ae65..b15f051 100644 --- a/test/test_worker.rb +++ b/test/test_worker.rb @@ -24,6 +24,6 @@ class TestWorker < Sidetiq::TestCase end def test_options - assert Sidetiq.schedules[BackfillWorker].backfill? + assert BackfillWorker.schedule.backfill? end end