1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Merge pull request #1588 from mperham/feature/pipelines

Feature/pipelines
This commit is contained in:
Mike Perham 2014-03-22 12:26:19 -07:00
commit dd30e18628

View file

@ -15,8 +15,13 @@ module Sidekiq
all = %w(failed processed)
stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
mset_args = []
stats.each do |stat|
mset_args << "stat:#{stat}"
mset_args << 0
end
Sidekiq.redis do |conn|
stats.each { |stat| conn.set("stat:#{stat}", 0) }
conn.mset(*mset_args)
end
end
@ -24,8 +29,16 @@ module Sidekiq
Sidekiq.redis do |conn|
queues = conn.smembers('queues')
lengths = conn.pipelined do
queues.each do |queue|
conn.llen("queue:#{queue}")
end
end
i = 0
array_of_arrays = queues.inject({}) do |memo, queue|
memo[queue] = conn.llen("queue:#{queue}")
memo[queue] = lengths[i]
i += 1
memo
end.sort_by { |_, size| size }
@ -68,15 +81,19 @@ module Sidekiq
def date_stat_hash(stat)
i = 0
stat_hash = {}
keys = []
dates = []
while i < @days_previous
date = @start_date - i
keys << "stat:#{stat}:#{date}"
dates << date
i += 1
end
Sidekiq.redis do |conn|
while i < @days_previous
date = @start_date - i
value = conn.get("stat:#{stat}:#{date}")
stat_hash[date.to_s] = value ? value.to_i : 0
i += 1
conn.mget(keys).each_with_index do |value, i|
stat_hash[dates[i].to_s] = value ? value.to_i : 0
end
end
@ -447,13 +464,22 @@ module Sidekiq
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
to_prune = []
sorted = procs.sort
Sidekiq.redis do |conn|
procs.sort.each do |key|
info, busy, at_s = conn.hmget(key, 'info', 'busy', 'beat')
# We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
# you'll be happier this way
result = conn.pipelined do
sorted.each do |key|
conn.hmget(key, 'info', 'busy', 'beat')
end
end
result.each_with_index do |(info, busy, at_s), i|
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
(to_prune << key; next) if info.nil?
(to_prune << sorted[i]; next) if info.nil?
hash = Sidekiq.load_json(info)
yield hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)
end
@ -498,7 +524,7 @@ module Sidekiq
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
procs.sort.each do |key|
valid, workers = conn.multi do
valid, workers = conn.pipelined do
conn.exists(key)
conn.hgetall("#{key}:workers")
end
@ -521,7 +547,7 @@ module Sidekiq
procs = conn.smembers('processes')
return 0 if procs.empty?
conn.multi do
conn.pipelined do
procs.each do |key|
conn.hget(key, 'busy')
end