diff --git a/lib/sidekiq/limit_fetch/global/monitor.rb b/lib/sidekiq/limit_fetch/global/monitor.rb index 1ab0179..b0b05c5 100644 --- a/lib/sidekiq/limit_fetch/global/monitor.rb +++ b/lib/sidekiq/limit_fetch/global/monitor.rb @@ -31,16 +31,21 @@ module Sidekiq::LimitFetch::Global def invalidate_old_processes Sidekiq.redis do |it| - it.smembers(PROCESS_SET).each do |process| - next if it.get heartbeat_key process + processes = it.smembers PROCESS_SET + processes.each do |process| + unless it.get heartbeat_key process + processes.delete process + it.srem PROCESS_SET, process + end + end - Sidekiq::Queue.instances.map(&:name).uniq.each do |queue| + Sidekiq::Queue.instances.map(&:name).uniq.each do |queue| + locks = it.lrange "limit_fetch:probed:#{queue}", 0, -1 + (locks.uniq - processes).each do |dead_process| %w(limit_fetch:probed: limit_fetch:busy:).each do |prefix| - it.lrem prefix + queue, 0, process + it.lrem prefix + queue, 0, dead_process end end - - it.srem PROCESS_SET, process end end end diff --git a/spec/sidekiq/limit_fetch/global/monitor_spec.rb b/spec/sidekiq/limit_fetch/global/monitor_spec.rb index ddfc65e..bf5dff0 100644 --- a/spec/sidekiq/limit_fetch/global/monitor_spec.rb +++ b/spec/sidekiq/limit_fetch/global/monitor_spec.rb @@ -1,5 +1,7 @@ require 'spec_helper' +Thread.abort_on_exception = true + describe Sidekiq::LimitFetch::Global::Monitor do let(:monitor) { described_class.start! ttl, timeout } let(:ttl) { 1 } @@ -26,5 +28,15 @@ describe Sidekiq::LimitFetch::Global::Monitor do sleep 2*ttl queue.probed.should == 0 end + + it 'should remove invalid locks' do + 2.times { queue.acquire } + described_class.stub :update_heartbeat + Sidekiq.redis do |it| + it.del Sidekiq::LimitFetch::Global::Monitor::PROCESS_SET + end + sleep 2*ttl + queue.probed.should == 0 + end end end