1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Manual testing fixes

This commit is contained in:
Mike Perham 2015-10-06 14:45:10 -07:00
parent 4781be562f
commit 3ca7499e84
5 changed files with 45 additions and 34 deletions

View file

@ -61,7 +61,7 @@ module Sidekiq
while !@done while !@done
get_one get_one
end end
Sidekiq.logger.debug("Fetcher shutting down") Sidekiq.logger.info("Fetcher exiting...")
end end
end end

View file

@ -53,7 +53,7 @@ module Sidekiq
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro. # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
Sidekiq::Fetcher.strategy.bulk_requeue([], @options) Sidekiq::Fetcher.strategy.bulk_requeue([], @options)
stop_heartbeat clear_heartbeat
end end
private unless $TESTING private unless $TESTING
@ -120,10 +120,13 @@ module Sidekiq
heartbeat(key, data, json) heartbeat(key, data, json)
sleep 5 sleep 5
end end
Sidekiq.logger.info("Heartbeat stopping...")
end end
def stop_heartbeat def clear_heartbeat
@done = true # Remove record from Redis since we are shutting down.
# Note we don't stop the heartbeat thread; if the process
# doesn't actually exit, it'll reappear in the Web UI.
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.pipelined do conn.pipelined do
conn.srem('processes', identity) conn.srem('processes', identity)

View file

@ -45,7 +45,10 @@ module Sidekiq
end end
def start def start
@ready.each { |x| x.start; dispatch } @ready.each do |x|
x.start
dispatch
end
end end
def quiet def quiet
@ -63,16 +66,16 @@ module Sidekiq
def stop(deadline) def stop(deadline)
quiet quiet
return shutdown if @in_progress.empty? return if @in_progress.empty?
logger.info { "Pausing to allow workers to finish..." } logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now remaining = deadline - Time.now
while remaining > 0.5 while remaining > 0.5
return shutdown if @in_progress.empty? return if @in_progress.empty?
sleep 0.5 sleep 0.5
remaining = deadline - Time.now remaining = deadline - Time.now
end end
return shutdown if @in_progress.empty? return if @in_progress.empty?
hard_shutdown hard_shutdown
end end
@ -96,7 +99,9 @@ module Sidekiq
if @done if @done
#shutdown if @in_progress.empty? #shutdown if @in_progress.empty?
else else
@ready << Processor.new(self) p = Processor.new(self)
p.start
@ready << p
end end
end end
dispatch dispatch
@ -127,13 +132,25 @@ module Sidekiq
def hard_shutdown def hard_shutdown
# We've reached the timeout and we still have busy workers. # We've reached the timeout and we still have busy workers.
# They must die but their messages shall live on. # They must die but their jobs shall live on.
logger.warn { "Terminating #{@in_progress.size} busy worker threads" } cleanup = nil
logger.warn { "Work still in progress #{@in_progress.values.inspect}" } @plock.synchronize do
cleanup = @in_progress.dup
end
requeue if cleanup.size > 0
logger.warn { "Terminating #{cleanup.size} busy worker threads" }
logger.warn { "Work still in progress #{cleanup.values.inspect}" }
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
Sidekiq::Fetcher.strategy.bulk_requeue(cleanup.values, @options)
end
@in_progress.each do |processor, _| cleanup.each do |processor, _|
processor.kill processor.kill
end end
end end
@ -147,22 +164,5 @@ module Sidekiq
@fetcher.request_job @fetcher.request_job
end end
def shutdown
requeue
end
def requeue
# Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice.
jobs = nil
@plock.synchronize do
jobs = @in_progress.values
end
Sidekiq::Fetcher.strategy.bulk_requeue(jobs, @options) if jobs.size > 0
end
end end
end end

View file

@ -70,7 +70,6 @@ module Sidekiq
process(job) if job process(job) if job
end end
rescue Exception => ex rescue Exception => ex
Sidekiq.logger.warn(ex.message)
@mgr.processor_died(self, ex) @mgr.processor_died(self, ex)
end end
end end

View file

@ -62,12 +62,13 @@ module Sidekiq
def start def start
@thread ||= safe_thread("scheduler") do @thread ||= safe_thread("scheduler") do
@queue.pop(initial_wait) initial_wait
while !@done while !@done
enqueue enqueue
@queue.pop(random_poll_interval) wait
end end
Sidekiq.logger.info("Scheduler exiting...")
end end
end end
@ -84,6 +85,11 @@ module Sidekiq
private private
def wait
@queue.pop(random_poll_interval)
rescue Timeout::Error
end
# Calculates a random interval that is ±50% the desired average. # Calculates a random interval that is ±50% the desired average.
def random_poll_interval def random_poll_interval
poll_interval_average * rand + poll_interval_average.to_f / 2 poll_interval_average * rand + poll_interval_average.to_f / 2
@ -122,6 +128,9 @@ module Sidekiq
total = 0 total = 0
total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average] total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average]
total += (5 * rand) total += (5 * rand)
@queue.pop(total)
rescue Timeout::Error
end end
end end