mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rewrite heartbeat so we can dynamically update tags/labels, #2867
This commit is contained in:
parent
b06c1ec7f2
commit
2367bc19c1
2 changed files with 40 additions and 34 deletions
|
@ -63,15 +63,16 @@ module Sidekiq
|
|||
|
||||
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
|
||||
|
||||
def heartbeat(k, data, json)
|
||||
results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, data) }
|
||||
def heartbeat
|
||||
results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, to_data) }
|
||||
results.compact!
|
||||
$0 = results.join(' ')
|
||||
|
||||
❤(k, json)
|
||||
❤
|
||||
end
|
||||
|
||||
def ❤(key, json)
|
||||
def ❤
|
||||
key = identity
|
||||
fails = procd = 0
|
||||
begin
|
||||
Processor::FAILURE.update {|curr| fails = curr; 0 }
|
||||
|
@ -98,7 +99,7 @@ module Sidekiq
|
|||
conn.multi do
|
||||
conn.sadd('processes', key)
|
||||
conn.exists(key)
|
||||
conn.hmset(key, 'info', json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done)
|
||||
conn.hmset(key, 'info', to_json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f, 'quiet', @done)
|
||||
conn.expire(key, 60)
|
||||
conn.rpop("#{key}-signals")
|
||||
end
|
||||
|
@ -124,28 +125,36 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def start_heartbeat
|
||||
k = identity
|
||||
data = {
|
||||
'hostname' => hostname,
|
||||
'started_at' => Time.now.to_f,
|
||||
'pid' => $$,
|
||||
'tag' => @options[:tag] || '',
|
||||
'concurrency' => @options[:concurrency],
|
||||
'queues' => @options[:queues].uniq,
|
||||
'labels' => @options[:labels],
|
||||
'identity' => k,
|
||||
}
|
||||
# this data doesn't change so dump it to a string
|
||||
# now so we don't need to dump it every heartbeat.
|
||||
json = Sidekiq.dump_json(data)
|
||||
|
||||
while true
|
||||
heartbeat(k, data, json)
|
||||
heartbeat
|
||||
sleep 5
|
||||
end
|
||||
Sidekiq.logger.info("Heartbeat stopping...")
|
||||
end
|
||||
|
||||
def to_data
|
||||
@data ||= begin
|
||||
{
|
||||
'hostname' => hostname,
|
||||
'started_at' => Time.now.to_f,
|
||||
'pid' => $$,
|
||||
'tag' => @options[:tag] || '',
|
||||
'concurrency' => @options[:concurrency],
|
||||
'queues' => @options[:queues].uniq,
|
||||
'labels' => @options[:labels],
|
||||
'identity' => identity,
|
||||
}
|
||||
end
|
||||
end
|
||||
|
||||
def to_json
|
||||
@json ||= begin
|
||||
# this data changes infrequently so dump it to a string
|
||||
# now so we don't need to dump it every heartbeat.
|
||||
Sidekiq.dump_json(to_data)
|
||||
end
|
||||
end
|
||||
|
||||
def clear_heartbeat
|
||||
# Remove record from Redis since we are shutting down.
|
||||
# Note we don't stop the heartbeat thread; if the process
|
||||
|
|
|
@ -18,6 +18,7 @@ class TestLauncher < Sidekiq::Test
|
|||
@mgr = new_manager(options)
|
||||
@launcher = Sidekiq::Launcher.new(options)
|
||||
@launcher.manager = @mgr
|
||||
@id = @launcher.identity
|
||||
|
||||
Sidekiq::Processor::WORKER_STATE['a'] = {'b' => 1}
|
||||
|
||||
|
@ -35,16 +36,16 @@ class TestLauncher < Sidekiq::Test
|
|||
i += 1
|
||||
end
|
||||
assert_equal 0, i
|
||||
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
|
||||
@launcher.heartbeat
|
||||
assert_equal 1, i
|
||||
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
|
||||
@launcher.heartbeat
|
||||
assert_equal 1, i
|
||||
end
|
||||
|
||||
describe 'when manager is active' do
|
||||
before do
|
||||
Sidekiq::CLI::PROCTITLES << proc { "xyz" }
|
||||
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
|
||||
@launcher.heartbeat
|
||||
Sidekiq::CLI::PROCTITLES.pop
|
||||
end
|
||||
|
||||
|
@ -53,9 +54,9 @@ class TestLauncher < Sidekiq::Test
|
|||
end
|
||||
|
||||
it 'stores process info in redis' do
|
||||
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
|
||||
info = Sidekiq.redis { |c| c.hmget(@id, 'busy') }
|
||||
assert_equal ["1"], info
|
||||
expires = Sidekiq.redis { |c| c.pttl('identity') }
|
||||
expires = Sidekiq.redis { |c| c.pttl(@id) }
|
||||
assert_in_delta 60000, expires, 500
|
||||
end
|
||||
end
|
||||
|
@ -63,7 +64,7 @@ class TestLauncher < Sidekiq::Test
|
|||
describe 'when manager is stopped' do
|
||||
before do
|
||||
@launcher.quiet
|
||||
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
|
||||
@launcher.heartbeat
|
||||
end
|
||||
|
||||
#after do
|
||||
|
@ -75,21 +76,17 @@ class TestLauncher < Sidekiq::Test
|
|||
end
|
||||
|
||||
it 'stores process info in redis' do
|
||||
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
|
||||
info = Sidekiq.redis { |c| c.hmget(@id, 'busy') }
|
||||
assert_equal ["1"], info
|
||||
expires = Sidekiq.redis { |c| c.pttl('identity') }
|
||||
expires = Sidekiq.redis { |c| c.pttl(@id) }
|
||||
assert_in_delta 60000, expires, 50
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def options
|
||||
{ :concurrency => 3, :queues => ['default'] }
|
||||
{ :concurrency => 3, :queues => ['default'], :tag => 'myapp' }
|
||||
end
|
||||
|
||||
def heartbeat_data
|
||||
{ 'concurrency' => 3, 'tag' => 'myapp' }
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue