mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor #1984
This commit is contained in:
parent
24c6615b07
commit
d54dc1c677
5 changed files with 34 additions and 86 deletions
|
@ -545,32 +545,53 @@ module Sidekiq
|
|||
class ProcessSet
|
||||
include Enumerable
|
||||
|
||||
def each(&block)
|
||||
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
|
||||
def initialize(clean_plz=true)
|
||||
self.class.cleanup if clean_plz
|
||||
end
|
||||
|
||||
# Cleans up dead processes recorded in Redis.
|
||||
# Returns the number of processes cleaned.
|
||||
def self.cleanup
|
||||
count = 0
|
||||
Sidekiq.redis do |conn|
|
||||
procs = conn.smembers('processes').sort
|
||||
heartbeats = conn.pipelined do
|
||||
procs.each do |key|
|
||||
conn.hget(key, 'info')
|
||||
end
|
||||
end
|
||||
|
||||
# the hash named key has an expiry of 60 seconds.
|
||||
# if it's not found, that means the process has not reported
|
||||
# in to Redis and probably died.
|
||||
to_prune = []
|
||||
heartbeats.each_with_index do |beat, i|
|
||||
to_prune << procs[i] if beat.nil?
|
||||
end
|
||||
count = conn.srem('processes', to_prune) unless to_prune.empty?
|
||||
end
|
||||
count
|
||||
end
|
||||
|
||||
def each(&block)
|
||||
procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort
|
||||
|
||||
to_prune = []
|
||||
sorted = procs.sort
|
||||
Sidekiq.redis do |conn|
|
||||
# We're making a tradeoff here between consuming more memory instead of
|
||||
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
|
||||
# you'll be happier this way
|
||||
result = conn.pipelined do
|
||||
sorted.each do |key|
|
||||
procs.each do |key|
|
||||
conn.hmget(key, 'info', 'busy', 'beat')
|
||||
end
|
||||
end
|
||||
|
||||
result.each_with_index do |(info, busy, at_s), i|
|
||||
# the hash named key has an expiry of 60 seconds.
|
||||
# if it's not found, that means the process has not reported
|
||||
# in to Redis and probably died.
|
||||
(to_prune << sorted[i]; next) if info.nil?
|
||||
hash = Sidekiq.load_json(info)
|
||||
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
|
||||
end
|
||||
end
|
||||
|
||||
Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty?
|
||||
nil
|
||||
end
|
||||
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/actor'
|
||||
require 'sidekiq/api'
|
||||
|
||||
module Sidekiq
|
||||
module Scheduled
|
||||
|
@ -75,7 +76,7 @@ module Sidekiq
|
|||
# We only do this if poll_interval is unset (the default).
|
||||
def poll_interval
|
||||
Sidekiq.options[:poll_interval] ||= begin
|
||||
cleanup_dead_process_records
|
||||
Sidekiq::ProcessSet.cleanup
|
||||
pcount = Sidekiq.redis {|c| c.scard('processes') } || 1
|
||||
pcount * 15
|
||||
end
|
||||
|
|
|
@ -44,21 +44,5 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# Cleans up dead processes recorded in Redis.
|
||||
def cleanup_dead_process_records
|
||||
Sidekiq.redis do |conn|
|
||||
procs = conn.smembers('processes').sort
|
||||
heartbeats = conn.pipelined do
|
||||
procs.each do |key|
|
||||
conn.hget(key, 'beat')
|
||||
end
|
||||
end
|
||||
|
||||
heartbeats.each_with_index do |beat, i|
|
||||
conn.srem('processes', procs[i]) if beat.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -469,7 +469,7 @@ class TestApi < Sidekiq::Test
|
|||
end
|
||||
|
||||
ps = Sidekiq::ProcessSet.new
|
||||
assert_equal 3, ps.size
|
||||
assert_equal 1, ps.size
|
||||
assert_equal 1, ps.to_a.size
|
||||
end
|
||||
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
require 'helper'
|
||||
require 'sidekiq/util'
|
||||
|
||||
class TestUtil < Sidekiq::Test
|
||||
class UtilClass
|
||||
include Sidekiq::Util
|
||||
end
|
||||
|
||||
describe 'util' do
|
||||
before do
|
||||
@orig_redis = Sidekiq.redis_pool
|
||||
Sidekiq.redis = REDIS
|
||||
Sidekiq.redis { |conn| conn.flushdb }
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.redis = @orig_redis
|
||||
end
|
||||
|
||||
# In real code that manages the hash sets for process keys
|
||||
# sets their expiration time to 60 seconds, so processes
|
||||
# who don't have a set under their name are considered 'dead'
|
||||
# because they haven't reported in
|
||||
describe '#cleanup_dead_process_records' do
|
||||
before do
|
||||
# Set up some live and dead processes
|
||||
@live_members = ['localhost-123', 'localhost-125']
|
||||
@dead_members = ['localhost-124']
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.sadd('processes', @live_members + @dead_members)
|
||||
# Add Heartbeats for the live processes
|
||||
@live_members.each do |m|
|
||||
conn.hset(m, 'beat', Time.now.to_f)
|
||||
end
|
||||
end
|
||||
|
||||
@util = UtilClass.new
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.redis do |conn|
|
||||
conn.srem('processes', @live_members + @dead_members)
|
||||
@live_members.each do |m|
|
||||
conn.hdel(m, 'beat')
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "should remove dead process records" do
|
||||
assert_equal 3, Sidekiq.redis{ |r| r.scard('processes') }
|
||||
@util.cleanup_dead_process_records
|
||||
still_alive = Sidekiq.redis{|r| r.smembers('processes')}
|
||||
assert_equal still_alive.sort, @live_members.sort
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue