diff --git a/lib/sidetiq.rb b/lib/sidetiq.rb index 3e155c2..231621d 100644 --- a/lib/sidetiq.rb +++ b/lib/sidetiq.rb @@ -2,6 +2,7 @@ require 'monitor' require 'ostruct' require 'singleton' +require 'socket' # gems require 'ice_cube' @@ -10,6 +11,7 @@ require 'sidekiq' # internal require 'sidetiq/config' require 'sidetiq/clock' +require 'sidetiq/lock' require 'sidetiq/middleware' require 'sidetiq/schedule' require 'sidetiq/schedulable' diff --git a/lib/sidetiq/clock.rb b/lib/sidetiq/clock.rb index 77842ae..ce89412 100644 --- a/lib/sidetiq/clock.rb +++ b/lib/sidetiq/clock.rb @@ -52,7 +52,7 @@ module Sidetiq tick = gettime mon_synchronize do schedules.each do |worker, sched| - synchronize_clockworks(worker) do |redis| + Lock.new(worker).synchronize do |redis| if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0 last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last) sched.occurrences_between(last + 1, tick).each do |past_t| @@ -150,24 +150,6 @@ module Sidetiq end end - def synchronize_clockworks(klass) - lock = "sidetiq:#{klass.name}:lock" - - Sidekiq.redis do |redis| - if redis.setnx(lock, 1) - Sidetiq.logger.debug "Sidetiq::Clock lock #{lock}" - - begin - redis.pexpire(lock, Sidetiq.config.lock_expire) - yield redis - ensure - redis.del(lock) - Sidetiq.logger.debug "Sidetiq::Clock unlock #{lock}" - end - end - end - end - def clock loop do sleep_time = time { yield } diff --git a/lib/sidetiq/lock.rb b/lib/sidetiq/lock.rb new file mode 100644 index 0000000..119c1b1 --- /dev/null +++ b/lib/sidetiq/lock.rb @@ -0,0 +1,65 @@ +module Sidetiq + class Lock # :nodoc: all + attr_reader :key, :timeout + + OWNER = "#{Socket.gethostname}:#{Process.pid}" + + def initialize(key, timeout = Sidetiq.config.lock_expire) + @key = key.kind_of?(Class) ? "sidetiq:#{key.name}:lock" : "sidetiq:#{key}:lock" + @timeout = timeout + end + + def synchronize + Sidekiq.redis do |redis| + if lock(redis) + + begin + yield redis + ensure + unlock(redis) + end + end + end + end + + private + + def lock(redis) + acquired = false + + watch(redis, key) do + if !redis.exists(key) + acquired = !!redis.multi do |multi| + multi.psetex(key, timeout, OWNER) + end + end + end + + Sidetiq.logger.info "Sidetiq::Clock lock #{key}" if acquired + + acquired + end + + def unlock(redis) + watch(redis, key) do + if redis.get(key) == OWNER + redis.multi do |multi| + multi.del(key) + end + + Sidetiq.logger.info "Sidetiq::Clock unlock #{key}" + end + end + end + + def watch(redis, *args) + redis.watch(*args) + + begin + yield + ensure + redis.unwatch + end + end + end +end diff --git a/test/test_lock.rb b/test/test_lock.rb new file mode 100644 index 0000000..3ab2505 --- /dev/null +++ b/test/test_lock.rb @@ -0,0 +1,30 @@ +require_relative 'helper' + +class TestLock < Sidetiq::TestCase + def test_locking + lock_name = SecureRandom.hex(8) + key = SecureRandom.hex(8) + + Sidekiq.redis do |redis| + redis.set(key, 0) + + 5.times.map do + Thread.start do + locked(lock_name) do |r| + sleep 0.1 + r.incr(key) + end + end + end.each(&:join) + + assert_equal "1", redis.get(key) + end + end + + def locked(lock_name) + Sidetiq::Lock.new(lock_name).synchronize do |redis| + yield redis + end + end +end +