Get scheduled jobs in batches before pushing into specific queues

This commit is contained in:
fatkodima 2019-09-17 21:31:25 +03:00 committed by Mike Perham
parent 537a8e7ce8
commit 9b75467b33
2 changed files with 21 additions and 13 deletions

View File

@ -5,6 +5,13 @@
HEAD
---------
- Get scheduled jobs in batches before pushing into specific queues.
This will decrease enqueueing time of scheduled jobs by a third. [fatkodima, #4273]
```
ScheduledSet with 10,000 jobs
Before: 56.6 seconds
After: 39.2 seconds
```
- Compress error backtraces before pushing into Redis, if you are
storing error backtraces, this will halve the size of your RetrySet
in Redis [fatkodima, #4272]
@ -15,7 +22,7 @@ After: 129 MB
```
- Support display of ActiveJob 6.0 payloads in the Web UI [#4263]
- Add `SortedSet#scan` for pattern based scanning. For large sets this API will be **MUCH** faster
than standard iteration using each.
than standard iteration using each. [fatkodima, #4262]
```ruby
Sidekiq::DeadSet.new.scan("UnreliableApi") do |job|
job.retry

View File

@ -14,18 +14,19 @@ module Sidekiq
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 1]).first)
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
# Get next items in the queue with scores (time to execute) <= now.
until (jobs = conn.zrangebyscore(sorted_set, "-inf", now, limit: [0, 100])).empty?
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
jobs.each do |job|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end