From cfae522adf8a62808bb6bda0ab47155408152f51 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 26 Mar 2013 22:56:49 -0700 Subject: [PATCH] Raise within any workers which haven't finished within the hard timeout, fixes #377 --- lib/sidekiq/cli.rb | 5 +++++ lib/sidekiq/manager.rb | 3 ++- lib/sidekiq/middleware/server/retry_jobs.rb | 2 ++ lib/sidekiq/processor.rb | 9 +++++++++ 4 files changed, 18 insertions(+), 1 deletion(-) diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index d4849b9f..6e2ff78b 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -9,6 +9,11 @@ require 'sidekiq' require 'sidekiq/util' module Sidekiq + # Used to raise in workers that have not finished within the + # hard timeout limit. This is needed to rollback db transactions, + # otherwise Ruby's Thread#kill will commit. See #377. + class Shutdown < RuntimeError; end + class CLI include Util include Singleton diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 378d9348..501054f9 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -138,7 +138,8 @@ module Sidekiq logger.debug { "Terminating worker threads" } @busy.each do |processor| - processor.terminate if processor.alive? + t = processor.bare_object.actual_work_thread + t.raise Shutdown if processor.alive? end after(0) { signal(:shutdown) } diff --git a/lib/sidekiq/middleware/server/retry_jobs.rb b/lib/sidekiq/middleware/server/retry_jobs.rb index e413caf1..af17c841 100644 --- a/lib/sidekiq/middleware/server/retry_jobs.rb +++ b/lib/sidekiq/middleware/server/retry_jobs.rb @@ -48,6 +48,8 @@ module Sidekiq def call(worker, msg, queue) yield + rescue Sidekiq::Shutdown + raise rescue Exception => e raise e unless msg['retry'] max_retry_attempts = retry_attempts_from(msg['retry'], DEFAULT_MAX_RETRY_ATTEMPTS) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index bd3f75a7..3febd082 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -26,6 +26,11 @@ module Sidekiq end end + # store the actual working thread so we + # can later kill if it necessary during + # hard shutdown. + attr_accessor :actual_work_thread + def initialize(boss) @boss = boss end @@ -34,6 +39,7 @@ module Sidekiq msgstr = work.message queue = work.queue_name defer do + @actual_work_thread = Thread.current begin msg = Sidekiq.load_json(msgstr) klass = msg['class'].constantize @@ -45,6 +51,9 @@ module Sidekiq worker.perform(*cloned(msg['args'])) end end + rescue Sidekiq::Shutdown + # Had to force kill this job because it didn't finish + # within the timeout. rescue Exception => ex handle_exception(ex, msg || { :message => msgstr }) raise