diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 0da92bcf..12020767 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -10,27 +10,34 @@ module Sidekiq include Celluloid include Sidekiq::Util + TIMEOUT = 1 + def initialize(mgr, queues) @cmd = queues.map { |q| "queue:#{q}" } @mgr = mgr # One second timeout for blpop. - # We can't block forever or else we can't shut down - # properly. - @cmd << 1 + @cmd << TIMEOUT end + # Fetching is straightforward: the Manager makes a fetch + # request for each idle processor when Sidekiq starts and + # then issues a new fetch request every time a Processor + # finishes a message. + # + # Because we have to shut down cleanly, we can't block + # forever and we can't loop forever. Instead we reschedule + # a new fetch if the current fetch turned up nothing. def fetch watchdog('Fetcher#fetch died') do msg = nil Sidekiq.redis do |conn| - a = Time.now (queue, msg) = conn.blpop *@cmd - p [Time.now - a, queue, msg] + puts 'All quiet' unless msg @mgr.assign! msg, queue.gsub(/\Aqueue:/, '') if msg end - after(0) { fetch } unless msg + after(0) { fetch } if !msg end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 549712be..76ff29f3 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -29,44 +29,46 @@ module Sidekiq @done = false @busy = [] + @fetcher = Fetcher.new(current_actor, options[:queues]) @ready = @count.times.map { Processor.new_link(current_actor) } - @fetcher = Sidekiq::Fetcher.new(current_actor, options[:queues]) end def stop(options={}) - shutdown = options[:shutdown] - timeout = options[:timeout] + watchdog('Manager#stop died') do + shutdown = options[:shutdown] + timeout = options[:timeout] - @done = true + @done = true - @fetcher.terminate if @fetcher.alive? - @ready.each { |x| x.terminate if x.alive? } - @ready.clear + @fetcher.terminate if @fetcher.alive? + @ready.each { |x| x.terminate if x.alive? } + @ready.clear - redis do |conn| - workers = conn.smembers('workers') - workers.each do |name| - conn.srem('workers', name) if name =~ /:#{process_id}-/ - end - end - - if shutdown - if @busy.empty? - # after(0) needed to avoid deadlock in Celluoid after USR1 + TERM - return after(0) { signal(:shutdown) } - else - logger.info { "Pausing #{timeout} seconds to allow workers to finish..." } + redis do |conn| + workers = conn.smembers('workers') + workers.each do |name| + conn.srem('workers', name) if name =~ /:#{process_id}-/ + end end - after(timeout) do - @busy.each { |x| x.terminate if x.alive? } - signal(:shutdown) + if shutdown + if @busy.empty? + # after(0) needed to avoid deadlock in Celluoid after USR1 + TERM + return after(0) { signal(:shutdown) } + else + logger.info { "Pausing #{timeout} seconds to allow workers to finish..." } + end + + after(timeout) do + @busy.each { |x| x.terminate if x.alive? } + signal(:shutdown) + end end end end def start - dispatch + @ready.each { dispatch } end def when_done(&blk) @@ -105,7 +107,6 @@ module Sidekiq processor = @ready.pop @busy << processor processor.process!(MultiJson.decode(msg), queue) - dispatch end end @@ -116,7 +117,7 @@ module Sidekiq # This is a safety check to ensure we haven't leaked # processors somehow. raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty? - return if @ready.empty? + raise "No ready processor!?" if @ready.empty? @fetcher.fetch! end