From d54dc1c677fbdd3d6697566f3478bb66f147f0cb Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Mon, 6 Oct 2014 08:53:06 -0700 Subject: [PATCH] Refactor #1984 --- lib/sidekiq/api.rb | 41 +++++++++++++++++++++------- lib/sidekiq/scheduled.rb | 3 ++- lib/sidekiq/util.rb | 16 ----------- test/test_api.rb | 2 +- test/test_util.rb | 58 ---------------------------------------- 5 files changed, 34 insertions(+), 86 deletions(-) delete mode 100644 test/test_util.rb diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index fc7736ba..6f15e267 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -545,32 +545,53 @@ module Sidekiq class ProcessSet include Enumerable - def each(&block) - procs = Sidekiq.redis { |conn| conn.smembers('processes') } + def initialize(clean_plz=true) + self.class.cleanup if clean_plz + end + + # Cleans up dead processes recorded in Redis. + # Returns the number of processes cleaned. + def self.cleanup + count = 0 + Sidekiq.redis do |conn| + procs = conn.smembers('processes').sort + heartbeats = conn.pipelined do + procs.each do |key| + conn.hget(key, 'info') + end + end + + # the hash named key has an expiry of 60 seconds. + # if it's not found, that means the process has not reported + # in to Redis and probably died. + to_prune = [] + heartbeats.each_with_index do |beat, i| + to_prune << procs[i] if beat.nil? + end + count = conn.srem('processes', to_prune) unless to_prune.empty? + end + count + end + + def each(&block) + procs = Sidekiq.redis { |conn| conn.smembers('processes') }.sort - to_prune = [] - sorted = procs.sort Sidekiq.redis do |conn| # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way result = conn.pipelined do - sorted.each do |key| + procs.each do |key| conn.hmget(key, 'info', 'busy', 'beat') end end result.each_with_index do |(info, busy, at_s), i| - # the hash named key has an expiry of 60 seconds. - # if it's not found, that means the process has not reported - # in to Redis and probably died. - (to_prune << sorted[i]; next) if info.nil? hash = Sidekiq.load_json(info) yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)) end end - Sidekiq.redis {|conn| conn.srem('processes', to_prune) } unless to_prune.empty? nil end diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 39e9b435..5bec7056 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -1,6 +1,7 @@ require 'sidekiq' require 'sidekiq/util' require 'sidekiq/actor' +require 'sidekiq/api' module Sidekiq module Scheduled @@ -75,7 +76,7 @@ module Sidekiq # We only do this if poll_interval is unset (the default). def poll_interval Sidekiq.options[:poll_interval] ||= begin - cleanup_dead_process_records + Sidekiq::ProcessSet.cleanup pcount = Sidekiq.redis {|c| c.scard('processes') } || 1 pcount * 15 end diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 6bd9bb79..1b3d3f94 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -44,21 +44,5 @@ module Sidekiq end end - # Cleans up dead processes recorded in Redis. - def cleanup_dead_process_records - Sidekiq.redis do |conn| - procs = conn.smembers('processes').sort - heartbeats = conn.pipelined do - procs.each do |key| - conn.hget(key, 'beat') - end - end - - heartbeats.each_with_index do |beat, i| - conn.srem('processes', procs[i]) if beat.nil? - end - end - end - end end diff --git a/test/test_api.rb b/test/test_api.rb index 95598ff6..38685f01 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -469,7 +469,7 @@ class TestApi < Sidekiq::Test end ps = Sidekiq::ProcessSet.new - assert_equal 3, ps.size + assert_equal 1, ps.size assert_equal 1, ps.to_a.size end diff --git a/test/test_util.rb b/test/test_util.rb deleted file mode 100644 index 2339525a..00000000 --- a/test/test_util.rb +++ /dev/null @@ -1,58 +0,0 @@ -require 'helper' -require 'sidekiq/util' - -class TestUtil < Sidekiq::Test - class UtilClass - include Sidekiq::Util - end - - describe 'util' do - before do - @orig_redis = Sidekiq.redis_pool - Sidekiq.redis = REDIS - Sidekiq.redis { |conn| conn.flushdb } - end - - after do - Sidekiq.redis = @orig_redis - end - - # In real code that manages the hash sets for process keys - # sets their expiration time to 60 seconds, so processes - # who don't have a set under their name are considered 'dead' - # because they haven't reported in - describe '#cleanup_dead_process_records' do - before do - # Set up some live and dead processes - @live_members = ['localhost-123', 'localhost-125'] - @dead_members = ['localhost-124'] - - Sidekiq.redis do |conn| - conn.sadd('processes', @live_members + @dead_members) - # Add Heartbeats for the live processes - @live_members.each do |m| - conn.hset(m, 'beat', Time.now.to_f) - end - end - - @util = UtilClass.new - end - - after do - Sidekiq.redis do |conn| - conn.srem('processes', @live_members + @dead_members) - @live_members.each do |m| - conn.hdel(m, 'beat') - end - end - end - - it "should remove dead process records" do - assert_equal 3, Sidekiq.redis{ |r| r.scard('processes') } - @util.cleanup_dead_process_records - still_alive = Sidekiq.redis{|r| r.smembers('processes')} - assert_equal still_alive.sort, @live_members.sort - end - end - end -end