mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Quick stats api. (#4936)
* Lazily fetch slow stats later. * Put calls to lazy load into stat method. * Add test for workers_size.
This commit is contained in:
parent
461ef8392f
commit
47ce2a0b38
2 changed files with 55 additions and 25 deletions
|
@ -8,7 +8,7 @@ require "base64"
|
|||
module Sidekiq
|
||||
class Stats
|
||||
def initialize
|
||||
fetch_stats!
|
||||
fetch_stats_fast!
|
||||
end
|
||||
|
||||
def processed
|
||||
|
@ -51,7 +51,8 @@ module Sidekiq
|
|||
Sidekiq::Stats::Queues.new.lengths
|
||||
end
|
||||
|
||||
def fetch_stats!
|
||||
# O(1) redis calls
|
||||
def fetch_stats_fast!
|
||||
pipe1_res = Sidekiq.redis { |conn|
|
||||
conn.pipelined do
|
||||
conn.get("stat:processed")
|
||||
|
@ -64,6 +65,34 @@ module Sidekiq
|
|||
end
|
||||
}
|
||||
|
||||
|
||||
default_queue_latency = if (entry = pipe1_res[6].first)
|
||||
job = begin
|
||||
Sidekiq.load_json(entry)
|
||||
rescue
|
||||
{}
|
||||
end
|
||||
now = Time.now.to_f
|
||||
thence = job["enqueued_at"] || now
|
||||
now - thence
|
||||
else
|
||||
0
|
||||
end
|
||||
|
||||
@stats = {
|
||||
processed: pipe1_res[0].to_i,
|
||||
failed: pipe1_res[1].to_i,
|
||||
scheduled_size: pipe1_res[2],
|
||||
retry_size: pipe1_res[3],
|
||||
dead_size: pipe1_res[4],
|
||||
processes_size: pipe1_res[5],
|
||||
|
||||
default_queue_latency: default_queue_latency
|
||||
}
|
||||
end
|
||||
|
||||
# O(number of processes + number of queues) redis calls
|
||||
def fetch_stats_slow!
|
||||
processes = Sidekiq.redis { |conn|
|
||||
conn.sscan_each("processes").to_a
|
||||
}
|
||||
|
@ -83,30 +112,13 @@ module Sidekiq
|
|||
workers_size = pipe2_res[0...s].sum(&:to_i)
|
||||
enqueued = pipe2_res[s..-1].sum(&:to_i)
|
||||
|
||||
default_queue_latency = if (entry = pipe1_res[6].first)
|
||||
job = begin
|
||||
Sidekiq.load_json(entry)
|
||||
rescue
|
||||
{}
|
||||
end
|
||||
now = Time.now.to_f
|
||||
thence = job["enqueued_at"] || now
|
||||
now - thence
|
||||
else
|
||||
0
|
||||
end
|
||||
@stats = {
|
||||
processed: pipe1_res[0].to_i,
|
||||
failed: pipe1_res[1].to_i,
|
||||
scheduled_size: pipe1_res[2],
|
||||
retry_size: pipe1_res[3],
|
||||
dead_size: pipe1_res[4],
|
||||
processes_size: pipe1_res[5],
|
||||
@stats[:workers_size] = workers_size
|
||||
@stats[:enqueued] = enqueued
|
||||
end
|
||||
|
||||
default_queue_latency: default_queue_latency,
|
||||
workers_size: workers_size,
|
||||
enqueued: enqueued
|
||||
}
|
||||
def fetch_stats!
|
||||
fetch_stats_fast!
|
||||
fetch_stats_slow!
|
||||
end
|
||||
|
||||
def reset(*stats)
|
||||
|
@ -126,6 +138,10 @@ module Sidekiq
|
|||
private
|
||||
|
||||
def stat(s)
|
||||
if (s == :enqueued || s == :workers_size) &&
|
||||
@stats.present? && @stats[s].nil?
|
||||
fetch_stats_slow!
|
||||
end
|
||||
@stats[s]
|
||||
end
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ describe 'API' do
|
|||
assert_equal 0, s.failed
|
||||
assert_equal 0, s.enqueued
|
||||
assert_equal 0, s.default_queue_latency
|
||||
assert_equal 0, s.workers_size
|
||||
end
|
||||
|
||||
describe "processed" do
|
||||
|
@ -71,6 +72,19 @@ describe 'API' do
|
|||
end
|
||||
end
|
||||
|
||||
describe "workers_size" do
|
||||
it 'retrieves the number of busy workers' do
|
||||
Sidekiq.redis do |c|
|
||||
c.sadd("processes", "process_1")
|
||||
c.sadd("processes", "process_2")
|
||||
c.hset("process_1", "busy", 1)
|
||||
c.hset("process_2", "busy", 2)
|
||||
end
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 3, s.workers_size
|
||||
end
|
||||
end
|
||||
|
||||
describe "queues" do
|
||||
it "is initially empty" do
|
||||
s = Sidekiq::Stats::Queues.new
|
||||
|
|
Loading…
Reference in a new issue