diff --git a/Changes.md b/Changes.md index f16a0aa6..36e21656 100644 --- a/Changes.md +++ b/Changes.md @@ -1,6 +1,7 @@ -master (unreleased) +HEAD ----------- +- Messages for terminated workers are now automatically requeued (mperham) - Add support for Exceptional error reporting (bensie) 0.7.0 diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index cbebfe2e..19cf2859 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -49,7 +49,13 @@ module Sidekiq logger.info("Pausing 5 seconds to allow workers to finish...") after(5) do @busy.each(&:terminate) - #@busy.each(&:requeue) + redis.with_connection do |conn| + conn.multi do + @busy.each do |busy| + conn.lpush("queue:#{busy.queue}", busy.msg) + end + end + end signal(:shutdown) end end @@ -66,6 +72,7 @@ module Sidekiq watchdog('sidekiq processor_done crashed!') do @done_callback.call(processor) if @done_callback @busy.delete(processor) + processor.msg = processor.queue = nil if stopped? processor.terminate if processor.alive? else @@ -91,6 +98,8 @@ module Sidekiq if msg processor = @ready.pop @busy << processor + processor.msg = msg + processor.queue = queue processor.process!(MultiJson.decode(msg), queue) end !!msg diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 4657ed06..4d0ebebe 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -23,6 +23,8 @@ module Sidekiq end end + attr_accessor :msg, :queue + def initialize(boss) @boss = boss redis.sadd('workers', self)