From b2c8bc6e49707e3378044a1ba47068fc8b6154f6 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Mon, 23 Apr 2018 15:46:58 -0700 Subject: [PATCH] Implement our own atomic counter, #3830 --- lib/sidekiq/launcher.rb | 8 ++++---- lib/sidekiq/processor.rb | 27 ++++++++++++++++++++++----- test/test_processor.rb | 8 ++++---- 3 files changed, 30 insertions(+), 13 deletions(-) diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index bdb67891..80f1b4b8 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -72,8 +72,8 @@ module Sidekiq key = identity fails = procd = 0 begin - Processor::FAILURE.update {|curr| fails = curr; 0 } - Processor::PROCESSED.update {|curr| procd = curr; 0 } + fails = Processor::FAILURE.reset + procd = Processor::PROCESSED.reset workers_key = "#{key}:workers" nowdate = Time.now.utc.strftime("%Y-%m-%d") @@ -112,8 +112,8 @@ module Sidekiq # ignore all redis/network issues logger.error("heartbeat: #{e.message}") # don't lose the counts if there was a network issue - Processor::PROCESSED.increment(procd) - Processor::FAILURE.increment(fails) + Processor::PROCESSED.incr(procd) + Processor::FAILURE.incr(fails) end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index cdf4bcdd..1603d40f 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -5,7 +5,6 @@ require 'sidekiq/job_logger' require 'sidekiq/job_retry' require 'thread' require 'concurrent/map' -require 'concurrent/atomic/atomic_fixnum' module Sidekiq ## @@ -187,9 +186,27 @@ module Sidekiq worker.perform(*cloned_args) end + # Ruby doesn't provide atomic counters out of the box so we'll + # implement something simple ourselves. + # https://bugs.ruby-lang.org/issues/14706 + class Counter + def initialize + @value = 0 + @lock = Mutex.new + end + + def incr(amount=1) + @lock.synchronize { @value = @value + amount } + end + + def reset + @lock.synchronize { val = @value; @value = 0; val } + end + end + + PROCESSED = Counter.new + FAILURE = Counter.new WORKER_STATE = Concurrent::Map.new - PROCESSED = Concurrent::AtomicFixnum.new - FAILURE = Concurrent::AtomicFixnum.new def stats(job_hash, queue) tid = Sidekiq::Logging.tid @@ -198,11 +215,11 @@ module Sidekiq begin yield rescue Exception - FAILURE.increment + FAILURE.incr raise ensure WORKER_STATE.delete(tid) - PROCESSED.increment + PROCESSED.incr end end diff --git a/test/test_processor.rb b/test/test_processor.rb index c498e8ad..2e7f4366 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -312,9 +312,9 @@ class TestProcessor < Sidekiq::Test end it 'increments processed stat' do - Sidekiq::Processor::PROCESSED.value = 0 + Sidekiq::Processor::PROCESSED.reset successful_job - assert_equal 1, Sidekiq::Processor::PROCESSED.value + assert_equal 1, Sidekiq::Processor::PROCESSED.reset end end @@ -330,9 +330,9 @@ class TestProcessor < Sidekiq::Test end it 'increments failed stat' do - Sidekiq::Processor::FAILURE.value = 0 + Sidekiq::Processor::FAILURE.reset failed_job - assert_equal 1, Sidekiq::Processor::FAILURE.value + assert_equal 1, Sidekiq::Processor::FAILURE.reset end end end