From ee7e181bd66db20b2186302e8b55f442fbd0eddf Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Sun, 2 Mar 2014 21:18:26 -0800 Subject: [PATCH] Rejigger heartbeat data to be a stable hash, not a time-based sorted set. --- Changes.md | 2 ++ lib/sidekiq/api.rb | 38 ++++++++++++++++++++++---------------- lib/sidekiq/launcher.rb | 3 ++- lib/sidekiq/manager.rb | 11 +++++------ test/test_api.rb | 18 +++++++++++++++++- 5 files changed, 48 insertions(+), 24 deletions(-) diff --git a/Changes.md b/Changes.md index 719cf697..53cae137 100644 --- a/Changes.md +++ b/Changes.md @@ -9,6 +9,8 @@ Please see [Upgrading.md](Upgrading.md) for more comprehensive upgrade notes. job queue. These jobs must be retried manually or they will expire after 6 months or 10,000 jobs. The Web UI contains a "Dead" tab exposing these jobs. +- **Process Heartbeat** - each Sidekiq process will ping Redis every 5 + seconds to give an accurate summary of the Sidekiq population at work. - **Remove official support for Ruby 1.9** Things still might work but I no longer actively test on it. - **Remove built-in support for Redis-to-Go**. diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index fd41b768..065dd032 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -419,22 +419,30 @@ module Sidekiq end end - class ProcessSet < SortedSet - def initialize - super 'processes' - end + ## + # Enumerates the set of Sidekiq processes which are actively working + # right now. Each process send a heartbeat to Redis every 5 seconds + # so this set should be relatively accurate, barring network partitions. + class ProcessSet + include Enumerable def each(&block) - now = Time.now.to_f - _, procs = Sidekiq.redis do |conn| - conn.multi do - conn.zremrangebyscore('processes', '-inf', now - 5.1) - conn.zrange name, 0, -1 + procs = Sidekiq.redis { |conn| conn.hgetall('processes') } + cutoff = Time.now.to_f - 5 + + to_prune = [] + procs.map {|name, data| Sidekiq.load_json(data) }. + sort_by {|x| x['key'] }. + each do |pdata| + if pdata['at'] < cutoff + to_prune << pdata['key']; next + else + yield pdata end end - procs.each do |proc_data| - yield Sidekiq.load_json(proc_data) - end + + Sidekiq.redis {|conn| conn.hdel('processes', *to_prune) } unless to_prune.empty? + nil end end @@ -459,12 +467,10 @@ module Sidekiq include Enumerable def each(&block) - live = ProcessSet.new.map {|x| "#{x['hostname']}:#{x['process_id']}-" } - p live + live = ProcessSet.new.map {|x| /\A#{x['hostname']}:#{x['process_id']}-/ } msgs = Sidekiq.redis do |conn| workers = conn.smembers("workers") - p workers - to_rem = workers.delete_if {|w| !live.any? {|identity| w =~ /\A#{identity}/ } } + to_rem = workers.delete_if {|w| !live.any? {|identity| w =~ identity } } conn.srem('workers', *to_rem) unless to_rem.empty? workers.empty? ? {} : conn.mapped_mget(*workers.map {|w| "worker:#{w}" }) diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index ee173032..d93367bb 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -56,10 +56,11 @@ module Sidekiq def start_heartbeat(tag) manager.heartbeat({ + 'key' => "#{hostname}:#{$$}", 'hostname' => hostname, 'pid' => $$, 'process_id' => process_id, - 'tag' => tag, + 'tag' => tag.strip, 'concurrency' => @options[:concurrency], 'queues' => @options[:queues].uniq, }) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index a70b29ec..cbc2aa75 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -143,12 +143,11 @@ module Sidekiq private def ❤(data) - now = Time.now.to_f - proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now)) - Sidekiq.redis do |conn| - conn.multi do - conn.zadd("processes", proc_data, now) - conn.zremrangebyscore('processes', '-inf', now - 5.1) + watchdog('heartbeat') do + now = Time.now.to_f + proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now)) + Sidekiq.redis do |conn| + conn.hset("processes", data['key'], proc_data) end end end diff --git a/test/test_api.rb b/test/test_api.rb index 1e5a1615..4a6c8a47 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -342,6 +342,20 @@ class TestApi < Sidekiq::Test assert_equal 0, r.size end + it 'can enumerate processes' do + odata = { 'pid' => 123, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:123", 'at' => Time.now.to_f - 5 } + pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f } + Sidekiq.redis do |conn| + conn.hset('processes', odata['key'], Sidekiq.dump_json(odata)) + conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata)) + end + + ps = Sidekiq::ProcessSet.new.to_a + assert_equal 1, ps.size + data = ps.first + assert_equal pdata, data + end + it 'can enumerate workers' do w = Sidekiq::Workers.new assert_equal 0, w.size @@ -349,9 +363,11 @@ class TestApi < Sidekiq::Test assert false end + pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f } Sidekiq.redis do |conn| - conn.zadd('processes', Time.now.to_f, Sidekiq.dump_json({ 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id })) + conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata)) end + s = "worker:#{hostname}:#{process_id}-12345" data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i }) Sidekiq.redis do |c|