mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Fit all stats into 2 redis pipelines
This commit is contained in:
parent
22072262d7
commit
20c616e971
3 changed files with 100 additions and 85 deletions
|
@ -3,28 +3,89 @@ require 'sidekiq'
|
|||
|
||||
module Sidekiq
|
||||
class Stats
|
||||
def initialize
|
||||
fetch_stats!
|
||||
end
|
||||
|
||||
def processed
|
||||
stat 'processed'
|
||||
stat :processed
|
||||
end
|
||||
|
||||
def failed
|
||||
stat 'failed'
|
||||
stat :failed
|
||||
end
|
||||
|
||||
def scheduled_size
|
||||
stat 'scheduled_size'
|
||||
stat :scheduled_size
|
||||
end
|
||||
|
||||
def retry_size
|
||||
stat 'retry_size'
|
||||
stat :retry_size
|
||||
end
|
||||
|
||||
def dead_size
|
||||
stat 'dead_size'
|
||||
stat :dead_size
|
||||
end
|
||||
|
||||
def enqueued
|
||||
stat :enqueued
|
||||
end
|
||||
|
||||
def processes_size
|
||||
stat :processes_size
|
||||
end
|
||||
|
||||
def workers_size
|
||||
stat :workers_size
|
||||
end
|
||||
|
||||
def default_queue_latency
|
||||
stat :default_queue_latency
|
||||
end
|
||||
|
||||
def fetch_stats!
|
||||
stats
|
||||
pipe1_res = Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.get('stat:processed')
|
||||
conn.get('stat:failed')
|
||||
conn.zcard('schedule')
|
||||
conn.zcard('retry')
|
||||
conn.zcard('dead')
|
||||
conn.scard('processes')
|
||||
|
||||
conn.lrange("queue:default", -1, -1)
|
||||
conn.smembers('processes')
|
||||
conn.smembers('queues'.freeze)
|
||||
end
|
||||
end
|
||||
|
||||
pipe2_res = Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
pipe1_res[7].each {|key| conn.hget(key, 'busy') }
|
||||
pipe1_res[8].each {|queue| conn.llen("queue:#{queue}") }
|
||||
end
|
||||
end
|
||||
|
||||
workers_size = pipe2_res.pop(pipe1_res[7].size).map(&:to_i).inject(0, &:+)
|
||||
enqueued = pipe2_res.pop(pipe1_res[8].size).map(&:to_i).inject(0, &:+)
|
||||
|
||||
default_queue_latency = if (entry = pipe1_res[6].first)
|
||||
Time.now.to_f - Sidekiq.load_json(entry)['enqueued_at']
|
||||
else
|
||||
0
|
||||
end
|
||||
@stats = {
|
||||
processed: pipe1_res[0].to_i,
|
||||
failed: pipe1_res[1].to_i,
|
||||
scheduled_size: pipe1_res[2],
|
||||
retry_size: pipe1_res[3],
|
||||
dead_size: pipe1_res[4],
|
||||
processes_size: pipe1_res[5],
|
||||
|
||||
default_queue_latency: default_queue_latency,
|
||||
workers_size: workers_size,
|
||||
enqueued: enqueued
|
||||
}
|
||||
end
|
||||
|
||||
def reset(*stats)
|
||||
|
@ -41,77 +102,33 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def queues
|
||||
Sidekiq.redis do |conn|
|
||||
queues = conn.smembers('queues'.freeze)
|
||||
|
||||
lengths = conn.pipelined do
|
||||
queues.each do |queue|
|
||||
conn.llen("queue:#{queue}")
|
||||
end
|
||||
end
|
||||
|
||||
i = 0
|
||||
array_of_arrays = queues.inject({}) do |memo, queue|
|
||||
memo[queue] = lengths[i]
|
||||
i += 1
|
||||
memo
|
||||
end.sort_by { |_, size| size }
|
||||
|
||||
Hash[array_of_arrays.reverse]
|
||||
end
|
||||
end
|
||||
|
||||
def enqueued
|
||||
queues.values.inject(&:+) || 0
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def stats(*only)
|
||||
all = %w(processed failed queues enqueued scheduled_size retry_size dead_size)
|
||||
metrics = only.any? ? only : all
|
||||
|
||||
if @all_stats
|
||||
@all_stats.slice(*metrics)
|
||||
else
|
||||
if metrics == all
|
||||
@all_stats = load_stats(*all)
|
||||
else
|
||||
load_stats(*metrics)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def stat(s)
|
||||
stats(s)[s]
|
||||
@stats[s]
|
||||
end
|
||||
|
||||
def load_stats(*metrics)
|
||||
read_pipelined = %w(processed failed scheduled_size retry_size dead_size) & metrics
|
||||
class Queues
|
||||
def lengths
|
||||
Sidekiq.redis do |conn|
|
||||
queues = conn.smembers('queues'.freeze)
|
||||
|
||||
loaded_stats = {}
|
||||
if read_pipelined.any?
|
||||
results = Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
read_pipelined.each do |key|
|
||||
case key
|
||||
when 'processed' then conn.get('stat:processed')
|
||||
when 'failed' then conn.get('stat:failed')
|
||||
when 'scheduled_size' then conn.zcard('schedule')
|
||||
when 'retry_size' then conn.zcard('retry')
|
||||
when 'dead_size' then conn.zcard('dead')
|
||||
end
|
||||
lengths = conn.pipelined do
|
||||
queues.each do |queue|
|
||||
conn.llen("queue:#{queue}")
|
||||
end
|
||||
end
|
||||
end
|
||||
read_pipelined.zip(results).each {|metric, v| loaded_stats[metric] = v.to_i }
|
||||
end
|
||||
|
||||
(metrics - read_pipelined).each do |metric|
|
||||
loaded_stats[metric] = public_send(metric)
|
||||
i = 0
|
||||
array_of_arrays = queues.inject({}) do |memo, queue|
|
||||
memo[queue] = lengths[i]
|
||||
i += 1
|
||||
memo
|
||||
end.sort_by { |_, size| size }
|
||||
|
||||
Hash[array_of_arrays.reverse]
|
||||
end
|
||||
end
|
||||
loaded_stats
|
||||
end
|
||||
|
||||
class History
|
||||
|
|
|
@ -210,33 +210,31 @@ module Sidekiq
|
|||
|
||||
get '/stats' do
|
||||
sidekiq_stats = Sidekiq::Stats.new
|
||||
sidekiq_stats.fetch_stats!
|
||||
queue = Sidekiq::Queue.new
|
||||
redis_stats = redis_info.select { |k, v| REDIS_KEYS.include? k }
|
||||
|
||||
content_type :json
|
||||
Sidekiq.dump_json(
|
||||
sidekiq: {
|
||||
processed: sidekiq_stats.processed,
|
||||
failed: sidekiq_stats.failed,
|
||||
busy: workers.size,
|
||||
processes: processes.size,
|
||||
enqueued: sidekiq_stats.enqueued,
|
||||
scheduled: sidekiq_stats.scheduled_size,
|
||||
retries: sidekiq_stats.retry_size,
|
||||
dead: sidekiq_stats.dead_size,
|
||||
default_latency: queue.latency
|
||||
processed: sidekiq_stats.processed,
|
||||
failed: sidekiq_stats.failed,
|
||||
busy: sidekiq_stats.workers_size,
|
||||
processes: sidekiq_stats.processes_size,
|
||||
enqueued: sidekiq_stats.enqueued,
|
||||
scheduled: sidekiq_stats.scheduled_size,
|
||||
retries: sidekiq_stats.retry_size,
|
||||
dead: sidekiq_stats.dead_size,
|
||||
default_latency: sidekiq_stats.default_queue_latency
|
||||
},
|
||||
redis: redis_stats
|
||||
)
|
||||
end
|
||||
|
||||
get '/stats/queues' do
|
||||
stats = Sidekiq::Stats.new
|
||||
queue_stats = Sidekiq::Stats::Queues.new
|
||||
|
||||
content_type :json
|
||||
Sidekiq.dump_json(
|
||||
stats.queues
|
||||
queue_stats.lengths
|
||||
)
|
||||
end
|
||||
|
||||
|
|
|
@ -77,8 +77,8 @@ class TestApi < Sidekiq::Test
|
|||
|
||||
describe "queues" do
|
||||
it "is initially empty" do
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal 0, s.queues.size
|
||||
s = Sidekiq::Stats::Queues.new
|
||||
assert_equal 0, s.lengths.size
|
||||
end
|
||||
|
||||
it "returns a hash of queue and size in order" do
|
||||
|
@ -90,9 +90,9 @@ class TestApi < Sidekiq::Test
|
|||
conn.sadd 'queues', 'bar'
|
||||
end
|
||||
|
||||
s = Sidekiq::Stats.new
|
||||
assert_equal ({ "foo" => 1, "bar" => 3 }), s.queues
|
||||
assert_equal "bar", s.queues.first.first
|
||||
s = Sidekiq::Stats::Queues.new
|
||||
assert_equal ({ "foo" => 1, "bar" => 3 }), s.lengths
|
||||
assert_equal "bar", s.lengths.first.first
|
||||
end
|
||||
end
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue