mirror of
https://github.com/deanpcmad/sidekiq-limit_fetch.git
synced 2022-11-09 13:54:36 -05:00
parent
75dd77d524
commit
287c013c96
2 changed files with 23 additions and 6 deletions
|
@ -31,16 +31,21 @@ module Sidekiq::LimitFetch::Global
|
|||
|
||||
def invalidate_old_processes
|
||||
Sidekiq.redis do |it|
|
||||
it.smembers(PROCESS_SET).each do |process|
|
||||
next if it.get heartbeat_key process
|
||||
processes = it.smembers PROCESS_SET
|
||||
processes.each do |process|
|
||||
unless it.get heartbeat_key process
|
||||
processes.delete process
|
||||
it.srem PROCESS_SET, process
|
||||
end
|
||||
end
|
||||
|
||||
Sidekiq::Queue.instances.map(&:name).uniq.each do |queue|
|
||||
Sidekiq::Queue.instances.map(&:name).uniq.each do |queue|
|
||||
locks = it.lrange "limit_fetch:probed:#{queue}", 0, -1
|
||||
(locks.uniq - processes).each do |dead_process|
|
||||
%w(limit_fetch:probed: limit_fetch:busy:).each do |prefix|
|
||||
it.lrem prefix + queue, 0, process
|
||||
it.lrem prefix + queue, 0, dead_process
|
||||
end
|
||||
end
|
||||
|
||||
it.srem PROCESS_SET, process
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
require 'spec_helper'
|
||||
|
||||
Thread.abort_on_exception = true
|
||||
|
||||
describe Sidekiq::LimitFetch::Global::Monitor do
|
||||
let(:monitor) { described_class.start! ttl, timeout }
|
||||
let(:ttl) { 1 }
|
||||
|
@ -26,5 +28,15 @@ describe Sidekiq::LimitFetch::Global::Monitor do
|
|||
sleep 2*ttl
|
||||
queue.probed.should == 0
|
||||
end
|
||||
|
||||
it 'should remove invalid locks' do
|
||||
2.times { queue.acquire }
|
||||
described_class.stub :update_heartbeat
|
||||
Sidekiq.redis do |it|
|
||||
it.del Sidekiq::LimitFetch::Global::Monitor::PROCESS_SET
|
||||
end
|
||||
sleep 2*ttl
|
||||
queue.probed.should == 0
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue