From dab7decb78280407e28876cabd60c2422b415b00 Mon Sep 17 00:00:00 2001 From: Tobias Svensson Date: Fri, 1 Feb 2013 16:53:34 +0000 Subject: [PATCH] Add a locking mechanism so jobs won't get enqueued more often than they should. --- README.md | 27 +++++++++++++++++++-------- lib/sidetiq/clock.rb | 41 +++++++++++++++++++++++++++++++++++------ test/helper.rb | 8 ++++++++ test/test_clock.rb | 31 ++++++++++++++++++++++++++++--- test/test_config.rb | 2 +- test/test_errors.rb | 2 +- test/test_schedule.rb | 2 +- test/test_sidetiq.rb | 37 ------------------------------------- test/test_version.rb | 2 +- test/test_web.rb | 2 +- 10 files changed, 95 insertions(+), 59 deletions(-) delete mode 100644 test/test_sidetiq.rb diff --git a/README.md b/README.md index b787d01..0ca4f9a 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,14 @@ Recurring jobs for [Sidekiq](http://mperham.github.com/sidekiq/). Sidetiq provides a simple API for defining recurring workers for Sidekiq. - Flexible DSL based on [ice_cube](http://seejohnrun.github.com/ice_cube/) + - High-resolution timer using `clock_gettime(3)` (or `mach_absolute_time()` on Apple Mac OS X), allowing for accurate sub-second clock ticks. +- Sidetiq uses a locking mechanism (based on `setnx` and `pexpire`) internally + so Sidetiq clocks can run in each Sidekiq process without interfering with + each other. + ## DEPENDENCIES - [Sidekiq](http://mperham.github.com/sidekiq/) @@ -38,8 +43,8 @@ class MyWorker include Sidekiq::Worker include Sidetiq::Schedulable - # Every other month on the first monday and last tuesday at 12 o'clock. - tiq { monthly(2).day_of_week(1 => [1], 2 => [-1]).hour_of_day(12) } + # Daily at midnight + tiq { daily } end ``` @@ -60,6 +65,18 @@ class MyWorker end ``` +Or complex schedules: + +```ruby +class MyWorker + include Sidekiq::Worker + include Sidetiq::Schedulable + + # Every other month on the first monday and last tuesday at 12 o'clock. + tiq { monthly(2).day_of_week(1 => [1], 2 => [-1]).hour_of_day(12) } +end +``` + The first time the tiq method is called, Sidetiq will automatically spin up it's clock thread and enqueue jobs for their next occurrence using `#perform_at`. Note that by default Sidekiq only polls every 15 seconds. @@ -90,12 +107,6 @@ require 'sidetiq/web' ![Screenshot](http://f.cl.ly/items/1P2u1v091F3V1n381g2I/Screen%20Shot%202013-02-01%20at%2012.16.17.png) -## CONSIDERATIONS - -If workers are spread across multiple machines multiple jobs might be enqueued -at the same time. This can be avoided by using a locking library for Sidekiq, -such as [sidekiq-unique-jobs](https://github.com/form26/sidekiq-unique-jobs). - ## CONTRIBUTE If you'd like to contribute to Sidetiq, start by forking my repo on GitHub: diff --git a/lib/sidetiq/clock.rb b/lib/sidetiq/clock.rb index 20089ce..eb79268 100644 --- a/lib/sidetiq/clock.rb +++ b/lib/sidetiq/clock.rb @@ -1,13 +1,16 @@ module Sidetiq configure do |config| config.priority = Thread.main.priority - config.resolution = 0.2 + config.resolution = 1 + config.lock_expire = 1000 end class Clock include Singleton include MonitorMixin + START_TIME = Time.local(2010, 1, 1) + attr_reader :schedules def initialize @@ -17,18 +20,15 @@ module Sidetiq end def schedule_for(worker) - schedules[worker] ||= Sidetiq::Schedule.new + schedules[worker] ||= Sidetiq::Schedule.new(START_TIME) end def tick @tick = gettime - synchronize do schedules.each do |worker, schedule| if schedule.schedule_next?(@tick) - occurrence = schedule.next_occurrence - Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{occurrence.to_s})" - worker.perform_at(occurrence) + enqueue(worker, schedule.next_occurrence(@tick)) end end end @@ -36,6 +36,35 @@ module Sidetiq private + def enqueue(worker, time) + key = "sidetiq:#{worker.name}" + + synchronize_clockworks("#{key}:lock") do |redis| + status = redis.get(key) + + if status.nil? || status.to_f < time.to_f + time_f = time.to_f + Sidekiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f})" + redis.set(key, time_f) + worker.perform_at(time) + end + end + end + + def synchronize_clockworks(lock) + Sidekiq.redis do |redis| + if redis.setnx(lock, 1) + Sidekiq.logger.debug "Sidetiq::Clock lock #{lock} #{Thread.current.inspect}" + + redis.pexpire(lock, Sidetiq.config.lock_expire) + yield redis + redis.del(lock) + + Sidekiq.logger.debug "Sidetiq::Clock unlock #{lock} #{Thread.current.inspect}" + end + end + end + def start! Sidekiq.logger.info "Sidetiq::Clock start" thr = Thread.start { clock { tick } } diff --git a/test/helper.rb b/test/helper.rb index 70cdb2e..e731f2b 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -4,6 +4,7 @@ SimpleCov.start { add_filter "/test/" } require 'minitest/autorun' require 'mocha/setup' require 'rack/test' +require 'mock_redis' require 'sidekiq' require 'sidekiq/testing' @@ -20,3 +21,10 @@ end # Keep the test output clean Sidekiq.logger = Logger.new(nil) + +class Sidetiq::TestCase < MiniTest::Unit::TestCase + def setup + Sidekiq.redis { |r| r.flushall } + end +end + diff --git a/test/test_clock.rb b/test/test_clock.rb index e0dfdbd..d5804e2 100644 --- a/test/test_clock.rb +++ b/test/test_clock.rb @@ -1,12 +1,37 @@ require_relative 'helper' -class TestClock < MiniTest::Unit::TestCase +class TestClock < Sidetiq::TestCase + def clock + @clock ||= Sidetiq::Clock.instance + end + def test_gettime_seconds - assert_equal Sidetiq::Clock.instance.gettime.tv_sec, Time.now.tv_sec + assert_equal clock.gettime.tv_sec, Time.now.tv_sec end def test_gettime_nsec - refute_nil Sidetiq::Clock.instance.gettime.tv_nsec + refute_nil clock.gettime.tv_nsec + end + + class FakeWorker; end + + def test_enqueues_jobs_by_schedule + schedule = Sidetiq::Schedule.new(Sidetiq::Clock::START_TIME) + schedule.daily + + clock.stubs(:schedules).returns(FakeWorker => schedule) + + FakeWorker.expects(:perform_at).times(10) + + 10.times do |i| + clock.stubs(:gettime).returns(Time.local(2011, 1, i + 1, 1)) + clock.tick + end + + clock.stubs(:gettime).returns(Time.local(2011, 1, 10, 2)) + clock.tick + clock.tick + clock.tick end end diff --git a/test/test_config.rb b/test/test_config.rb index 0de0129..936230d 100644 --- a/test/test_config.rb +++ b/test/test_config.rb @@ -1,6 +1,6 @@ require_relative 'helper' -class TestConfig < MiniTest::Unit::TestCase +class TestConfig < Sidetiq::TestCase def setup @saved = Sidetiq.config Sidetiq.config = OpenStruct.new diff --git a/test/test_errors.rb b/test/test_errors.rb index 6768c34..a8c919b 100644 --- a/test/test_errors.rb +++ b/test/test_errors.rb @@ -1,6 +1,6 @@ require_relative 'helper' -class TestErrors < MiniTest::Unit::TestCase +class TestErrors < Sidetiq::TestCase def test_error_superclass assert_equal StandardError, Sidetiq::Error.superclass end diff --git a/test/test_schedule.rb b/test/test_schedule.rb index c7651e6..4880233 100644 --- a/test/test_schedule.rb +++ b/test/test_schedule.rb @@ -1,6 +1,6 @@ require_relative 'helper' -class TestSchedule < MiniTest::Unit::TestCase +class TestSchedule < Sidetiq::TestCase def test_super assert_equal IceCube::Schedule, Sidetiq::Schedule.superclass end diff --git a/test/test_sidetiq.rb b/test/test_sidetiq.rb deleted file mode 100644 index 36c8c21..0000000 --- a/test/test_sidetiq.rb +++ /dev/null @@ -1,37 +0,0 @@ -require_relative 'helper' - -class TestSidetiq < MiniTest::Unit::TestCase - class Worker - include Sidekiq::Worker - include Sidetiq::Schedulable - - tiq do - daily.hour_of_day(0) - end - end - - def clock - @clock ||= Sidetiq::Clock.instance - end - - def tick - clock.tick - end - - def test_scheduling - assert_equal 0, Worker.jobs.size # sanity - - clock.stubs(:gettime).returns(Time.now + (24 * 60 * 60)) - tick - assert_equal 1, Worker.jobs.size - - clock.stubs(:gettime).returns(Time.now + (2 * 24 * 60 * 60)) - tick - assert_equal 2, Worker.jobs.size - - clock.stubs(:gettime).returns(Time.now + (2 * 24 * 60 * 60 + 1)) - tick - assert_equal 2, Worker.jobs.size - end -end - diff --git a/test/test_version.rb b/test/test_version.rb index 58b3b50..7d8d33b 100644 --- a/test/test_version.rb +++ b/test/test_version.rb @@ -1,6 +1,6 @@ require_relative 'helper' -class TestVersion < MiniTest::Unit::TestCase +class TestVersion < Sidetiq::TestCase def test_major assert_instance_of Fixnum, Sidetiq::VERSION::MAJOR end diff --git a/test/test_web.rb b/test/test_web.rb index a7a32c2..a339daf 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -1,6 +1,6 @@ require_relative 'helper' -class TestWeb < MiniTest::Unit::TestCase +class TestWeb < Sidetiq::TestCase include Rack::Test::Methods class Worker