mirror of
https://github.com/deanpcmad/sidekiq-limit_fetch.git
synced 2022-11-09 13:54:36 -05:00
Add heartbeat monitor
This commit is contained in:
parent
d7a4fc5dcc
commit
071da02398
4 changed files with 89 additions and 4 deletions
|
@ -9,6 +9,7 @@ class Sidekiq::LimitFetch
|
|||
require_relative 'limit_fetch/local/selector'
|
||||
require_relative 'limit_fetch/global/semaphore'
|
||||
require_relative 'limit_fetch/global/selector'
|
||||
require_relative 'limit_fetch/global/monitor'
|
||||
require_relative 'extensions/queue'
|
||||
|
||||
Sidekiq.options[:fetch] = self
|
||||
|
@ -18,6 +19,7 @@ class Sidekiq::LimitFetch
|
|||
end
|
||||
|
||||
def initialize(options)
|
||||
Global::Monitor.start! if options[:global]
|
||||
@queues = Queues.new options
|
||||
end
|
||||
|
||||
|
|
53
lib/sidekiq/limit_fetch/global/monitor.rb
Normal file
53
lib/sidekiq/limit_fetch/global/monitor.rb
Normal file
|
@ -0,0 +1,53 @@
|
|||
module Sidekiq::LimitFetch::Global
|
||||
module Monitor
|
||||
extend self
|
||||
|
||||
HEARTBEAT_NAMESPACE = 'heartbeat:'
|
||||
PROCESSOR_NAMESPACE = 'processor:'
|
||||
|
||||
HEARTBEAT_TTL = 400
|
||||
REFRESH_TIMEOUT = 180
|
||||
|
||||
def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
|
||||
Thread.new do
|
||||
loop do
|
||||
update_heartbeat ttl
|
||||
invalidate_old_processors
|
||||
sleep timeout
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def update_heartbeat(ttl)
|
||||
Sidekiq.redis do |it|
|
||||
it.set processor_key, true
|
||||
it.set heartbeat_key, true
|
||||
it.expire heartbeat_key, ttl
|
||||
end
|
||||
end
|
||||
|
||||
def invalidate_old_processors
|
||||
Sidekiq.redis do |it|
|
||||
it.keys(PROCESSOR_NAMESPACE + '*').each do |processor|
|
||||
processor.sub! PROCESSOR_NAMESPACE, ''
|
||||
next if it.get heartbeat_key processor
|
||||
|
||||
it.del processor_key processor
|
||||
it.keys('limit_fetch:busy:*').each do |queue|
|
||||
it.lrem queue, 0, processor
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def heartbeat_key(processor=Selector.uuid)
|
||||
HEARTBEAT_NAMESPACE + processor
|
||||
end
|
||||
|
||||
def processor_key(processor=Selector.uuid)
|
||||
PROCESSOR_NAMESPACE + processor
|
||||
end
|
||||
end
|
||||
end
|
|
@ -10,6 +10,10 @@ module Sidekiq::LimitFetch::Global
|
|||
redis_eval :release, [namespace, uuid, queues]
|
||||
end
|
||||
|
||||
def uuid
|
||||
@uuid ||= SecureRandom.uuid
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def namespace
|
||||
|
@ -19,10 +23,6 @@ module Sidekiq::LimitFetch::Global
|
|||
end
|
||||
end
|
||||
|
||||
def uuid
|
||||
@uuid ||= SecureRandom.uuid
|
||||
end
|
||||
|
||||
def redis_eval(script_name, args)
|
||||
Sidekiq.redis do |it|
|
||||
begin
|
||||
|
|
30
spec/sidekiq/limit_fetch/global/monitor_spec.rb
Normal file
30
spec/sidekiq/limit_fetch/global/monitor_spec.rb
Normal file
|
@ -0,0 +1,30 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe Sidekiq::LimitFetch::Global::Monitor do
|
||||
let(:global) { true }
|
||||
let(:monitor) { described_class.start! ttl, timeout }
|
||||
let(:ttl) { 2 }
|
||||
let(:queue) { Sidekiq::Queue[name] }
|
||||
let(:name) { 'default' }
|
||||
|
||||
before :each do
|
||||
monitor
|
||||
end
|
||||
|
||||
after :each do
|
||||
monitor.kill
|
||||
end
|
||||
|
||||
context 'old locks' do
|
||||
let(:timeout) { 100 }
|
||||
|
||||
it 'should remove invalidated old locks' do
|
||||
2.times { queue.acquire }
|
||||
described_class.send(:invalidate_old_processors)
|
||||
queue.busy.should == 2
|
||||
sleep 2
|
||||
described_class.send(:invalidate_old_processors)
|
||||
queue.busy.should == 0
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Reference in a new issue