1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Implement our own atomic counter, #3830

This commit is contained in:
Mike Perham 2018-04-23 15:46:58 -07:00
parent ad6bb6ae96
commit b2c8bc6e49
3 changed files with 30 additions and 13 deletions

View file

@ -72,8 +72,8 @@ module Sidekiq
key = identity
fails = procd = 0
begin
Processor::FAILURE.update {|curr| fails = curr; 0 }
Processor::PROCESSED.update {|curr| procd = curr; 0 }
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset
workers_key = "#{key}:workers"
nowdate = Time.now.utc.strftime("%Y-%m-%d")
@ -112,8 +112,8 @@ module Sidekiq
# ignore all redis/network issues
logger.error("heartbeat: #{e.message}")
# don't lose the counts if there was a network issue
Processor::PROCESSED.increment(procd)
Processor::FAILURE.increment(fails)
Processor::PROCESSED.incr(procd)
Processor::FAILURE.incr(fails)
end
end

View file

@ -5,7 +5,6 @@ require 'sidekiq/job_logger'
require 'sidekiq/job_retry'
require 'thread'
require 'concurrent/map'
require 'concurrent/atomic/atomic_fixnum'
module Sidekiq
##
@ -187,9 +186,27 @@ module Sidekiq
worker.perform(*cloned_args)
end
# Ruby doesn't provide atomic counters out of the box so we'll
# implement something simple ourselves.
# https://bugs.ruby-lang.org/issues/14706
class Counter
def initialize
@value = 0
@lock = Mutex.new
end
def incr(amount=1)
@lock.synchronize { @value = @value + amount }
end
def reset
@lock.synchronize { val = @value; @value = 0; val }
end
end
PROCESSED = Counter.new
FAILURE = Counter.new
WORKER_STATE = Concurrent::Map.new
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(job_hash, queue)
tid = Sidekiq::Logging.tid
@ -198,11 +215,11 @@ module Sidekiq
begin
yield
rescue Exception
FAILURE.increment
FAILURE.incr
raise
ensure
WORKER_STATE.delete(tid)
PROCESSED.increment
PROCESSED.incr
end
end

View file

@ -312,9 +312,9 @@ class TestProcessor < Sidekiq::Test
end
it 'increments processed stat' do
Sidekiq::Processor::PROCESSED.value = 0
Sidekiq::Processor::PROCESSED.reset
successful_job
assert_equal 1, Sidekiq::Processor::PROCESSED.value
assert_equal 1, Sidekiq::Processor::PROCESSED.reset
end
end
@ -330,9 +330,9 @@ class TestProcessor < Sidekiq::Test
end
it 'increments failed stat' do
Sidekiq::Processor::FAILURE.value = 0
Sidekiq::Processor::FAILURE.reset
failed_job
assert_equal 1, Sidekiq::Processor::FAILURE.value
assert_equal 1, Sidekiq::Processor::FAILURE.reset
end
end
end