Speedup iterating over `WorkSet` (#5559)

This commit is contained in:
fatkodima 2022-10-04 00:51:35 +03:00 committed by GitHub
parent d424e45328
commit 7037533e8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 19 additions and 12 deletions

View File

@ -1103,24 +1103,31 @@ module Sidekiq
def each(&block)
results = []
procs = nil
all_works = nil
Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a
procs.sort.each do |key|
valid, workers = conn.pipelined { |pipeline|
pipeline.exists?(key)
procs = conn.sscan_each("processes").to_a.sort
all_works = conn.pipelined do |pipeline|
procs.each do |key|
pipeline.hgetall("#{key}:work")
}
next unless valid
workers.each_pair do |tid, json|
hsh = Sidekiq.load_json(json)
p = hsh["payload"]
# avoid breaking API, this is a side effect of the JSON optimization in #4316
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
results << [key, tid, hsh]
end
end
end
procs.zip(all_works).each do |key, workers|
workers.each_pair do |tid, json|
next if json.empty?
hsh = Sidekiq.load_json(json)
p = hsh["payload"]
# avoid breaking API, this is a side effect of the JSON optimization in #4316
hsh["payload"] = Sidekiq.load_json(p) if p.is_a?(String)
results << [key, tid, hsh]
end
end
results.sort_by { |(_, _, hsh)| hsh["run_at"] }.each(&block)
end