mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Use a mutex to synchronize access to shared WORKER_STATE hash (#3959)
On jruby, the Hash implementation is not threadsafe, and leads to jobs not being deleted from the busy list. See #3958
This commit is contained in:
parent
da3fbed14b
commit
927680cae7
3 changed files with 32 additions and 5 deletions
|
@ -6,6 +6,7 @@ HEAD
|
|||
---------
|
||||
|
||||
- Raise error for duplicate queue names in config to avoid unexpected fetch algorithm change [#3911]
|
||||
- Wrap WORKER_STATE in a mutex since Hash is not threadsafe on jruby [#3958]
|
||||
|
||||
5.2.1
|
||||
-----------
|
||||
|
|
|
@ -203,15 +203,41 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# jruby's Hash implementation is not threadsafe, so we wrap it in a mutex here
|
||||
class SharedWorkerState
|
||||
def initialize
|
||||
@worker_state = {}
|
||||
@lock = Mutex.new
|
||||
end
|
||||
|
||||
def set(tid, hash)
|
||||
@lock.synchronize { @worker_state[tid] = hash }
|
||||
end
|
||||
|
||||
def delete(tid)
|
||||
@lock.synchronize { @worker_state.delete(tid) }
|
||||
end
|
||||
|
||||
def dup
|
||||
@lock.synchronize { @worker_state.dup }
|
||||
end
|
||||
|
||||
def size
|
||||
@lock.synchronize { @worker_state.size }
|
||||
end
|
||||
|
||||
def clear
|
||||
@lock.synchronize { @worker_state.clear }
|
||||
end
|
||||
end
|
||||
|
||||
PROCESSED = Counter.new
|
||||
FAILURE = Counter.new
|
||||
# This is mutable global state but because each thread is storing
|
||||
# its own unique key/value, there's no thread-safety issue AFAIK.
|
||||
WORKER_STATE = {}
|
||||
WORKER_STATE = SharedWorkerState.new
|
||||
|
||||
def stats(job_hash, queue)
|
||||
tid = Sidekiq::Logging.tid
|
||||
WORKER_STATE[tid] = {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i }
|
||||
WORKER_STATE.set(tid, {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i })
|
||||
|
||||
begin
|
||||
yield
|
||||
|
|
|
@ -20,7 +20,7 @@ class TestLauncher < Sidekiq::Test
|
|||
@launcher.manager = @mgr
|
||||
@id = @launcher.identity
|
||||
|
||||
Sidekiq::Processor::WORKER_STATE['a'] = {'b' => 1}
|
||||
Sidekiq::Processor::WORKER_STATE.set('a', {'b' => 1})
|
||||
|
||||
@proctitle = $0
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue