mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor fetch, X-treme termination!
This commit is contained in:
parent
4d14c2f7fa
commit
da01d927a6
2 changed files with 10 additions and 9 deletions
|
@ -30,15 +30,16 @@ module Sidekiq
|
||||||
# a new fetch if the current fetch turned up nothing.
|
# a new fetch if the current fetch turned up nothing.
|
||||||
def fetch
|
def fetch
|
||||||
watchdog('Fetcher#fetch died') do
|
watchdog('Fetcher#fetch died') do
|
||||||
|
queue = nil
|
||||||
msg = nil
|
msg = nil
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis { |conn| (queue, msg) = conn.blpop(*@cmd) }
|
||||||
(queue, msg) = conn.blpop *@cmd
|
|
||||||
puts 'All quiet' unless msg
|
|
||||||
@mgr.assign! msg, queue.gsub(/\Aqueue:/, '') if msg
|
|
||||||
end
|
|
||||||
after(0) { fetch } if !msg
|
|
||||||
|
|
||||||
|
if msg
|
||||||
|
@mgr.assign!(msg, queue.gsub(/\Aqueue:/, ''))
|
||||||
|
else
|
||||||
|
puts 'All quiet'
|
||||||
|
after(0) { fetch }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -40,8 +40,8 @@ module Sidekiq
|
||||||
|
|
||||||
@done = true
|
@done = true
|
||||||
|
|
||||||
@fetcher.terminate if @fetcher.alive?
|
@fetcher.terminate! if @fetcher.alive?
|
||||||
@ready.each { |x| x.terminate if x.alive? }
|
@ready.each { |x| x.terminate! if x.alive? }
|
||||||
@ready.clear
|
@ready.clear
|
||||||
|
|
||||||
redis do |conn|
|
redis do |conn|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue