1
0
Fork 0
mirror of https://github.com/endofunky/sidetiq.git synced 2022-11-09 13:53:30 -05:00

Add backfill functionality.

This commit is contained in:
Tobias Svensson 2013-03-11 17:11:37 +00:00
parent e597658f7e
commit 6cc8fdadc6
10 changed files with 149 additions and 34 deletions

View file

@ -6,6 +6,7 @@ HEAD
- Add `Sidetiq.scheduled`.
- Add `Sidetiq.retries`.
- Add `Sidetiq.logger`. This defaults to the Sidekiq logger.
- Add support for job backfills.
- Clean up tests.
- Sidetiq::Schedule no longer inherits from IceCube::Schedule.

View file

@ -13,6 +13,7 @@ Table Of Contents
* [Dependencies](#section_Dependencies)
* [Installation](#section_Installation)
* [Introduction](#section_Introduction)
* [Backfills](#section_Backfills)
* [Configuration](#section_Configuration)
* [Logging](#section_Configuration_Logging)
* [API](#section_API)
@ -148,6 +149,32 @@ end
Additionally, Sidetiq includes a middleware that will check if the clock
thread is still alive and restart it if necessary.
<a name='section_Backfills''></a>
Backfills
---------
In certain cases it is desirable that missed jobs will be enqueued
retroactively, for example when a critical, hourly job isn't run due to
server downtime. To solve this, `#tiq` takes a *backfill* option. If
missing job occurrences have been detected, Sidetiq will then enqueue
the jobs automatically. It will also ensure that the timestamps passed to
`#perform` are as expected:
```ruby
class MyWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq backfill: true do
hourly
end
def perform(last_occurrence, current_occurrence)
# do stuff ...
end
end
```
<a name='section_Configuration'></a>
Configuration
-------------

View file

@ -51,10 +51,16 @@ module Sidetiq
def tick
tick = gettime
synchronize do
schedules.each do |worker, schedule|
if schedule.schedule_next?(tick)
enqueue(worker, schedule.next_occurrence(tick))
end
schedules.each do |worker, sched|
synchronize_clockworks(worker) do |redis|
if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0
last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last)
sched.occurrences_between(last + 1, tick).each do |past_t|
enqueue(worker, past_t, redis)
end
end
enqueue(worker, sched.next_occurrence(tick), redis)
end if sched.schedule_next?(tick)
end
end
end
@ -126,27 +132,26 @@ module Sidetiq
private
def enqueue(worker, time)
key = "sidetiq:#{worker.name}"
time_f = time.to_f
def enqueue(worker, time, redis)
key = "sidetiq:#{worker.name}"
time_f = time.to_f
next_run = (redis.get("#{key}:next") || -1).to_f
synchronize_clockworks("#{key}:lock") do |redis|
next_run = (redis.get("#{key}:next") || -1).to_f
if next_run < time_f
Sidetiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})"
if next_run < time_f
Sidetiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})"
redis.mset("#{key}:last", next_run, "#{key}:next", time_f)
redis.mset("#{key}:last", next_run, "#{key}:next", time_f)
arity = [worker.instance_method(:perform).arity - 1, -1].max
args = [next_run, time_f][0..arity]
arity = [worker.instance_method(:perform).arity - 1, -1].max
args = [next_run, time_f][0..arity]
worker.perform_at(time, *args)
end
worker.perform_at(time, *args)
end
end
def synchronize_clockworks(lock)
def synchronize_clockworks(klass)
lock = "sidetiq:#{klass.name}:lock"
Sidekiq.redis do |redis|
if redis.setnx(lock, 1)
Sidetiq.logger.debug "Sidetiq::Clock lock #{lock}"

View file

@ -4,12 +4,12 @@ module Sidetiq
# Examples
#
# class MyWorker
# include Sidekiq::Worker
# include Sidetiq::Schedulable
# include Sidekiq::Worker
# include Sidetiq::Schedulable
#
# # Daily at midnight
# tiq { daily }
# end
# # Daily at midnight
# tiq { daily }
# end
module Schedulable
module ClassMethods
# Public: Returns a Float timestamp of the last scheduled run.
@ -22,14 +22,16 @@ module Sidetiq
get_timestamp "next"
end
def tiq(&block) # :nodoc:
def tiq(options = {}, &block) # :nodoc:
clock = Sidetiq::Clock.instance
clock.synchronize do
clock.schedule_for(self).instance_eval(&block)
schedule = clock.schedule_for(self)
schedule.instance_eval(&block)
schedule.set_options(options)
end
end
private
private
def get_timestamp(key)
Sidekiq.redis do |redis|

View file

@ -1,6 +1,12 @@
module Sidetiq
# Public: Recurrence schedule.
class Schedule
# :nodoc:
attr_reader :last_occurrence
# Public: Writer for backfilling option.
attr_writer :backfill
# Public: Start time offset from epoch used for calculating run
# times in the Sidetiq schedules.
START_TIME = Sidetiq.config.utc ? Time.utc(2010, 1, 1) : Time.local(2010, 1, 1)
@ -54,6 +60,16 @@ module Sidetiq
def to_s
@schedule.to_s
end
# Public: Inquirer for backfilling option.
def backfill?
!!@backfill
end
# Internal: Set schedule options.
def set_options(hash)
self.backfill = hash[:backfill] if !hash[:backfill].nil?
end
end
end

11
test/fixtures/backfill_worker.rb vendored Normal file
View file

@ -0,0 +1,11 @@
class BackfillWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq backfill: true do
daily
end
def perform
end
end

View file

@ -40,6 +40,21 @@ class TestClock < Sidetiq::TestCase
Sidetiq.config.utc = false
end
def test_backfilling
BackfillWorker.jobs.clear
start = Sidetiq::Schedule::START_TIME
BackfillWorker.stubs(:last_scheduled_occurrence).returns(start.to_f)
clock.stubs(:gettime).returns(start)
clock.tick
BackfillWorker.jobs.clear
clock.stubs(:gettime).returns(start + 86400 * 10 + 1)
clock.tick
assert_equal 10, BackfillWorker.jobs.length
end
def test_enqueues_jobs_by_schedule
schedule = Sidetiq::Schedule.new
schedule.daily
@ -72,7 +87,8 @@ class TestClock < Sidetiq::TestCase
expected_second_tick = expected_first_tick + 3600
LastTickWorker.expects(:perform_at).with(expected_first_tick, -1).once
LastTickWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f).once
LastTickWorker.expects(:perform_at).with(expected_second_tick,
expected_first_tick.to_f).once
clock.tick
clock.tick
@ -90,10 +106,15 @@ class TestClock < Sidetiq::TestCase
expected_first_tick = time + 1800
expected_second_tick = expected_first_tick + 3600
LastAndScheduledTicksWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once
LastAndScheduledTicksWorker.expects(:perform_at)
.with(expected_first_tick, -1, expected_first_tick.to_f).once
clock.tick
LastAndScheduledTicksWorker.expects(:perform_at).with(expected_second_tick, expected_first_tick.to_f, expected_second_tick.to_f).once
LastAndScheduledTicksWorker.expects(:perform_at)
.with(expected_second_tick, expected_first_tick.to_f,
expected_second_tick.to_f).once
clock.tick
end
@ -108,7 +129,8 @@ class TestClock < Sidetiq::TestCase
expected_first_tick = time + 1800
SplatArgsWorker.expects(:perform_at).with(expected_first_tick, -1, expected_first_tick.to_f).once
SplatArgsWorker.expects(:perform_at)
.with(expected_first_tick, -1, expected_first_tick.to_f).once
clock.tick
end
end

View file

@ -17,5 +17,22 @@ class TestSchedule < Sidetiq::TestCase
assert sched.schedule_next?(Time.now + (2 * 24 * 60 * 60))
refute sched.schedule_next?(Time.now + (2 * 24 * 60 * 60))
end
def test_backfill
sched = Sidetiq::Schedule.new
refute sched.backfill?
sched.backfill = true
assert sched.backfill?
end
def test_set_options
sched = Sidetiq::Schedule.new
sched.set_options(backfill: true)
assert sched.backfill?
sched.set_options(backfill: false)
refute sched.backfill?
end
end

View file

@ -2,13 +2,23 @@ require_relative 'helper'
class TestSidetiq < Sidetiq::TestCase
def test_schedules
assert_equal Sidetiq.schedules, Sidetiq::Clock.schedules
assert_equal [ScheduledWorker], Sidetiq.schedules.keys
assert_kind_of Sidetiq::Schedule, Sidetiq.schedules[ScheduledWorker]
schedules = Sidetiq.schedules
assert_equal 2, schedules.length
assert_includes schedules.keys, ScheduledWorker
assert_includes schedules.keys, BackfillWorker
assert_kind_of Sidetiq::Schedule, schedules[ScheduledWorker]
assert_kind_of Sidetiq::Schedule, schedules[BackfillWorker]
end
def test_workers
assert_equal [ScheduledWorker], Sidetiq.workers
workers = Sidetiq.workers
assert_includes workers, ScheduledWorker
assert_includes workers, BackfillWorker
assert_equal 2, workers.length
end
def test_scheduled

View file

@ -22,4 +22,8 @@ class TestWorker < Sidetiq::TestCase
assert FakeWorker.last_scheduled_occurrence == last_run
assert FakeWorker.next_scheduled_occurrence == next_run
end
def test_options
assert Sidetiq.schedules[BackfillWorker].backfill?
end
end