mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Simpler fetch logic/design
This commit is contained in:
parent
5fdc297e10
commit
4d14c2f7fa
2 changed files with 40 additions and 32 deletions
|
@ -10,27 +10,34 @@ module Sidekiq
|
|||
include Celluloid
|
||||
include Sidekiq::Util
|
||||
|
||||
TIMEOUT = 1
|
||||
|
||||
def initialize(mgr, queues)
|
||||
@cmd = queues.map { |q| "queue:#{q}" }
|
||||
@mgr = mgr
|
||||
|
||||
# One second timeout for blpop.
|
||||
# We can't block forever or else we can't shut down
|
||||
# properly.
|
||||
@cmd << 1
|
||||
@cmd << TIMEOUT
|
||||
end
|
||||
|
||||
# Fetching is straightforward: the Manager makes a fetch
|
||||
# request for each idle processor when Sidekiq starts and
|
||||
# then issues a new fetch request every time a Processor
|
||||
# finishes a message.
|
||||
#
|
||||
# Because we have to shut down cleanly, we can't block
|
||||
# forever and we can't loop forever. Instead we reschedule
|
||||
# a new fetch if the current fetch turned up nothing.
|
||||
def fetch
|
||||
watchdog('Fetcher#fetch died') do
|
||||
|
||||
msg = nil
|
||||
Sidekiq.redis do |conn|
|
||||
a = Time.now
|
||||
(queue, msg) = conn.blpop *@cmd
|
||||
p [Time.now - a, queue, msg]
|
||||
puts 'All quiet' unless msg
|
||||
@mgr.assign! msg, queue.gsub(/\Aqueue:/, '') if msg
|
||||
end
|
||||
after(0) { fetch } unless msg
|
||||
after(0) { fetch } if !msg
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -29,44 +29,46 @@ module Sidekiq
|
|||
|
||||
@done = false
|
||||
@busy = []
|
||||
@fetcher = Fetcher.new(current_actor, options[:queues])
|
||||
@ready = @count.times.map { Processor.new_link(current_actor) }
|
||||
@fetcher = Sidekiq::Fetcher.new(current_actor, options[:queues])
|
||||
end
|
||||
|
||||
def stop(options={})
|
||||
shutdown = options[:shutdown]
|
||||
timeout = options[:timeout]
|
||||
watchdog('Manager#stop died') do
|
||||
shutdown = options[:shutdown]
|
||||
timeout = options[:timeout]
|
||||
|
||||
@done = true
|
||||
@done = true
|
||||
|
||||
@fetcher.terminate if @fetcher.alive?
|
||||
@ready.each { |x| x.terminate if x.alive? }
|
||||
@ready.clear
|
||||
@fetcher.terminate if @fetcher.alive?
|
||||
@ready.each { |x| x.terminate if x.alive? }
|
||||
@ready.clear
|
||||
|
||||
redis do |conn|
|
||||
workers = conn.smembers('workers')
|
||||
workers.each do |name|
|
||||
conn.srem('workers', name) if name =~ /:#{process_id}-/
|
||||
end
|
||||
end
|
||||
|
||||
if shutdown
|
||||
if @busy.empty?
|
||||
# after(0) needed to avoid deadlock in Celluoid after USR1 + TERM
|
||||
return after(0) { signal(:shutdown) }
|
||||
else
|
||||
logger.info { "Pausing #{timeout} seconds to allow workers to finish..." }
|
||||
redis do |conn|
|
||||
workers = conn.smembers('workers')
|
||||
workers.each do |name|
|
||||
conn.srem('workers', name) if name =~ /:#{process_id}-/
|
||||
end
|
||||
end
|
||||
|
||||
after(timeout) do
|
||||
@busy.each { |x| x.terminate if x.alive? }
|
||||
signal(:shutdown)
|
||||
if shutdown
|
||||
if @busy.empty?
|
||||
# after(0) needed to avoid deadlock in Celluoid after USR1 + TERM
|
||||
return after(0) { signal(:shutdown) }
|
||||
else
|
||||
logger.info { "Pausing #{timeout} seconds to allow workers to finish..." }
|
||||
end
|
||||
|
||||
after(timeout) do
|
||||
@busy.each { |x| x.terminate if x.alive? }
|
||||
signal(:shutdown)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def start
|
||||
dispatch
|
||||
@ready.each { dispatch }
|
||||
end
|
||||
|
||||
def when_done(&blk)
|
||||
|
@ -105,7 +107,6 @@ module Sidekiq
|
|||
processor = @ready.pop
|
||||
@busy << processor
|
||||
processor.process!(MultiJson.decode(msg), queue)
|
||||
dispatch
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -116,7 +117,7 @@ module Sidekiq
|
|||
# This is a safety check to ensure we haven't leaked
|
||||
# processors somehow.
|
||||
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
|
||||
return if @ready.empty?
|
||||
raise "No ready processor!?" if @ready.empty?
|
||||
|
||||
@fetcher.fetch!
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue