diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 5c7c69c..fbb1b5a 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -9,6 +9,7 @@ class Sidekiq::LimitFetch require_relative 'limit_fetch/local/selector' require_relative 'limit_fetch/global/semaphore' require_relative 'limit_fetch/global/selector' + require_relative 'limit_fetch/global/monitor' require_relative 'extensions/queue' Sidekiq.options[:fetch] = self @@ -18,6 +19,7 @@ class Sidekiq::LimitFetch end def initialize(options) + Global::Monitor.start! if options[:global] @queues = Queues.new options end diff --git a/lib/sidekiq/limit_fetch/global/monitor.rb b/lib/sidekiq/limit_fetch/global/monitor.rb new file mode 100644 index 0000000..cbc4c7c --- /dev/null +++ b/lib/sidekiq/limit_fetch/global/monitor.rb @@ -0,0 +1,53 @@ +module Sidekiq::LimitFetch::Global + module Monitor + extend self + + HEARTBEAT_NAMESPACE = 'heartbeat:' + PROCESSOR_NAMESPACE = 'processor:' + + HEARTBEAT_TTL = 400 + REFRESH_TIMEOUT = 180 + + def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) + Thread.new do + loop do + update_heartbeat ttl + invalidate_old_processors + sleep timeout + end + end + end + + private + + def update_heartbeat(ttl) + Sidekiq.redis do |it| + it.set processor_key, true + it.set heartbeat_key, true + it.expire heartbeat_key, ttl + end + end + + def invalidate_old_processors + Sidekiq.redis do |it| + it.keys(PROCESSOR_NAMESPACE + '*').each do |processor| + processor.sub! PROCESSOR_NAMESPACE, '' + next if it.get heartbeat_key processor + + it.del processor_key processor + it.keys('limit_fetch:busy:*').each do |queue| + it.lrem queue, 0, processor + end + end + end + end + + def heartbeat_key(processor=Selector.uuid) + HEARTBEAT_NAMESPACE + processor + end + + def processor_key(processor=Selector.uuid) + PROCESSOR_NAMESPACE + processor + end + end +end diff --git a/lib/sidekiq/limit_fetch/global/selector.rb b/lib/sidekiq/limit_fetch/global/selector.rb index 6ade2f9..f8496f1 100644 --- a/lib/sidekiq/limit_fetch/global/selector.rb +++ b/lib/sidekiq/limit_fetch/global/selector.rb @@ -10,6 +10,10 @@ module Sidekiq::LimitFetch::Global redis_eval :release, [namespace, uuid, queues] end + def uuid + @uuid ||= SecureRandom.uuid + end + private def namespace @@ -19,10 +23,6 @@ module Sidekiq::LimitFetch::Global end end - def uuid - @uuid ||= SecureRandom.uuid - end - def redis_eval(script_name, args) Sidekiq.redis do |it| begin diff --git a/spec/sidekiq/limit_fetch/global/monitor_spec.rb b/spec/sidekiq/limit_fetch/global/monitor_spec.rb new file mode 100644 index 0000000..0d4a97d --- /dev/null +++ b/spec/sidekiq/limit_fetch/global/monitor_spec.rb @@ -0,0 +1,30 @@ +require 'spec_helper' + +describe Sidekiq::LimitFetch::Global::Monitor do + let(:global) { true } + let(:monitor) { described_class.start! ttl, timeout } + let(:ttl) { 2 } + let(:queue) { Sidekiq::Queue[name] } + let(:name) { 'default' } + + before :each do + monitor + end + + after :each do + monitor.kill + end + + context 'old locks' do + let(:timeout) { 100 } + + it 'should remove invalidated old locks' do + 2.times { queue.acquire } + described_class.send(:invalidate_old_processors) + queue.busy.should == 2 + sleep 2 + described_class.send(:invalidate_old_processors) + queue.busy.should == 0 + end + end +end