mirror of
https://github.com/endofunky/sidetiq.git
synced 2022-11-09 13:53:30 -05:00
Better protection against stale locks and race-conditions.
Locking is now done using WATCH/MULTI/EXEC/UNWATCH and additionally includes a host and process specific identifier to prevent accidental unlocks from other Sidekiq processes.
This commit is contained in:
parent
8f9592d7d8
commit
bc726c6552
4 changed files with 98 additions and 19 deletions
|
@ -2,6 +2,7 @@
|
|||
require 'monitor'
|
||||
require 'ostruct'
|
||||
require 'singleton'
|
||||
require 'socket'
|
||||
|
||||
# gems
|
||||
require 'ice_cube'
|
||||
|
@ -10,6 +11,7 @@ require 'sidekiq'
|
|||
# internal
|
||||
require 'sidetiq/config'
|
||||
require 'sidetiq/clock'
|
||||
require 'sidetiq/lock'
|
||||
require 'sidetiq/middleware'
|
||||
require 'sidetiq/schedule'
|
||||
require 'sidetiq/schedulable'
|
||||
|
|
|
@ -52,7 +52,7 @@ module Sidetiq
|
|||
tick = gettime
|
||||
mon_synchronize do
|
||||
schedules.each do |worker, sched|
|
||||
synchronize_clockworks(worker) do |redis|
|
||||
Lock.new(worker).synchronize do |redis|
|
||||
if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0
|
||||
last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last)
|
||||
sched.occurrences_between(last + 1, tick).each do |past_t|
|
||||
|
@ -150,24 +150,6 @@ module Sidetiq
|
|||
end
|
||||
end
|
||||
|
||||
def synchronize_clockworks(klass)
|
||||
lock = "sidetiq:#{klass.name}:lock"
|
||||
|
||||
Sidekiq.redis do |redis|
|
||||
if redis.setnx(lock, 1)
|
||||
Sidetiq.logger.debug "Sidetiq::Clock lock #{lock}"
|
||||
|
||||
begin
|
||||
redis.pexpire(lock, Sidetiq.config.lock_expire)
|
||||
yield redis
|
||||
ensure
|
||||
redis.del(lock)
|
||||
Sidetiq.logger.debug "Sidetiq::Clock unlock #{lock}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def clock
|
||||
loop do
|
||||
sleep_time = time { yield }
|
||||
|
|
65
lib/sidetiq/lock.rb
Normal file
65
lib/sidetiq/lock.rb
Normal file
|
@ -0,0 +1,65 @@
|
|||
module Sidetiq
|
||||
class Lock # :nodoc: all
|
||||
attr_reader :key, :timeout
|
||||
|
||||
OWNER = "#{Socket.gethostname}:#{Process.pid}"
|
||||
|
||||
def initialize(key, timeout = Sidetiq.config.lock_expire)
|
||||
@key = key.kind_of?(Class) ? "sidetiq:#{key.name}:lock" : "sidetiq:#{key}:lock"
|
||||
@timeout = timeout
|
||||
end
|
||||
|
||||
def synchronize
|
||||
Sidekiq.redis do |redis|
|
||||
if lock(redis)
|
||||
|
||||
begin
|
||||
yield redis
|
||||
ensure
|
||||
unlock(redis)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def lock(redis)
|
||||
acquired = false
|
||||
|
||||
watch(redis, key) do
|
||||
if !redis.exists(key)
|
||||
acquired = !!redis.multi do |multi|
|
||||
multi.psetex(key, timeout, OWNER)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Sidetiq.logger.info "Sidetiq::Clock lock #{key}" if acquired
|
||||
|
||||
acquired
|
||||
end
|
||||
|
||||
def unlock(redis)
|
||||
watch(redis, key) do
|
||||
if redis.get(key) == OWNER
|
||||
redis.multi do |multi|
|
||||
multi.del(key)
|
||||
end
|
||||
|
||||
Sidetiq.logger.info "Sidetiq::Clock unlock #{key}"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def watch(redis, *args)
|
||||
redis.watch(*args)
|
||||
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
redis.unwatch
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
30
test/test_lock.rb
Normal file
30
test/test_lock.rb
Normal file
|
@ -0,0 +1,30 @@
|
|||
require_relative 'helper'
|
||||
|
||||
class TestLock < Sidetiq::TestCase
|
||||
def test_locking
|
||||
lock_name = SecureRandom.hex(8)
|
||||
key = SecureRandom.hex(8)
|
||||
|
||||
Sidekiq.redis do |redis|
|
||||
redis.set(key, 0)
|
||||
|
||||
5.times.map do
|
||||
Thread.start do
|
||||
locked(lock_name) do |r|
|
||||
sleep 0.1
|
||||
r.incr(key)
|
||||
end
|
||||
end
|
||||
end.each(&:join)
|
||||
|
||||
assert_equal "1", redis.get(key)
|
||||
end
|
||||
end
|
||||
|
||||
def locked(lock_name)
|
||||
Sidetiq::Lock.new(lock_name).synchronize do |redis|
|
||||
yield redis
|
||||
end
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in a new issue