Add a locking mechanism so jobs won't get enqueued more often than they should.

This commit is contained in:
Tobias Svensson 2013-02-01 16:53:34 +00:00
parent 9d0c33ac27
commit dab7decb78
10 changed files with 95 additions and 59 deletions

View File

@ -10,9 +10,14 @@ Recurring jobs for [Sidekiq](http://mperham.github.com/sidekiq/).
Sidetiq provides a simple API for defining recurring workers for Sidekiq.
- Flexible DSL based on [ice_cube](http://seejohnrun.github.com/ice_cube/)
- High-resolution timer using `clock_gettime(3)` (or `mach_absolute_time()` on
Apple Mac OS X), allowing for accurate sub-second clock ticks.
- Sidetiq uses a locking mechanism (based on `setnx` and `pexpire`) internally
so Sidetiq clocks can run in each Sidekiq process without interfering with
each other.
## DEPENDENCIES
- [Sidekiq](http://mperham.github.com/sidekiq/)
@ -38,8 +43,8 @@ class MyWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
# Every other month on the first monday and last tuesday at 12 o'clock.
tiq { monthly(2).day_of_week(1 => [1], 2 => [-1]).hour_of_day(12) }
# Daily at midnight
tiq { daily }
end
```
@ -60,6 +65,18 @@ class MyWorker
end
```
Or complex schedules:
```ruby
class MyWorker
include Sidekiq::Worker
include Sidetiq::Schedulable
# Every other month on the first monday and last tuesday at 12 o'clock.
tiq { monthly(2).day_of_week(1 => [1], 2 => [-1]).hour_of_day(12) }
end
```
The first time the tiq method is called, Sidetiq will automatically spin up
it's clock thread and enqueue jobs for their next occurrence using
`#perform_at`. Note that by default Sidekiq only polls every 15 seconds.
@ -90,12 +107,6 @@ require 'sidetiq/web'
![Screenshot](http://f.cl.ly/items/1P2u1v091F3V1n381g2I/Screen%20Shot%202013-02-01%20at%2012.16.17.png)
## CONSIDERATIONS
If workers are spread across multiple machines multiple jobs might be enqueued
at the same time. This can be avoided by using a locking library for Sidekiq,
such as [sidekiq-unique-jobs](https://github.com/form26/sidekiq-unique-jobs).
## CONTRIBUTE
If you'd like to contribute to Sidetiq, start by forking my repo on GitHub:

View File

@ -1,13 +1,16 @@
module Sidetiq
configure do |config|
config.priority = Thread.main.priority
config.resolution = 0.2
config.resolution = 1
config.lock_expire = 1000
end
class Clock
include Singleton
include MonitorMixin
START_TIME = Time.local(2010, 1, 1)
attr_reader :schedules
def initialize
@ -17,18 +20,15 @@ module Sidetiq
end
def schedule_for(worker)
schedules[worker] ||= Sidetiq::Schedule.new
schedules[worker] ||= Sidetiq::Schedule.new(START_TIME)
end
def tick
@tick = gettime
synchronize do
schedules.each do |worker, schedule|
if schedule.schedule_next?(@tick)
occurrence = schedule.next_occurrence
Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{occurrence.to_s})"
worker.perform_at(occurrence)
enqueue(worker, schedule.next_occurrence(@tick))
end
end
end
@ -36,6 +36,35 @@ module Sidetiq
private
def enqueue(worker, time)
key = "sidetiq:#{worker.name}"
synchronize_clockworks("#{key}:lock") do |redis|
status = redis.get(key)
if status.nil? || status.to_f < time.to_f
time_f = time.to_f
Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f})"
redis.set(key, time_f)
worker.perform_at(time)
end
end
end
def synchronize_clockworks(lock)
Sidekiq.redis do |redis|
if redis.setnx(lock, 1)
Sidekiq.logger.debug "Sidetiq::Clock lock #{lock} #{Thread.current.inspect}"
redis.pexpire(lock, Sidetiq.config.lock_expire)
yield redis
redis.del(lock)
Sidekiq.logger.debug "Sidetiq::Clock unlock #{lock} #{Thread.current.inspect}"
end
end
end
def start!
Sidekiq.logger.info "Sidetiq::Clock start"
thr = Thread.start { clock { tick } }

View File

@ -4,6 +4,7 @@ SimpleCov.start { add_filter "/test/" }
require 'minitest/autorun'
require 'mocha/setup'
require 'rack/test'
require 'mock_redis'
require 'sidekiq'
require 'sidekiq/testing'
@ -20,3 +21,10 @@ end
# Keep the test output clean
Sidekiq.logger = Logger.new(nil)
class Sidetiq::TestCase < MiniTest::Unit::TestCase
def setup
Sidekiq.redis { |r| r.flushall }
end
end

View File

@ -1,12 +1,37 @@
require_relative 'helper'
class TestClock < MiniTest::Unit::TestCase
class TestClock < Sidetiq::TestCase
def clock
@clock ||= Sidetiq::Clock.instance
end
def test_gettime_seconds
assert_equal Sidetiq::Clock.instance.gettime.tv_sec, Time.now.tv_sec
assert_equal clock.gettime.tv_sec, Time.now.tv_sec
end
def test_gettime_nsec
refute_nil Sidetiq::Clock.instance.gettime.tv_nsec
refute_nil clock.gettime.tv_nsec
end
class FakeWorker; end
def test_enqueues_jobs_by_schedule
schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME)
schedule.daily
clock.stubs(:schedules).returns(FakeWorker => schedule)
FakeWorker.expects(:perform_at).times(10)
10.times do |i|
clock.stubs(:gettime).returns(Time.local(2011, 1, i + 1, 1))
clock.tick
end
clock.stubs(:gettime).returns(Time.local(2011, 1, 10, 2))
clock.tick
clock.tick
clock.tick
end
end

View File

@ -1,6 +1,6 @@
require_relative 'helper'
class TestConfig < MiniTest::Unit::TestCase
class TestConfig < Sidetiq::TestCase
def setup
@saved = Sidetiq.config
Sidetiq.config = OpenStruct.new

View File

@ -1,6 +1,6 @@
require_relative 'helper'
class TestErrors < MiniTest::Unit::TestCase
class TestErrors < Sidetiq::TestCase
def test_error_superclass
assert_equal StandardError, Sidetiq::Error.superclass
end

View File

@ -1,6 +1,6 @@
require_relative 'helper'
class TestSchedule < MiniTest::Unit::TestCase
class TestSchedule < Sidetiq::TestCase
def test_super
assert_equal IceCube::Schedule, Sidetiq::Schedule.superclass
end

View File

@ -1,37 +0,0 @@
require_relative 'helper'
class TestSidetiq < MiniTest::Unit::TestCase
class Worker
include Sidekiq::Worker
include Sidetiq::Schedulable
tiq do
daily.hour_of_day(0)
end
end
def clock
@clock ||= Sidetiq::Clock.instance
end
def tick
clock.tick
end
def test_scheduling
assert_equal 0, Worker.jobs.size # sanity
clock.stubs(:gettime).returns(Time.now + (24 * 60 * 60))
tick
assert_equal 1, Worker.jobs.size
clock.stubs(:gettime).returns(Time.now + (2 * 24 * 60 * 60))
tick
assert_equal 2, Worker.jobs.size
clock.stubs(:gettime).returns(Time.now + (2 * 24 * 60 * 60 + 1))
tick
assert_equal 2, Worker.jobs.size
end
end

View File

@ -1,6 +1,6 @@
require_relative 'helper'
class TestVersion < MiniTest::Unit::TestCase
class TestVersion < Sidetiq::TestCase
def test_major
assert_instance_of Fixnum, Sidetiq::VERSION::MAJOR
end

View File

@ -1,6 +1,6 @@
require_relative 'helper'
class TestWeb < MiniTest::Unit::TestCase
class TestWeb < Sidetiq::TestCase
include Rack::Test::Methods
class Worker