mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Switch to Sidekiq.info
Working on pipelining as much as possible.
This commit is contained in:
parent
7f778c656e
commit
1b8d31b17a
4 changed files with 57 additions and 46 deletions
|
@ -1,35 +1,45 @@
|
|||
module Sidekiq
|
||||
module Stats
|
||||
module_function
|
||||
module_function
|
||||
|
||||
def processed
|
||||
(Sidekiq.redis { |conn| conn.get('stat:processed') } || 0).to_i
|
||||
def info
|
||||
results = {}
|
||||
futures = {}
|
||||
queues_with_sizes = Sidekiq.redis { |conn|
|
||||
conn.pipelined do
|
||||
futures[:processed] = conn.get('stat:processed')
|
||||
futures[:failed] = conn.get('stat:failed')
|
||||
futures[:queues] = conn.smembers('queues')
|
||||
end
|
||||
}
|
||||
queues_with_sizes = Sidekiq.redis do |conn|
|
||||
futures[:queues].value.inject({}) { |memo, q|
|
||||
memo[q] = conn.llen("queue:#{q}")
|
||||
memo
|
||||
}.sort_by { |_, size| size }
|
||||
end
|
||||
results[:processed] = (futures[:processed].value || 0).to_i
|
||||
results[:failed] = (futures[:failed].value || 0).to_i
|
||||
results[:backlog] = queues_with_sizes.
|
||||
map {|_, size| size }.
|
||||
inject(0) {|memo, val| memo + val }
|
||||
results
|
||||
end
|
||||
|
||||
def failed
|
||||
(Sidekiq.redis { |conn| conn.get('stat:failed') } || 0).to_i
|
||||
end
|
||||
def queues_with_sizes
|
||||
Sidekiq.redis { |conn|
|
||||
conn.smembers('queues').inject({}) { |memo, q|
|
||||
memo[q] = conn.llen("queue:#{q}")
|
||||
memo
|
||||
}.sort_by { |_, size| size }
|
||||
}
|
||||
end
|
||||
|
||||
def queues_with_sizes
|
||||
Sidekiq.redis { |conn|
|
||||
conn.smembers('queues').inject({}) { |memo, q|
|
||||
memo[q] = conn.llen("queue:#{q}")
|
||||
memo
|
||||
}.sort_by { |_, size| size }
|
||||
def size(*queues)
|
||||
return info[:backlog] if queues.empty?
|
||||
queues.
|
||||
map(&:to_s).
|
||||
inject(0) { |memo, queue|
|
||||
memo += Sidekiq.redis { |conn| conn.llen("queue:#{queue}") }
|
||||
}
|
||||
end
|
||||
|
||||
def backlog
|
||||
queues_with_sizes.map {|_, size| size }.inject(0) {|memo, val| memo + val }
|
||||
end
|
||||
|
||||
def size(*queues)
|
||||
return backlog if queues.empty?
|
||||
queues.
|
||||
map(&:to_s).
|
||||
inject(0) { |memo, queue|
|
||||
memo += Sidekiq.redis { |conn| conn.llen("queue:#{queue}") }
|
||||
}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -61,11 +61,11 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def processed
|
||||
Sidekiq::Stats.processed
|
||||
Sidekiq.info[:processed]
|
||||
end
|
||||
|
||||
def failed
|
||||
Sidekiq::Stats.failed
|
||||
Sidekiq.info[:failed]
|
||||
end
|
||||
|
||||
def zcard(name)
|
||||
|
@ -73,11 +73,11 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def queues
|
||||
@queues ||= Sidekiq::Stats.queues_with_sizes
|
||||
@queues ||= Sidekiq.queues_with_sizes
|
||||
end
|
||||
|
||||
def backlog
|
||||
Sidekiq::Stats.backlog
|
||||
Sidekiq.info[:backlog]
|
||||
end
|
||||
|
||||
def retries_with_score(score)
|
||||
|
|
|
@ -25,4 +25,5 @@ Gem::Specification.new do |gem|
|
|||
gem.add_development_dependency 'rake'
|
||||
gem.add_development_dependency 'actionmailer', '~> 3'
|
||||
gem.add_development_dependency 'activerecord', '~> 3'
|
||||
gem.add_development_dependency 'pry'
|
||||
end
|
||||
|
|
|
@ -32,15 +32,15 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
boss.expect(:processor_done!, nil, [processor])
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
|
||||
assert_equal 0, Sidekiq::Stats.failed
|
||||
assert_equal 0, Sidekiq::Stats.processed
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
|
||||
assert_equal 0, Sidekiq::Stats.failed
|
||||
assert_equal 3, Sidekiq::Stats.processed
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 3, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -50,8 +50,8 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
|
||||
@redis.with do |conn|
|
||||
assert_equal [], conn.smembers('workers')
|
||||
assert_equal 0, Sidekiq::Stats.failed
|
||||
assert_equal 0, Sidekiq::Stats.processed
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
|
||||
|
@ -60,8 +60,8 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
processor.process(msg, 'xyzzy')
|
||||
end
|
||||
|
||||
assert_equal 1, Sidekiq::Stats.failed
|
||||
assert_equal 1, Sidekiq::Stats.processed
|
||||
assert_equal 1, Sidekiq.info[:failed]
|
||||
assert_equal 1, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -77,24 +77,24 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
describe "queues_with_counts" do
|
||||
describe "queues_with_sizes" do
|
||||
it "returns queue names and corresponding job counts" do
|
||||
assert_equal [["foo", 1], ["bar", 2]], Sidekiq::Stats.queues_with_sizes
|
||||
assert_equal [["foo", 1], ["bar", 2]], Sidekiq.queues_with_sizes
|
||||
end
|
||||
end
|
||||
|
||||
describe "backlog" do
|
||||
it "returns count of all jobs yet to be processed" do
|
||||
assert_equal 3, Sidekiq::Stats.backlog
|
||||
assert_equal 3, Sidekiq.info[:backlog]
|
||||
end
|
||||
end
|
||||
|
||||
describe "size" do
|
||||
it "returns size of queues" do
|
||||
assert_equal 1, Sidekiq::Stats.size(:foo)
|
||||
assert_equal 1, Sidekiq::Stats.size("foo")
|
||||
assert_equal 3, Sidekiq::Stats.size("foo", "bar")
|
||||
assert_equal 3, Sidekiq::Stats.size
|
||||
assert_equal 1, Sidekiq.size(:foo)
|
||||
assert_equal 1, Sidekiq.size("foo")
|
||||
assert_equal 3, Sidekiq.size("foo", "bar")
|
||||
assert_equal 3, Sidekiq.size
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue