mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Use bulk_requeue instead of terminate.
This commit is contained in:
parent
693df6afbd
commit
381cca1df8
3 changed files with 20 additions and 20 deletions
|
@ -47,8 +47,8 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def clean_up
|
||||
@strategy.clean_up
|
||||
def bulk_requeue(in_progress)
|
||||
@strategy.bulk_requeue(in_progress.values)
|
||||
end
|
||||
|
||||
def handle_fetch_exception(ex)
|
||||
|
@ -94,10 +94,7 @@ module Sidekiq
|
|||
UnitOfWork.new(*work) if work
|
||||
end
|
||||
|
||||
def clean_up
|
||||
end
|
||||
|
||||
def self.bulk_requeue(inprogress)
|
||||
def bulk_requeue(inprogress)
|
||||
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
|
||||
jobs_to_requeue = {}
|
||||
inprogress.each do |unit_of_work|
|
||||
|
|
|
@ -57,8 +57,7 @@ module Sidekiq
|
|||
|
||||
def clean_up_for_graceful_shutdown
|
||||
if @busy.empty?
|
||||
@fetcher.clean_up
|
||||
after(0) { signal(:shutdown) }
|
||||
terminate
|
||||
return true
|
||||
end
|
||||
|
||||
|
@ -82,7 +81,7 @@ module Sidekiq
|
|||
@busy.delete(processor)
|
||||
if stopped?
|
||||
processor.terminate if processor.alive?
|
||||
signal(:shutdown) if @busy.empty?
|
||||
terminate if @busy.empty?
|
||||
else
|
||||
@ready << processor if processor.alive?
|
||||
end
|
||||
|
@ -102,7 +101,7 @@ module Sidekiq
|
|||
@ready << p
|
||||
dispatch
|
||||
else
|
||||
signal(:shutdown) if @busy.empty?
|
||||
terminate if @busy.empty?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -164,14 +163,6 @@ module Sidekiq
|
|||
# They must die but their messages shall live on.
|
||||
logger.info("Still waiting for #{@busy.size} busy workers")
|
||||
|
||||
# Re-enqueue terminated jobs
|
||||
# NOTE: You may notice that we may push a job back to redis before
|
||||
# the worker thread is terminated. This is ok because Sidekiq's
|
||||
# contract says that jobs are run AT LEAST once. Process termination
|
||||
# is delayed until we're certain the jobs are back in Redis because
|
||||
# it is worse to lose a job than to run it twice.
|
||||
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values)
|
||||
|
||||
logger.warn { "Terminating #{@busy.size} busy worker threads" }
|
||||
@busy.each do |processor|
|
||||
if processor.alive? && t = @threads.delete(processor.object_id)
|
||||
|
@ -179,7 +170,7 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
after(0) { signal(:shutdown) }
|
||||
terminate
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -197,5 +188,17 @@ module Sidekiq
|
|||
def stopped?
|
||||
@done
|
||||
end
|
||||
|
||||
def terminate
|
||||
# Re-enqueue terminated jobs
|
||||
# NOTE: You may notice that we may push a job back to redis before
|
||||
# the worker thread is terminated. This is ok because Sidekiq's
|
||||
# contract says that jobs are run AT LEAST once. Process termination
|
||||
# is delayed until we're certain the jobs are back in Redis because
|
||||
# it is worse to lose a job than to run it twice.
|
||||
@fetcher.bulk_requeue(@in_progress.values)
|
||||
@in_progress.clear
|
||||
after(0) { signal(:shutdown) }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -36,7 +36,7 @@ class TestFetcher < Sidekiq::Test
|
|||
assert_equal 0, q1.size
|
||||
assert_equal 0, q2.size
|
||||
uow = Sidekiq::BasicFetch::UnitOfWork
|
||||
Sidekiq::BasicFetch.bulk_requeue([uow.new('fuzzy:queue:foo', 'bob'), uow.new('fuzzy:queue:foo', 'bar'), uow.new('fuzzy:queue:bar', 'widget')])
|
||||
Sidekiq::BasicFetch.new({:queues => []}).bulk_requeue([uow.new('fuzzy:queue:foo', 'bob'), uow.new('fuzzy:queue:foo', 'bar'), uow.new('fuzzy:queue:bar', 'widget')])
|
||||
assert_equal 2, q1.size
|
||||
assert_equal 1, q2.size
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue