mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rewrite ProcessSet#each to use pipelining to avoid lots of roundtrips in large Sidekiq installations.
This commit is contained in:
parent
50c4c35bde
commit
e01675980d
1 changed files with 12 additions and 3 deletions
|
@ -447,13 +447,22 @@ module Sidekiq
|
|||
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
|
||||
|
||||
to_prune = []
|
||||
sorted = procs.sort
|
||||
Sidekiq.redis do |conn|
|
||||
procs.sort.each do |key|
|
||||
info, busy, at_s = conn.hmget(key, 'info', 'busy', 'beat')
|
||||
# We're making a tradeoff here between consuming more memory instead of
|
||||
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
|
||||
# you'll be happier this way
|
||||
result = conn.pipelined do
|
||||
sorted.each do |key|
|
||||
conn.hmget(key, 'info', 'busy', 'beat')
|
||||
end
|
||||
end
|
||||
|
||||
result.each_with_index do |(info, busy, at_s), i|
|
||||
# the hash named key has an expiry of 60 seconds.
|
||||
# if it's not found, that means the process has not reported
|
||||
# in to Redis and probably died.
|
||||
(to_prune << key; next) if info.nil?
|
||||
(to_prune << sorted[i]; next) if info.nil?
|
||||
hash = Sidekiq.load_json(info)
|
||||
yield hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue