diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index ba6e3f50..031253f6 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -23,17 +23,12 @@ module Sidekiq verbose options.inspect @count = options[:processor_count] || 25 @queues = options[:queues] - @queue_idx = 0 - @queues_size = @queues.size @redis = redis @done_callback = nil @done = false @busy = [] - @ready = [] - @count.times do - @ready << Processor.new_link(current_actor) - end + @ready = @count.times.map { Processor.new_link(current_actor) } end def stop @@ -83,13 +78,12 @@ module Sidekiq private - def find_work(queue_idx) - current_queue = @queues[queue_idx] - msg = @redis.lpop("queue:#{current_queue}") + def find_work(queue) + msg = @redis.lpop("queue:#{queue}") if msg processor = @ready.pop @busy << processor - processor.process!(MultiJson.decode(msg), current_queue) + processor.process!(MultiJson.decode(msg), queue) end !!msg end @@ -98,26 +92,14 @@ module Sidekiq watchdog("Fatal error in sidekiq, dispatch loop died") do return if stopped? - # Our dispatch loop - # Loop through the queues, looking for a message in each. - queue_idx = 0 - found = false + # Dispatch loop loop do - # return so that we don't dispatch again until processor_done - break verbose('no processors') if @ready.size == 0 - - found ||= find_work(queue_idx) - queue_idx += 1 - - # if we find no messages in any of the queues, we can break - # out of the loop. Otherwise we loop again. - lastq = (queue_idx % @queues.size == 0) - if lastq && !found - verbose('nothing to process'); break - elsif lastq - queue_idx = 0 - found = false + break verbose('no processors') if @ready.empty? + found = false + @ready.size.times do + found ||= find_work(@queues.sample) end + break verbose('nothing to process') unless found end # This is the polling loop that ensures we check Redis every