mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Move proctitle from cli to launcher (#4067)
This commit is contained in:
parent
aed633ac01
commit
6ac6ed5060
3 changed files with 102 additions and 96 deletions
|
@ -16,14 +16,6 @@ module Sidekiq
|
|||
include Util
|
||||
include Singleton unless $TESTING
|
||||
|
||||
PROCTITLES = [
|
||||
proc { 'sidekiq' },
|
||||
proc { Sidekiq::VERSION },
|
||||
proc { |me, data| data['tag'] },
|
||||
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data['concurrency']} busy]" },
|
||||
proc { |me, data| "stopping" if me.stopping? },
|
||||
]
|
||||
|
||||
attr_accessor :launcher
|
||||
attr_accessor :environment
|
||||
|
||||
|
|
|
@ -11,9 +11,17 @@ module Sidekiq
|
|||
class Launcher
|
||||
include Util
|
||||
|
||||
attr_accessor :manager, :poller, :fetcher
|
||||
STATS_TTL = 5*365*24*60*60 # 5 years
|
||||
|
||||
STATS_TTL = 5*365*24*60*60
|
||||
PROCTITLES = [
|
||||
proc { 'sidekiq' },
|
||||
proc { Sidekiq::VERSION },
|
||||
proc { |me, data| data['tag'] },
|
||||
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data['concurrency']} busy]" },
|
||||
proc { |me, data| "stopping" if me.stopping? },
|
||||
]
|
||||
|
||||
attr_accessor :manager, :poller, :fetcher
|
||||
|
||||
def initialize(options)
|
||||
@manager = Sidekiq::Manager.new(options)
|
||||
|
@ -62,10 +70,30 @@ module Sidekiq
|
|||
|
||||
private unless $TESTING
|
||||
|
||||
def start_heartbeat
|
||||
while true
|
||||
heartbeat
|
||||
sleep 5
|
||||
end
|
||||
Sidekiq.logger.info("Heartbeat stopping...")
|
||||
end
|
||||
|
||||
def clear_heartbeat
|
||||
# Remove record from Redis since we are shutting down.
|
||||
# Note we don't stop the heartbeat thread; if the process
|
||||
# doesn't actually exit, it'll reappear in the Web UI.
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.srem('processes', identity)
|
||||
conn.del("#{identity}:workers")
|
||||
end
|
||||
end
|
||||
rescue
|
||||
# best effort, ignore network errors
|
||||
end
|
||||
|
||||
def heartbeat
|
||||
results = Sidekiq::CLI::PROCTITLES.map {|x| x.(self, to_data) }
|
||||
results.compact!
|
||||
$0 = results.join(' ')
|
||||
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(' ')
|
||||
|
||||
❤
|
||||
end
|
||||
|
@ -73,6 +101,7 @@ module Sidekiq
|
|||
def ❤
|
||||
key = identity
|
||||
fails = procd = 0
|
||||
|
||||
begin
|
||||
fails = Processor::FAILURE.reset
|
||||
procd = Processor::PROCESSED.reset
|
||||
|
@ -80,6 +109,7 @@ module Sidekiq
|
|||
|
||||
workers_key = "#{key}:workers"
|
||||
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.incrby("stat:processed", procd)
|
||||
|
@ -97,16 +127,19 @@ module Sidekiq
|
|||
conn.expire(workers_key, 60)
|
||||
end
|
||||
end
|
||||
|
||||
fails = procd = 0
|
||||
|
||||
_, exists, _, _, msg = Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
res = conn.multi do
|
||||
conn.sadd('processes', key)
|
||||
conn.exists(key)
|
||||
conn.hmset(key, 'info', to_json, 'busy', curstate.size, 'beat', Time.now.to_f, 'quiet', @done)
|
||||
conn.expire(key, 60)
|
||||
conn.rpop("#{key}-signals")
|
||||
end
|
||||
|
||||
res
|
||||
end
|
||||
|
||||
# first heartbeat or recovering from an outage and need to reestablish our heartbeat
|
||||
|
@ -124,14 +157,6 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def start_heartbeat
|
||||
while true
|
||||
heartbeat
|
||||
sleep 5
|
||||
end
|
||||
Sidekiq.logger.info("Heartbeat stopping...")
|
||||
end
|
||||
|
||||
def to_data
|
||||
@data ||= begin
|
||||
{
|
||||
|
@ -154,20 +179,5 @@ module Sidekiq
|
|||
Sidekiq.dump_json(to_data)
|
||||
end
|
||||
end
|
||||
|
||||
def clear_heartbeat
|
||||
# Remove record from Redis since we are shutting down.
|
||||
# Note we don't stop the heartbeat thread; if the process
|
||||
# doesn't actually exit, it'll reappear in the Web UI.
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.srem('processes', identity)
|
||||
conn.del("#{identity}:workers")
|
||||
end
|
||||
end
|
||||
rescue
|
||||
# best effort, ignore network errors
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,93 +1,97 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative 'helper'
|
||||
require 'sidekiq/launcher'
|
||||
require 'sidekiq/cli'
|
||||
|
||||
class TestLauncher < Minitest::Test
|
||||
describe Sidekiq::Launcher do
|
||||
subject { Sidekiq::Launcher.new(options) }
|
||||
|
||||
let(:options) do
|
||||
{
|
||||
concurrency: 3,
|
||||
queues: ['default'],
|
||||
tag: 'myapp'
|
||||
}
|
||||
end
|
||||
|
||||
describe 'launcher' do
|
||||
before do
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
Sidekiq.redis { |c| c.flushdb }
|
||||
Sidekiq::Processor::WORKER_STATE.set('tid', { queue: 'queue', payload: 'job_hash', run_at: Time.now.to_i })
|
||||
@proctitle = $0
|
||||
end
|
||||
|
||||
def new_manager(opts)
|
||||
Sidekiq::Manager.new(opts)
|
||||
after do
|
||||
Sidekiq::Processor::WORKER_STATE.clear
|
||||
$0 = @proctitle
|
||||
end
|
||||
|
||||
describe 'heartbeat' do
|
||||
before do
|
||||
@mgr = new_manager(options)
|
||||
@launcher = Sidekiq::Launcher.new(options)
|
||||
@launcher.manager = @mgr
|
||||
@id = @launcher.identity
|
||||
describe '#heartbeat' do
|
||||
describe 'run' do
|
||||
it 'sets sidekiq version, tag and the number of busy workers to proctitle' do
|
||||
subject.heartbeat
|
||||
|
||||
Sidekiq::Processor::WORKER_STATE.set('a', {'b' => 1})
|
||||
|
||||
@proctitle = $0
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq::Processor::WORKER_STATE.clear
|
||||
$0 = @proctitle
|
||||
end
|
||||
|
||||
it 'fires new heartbeat events' do
|
||||
i = 0
|
||||
Sidekiq.on(:heartbeat) do
|
||||
i += 1
|
||||
end
|
||||
assert_equal 0, i
|
||||
@launcher.heartbeat
|
||||
assert_equal 1, i
|
||||
@launcher.heartbeat
|
||||
assert_equal 1, i
|
||||
end
|
||||
|
||||
describe 'when manager is active' do
|
||||
before do
|
||||
Sidekiq::CLI::PROCTITLES << proc { "xyz" }
|
||||
@launcher.heartbeat
|
||||
Sidekiq::CLI::PROCTITLES.pop
|
||||
end
|
||||
|
||||
it 'sets useful info to proctitle' do
|
||||
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
|
||||
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
|
||||
end
|
||||
|
||||
it 'stores process info in redis' do
|
||||
info = Sidekiq.redis { |c| c.hmget(@id, 'busy') }
|
||||
assert_equal ["1"], info
|
||||
expires = Sidekiq.redis { |c| c.pttl(@id) }
|
||||
subject.heartbeat
|
||||
|
||||
workers = Sidekiq.redis { |c| c.hmget(subject.identity, 'busy') }
|
||||
|
||||
assert_equal ["1"], workers
|
||||
|
||||
expires = Sidekiq.redis { |c| c.pttl(subject.identity) }
|
||||
|
||||
assert_in_delta 60000, expires, 500
|
||||
end
|
||||
|
||||
describe 'events' do
|
||||
before do
|
||||
@cnt = 0
|
||||
|
||||
Sidekiq.on(:heartbeat) do
|
||||
@cnt += 1
|
||||
end
|
||||
end
|
||||
|
||||
it 'fires start heartbeat event only once' do
|
||||
assert_equal 0, @cnt
|
||||
|
||||
subject.heartbeat
|
||||
|
||||
assert_equal 1, @cnt
|
||||
|
||||
subject.heartbeat
|
||||
|
||||
assert_equal 1, @cnt
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe 'when manager is stopped' do
|
||||
describe 'quite' do
|
||||
before do
|
||||
@launcher.quiet
|
||||
@launcher.heartbeat
|
||||
subject.quiet
|
||||
end
|
||||
|
||||
#after do
|
||||
#puts system('redis-cli -n 15 keys "*" | while read LINE ; do TTL=`redis-cli -n 15 ttl "$LINE"`; if [ "$TTL" -eq -1 ]; then echo "$LINE"; fi; done;')
|
||||
#end
|
||||
it 'sets stopping proctitle' do
|
||||
subject.heartbeat
|
||||
|
||||
it 'indicates stopping status in proctitle' do
|
||||
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
|
||||
end
|
||||
|
||||
it 'stores process info in redis' do
|
||||
info = Sidekiq.redis { |c| c.hmget(@id, 'busy') }
|
||||
subject.heartbeat
|
||||
|
||||
info = Sidekiq.redis { |c| c.hmget(subject.identity, 'busy') }
|
||||
|
||||
assert_equal ["1"], info
|
||||
expires = Sidekiq.redis { |c| c.pttl(@id) }
|
||||
|
||||
expires = Sidekiq.redis { |c| c.pttl(subject.identity) }
|
||||
|
||||
assert_in_delta 60000, expires, 50
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def options
|
||||
{ :concurrency => 3, :queues => ['default'], :tag => 'myapp' }
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue