mirror of
https://github.com/endofunky/sidetiq.git
synced 2022-11-09 13:53:30 -05:00
Store more detailed lock metadata.
This commit is contained in:
parent
1896af87bd
commit
0291e6981f
7 changed files with 213 additions and 68 deletions
|
@ -14,7 +14,8 @@ require 'sidetiq/logging'
|
|||
require 'sidetiq/api'
|
||||
require 'sidetiq/clock'
|
||||
require 'sidetiq/handler'
|
||||
require 'sidetiq/lock'
|
||||
require 'sidetiq/lock/meta_data'
|
||||
require 'sidetiq/lock/redis'
|
||||
require 'sidetiq/schedule'
|
||||
require 'sidetiq/schedulable'
|
||||
require 'sidetiq/version'
|
||||
|
|
|
@ -6,7 +6,7 @@ module Sidetiq
|
|||
def dispatch(worker, sched, tick)
|
||||
return unless sched.schedule_next?(tick)
|
||||
|
||||
Lock.new(worker).synchronize do |redis|
|
||||
Lock::Redis.new(worker).synchronize do |redis|
|
||||
if sched.backfill? && (last = worker.last_scheduled_occurrence) > 0
|
||||
last = Sidetiq.config.utc ? Time.at(last).utc : Time.at(last)
|
||||
sched.occurrences_between(last + 1, tick).each do |past_t|
|
||||
|
|
|
@ -1,65 +0,0 @@
|
|||
module Sidetiq
|
||||
class Lock # :nodoc: all
|
||||
include Logging
|
||||
|
||||
attr_reader :key, :timeout
|
||||
|
||||
OWNER = "#{Socket.gethostname}:#{Process.pid}"
|
||||
|
||||
def initialize(key, timeout = Sidetiq.config.lock_expire)
|
||||
@key = key.kind_of?(Class) ? "sidetiq:#{key.name}:lock" : "sidetiq:#{key}:lock"
|
||||
@timeout = timeout
|
||||
end
|
||||
|
||||
def synchronize
|
||||
Sidekiq.redis do |redis|
|
||||
if lock(redis)
|
||||
debug "Sidetiq::Clock lock #{key}"
|
||||
|
||||
begin
|
||||
yield redis
|
||||
ensure
|
||||
unlock(redis)
|
||||
debug "Sidetiq::Clock unlock #{key}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def lock(redis)
|
||||
acquired = false
|
||||
|
||||
watch(redis, key) do
|
||||
if !redis.exists(key)
|
||||
acquired = !!redis.multi do |multi|
|
||||
multi.psetex(key, timeout, OWNER)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
acquired
|
||||
end
|
||||
|
||||
def unlock(redis)
|
||||
watch(redis, key) do
|
||||
if redis.get(key) == OWNER
|
||||
redis.multi do |multi|
|
||||
multi.del(key)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def watch(redis, *args)
|
||||
redis.watch(*args)
|
||||
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
redis.unwatch
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
49
lib/sidetiq/lock/meta_data.rb
Normal file
49
lib/sidetiq/lock/meta_data.rb
Normal file
|
@ -0,0 +1,49 @@
|
|||
module Sidetiq
|
||||
module Lock
|
||||
class MetaData
|
||||
OWNER = "#{Socket.gethostname}:#{Process.pid}"
|
||||
|
||||
attr_accessor :owner, :timestamp, :key
|
||||
|
||||
class << self
|
||||
include Sidekiq::ExceptionHandler
|
||||
|
||||
def for_new_lock(key)
|
||||
new(owner: OWNER, timestamp: Sidetiq.clock.gettime.to_f, key: key)
|
||||
end
|
||||
|
||||
def from_json(json = "")
|
||||
# Avoid TypeError when nil is passed to JSON.parse.
|
||||
json = "" if json.nil?
|
||||
|
||||
hash = JSON.parse(json, symbolize_names: true)
|
||||
new(hash)
|
||||
rescue JSON::ParserError => e
|
||||
if json != ""
|
||||
# Looks like garbage lock metadata, so report it.
|
||||
handle_exception(e, context: "Garbage lock meta data detected: #{json}")
|
||||
end
|
||||
|
||||
new
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(hash = {})
|
||||
@owner = hash[:owner]
|
||||
@timestamp = hash[:timestamp]
|
||||
@key = hash[:key]
|
||||
end
|
||||
|
||||
def to_json
|
||||
instance_variables.each_with_object({}) do |var, hash|
|
||||
hash[var.to_s.delete("@")] = instance_variable_get(var)
|
||||
end.to_json
|
||||
end
|
||||
|
||||
def to_s
|
||||
"Sidetiq::Lock on #{key} set at #{timestamp} by #{owner}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
70
lib/sidetiq/lock/redis.rb
Normal file
70
lib/sidetiq/lock/redis.rb
Normal file
|
@ -0,0 +1,70 @@
|
|||
module Sidetiq
|
||||
module Lock
|
||||
class Redis
|
||||
include Logging
|
||||
|
||||
attr_reader :key, :timeout
|
||||
|
||||
def initialize(key, timeout = Sidetiq.config.lock_expire)
|
||||
@key = key.kind_of?(Class) ? "sidetiq:#{key.name}:lock" : "sidetiq:#{key}:lock"
|
||||
@timeout = timeout
|
||||
end
|
||||
|
||||
def synchronize
|
||||
Sidekiq.redis do |redis|
|
||||
acquired, meta = lock(redis)
|
||||
|
||||
if acquired
|
||||
debug "Lock: #{meta}"
|
||||
|
||||
begin
|
||||
yield redis
|
||||
ensure
|
||||
unlock(redis)
|
||||
debug "Unlock: #{key}"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def lock(redis)
|
||||
acquired, meta = false, nil
|
||||
|
||||
watch(redis, key) do
|
||||
if !redis.exists(key)
|
||||
acquired = !!redis.multi do |multi|
|
||||
meta = MetaData.for_new_lock(key)
|
||||
multi.psetex(key, timeout, meta.to_json)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
[acquired, meta]
|
||||
end
|
||||
|
||||
def unlock(redis)
|
||||
watch(redis, key) do
|
||||
meta = MetaData.from_json(redis.get(key))
|
||||
|
||||
if meta.owner == Sidetiq::Lock::MetaData::OWNER
|
||||
redis.multi do |multi|
|
||||
multi.del(key)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def watch(redis, *args)
|
||||
redis.watch(*args)
|
||||
|
||||
begin
|
||||
yield
|
||||
ensure
|
||||
redis.unwatch
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -21,8 +21,21 @@ class TestLock < Sidetiq::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
def test_lock_sets_correct_meta_data
|
||||
key = SecureRandom.hex(8)
|
||||
internal_key = "sidetiq:#{key}:lock"
|
||||
|
||||
locked(key) do |redis|
|
||||
json = redis.get(internal_key)
|
||||
md = Sidetiq::Lock::MetaData.from_json(json)
|
||||
|
||||
assert_equal Sidetiq::Lock::MetaData::OWNER, md.owner
|
||||
assert_equal internal_key, md.key
|
||||
end
|
||||
end
|
||||
|
||||
def locked(lock_name)
|
||||
Sidetiq::Lock.new(lock_name).synchronize do |redis|
|
||||
Sidetiq::Lock::Redis.new(lock_name).synchronize do |redis|
|
||||
yield redis
|
||||
end
|
||||
end
|
||||
|
|
77
test/test_lock_meta_data.rb
Normal file
77
test/test_lock_meta_data.rb
Normal file
|
@ -0,0 +1,77 @@
|
|||
require_relative 'helper'
|
||||
|
||||
class TestLockMetaData < Sidetiq::TestCase
|
||||
def new_without_params
|
||||
md = Sidetiq::Lock::MetaData.new
|
||||
|
||||
assert_nil md.owner
|
||||
assert_nil md.timestamp
|
||||
assert_nil md.key
|
||||
end
|
||||
|
||||
def new_with_empty_hash
|
||||
md = Sidetiq::Lock::MetaData.new({})
|
||||
|
||||
assert_nil md.owner
|
||||
assert_nil md.timestamp
|
||||
assert_nil md.key
|
||||
end
|
||||
|
||||
def test_from_json
|
||||
json = { timestamp: 42, owner: "me", key: "foobar" }.to_json
|
||||
md = Sidetiq::Lock::MetaData.from_json(json)
|
||||
|
||||
assert_equal 42, md.timestamp
|
||||
assert_equal "me", md.owner
|
||||
assert_equal "foobar", md.key
|
||||
end
|
||||
|
||||
def test_from_json_with_empty_string_json
|
||||
md = Sidetiq::Lock::MetaData.from_json(nil)
|
||||
|
||||
assert_nil md.owner
|
||||
assert_nil md.timestamp
|
||||
assert_nil md.key
|
||||
end
|
||||
|
||||
def test_from_json_with_nil_json
|
||||
md = Sidetiq::Lock::MetaData.from_json("")
|
||||
|
||||
assert_nil md.owner
|
||||
assert_nil md.timestamp
|
||||
assert_nil md.key
|
||||
end
|
||||
|
||||
def test_from_json_with_malformed_json
|
||||
Sidetiq::Lock::MetaData.expects(:handle_exception).once
|
||||
|
||||
md = Sidetiq::Lock::MetaData.from_json('invalid')
|
||||
|
||||
assert_nil md.owner
|
||||
assert_nil md.timestamp
|
||||
assert_nil md.key
|
||||
end
|
||||
|
||||
def test_for_new_lock
|
||||
md = Sidetiq::Lock::MetaData.for_new_lock("baz")
|
||||
|
||||
assert_equal Sidetiq::Lock::MetaData::OWNER, md.owner
|
||||
assert_equal "baz", md.key
|
||||
|
||||
assert md.timestamp > 0
|
||||
assert md.timestamp < Time.now.to_f
|
||||
end
|
||||
|
||||
def test_to_json
|
||||
md = Sidetiq::Lock::MetaData.new(timestamp: 42, owner: "me", key: "foobar")
|
||||
|
||||
assert_equal '{"owner":"me","timestamp":42,"key":"foobar"}', md.to_json
|
||||
end
|
||||
|
||||
def test_to_s
|
||||
md = Sidetiq::Lock::MetaData.new(timestamp: 42, owner: "me", key: "foobar")
|
||||
|
||||
assert_equal "Sidetiq::Lock on foobar set at 42 by me", md.to_s
|
||||
end
|
||||
end
|
||||
|
Loading…
Reference in a new issue