mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
commit
3c696859e8
7 changed files with 97 additions and 18 deletions
|
@ -19,8 +19,8 @@ class SinatraWorker
|
|||
end
|
||||
|
||||
get '/' do
|
||||
@failed = $redis.get('stat:failed')
|
||||
@processed = $redis.get('stat:processed')
|
||||
@failed = Sidekiq::Stats.failed
|
||||
@processed = Sidekiq::Stats.processed
|
||||
@messages = $redis.lrange('sinkiq-example-messages', 0, -1)
|
||||
erb :index
|
||||
end
|
||||
|
|
|
@ -4,6 +4,7 @@ require 'sidekiq/client'
|
|||
require 'sidekiq/worker'
|
||||
require 'sidekiq/redis_connection'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/stats'
|
||||
|
||||
require 'sidekiq/extensions/class_methods'
|
||||
require 'sidekiq/extensions/action_mailer'
|
||||
|
|
36
lib/sidekiq/stats.rb
Normal file
36
lib/sidekiq/stats.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
module Sidekiq
|
||||
module_function
|
||||
|
||||
def info
|
||||
results = {}
|
||||
processed, failed, queues = Sidekiq.redis { |conn|
|
||||
conn.multi do
|
||||
conn.get('stat:processed')
|
||||
conn.get('stat:failed')
|
||||
conn.smembers('queues')
|
||||
end
|
||||
}
|
||||
results[:queues_with_sizes] = Sidekiq.redis do |conn|
|
||||
queues.inject({}) { |memo, q|
|
||||
memo[q] = conn.llen("queue:#{q}")
|
||||
memo
|
||||
}.sort_by { |_, size| size }
|
||||
end
|
||||
results[:processed] = (processed || 0).to_i
|
||||
results[:failed] = (failed || 0).to_i
|
||||
results[:backlog] = results[:queues_with_sizes].
|
||||
map {|_, size| size }.
|
||||
inject(0) {|memo, val| memo + val }
|
||||
results
|
||||
end
|
||||
|
||||
def size(*queues)
|
||||
return info[:backlog] if queues.empty?
|
||||
|
||||
Sidekiq.redis { |conn|
|
||||
conn.multi {
|
||||
queues.map { |q| conn.llen("queue:#{q}") }
|
||||
}
|
||||
}.inject(0) { |memo, count| memo += count }
|
||||
end
|
||||
end
|
|
@ -60,12 +60,16 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def info
|
||||
@info ||= Sidekiq.info
|
||||
end
|
||||
|
||||
def processed
|
||||
Sidekiq.redis { |conn| conn.get('stat:processed') } || 0
|
||||
info[:processed]
|
||||
end
|
||||
|
||||
def failed
|
||||
Sidekiq.redis { |conn| conn.get('stat:failed') } || 0
|
||||
info[:failed]
|
||||
end
|
||||
|
||||
def zcard(name)
|
||||
|
@ -73,15 +77,11 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def queues
|
||||
@queues ||= Sidekiq.redis do |conn|
|
||||
conn.smembers('queues').map do |q|
|
||||
[q, conn.llen("queue:#{q}") || 0]
|
||||
end.sort { |x,y| x[1] <=> y[1] }
|
||||
end
|
||||
@queues ||= Sidekiq.info[:queues_with_sizes]
|
||||
end
|
||||
|
||||
def backlog
|
||||
queues.map {|name, size| size }.inject(0) {|memo, val| memo + val }
|
||||
info[:backlog]
|
||||
end
|
||||
|
||||
def retries_with_score(score)
|
||||
|
|
|
@ -25,4 +25,5 @@ Gem::Specification.new do |gem|
|
|||
gem.add_development_dependency 'rake'
|
||||
gem.add_development_dependency 'actionmailer', '~> 3'
|
||||
gem.add_development_dependency 'activerecord', '~> 3'
|
||||
gem.add_development_dependency 'pry'
|
||||
end
|
||||
|
|
|
@ -4,6 +4,8 @@ if ENV.has_key?("SIMPLECOV")
|
|||
SimpleCov.start
|
||||
end
|
||||
|
||||
require 'pry'
|
||||
|
||||
require 'minitest/unit'
|
||||
require 'minitest/pride'
|
||||
require 'minitest/autorun'
|
||||
|
|
|
@ -11,6 +11,7 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
|
||||
class DumbWorker
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options :queue => 'dumbq'
|
||||
|
||||
def perform(arg)
|
||||
raise 'bang' if arg == nil
|
||||
|
@ -31,15 +32,15 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
boss.expect(:processor_done!, nil, [processor])
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
|
||||
assert_equal 0, conn.get('stat:failed').to_i
|
||||
assert_equal 0, conn.get('stat:processed').to_i
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
|
||||
assert_equal 0, conn.get('stat:failed').to_i
|
||||
assert_equal 3, conn.get('stat:processed').to_i
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 3, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -49,8 +50,8 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
|
||||
@redis.with do |conn|
|
||||
assert_equal [], conn.smembers('workers')
|
||||
assert_equal 0, conn.get('stat:failed').to_i
|
||||
assert_equal 0, conn.get('stat:processed').to_i
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
|
||||
|
@ -59,8 +60,46 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
processor.process(msg, 'xyzzy')
|
||||
end
|
||||
|
||||
assert_equal 1, conn.get('stat:failed').to_i
|
||||
assert_equal 1, conn.get('stat:processed').to_i
|
||||
assert_equal 1, Sidekiq.info[:failed]
|
||||
assert_equal 1, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
describe "info counts" do
|
||||
before do
|
||||
@redis.with do |conn|
|
||||
conn.rpush 'queue:foo', '{}'
|
||||
conn.sadd 'queues', 'foo'
|
||||
|
||||
conn.rpush 'queue:bar', '{}'
|
||||
conn.rpush 'queue:bar', '{}'
|
||||
conn.sadd 'queues', 'bar'
|
||||
|
||||
conn.rpush 'queue:baz', '{}'
|
||||
conn.sadd 'queues', 'baz'
|
||||
end
|
||||
end
|
||||
|
||||
describe "queues_with_sizes" do
|
||||
it "returns queue names and corresponding job counts" do
|
||||
assert_equal [["foo", 1], ["baz", 1], ["bar", 2]], Sidekiq.info[:queues_with_sizes]
|
||||
end
|
||||
end
|
||||
|
||||
describe "backlog" do
|
||||
it "returns count of all jobs yet to be processed" do
|
||||
assert_equal 4, Sidekiq.info[:backlog]
|
||||
end
|
||||
end
|
||||
|
||||
describe "size" do
|
||||
it "returns size of queues" do
|
||||
assert_equal 0, Sidekiq.size("foox")
|
||||
assert_equal 1, Sidekiq.size(:foo)
|
||||
assert_equal 1, Sidekiq.size("foo")
|
||||
assert_equal 3, Sidekiq.size("foo", "bar")
|
||||
assert_equal 4, Sidekiq.size
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue