From 3ca7499e844d9e9b5398b138cc19fbf453f1ad62 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 6 Oct 2015 14:45:10 -0700 Subject: [PATCH] Manual testing fixes --- lib/sidekiq/fetch.rb | 2 +- lib/sidekiq/launcher.rb | 9 ++++--- lib/sidekiq/manager.rb | 54 ++++++++++++++++++++-------------------- lib/sidekiq/processor.rb | 1 - lib/sidekiq/scheduled.rb | 13 ++++++++-- 5 files changed, 45 insertions(+), 34 deletions(-) diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 1659cc39..26b11324 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -61,7 +61,7 @@ module Sidekiq while !@done get_one end - Sidekiq.logger.debug("Fetcher shutting down") + Sidekiq.logger.info("Fetcher exiting...") end end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index 42c7c7dc..deceda87 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -53,7 +53,7 @@ module Sidekiq # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. Sidekiq::Fetcher.strategy.bulk_requeue([], @options) - stop_heartbeat + clear_heartbeat end private unless $TESTING @@ -120,10 +120,13 @@ module Sidekiq heartbeat(key, data, json) sleep 5 end + Sidekiq.logger.info("Heartbeat stopping...") end - def stop_heartbeat - @done = true + def clear_heartbeat + # Remove record from Redis since we are shutting down. + # Note we don't stop the heartbeat thread; if the process + # doesn't actually exit, it'll reappear in the Web UI. Sidekiq.redis do |conn| conn.pipelined do conn.srem('processes', identity) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 9ef844bf..79bb4e3e 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -45,7 +45,10 @@ module Sidekiq end def start - @ready.each { |x| x.start; dispatch } + @ready.each do |x| + x.start + dispatch + end end def quiet @@ -63,16 +66,16 @@ module Sidekiq def stop(deadline) quiet - return shutdown if @in_progress.empty? + return if @in_progress.empty? logger.info { "Pausing to allow workers to finish..." } remaining = deadline - Time.now while remaining > 0.5 - return shutdown if @in_progress.empty? + return if @in_progress.empty? sleep 0.5 remaining = deadline - Time.now end - return shutdown if @in_progress.empty? + return if @in_progress.empty? hard_shutdown end @@ -96,7 +99,9 @@ module Sidekiq if @done #shutdown if @in_progress.empty? else - @ready << Processor.new(self) + p = Processor.new(self) + p.start + @ready << p end end dispatch @@ -127,13 +132,25 @@ module Sidekiq def hard_shutdown # We've reached the timeout and we still have busy workers. - # They must die but their messages shall live on. - logger.warn { "Terminating #{@in_progress.size} busy worker threads" } - logger.warn { "Work still in progress #{@in_progress.values.inspect}" } + # They must die but their jobs shall live on. + cleanup = nil + @plock.synchronize do + cleanup = @in_progress.dup + end - requeue + if cleanup.size > 0 + logger.warn { "Terminating #{cleanup.size} busy worker threads" } + logger.warn { "Work still in progress #{cleanup.values.inspect}" } + # Re-enqueue unfinished jobs + # NOTE: You may notice that we may push a job back to redis before + # the worker thread is terminated. This is ok because Sidekiq's + # contract says that jobs are run AT LEAST once. Process termination + # is delayed until we're certain the jobs are back in Redis because + # it is worse to lose a job than to run it twice. + Sidekiq::Fetcher.strategy.bulk_requeue(cleanup.values, @options) + end - @in_progress.each do |processor, _| + cleanup.each do |processor, _| processor.kill end end @@ -147,22 +164,5 @@ module Sidekiq @fetcher.request_job end - def shutdown - requeue - end - - def requeue - # Re-enqueue unfinished jobs - # NOTE: You may notice that we may push a job back to redis before - # the worker thread is terminated. This is ok because Sidekiq's - # contract says that jobs are run AT LEAST once. Process termination - # is delayed until we're certain the jobs are back in Redis because - # it is worse to lose a job than to run it twice. - jobs = nil - @plock.synchronize do - jobs = @in_progress.values - end - Sidekiq::Fetcher.strategy.bulk_requeue(jobs, @options) if jobs.size > 0 - end end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 34f088ce..bf0a5fe0 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -70,7 +70,6 @@ module Sidekiq process(job) if job end rescue Exception => ex - Sidekiq.logger.warn(ex.message) @mgr.processor_died(self, ex) end end diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 1e7f3c57..5a578925 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -62,12 +62,13 @@ module Sidekiq def start @thread ||= safe_thread("scheduler") do - @queue.pop(initial_wait) + initial_wait while !@done enqueue - @queue.pop(random_poll_interval) + wait end + Sidekiq.logger.info("Scheduler exiting...") end end @@ -84,6 +85,11 @@ module Sidekiq private + def wait + @queue.pop(random_poll_interval) + rescue Timeout::Error + end + # Calculates a random interval that is ±50% the desired average. def random_poll_interval poll_interval_average * rand + poll_interval_average.to_f / 2 @@ -122,6 +128,9 @@ module Sidekiq total = 0 total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average] total += (5 * rand) + + @queue.pop(total) + rescue Timeout::Error end end