1
0
Fork 0
mirror of https://github.com/endofunky/sidetiq.git synced 2022-11-09 13:53:30 -05:00

Store schedules on the workers instead of a global, mutable Hash.

This commit is contained in:
Tobias Svensson 2013-09-17 16:48:41 +01:00
parent fdedc078f9
commit f5c1b91587
17 changed files with 89 additions and 59 deletions

View file

@ -2,6 +2,7 @@
require 'ostruct' require 'ostruct'
require 'singleton' require 'singleton'
require 'socket' require 'socket'
require 'time'
# gems # gems
require 'ice_cube' require 'ice_cube'
@ -12,6 +13,7 @@ require 'celluloid'
require 'sidetiq/config' require 'sidetiq/config'
require 'sidetiq/logging' require 'sidetiq/logging'
require 'sidetiq/api' require 'sidetiq/api'
require 'sidetiq/subclass_tracking'
require 'sidetiq/clock' require 'sidetiq/clock'
require 'sidetiq/handler' require 'sidetiq/handler'
require 'sidetiq/lock/meta_data' require 'sidetiq/lock/meta_data'
@ -58,3 +60,7 @@ module Sidetiq
Sidetiq::Supervisor.handler Sidetiq::Supervisor.handler
end end
end end
if Sidekiq.server?
Sidetiq::Supervisor.run!
end

View file

@ -3,12 +3,12 @@ module Sidetiq
module API module API
# Public: Returns an Array of workers including Sidetiq::Schedulable. # Public: Returns an Array of workers including Sidetiq::Schedulable.
def workers def workers
schedules.keys Sidetiq::Schedulable.subclasses(true)
end end
# Public: Returns a Hash of Sidetiq::Schedule instances. # Public: Returns a Hash of Sidetiq::Schedule instances.
def schedules def schedules
clock.schedules.dup workers.map(&:schedule)
end end
# Public: Currently scheduled recurring jobs. # Public: Currently scheduled recurring jobs.

View file

@ -8,7 +8,6 @@ module Sidetiq
def initialize # :nodoc: def initialize # :nodoc:
super super
@schedules = {}
end end
# Public: Get the schedule for `worker`. # Public: Get the schedule for `worker`.
@ -22,7 +21,9 @@ module Sidetiq
# #
# Returns a Sidetiq::Schedule instances. # Returns a Sidetiq::Schedule instances.
def schedule_for(worker) def schedule_for(worker)
schedules[worker] ||= Sidetiq::Schedule.new if worker.respond_to?(:schedule)
worker.schedule
end
end end
# Public: Issue a single clock tick. # Public: Issue a single clock tick.
@ -35,8 +36,8 @@ module Sidetiq
# Returns a hash of Sidetiq::Schedule instances. # Returns a hash of Sidetiq::Schedule instances.
def tick def tick
tick = gettime tick = gettime
schedules.each do |worker, sched| Sidetiq.workers.each do |worker|
Sidetiq.handler.dispatch(worker,sched, tick) Sidetiq.handler.dispatch(worker, tick)
end end
end end

View file

@ -3,18 +3,20 @@ module Sidetiq
include Logging include Logging
include Sidekiq::ExceptionHandler include Sidekiq::ExceptionHandler
def dispatch(worker, sched, tick) def dispatch(worker, tick)
return unless sched.schedule_next?(tick) schedule = worker.schedule
return unless schedule.schedule_next?(tick)
Lock::Redis.new(worker).synchronize do |redis| Lock::Redis.new(worker).synchronize do |redis|
if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0 if schedule.backfill? && (last = worker.last_scheduled_occurrence) > 0
last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last) last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last)
sched.occurrences_between(last + 1, tick).each do |past_t| schedule.occurrences_between(last + 1, tick).each do |past_t|
enqueue(worker, past_t, redis) enqueue(worker, past_t, redis)
end end
end end
enqueue(worker, sched.next_occurrence(tick), redis) enqueue(worker, schedule.next_occurrence(tick), redis)
end end
rescue StandardError => e rescue StandardError => e
handle_exception(e, context: "Sidetiq::Handler#dispatch") handle_exception(e, context: "Sidetiq::Handler#dispatch")

View file

@ -39,12 +39,10 @@ module Sidetiq
def save_entry_for_worker(entry, worker) def save_entry_for_worker(entry, worker)
Sidekiq.redis do |redis| Sidekiq.redis do |redis|
redis.pipelined do |pipe|
list_name = "sidetiq:#{worker.class.name}:history" list_name = "sidetiq:#{worker.class.name}:history"
pipe.lpush(list_name, JSON.dump(entry)) redis.lpush(list_name, JSON.dump(entry))
pipe.ltrim(list_name, 0, Sidetiq.config.worker_history - 1) redis.ltrim(list_name, 0, Sidetiq.config.worker_history - 1)
end
end end
end end
end end

View file

@ -11,14 +11,24 @@ module Sidetiq
# recurrence { daily } # recurrence { daily }
# end # end
module Schedulable module Schedulable
extend SubclassTracking
module ClassMethods module ClassMethods
include Logging include Logging
include SubclassTracking
attr_writer :schedule
# Public: Returns a Float timestamp of the last scheduled run. # Public: Returns a Float timestamp of the last scheduled run.
def last_scheduled_occurrence def last_scheduled_occurrence
get_timestamp "last" get_timestamp "last"
end end
# Public: Returns the Sidetiq::Schedule for this worker.
def schedule
@schedule ||= Sidetiq::Schedule.new
end
# Public: Returns a Float timestamp of the next scheduled run. # Public: Returns a Float timestamp of the next scheduled run.
def next_scheduled_occurrence def next_scheduled_occurrence
get_timestamp "next" get_timestamp "next"
@ -32,7 +42,6 @@ module Sidetiq
end end
def recurrence(options = {}, &block) # :nodoc: def recurrence(options = {}, &block) # :nodoc:
schedule = Sidetiq.clock.schedule_for(self)
schedule.instance_eval(&block) schedule.instance_eval(&block)
schedule.set_options(options) schedule.set_options(options)
end end
@ -47,7 +56,11 @@ module Sidetiq
end end
def self.included(klass) # :nodoc: def self.included(klass) # :nodoc:
super
klass.extend(Sidetiq::Schedulable::ClassMethods) klass.extend(Sidetiq::Schedulable::ClassMethods)
klass.extend(Sidetiq::SubclassTracking)
subclasses << klass
end end
end end
end end

View file

@ -11,7 +11,7 @@
<%= erb File.read(File.join(File.dirname(__FILE__), 'views', '_home_nav.erb')) %> <%= erb File.read(File.join(File.dirname(__FILE__), 'views', '_home_nav.erb')) %>
<div class="span9"> <div class="span9">
<% if @schedules.length > 0 %> <% if @workers.length > 0 %>
<table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;"> <table class="table table-striped table-bordered table-white" style="width: 100%; margin: 0; table-layout:fixed;">
<thead> <thead>
<th style="width: 50%">Worker</th> <th style="width: 50%">Worker</th>
@ -19,7 +19,8 @@
<th style="width: 30%">Next Run</th> <th style="width: 30%">Next Run</th>
<th style="width: 10%">Actions</th> <th style="width: 10%">Actions</th>
</thead> </thead>
<% @schedules.each do |worker, schedule| %> <% @workers.each do |worker| %>
<% schedule = worker.schedule %>
<tr> <tr>
<td> <td>
<a href="<%= "#{root_path}sidetiq/#{worker.name}/schedule" %>"><%= worker.name %></a> <a href="<%= "#{root_path}sidetiq/#{worker.name}/schedule" %>"><%= worker.name %></a>

View file

@ -6,7 +6,7 @@ module Sidetiq
def self.registered(app) def self.registered(app)
app.get "/sidetiq" do app.get "/sidetiq" do
@schedules = Sidetiq.schedules @workers = Sidetiq.workers
@time = Sidetiq.clock.gettime @time = Sidetiq.clock.gettime
erb File.read(File.join(VIEWS, 'sidetiq.erb')) erb File.read(File.join(VIEWS, 'sidetiq.erb'))
end end
@ -22,9 +22,11 @@ module Sidetiq
@time = Sidetiq.clock.gettime @time = Sidetiq.clock.gettime
@worker, @schedule = Sidetiq.schedules.select do |worker, _| @worker = Sidetiq.workers.detect do |worker|
worker.name == name worker.name == name
end.flatten end
@schedule = @worker.schedule
erb File.read(File.join(VIEWS, 'schedule.erb')) erb File.read(File.join(VIEWS, 'schedule.erb'))
end end
@ -34,12 +36,12 @@ module Sidetiq
@time = Sidetiq.clock.gettime @time = Sidetiq.clock.gettime
@worker, @schedule = Sidetiq.schedules.select do |worker, _| @worker = Sidetiq.workers.detect do |worker|
worker.name == name worker.name == name
end.flatten end
@history = Sidekiq.redis do |redis| @history = Sidekiq.redis do |redis|
redis.lrange("sidetiq:#{@worker.name}:history", 0, -1) redis.lrange("sidetiq:#{name}:history", 0, -1)
end end
erb File.read(File.join(VIEWS, 'history.erb')) erb File.read(File.join(VIEWS, 'history.erb'))
@ -48,9 +50,9 @@ module Sidetiq
app.post "/sidetiq/:name/trigger" do app.post "/sidetiq/:name/trigger" do
halt 404 unless (name = params[:name]) halt 404 unless (name = params[:name])
worker, _ = Sidetiq.schedules.select do |w, _| worker = Sidetiq.workers.detect do |worker|
w.name == name worker.name == name
end.flatten end
worker.perform_async worker.perform_async

View file

@ -1,4 +1,9 @@
class LastAndScheduledTicksWorker class LastAndScheduledTicksWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
recurrence { hourly }
def perform(last_tick, scheduled_tick) def perform(last_tick, scheduled_tick)
end end
end end

View file

@ -1,4 +1,9 @@
class LastTickWorker class LastTickWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
recurrence { hourly }
def perform(last_tick) def perform(last_tick)
end end
end end

View file

@ -2,6 +2,8 @@ class OptionalArgumentWorker
include Sidekiq::Worker include Sidekiq::Worker
include Sidetiq::Schedulable include Sidetiq::Schedulable
recurrence { hourly }
def perform(last_tick = nil) def perform(last_tick = nil)
end end
end end

View file

@ -2,6 +2,8 @@ class SimpleWorker
include Sidekiq::Worker include Sidekiq::Worker
include Sidetiq::Schedulable include Sidetiq::Schedulable
recurrence { daily }
def perform def perform
end end
end end

View file

@ -1,4 +1,9 @@
class SplatArgsWorker class SplatArgsWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
recurrence { hourly }
def perform(arg1, *args) def perform(arg1, *args)
end end
end end

View file

@ -18,6 +18,7 @@ class TestClock < Sidetiq::TestCase
def test_backfilling def test_backfilling
BackfillWorker.jobs.clear BackfillWorker.jobs.clear
Sidetiq.stubs(:workers).returns([BackfillWorker])
start = Sidetiq::Schedule::START_TIME start = Sidetiq::Schedule::START_TIME
BackfillWorker.stubs(:last_scheduled_occurrence).returns(start.to_f) BackfillWorker.stubs(:last_scheduled_occurrence).returns(start.to_f)
@ -32,10 +33,7 @@ class TestClock < Sidetiq::TestCase
end end
def test_enqueues_jobs_by_schedule def test_enqueues_jobs_by_schedule
schedule = Sidetiq::Schedule.new Sidetiq.stubs(:workers).returns([SimpleWorker])
schedule.daily
clock.stubs(:schedules).returns(SimpleWorker => schedule)
SimpleWorker.expects(:perform_at).times(10) SimpleWorker.expects(:perform_at).times(10)
@ -51,13 +49,11 @@ class TestClock < Sidetiq::TestCase
end end
def test_enqueues_jobs_with_default_last_tick_arg_on_first_run def test_enqueues_jobs_with_default_last_tick_arg_on_first_run
schedule = Sidetiq::Schedule.new
schedule.hourly
time = Time.local(2011, 1, 1, 1, 30) time = Time.local(2011, 1, 1, 1, 30)
clock.stubs(:gettime).returns(time, time + 3600) clock.stubs(:gettime).returns(time, time + 3600)
clock.stubs(:schedules).returns(LastTickWorker => schedule)
Sidetiq.stubs(:workers).returns([LastTickWorker])
expected_first_tick = time + 1800 expected_first_tick = time + 1800
expected_second_tick = expected_first_tick + 3600 expected_second_tick = expected_first_tick + 3600
@ -71,13 +67,11 @@ class TestClock < Sidetiq::TestCase
end end
def test_enqueues_jobs_with_last_run_timestamp_and_next_run_timestamp def test_enqueues_jobs_with_last_run_timestamp_and_next_run_timestamp
schedule = Sidetiq::Schedule.new
schedule.hourly
time = Time.local(2011, 1, 1, 1, 30) time = Time.local(2011, 1, 1, 1, 30)
clock.stubs(:gettime).returns(time, time + 3600) clock.stubs(:gettime).returns(time, time + 3600)
clock.stubs(:schedules).returns(LastAndScheduledTicksWorker => schedule)
Sidetiq.stubs(:workers).returns([LastAndScheduledTicksWorker])
expected_first_tick = time + 1800 expected_first_tick = time + 1800
expected_second_tick = expected_first_tick + 3600 expected_second_tick = expected_first_tick + 3600
@ -95,13 +89,11 @@ class TestClock < Sidetiq::TestCase
end end
def test_enqueues_jobs_with_last_run_timestamp_if_optional_argument def test_enqueues_jobs_with_last_run_timestamp_if_optional_argument
schedule = Sidetiq::Schedule.new
schedule.hourly
time = Time.local(2011, 1, 1, 1, 30) time = Time.local(2011, 1, 1, 1, 30)
clock.stubs(:gettime).returns(time, time + 3600) clock.stubs(:gettime).returns(time, time + 3600)
clock.stubs(:schedules).returns(OptionalArgumentWorker => schedule)
Sidetiq.stubs(:workers).returns([OptionalArgumentWorker])
expected_first_tick = time + 1800 expected_first_tick = time + 1800
@ -111,13 +103,11 @@ class TestClock < Sidetiq::TestCase
end end
def test_enqueues_jobs_correctly_for_splat_args_perform_methods def test_enqueues_jobs_correctly_for_splat_args_perform_methods
schedule = Sidetiq::Schedule.new
schedule.hourly
time = Time.local(2011, 1, 1, 1, 30) time = Time.local(2011, 1, 1, 1, 30)
clock.stubs(:gettime).returns(time, time + 3600) clock.stubs(:gettime).returns(time, time + 3600)
clock.stubs(:schedules).returns(SplatArgsWorker => schedule)
Sidetiq.stubs(:workers).returns([SplatArgsWorker])
expected_first_tick = time + 1800 expected_first_tick = time + 1800

View file

@ -4,13 +4,11 @@ class TestSidetiq < Sidetiq::TestCase
def test_schedules def test_schedules
schedules = Sidetiq.schedules schedules = Sidetiq.schedules
assert_equal 2, schedules.length assert_includes schedules, ScheduledWorker.schedule
assert_includes schedules, BackfillWorker.schedule
assert_includes schedules.keys, ScheduledWorker assert_kind_of Sidetiq::Schedule, ScheduledWorker.schedule
assert_includes schedules.keys, BackfillWorker assert_kind_of Sidetiq::Schedule, BackfillWorker.schedule
assert_kind_of Sidetiq::Schedule, schedules[ScheduledWorker]
assert_kind_of Sidetiq::Schedule, schedules[BackfillWorker]
end end
def test_workers def test_workers
@ -18,7 +16,6 @@ class TestSidetiq < Sidetiq::TestCase
assert_includes workers, ScheduledWorker assert_includes workers, ScheduledWorker
assert_includes workers, BackfillWorker assert_includes workers, BackfillWorker
assert_equal 2, workers.length
end end
def test_scheduled def test_scheduled

View file

@ -14,6 +14,7 @@ class TestWeb < Sidetiq::TestCase
def setup def setup
super super
ScheduledWorker.jobs.clear ScheduledWorker.jobs.clear
Sidetiq.stubs(:workers).returns([ScheduledWorker])
end end
def test_home_tab def test_home_tab
@ -27,7 +28,7 @@ class TestWeb < Sidetiq::TestCase
get '/sidetiq' get '/sidetiq'
assert_equal 200, last_response.status assert_equal 200, last_response.status
clock.schedules.each do |worker, schedule| Sidetiq.workers.each do |worker|
assert_match /#{worker.name}/, last_response.body assert_match /#{worker.name}/, last_response.body
assert_match /#{worker.get_sidekiq_options['queue']}/, last_response.body assert_match /#{worker.get_sidekiq_options['queue']}/, last_response.body
end end
@ -46,7 +47,7 @@ class TestWeb < Sidetiq::TestCase
def test_schedule_page def test_schedule_page
get "/sidetiq/ScheduledWorker/schedule" get "/sidetiq/ScheduledWorker/schedule"
assert_equal 200, last_response.status assert_equal 200, last_response.status
schedule = clock.schedules[ScheduledWorker] schedule = ScheduledWorker.schedule
schedule.recurrence_rules.each do |rule| schedule.recurrence_rules.each do |rule|
assert_match /#{rule.to_s}/, last_response.body assert_match /#{rule.to_s}/, last_response.body

View file

@ -24,6 +24,6 @@ class TestWorker < Sidetiq::TestCase
end end
def test_options def test_options
assert Sidetiq.schedules[BackfillWorker].backfill? assert BackfillWorker.schedule.backfill?
end end
end end