mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
commit
b2826161c2
8 changed files with 124 additions and 112 deletions
|
@ -19,8 +19,9 @@ class SinatraWorker
|
|||
end
|
||||
|
||||
get '/' do
|
||||
@failed = Sidekiq.info[:failed]
|
||||
@processed = Sidekiq.info[:processed]
|
||||
stats = Sidekiq::Stats.new
|
||||
@failed = stats.failed
|
||||
@processed = stats.processed
|
||||
@messages = $redis.lrange('sinkiq-example-messages', 0, -1)
|
||||
erb :index
|
||||
end
|
||||
|
|
|
@ -2,6 +2,39 @@ require 'sidekiq'
|
|||
|
||||
module Sidekiq
|
||||
|
||||
class Stats
|
||||
def processed
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.get("stat:processed")
|
||||
end
|
||||
count.nil? ? 0 : count.to_i
|
||||
end
|
||||
|
||||
def failed
|
||||
count = Sidekiq.redis do |conn|
|
||||
conn.get("stat:failed")
|
||||
end
|
||||
count.nil? ? 0 : count.to_i
|
||||
end
|
||||
|
||||
def queues
|
||||
Sidekiq.redis do |conn|
|
||||
queues = conn.smembers('queues')
|
||||
|
||||
array_of_arrays = queues.inject({}) do |memo, queue|
|
||||
memo[queue] = conn.llen("queue:#{queue}")
|
||||
memo
|
||||
end.sort_by { |_, size| size }
|
||||
|
||||
Hash[array_of_arrays.reverse]
|
||||
end
|
||||
end
|
||||
|
||||
def enqueued
|
||||
queues.values.inject(&:+) || 0
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Encapsulates a queue within Sidekiq.
|
||||
# Allows enumeration of all jobs within the queue
|
||||
|
|
|
@ -1,31 +1,8 @@
|
|||
module Sidekiq
|
||||
module_function
|
||||
|
||||
def info
|
||||
results = {}
|
||||
processed, failed, queues = Sidekiq.redis { |conn|
|
||||
conn.multi do
|
||||
conn.get('stat:processed')
|
||||
conn.get('stat:failed')
|
||||
conn.smembers('queues')
|
||||
end
|
||||
}
|
||||
results[:queues_with_sizes] = Sidekiq.redis do |conn|
|
||||
queues.inject({}) { |memo, q|
|
||||
memo[q] = conn.llen("queue:#{q}")
|
||||
memo
|
||||
}.sort_by { |_, size| size }
|
||||
end
|
||||
results[:processed] = (processed || 0).to_i
|
||||
results[:failed] = (failed || 0).to_i
|
||||
results[:backlog] = results[:queues_with_sizes].
|
||||
map {|_, size| size }.
|
||||
inject(0) {|memo, val| memo + val }
|
||||
results
|
||||
end
|
||||
|
||||
def size(*queues)
|
||||
return info[:backlog] if queues.empty?
|
||||
return Sidekiq::Stats.new.enqueued if queues.empty?
|
||||
|
||||
Sidekiq.redis { |conn|
|
||||
conn.multi {
|
||||
|
|
|
@ -34,28 +34,16 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def info
|
||||
@info ||= Sidekiq.info
|
||||
def stats
|
||||
@stats ||= Sidekiq::Stats.new
|
||||
end
|
||||
|
||||
def processed
|
||||
info[:processed]
|
||||
def scheduled_job_count
|
||||
Sidekiq::ScheduledSet.new.size
|
||||
end
|
||||
|
||||
def failed
|
||||
info[:failed]
|
||||
end
|
||||
|
||||
def zcard(name)
|
||||
Sidekiq.redis { |conn| conn.zcard(name) }
|
||||
end
|
||||
|
||||
def queues
|
||||
@queues ||= Sidekiq.info[:queues_with_sizes]
|
||||
end
|
||||
|
||||
def backlog
|
||||
info[:backlog]
|
||||
def retry_job_count
|
||||
Sidekiq::RetrySet.new.size
|
||||
end
|
||||
|
||||
def retries_with_score(score)
|
||||
|
@ -122,7 +110,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
get "/queues" do
|
||||
@queues = queues
|
||||
@queues = Sidekiq::Stats.new.queues
|
||||
slim :queues
|
||||
end
|
||||
|
||||
|
|
|
@ -1,6 +1,79 @@
|
|||
require 'helper'
|
||||
|
||||
class TestApi < MiniTest::Unit::TestCase
|
||||
describe "stats" do
|
||||
before do
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
end
|
||||
|
||||
describe "processed" do
|
||||
it "is initially zero" do
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 0, s.processed
|
||||
end
|
||||
|
||||
it "returns number of processed jobs" do
|
||||
Sidekiq.redis { |conn| conn.set("stat:processed", 5) }
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 5, s.processed
|
||||
end
|
||||
end
|
||||
|
||||
describe "failed" do
|
||||
it "is initially zero" do
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 0, s.processed
|
||||
end
|
||||
|
||||
it "returns number of failed jobs" do
|
||||
Sidekiq.redis { |conn| conn.set("stat:failed", 5) }
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 5, s.failed
|
||||
end
|
||||
end
|
||||
|
||||
describe "queues" do
|
||||
it "is initially empty" do
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 0, s.queues.size
|
||||
end
|
||||
|
||||
it "returns a hash of queue and size in order" do
|
||||
Sidekiq.redis do |conn|
|
||||
conn.rpush 'queue:foo', '{}'
|
||||
conn.sadd 'queues', 'foo'
|
||||
|
||||
3.times { conn.rpush 'queue:bar', '{}' }
|
||||
conn.sadd 'queues', 'bar'
|
||||
end
|
||||
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal ({ "foo" => 1, "bar" => 3 }), s.queues
|
||||
assert_equal "bar", s.queues.first.first
|
||||
end
|
||||
end
|
||||
|
||||
describe "enqueued" do
|
||||
it "is initially empty" do
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 0, s.enqueued
|
||||
end
|
||||
|
||||
it "returns total enqueued jobs" do
|
||||
Sidekiq.redis do |conn|
|
||||
conn.rpush 'queue:foo', '{}'
|
||||
conn.sadd 'queues', 'foo'
|
||||
|
||||
3.times { conn.rpush 'queue:bar', '{}' }
|
||||
conn.sadd 'queues', 'bar'
|
||||
end
|
||||
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 4, s.enqueued
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'with an empty database' do
|
||||
before do
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
|
|
|
@ -18,54 +18,6 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'updates global stats in the success case' do
|
||||
msg = Sidekiq.dump_json({ 'class' => DumbWorker.to_s, 'args' => [""] })
|
||||
boss = MiniTest::Mock.new
|
||||
actor = MiniTest::Mock.new
|
||||
|
||||
@redis.with do |conn|
|
||||
|
||||
set = conn.smembers('workers')
|
||||
assert_equal 0, set.size
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
3.times do
|
||||
actor.expect(:processor_done, nil, [processor])
|
||||
boss.expect(:async, actor, [])
|
||||
end
|
||||
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 3, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
it 'updates global stats in the error case' do
|
||||
msg = Sidekiq.dump_json({ 'class' => DumbWorker.to_s, 'args' => [nil] })
|
||||
boss = MiniTest::Mock.new
|
||||
|
||||
@redis.with do |conn|
|
||||
assert_equal [], conn.smembers('workers')
|
||||
assert_equal 0, Sidekiq.info[:failed]
|
||||
assert_equal 0, Sidekiq.info[:processed]
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
|
||||
assert_raises RuntimeError do
|
||||
processor.process(msg, 'xyzzy')
|
||||
end
|
||||
|
||||
assert_equal 1, Sidekiq.info[:failed]
|
||||
assert_equal 1, Sidekiq.info[:processed]
|
||||
end
|
||||
end
|
||||
|
||||
describe "info counts" do
|
||||
before do
|
||||
@redis.with do |conn|
|
||||
|
@ -80,18 +32,6 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
describe "queues_with_sizes" do
|
||||
it "returns queue names and corresponding job counts" do
|
||||
assert_equal [["foo", 1], ["baz", 2], ["bar", 3]], Sidekiq.info[:queues_with_sizes]
|
||||
end
|
||||
end
|
||||
|
||||
describe "backlog" do
|
||||
it "returns count of all jobs yet to be processed" do
|
||||
assert_equal 6, Sidekiq.info[:backlog]
|
||||
end
|
||||
end
|
||||
|
||||
describe "size" do
|
||||
it "returns size of queues" do
|
||||
assert_equal 0, Sidekiq.size("foox")
|
||||
|
|
|
@ -1,19 +1,19 @@
|
|||
ul.unstyled.summary
|
||||
li
|
||||
span.count #{number_with_delimiter(processed)}
|
||||
span.count #{number_with_delimiter(stats.processed)}
|
||||
span.desc Processed
|
||||
li
|
||||
span.count #{number_with_delimiter(failed)}
|
||||
span.count #{number_with_delimiter(stats.failed)}
|
||||
span.desc Failed
|
||||
li
|
||||
span.count #{number_with_delimiter(workers.size)}
|
||||
span.desc Busy
|
||||
li
|
||||
span.count #{number_with_delimiter(zcard('schedule'))}
|
||||
span.count #{number_with_delimiter(scheduled_job_count)}
|
||||
span.desc Scheduled
|
||||
li
|
||||
span.count #{number_with_delimiter(zcard('retry'))}
|
||||
span.count #{number_with_delimiter(retry_job_count)}
|
||||
span.desc Retries
|
||||
li
|
||||
span.count #{number_with_delimiter(backlog)}
|
||||
span.desc Queue
|
||||
span.count #{number_with_delimiter(stats.enqueued)}
|
||||
span.desc Enqueued
|
||||
|
|
|
@ -5,7 +5,7 @@ table class="queues table table-hover table-bordered table-striped table-white"
|
|||
th Queue
|
||||
th Size
|
||||
th Actions
|
||||
- queues.each do |(queue, size)|
|
||||
- @queues.each do |queue, size|
|
||||
tr
|
||||
td
|
||||
a href="#{root_path}queues/#{queue}" #{queue}
|
||||
|
|
Loading…
Reference in a new issue