mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge branch 'master' of github.com:mperham/sidekiq
This commit is contained in:
commit
e094800c9e
1 changed files with 10 additions and 28 deletions
|
@ -23,17 +23,12 @@ module Sidekiq
|
||||||
verbose options.inspect
|
verbose options.inspect
|
||||||
@count = options[:processor_count] || 25
|
@count = options[:processor_count] || 25
|
||||||
@queues = options[:queues]
|
@queues = options[:queues]
|
||||||
@queue_idx = 0
|
|
||||||
@queues_size = @queues.size
|
|
||||||
@redis = redis
|
@redis = redis
|
||||||
@done_callback = nil
|
@done_callback = nil
|
||||||
|
|
||||||
@done = false
|
@done = false
|
||||||
@busy = []
|
@busy = []
|
||||||
@ready = []
|
@ready = @count.times.map { Processor.new_link(current_actor) }
|
||||||
@count.times do
|
|
||||||
@ready << Processor.new_link(current_actor)
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def stop
|
def stop
|
||||||
|
@ -83,13 +78,12 @@ module Sidekiq
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def find_work(queue_idx)
|
def find_work(queue)
|
||||||
current_queue = @queues[queue_idx]
|
msg = @redis.lpop("queue:#{queue}")
|
||||||
msg = @redis.lpop("queue:#{current_queue}")
|
|
||||||
if msg
|
if msg
|
||||||
processor = @ready.pop
|
processor = @ready.pop
|
||||||
@busy << processor
|
@busy << processor
|
||||||
processor.process!(MultiJson.decode(msg), current_queue)
|
processor.process!(MultiJson.decode(msg), queue)
|
||||||
end
|
end
|
||||||
!!msg
|
!!msg
|
||||||
end
|
end
|
||||||
|
@ -98,26 +92,14 @@ module Sidekiq
|
||||||
watchdog("Fatal error in sidekiq, dispatch loop died") do
|
watchdog("Fatal error in sidekiq, dispatch loop died") do
|
||||||
return if stopped?
|
return if stopped?
|
||||||
|
|
||||||
# Our dispatch loop
|
# Dispatch loop
|
||||||
# Loop through the queues, looking for a message in each.
|
|
||||||
queue_idx = 0
|
|
||||||
found = false
|
|
||||||
loop do
|
loop do
|
||||||
# return so that we don't dispatch again until processor_done
|
break verbose('no processors') if @ready.empty?
|
||||||
break verbose('no processors') if @ready.size == 0
|
found = false
|
||||||
|
@ready.size.times do
|
||||||
found ||= find_work(queue_idx)
|
found ||= find_work(@queues.sample)
|
||||||
queue_idx += 1
|
|
||||||
|
|
||||||
# if we find no messages in any of the queues, we can break
|
|
||||||
# out of the loop. Otherwise we loop again.
|
|
||||||
lastq = (queue_idx % @queues.size == 0)
|
|
||||||
if lastq && !found
|
|
||||||
verbose('nothing to process'); break
|
|
||||||
elsif lastq
|
|
||||||
queue_idx = 0
|
|
||||||
found = false
|
|
||||||
end
|
end
|
||||||
|
break verbose('nothing to process') unless found
|
||||||
end
|
end
|
||||||
|
|
||||||
# This is the polling loop that ensures we check Redis every
|
# This is the polling loop that ensures we check Redis every
|
||||||
|
|
Loading…
Add table
Reference in a new issue