mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Added method to Util module to clean up process records that don't have
a matching heartbeat record, indicating that they're no longer alive. Added call to this new util method in the Schedule::Poller.poll_interval method because the number of live processes is used as a multiplier for the default wait interval. Since the value for poll_interval is memoized this call to 'cleanup_dead_process_records' should only be called once at startup.
This commit is contained in:
parent
5f90010ecb
commit
5590669023
3 changed files with 75 additions and 0 deletions
|
@ -75,6 +75,7 @@ module Sidekiq
|
|||
# We only do this if poll_interval is unset (the default).
|
||||
def poll_interval
|
||||
Sidekiq.options[:poll_interval] ||= begin
|
||||
cleanup_dead_process_records
|
||||
pcount = Sidekiq.redis {|c| c.scard('processes') } || 1
|
||||
pcount * 15
|
||||
end
|
||||
|
|
|
@ -44,5 +44,21 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# Cleans up dead processes recorded in Redis.
|
||||
def cleanup_dead_process_records
|
||||
Sidekiq.redis do |conn|
|
||||
procs = conn.smembers('processes').sort
|
||||
heartbeats = conn.pipelined do
|
||||
procs.each do |key|
|
||||
conn.hget(key, 'beat')
|
||||
end
|
||||
end
|
||||
|
||||
heartbeats.each_with_index do |beat, i|
|
||||
conn.srem('processes', procs[i]) if beat.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
58
test/test_util.rb
Normal file
58
test/test_util.rb
Normal file
|
@ -0,0 +1,58 @@
|
|||
require 'helper'
|
||||
require 'sidekiq/util'
|
||||
|
||||
class TestUtil < Sidekiq::Test
|
||||
class UtilClass
|
||||
include Sidekiq::Util
|
||||
end
|
||||
|
||||
describe 'util' do
|
||||
before do
|
||||
@orig_redis = Sidekiq.redis_pool
|
||||
Sidekiq.redis = REDIS
|
||||
Sidekiq.redis { |conn| conn.flushdb }
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.redis = @orig_redis
|
||||
end
|
||||
|
||||
# In real code that manages the hash sets for process keys
|
||||
# sets their expiration time to 60 seconds, so processes
|
||||
# who don't have a set under their name are considered 'dead'
|
||||
# because they haven't reported in
|
||||
describe '#cleanup_dead_process_records' do
|
||||
before do
|
||||
# Set up some live and dead processes
|
||||
@live_members = ['localhost-123', 'localhost-125']
|
||||
@dead_members = ['localhost-124']
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.sadd('processes', @live_members + @dead_members)
|
||||
# Add Heartbeats for the live processes
|
||||
@live_members.each do |m|
|
||||
conn.hset(m, 'beat', Time.now.to_f)
|
||||
end
|
||||
end
|
||||
|
||||
@util = UtilClass.new
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.redis do |conn|
|
||||
conn.srem('processes', @live_members + @dead_members)
|
||||
@live_members.each do |m|
|
||||
conn.hdel(m, 'beat')
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "should remove dead process records" do
|
||||
assert_equal 3, Sidekiq.redis{ |r| r.scard('processes') }
|
||||
@util.cleanup_dead_process_records
|
||||
still_alive = Sidekiq.redis{|r| r.smembers('processes')}
|
||||
assert_equal still_alive.sort, @live_members.sort
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Reference in a new issue