mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rejigger heartbeat data to be a stable hash, not a time-based sorted set.
This commit is contained in:
parent
bbe245e4ff
commit
ee7e181bd6
5 changed files with 48 additions and 24 deletions
|
@ -9,6 +9,8 @@ Please see [Upgrading.md](Upgrading.md) for more comprehensive upgrade notes.
|
|||
job queue. These jobs must be retried manually or they will expire
|
||||
after 6 months or 10,000 jobs. The Web UI contains a "Dead" tab
|
||||
exposing these jobs.
|
||||
- **Process Heartbeat** - each Sidekiq process will ping Redis every 5
|
||||
seconds to give an accurate summary of the Sidekiq population at work.
|
||||
- **Remove official support for Ruby 1.9** Things still might work but
|
||||
I no longer actively test on it.
|
||||
- **Remove built-in support for Redis-to-Go**.
|
||||
|
|
|
@ -419,22 +419,30 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
class ProcessSet < SortedSet
|
||||
def initialize
|
||||
super 'processes'
|
||||
end
|
||||
##
|
||||
# Enumerates the set of Sidekiq processes which are actively working
|
||||
# right now. Each process send a heartbeat to Redis every 5 seconds
|
||||
# so this set should be relatively accurate, barring network partitions.
|
||||
class ProcessSet
|
||||
include Enumerable
|
||||
|
||||
def each(&block)
|
||||
now = Time.now.to_f
|
||||
_, procs = Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.zremrangebyscore('processes', '-inf', now - 5.1)
|
||||
conn.zrange name, 0, -1
|
||||
procs = Sidekiq.redis { |conn| conn.hgetall('processes') }
|
||||
cutoff = Time.now.to_f - 5
|
||||
|
||||
to_prune = []
|
||||
procs.map {|name, data| Sidekiq.load_json(data) }.
|
||||
sort_by {|x| x['key'] }.
|
||||
each do |pdata|
|
||||
if pdata['at'] < cutoff
|
||||
to_prune << pdata['key']; next
|
||||
else
|
||||
yield pdata
|
||||
end
|
||||
end
|
||||
procs.each do |proc_data|
|
||||
yield Sidekiq.load_json(proc_data)
|
||||
end
|
||||
|
||||
Sidekiq.redis {|conn| conn.hdel('processes', *to_prune) } unless to_prune.empty?
|
||||
nil
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -459,12 +467,10 @@ module Sidekiq
|
|||
include Enumerable
|
||||
|
||||
def each(&block)
|
||||
live = ProcessSet.new.map {|x| "#{x['hostname']}:#{x['process_id']}-" }
|
||||
p live
|
||||
live = ProcessSet.new.map {|x| /\A#{x['hostname']}:#{x['process_id']}-/ }
|
||||
msgs = Sidekiq.redis do |conn|
|
||||
workers = conn.smembers("workers")
|
||||
p workers
|
||||
to_rem = workers.delete_if {|w| !live.any? {|identity| w =~ /\A#{identity}/ } }
|
||||
to_rem = workers.delete_if {|w| !live.any? {|identity| w =~ identity } }
|
||||
conn.srem('workers', *to_rem) unless to_rem.empty?
|
||||
|
||||
workers.empty? ? {} : conn.mapped_mget(*workers.map {|w| "worker:#{w}" })
|
||||
|
|
|
@ -56,10 +56,11 @@ module Sidekiq
|
|||
|
||||
def start_heartbeat(tag)
|
||||
manager.heartbeat({
|
||||
'key' => "#{hostname}:#{$$}",
|
||||
'hostname' => hostname,
|
||||
'pid' => $$,
|
||||
'process_id' => process_id,
|
||||
'tag' => tag,
|
||||
'tag' => tag.strip,
|
||||
'concurrency' => @options[:concurrency],
|
||||
'queues' => @options[:queues].uniq,
|
||||
})
|
||||
|
|
|
@ -143,12 +143,11 @@ module Sidekiq
|
|||
private
|
||||
|
||||
def ❤(data)
|
||||
now = Time.now.to_f
|
||||
proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now))
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.zadd("processes", proc_data, now)
|
||||
conn.zremrangebyscore('processes', '-inf', now - 5.1)
|
||||
watchdog('heartbeat') do
|
||||
now = Time.now.to_f
|
||||
proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now))
|
||||
Sidekiq.redis do |conn|
|
||||
conn.hset("processes", data['key'], proc_data)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -342,6 +342,20 @@ class TestApi < Sidekiq::Test
|
|||
assert_equal 0, r.size
|
||||
end
|
||||
|
||||
it 'can enumerate processes' do
|
||||
odata = { 'pid' => 123, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:123", 'at' => Time.now.to_f - 5 }
|
||||
pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f }
|
||||
Sidekiq.redis do |conn|
|
||||
conn.hset('processes', odata['key'], Sidekiq.dump_json(odata))
|
||||
conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata))
|
||||
end
|
||||
|
||||
ps = Sidekiq::ProcessSet.new.to_a
|
||||
assert_equal 1, ps.size
|
||||
data = ps.first
|
||||
assert_equal pdata, data
|
||||
end
|
||||
|
||||
it 'can enumerate workers' do
|
||||
w = Sidekiq::Workers.new
|
||||
assert_equal 0, w.size
|
||||
|
@ -349,9 +363,11 @@ class TestApi < Sidekiq::Test
|
|||
assert false
|
||||
end
|
||||
|
||||
pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f }
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd('processes', Time.now.to_f, Sidekiq.dump_json({ 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id }))
|
||||
conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata))
|
||||
end
|
||||
|
||||
s = "worker:#{hostname}:#{process_id}-12345"
|
||||
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i })
|
||||
Sidekiq.redis do |c|
|
||||
|
|
Loading…
Add table
Reference in a new issue