From 927680cae78731c540fefaabf5bad619b1d1d171 Mon Sep 17 00:00:00 2001 From: Matt Books Date: Tue, 11 Sep 2018 09:53:52 -0700 Subject: [PATCH] Use a mutex to synchronize access to shared WORKER_STATE hash (#3959) On jruby, the Hash implementation is not threadsafe, and leads to jobs not being deleted from the busy list. See #3958 --- Changes.md | 1 + lib/sidekiq/processor.rb | 34 ++++++++++++++++++++++++++++++---- test/test_launcher.rb | 2 +- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/Changes.md b/Changes.md index 4412dfaa..6376e507 100644 --- a/Changes.md +++ b/Changes.md @@ -6,6 +6,7 @@ HEAD --------- - Raise error for duplicate queue names in config to avoid unexpected fetch algorithm change [#3911] +- Wrap WORKER_STATE in a mutex since Hash is not threadsafe on jruby [#3958] 5.2.1 ----------- diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 751d6e1c..3c775ad1 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -203,15 +203,41 @@ module Sidekiq end end + # jruby's Hash implementation is not threadsafe, so we wrap it in a mutex here + class SharedWorkerState + def initialize + @worker_state = {} + @lock = Mutex.new + end + + def set(tid, hash) + @lock.synchronize { @worker_state[tid] = hash } + end + + def delete(tid) + @lock.synchronize { @worker_state.delete(tid) } + end + + def dup + @lock.synchronize { @worker_state.dup } + end + + def size + @lock.synchronize { @worker_state.size } + end + + def clear + @lock.synchronize { @worker_state.clear } + end + end + PROCESSED = Counter.new FAILURE = Counter.new - # This is mutable global state but because each thread is storing - # its own unique key/value, there's no thread-safety issue AFAIK. - WORKER_STATE = {} + WORKER_STATE = SharedWorkerState.new def stats(job_hash, queue) tid = Sidekiq::Logging.tid - WORKER_STATE[tid] = {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i } + WORKER_STATE.set(tid, {:queue => queue, :payload => job_hash, :run_at => Time.now.to_i }) begin yield diff --git a/test/test_launcher.rb b/test/test_launcher.rb index 3007184c..b58546d5 100644 --- a/test/test_launcher.rb +++ b/test/test_launcher.rb @@ -20,7 +20,7 @@ class TestLauncher < Sidekiq::Test @launcher.manager = @mgr @id = @launcher.identity - Sidekiq::Processor::WORKER_STATE['a'] = {'b' => 1} + Sidekiq::Processor::WORKER_STATE.set('a', {'b' => 1}) @proctitle = $0 end