1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Continue scheduling jobs when backlog jobs exist (#5056)

When there are scheduled jobs that are due, the scheduler thread will
keep going instead of waiting for the next poll.

Co-authored-by: Heinrich Lee Yu <heinrich@gitlab.com>
This commit is contained in:
Heinrich Lee Yu 2021-11-11 08:18:50 +08:00 committed by GitHub
parent acaee69ba4
commit a486ffa555
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 33 additions and 2 deletions

View file

@ -19,10 +19,11 @@ module Sidekiq
LUA
def initialize
@done = false
@lua_zpopbyscore_sha = nil
end
def enqueue_jobs(now = Time.now.to_f.to_s, sorted_sets = SETS)
def enqueue_jobs(sorted_sets = SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
@ -31,7 +32,7 @@ module Sidekiq
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while (job = zpopbyscore(conn, keys: [sorted_set], argv: [now]))
while !@done && (job = zpopbyscore(conn, keys: [sorted_set], argv: [Time.now.to_f.to_s]))
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
@ -39,6 +40,10 @@ module Sidekiq
end
end
def terminate
@done = true
end
private
def zpopbyscore(conn, keys: nil, argv: nil)
@ -74,6 +79,8 @@ module Sidekiq
# Shut down this instance, will pause until the thread is dead.
def terminate
@done = true
@enq.terminate if @enq.respond_to?(:terminate)
if @thread
t = @thread
@thread = nil

View file

@ -79,6 +79,30 @@ describe Sidekiq::Scheduled do
end
end
it 'should not enqueue jobs when terminate has been called' do
created_time = Time.new(2013, 2, 3)
enqueued_time = Time.new(2013, 2, 4)
Time.stub(:now, created_time) do
@retry.schedule (enqueued_time - 60).to_f, @error_1.merge!('created_at' => created_time.to_f)
@scheduled.schedule (enqueued_time - 60).to_f, @future_1.merge!('created_at' => created_time.to_f)
end
Time.stub(:now, enqueued_time) do
@poller.terminate
@poller.enqueue
Sidekiq.redis do |conn|
%w(queue:queue_1 queue:queue_4).each do |queue_name|
assert_equal 0, conn.llen(queue_name)
end
end
assert_equal 1, @retry.size
assert_equal 1, @scheduled.size
end
end
def with_sidekiq_option(name, value)
_original, Sidekiq.options[name] = Sidekiq.options[name], value
begin