Fix lingering threads in tests

Also, allow embedding process to provide their own heartbeat thread.
This commit is contained in:
Mike Perham 2022-11-01 12:32:31 -07:00
parent b1fc7fa8be
commit 636fa9a06a
No known key found for this signature in database
4 changed files with 66 additions and 134 deletions

View File

@ -32,15 +32,17 @@ module Sidekiq
@done = false
end
def run
# Start this Sidekiq instance. If an embedding process already
# has a heartbeat thread, caller can use `async_beat: false`
# and instead have thread call Launcher#heartbeat every N seconds.
def run(async_beat: true)
Sidekiq.freeze!
@thread = safe_thread("heartbeat", &method(:start_heartbeat))
@thread = safe_thread("heartbeat", &method(:start_heartbeat)) if async_beat
@poller.start
@managers.each(&:start)
end
# Stops this instance from processing any more jobs,
#
def quiet
return if @done
@ -71,18 +73,30 @@ module Sidekiq
@done
end
# If embedding Sidekiq, you can have the process heartbeat
# call this method to regularly heartbeat rather than creating
# a separate thread.
def heartbeat
end
private unless $TESTING
BEAT_PAUSE = 10
def start_heartbeat
loop do
heartbeat
beat
sleep BEAT_PAUSE
end
logger.info("Heartbeat stopping...")
end
def beat
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded
end
def clear_heartbeat
flush_stats
@ -99,12 +113,6 @@ module Sidekiq
# best effort, ignore network errors
end
def heartbeat
$0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded
end
def flush_stats
fails = Processor::FAILURE.reset
procd = Processor::PROCESSED.reset

View File

@ -5,7 +5,7 @@ Bundler.require(:default, :test)
require "minitest/pride"
require "maxitest/autorun"
# require "maxitest/threads"
require "maxitest/threads"
$TESTING = true
# disable minitest/parallel threads

View File

@ -4,10 +4,6 @@ require_relative "helper"
require "sidekiq/launcher"
describe Sidekiq::Launcher do
subject do
Sidekiq::Launcher.new(@config)
end
before do
@config = reset!
@config.default_capsule.concurrency = 3
@ -23,13 +19,15 @@ describe Sidekiq::Launcher do
end
it "starts and stops" do
subject.run
subject.stop
l = Sidekiq::Launcher.new(@config)
l.run(async_beat: false)
l.stop
end
describe "heartbeat" do
before do
@id = subject.identity
@launcher = Sidekiq::Launcher.new(@config)
@id = @launcher.identity
Sidekiq::Processor::WORK_STATE.set("a", {"b" => 1})
@ -41,123 +39,52 @@ describe Sidekiq::Launcher do
$0 = @proctitle
end
describe "#heartbeat" do
describe "run" do
it "sets sidekiq version, tag and the number of busy workers to proctitle" do
subject.heartbeat
it "stores process info in redis" do
@launcher.beat
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
end
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0
workers, rtt = @config.redis { |c| c.hmget(@id, "busy", "rtt_us") }
it "stores process info in redis" do
subject.heartbeat
assert_equal "1", workers
refute_nil rtt
assert_in_delta 1000, rtt.to_i, 1000
workers, rtt = @config.redis { |c| c.hmget(subject.identity, "busy", "rtt_us") }
assert_equal "1", workers
refute_nil rtt
assert_in_delta 1000, rtt.to_i, 1000
expires = @config.redis { |c| c.pttl(subject.identity) }
assert_in_delta 60000, expires, 500
end
describe "events" do
before do
@cnt = 0
@config.on(:heartbeat) do
@cnt += 1
end
end
it "fires start heartbeat event only once" do
assert_equal 0, @cnt
subject.heartbeat
assert_equal 1, @cnt
subject.heartbeat
assert_equal 1, @cnt
end
end
end
describe "quiet" do
before do
subject.quiet
end
it "sets stopping proctitle" do
subject.heartbeat
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
end
it "stores process info in redis" do
subject.heartbeat
info = @config.redis { |c| c.hmget(subject.identity, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(subject.identity) }
assert_in_delta 60000, expires, 50
end
end
it "fires new heartbeat events" do
i = 0
@config.on(:heartbeat) do
i += 1
end
assert_equal 0, i
subject.heartbeat
assert_equal 1, i
subject.heartbeat
assert_equal 1, i
end
describe "when manager is active" do
before do
Sidekiq::Launcher::PROCTITLES << proc { "xyz" }
subject.heartbeat
Sidekiq::Launcher::PROCTITLES.pop
end
it "sets useful info to proctitle" do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end
it "stores process info in redis" do
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 500
end
end
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 500
end
describe "when manager is stopped" do
before do
subject.quiet
subject.heartbeat
end
it "fires start heartbeat event only once" do
cnt = 0
# after do
# puts system('redis-cli -n 15 keys "*" | while read LINE ; do TTL=`redis-cli -n 15 ttl "$LINE"`; if [ "$TTL" -eq -1 ]; then echo "$LINE"; fi; done;')
# end
it "indicates stopping status in proctitle" do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
@config.on(:heartbeat) do
cnt += 1
end
assert_equal 0, cnt
@launcher.heartbeat
assert_equal 1, cnt
@launcher.heartbeat
assert_equal 1, cnt
end
it "stores process info in redis" do
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 50
end
it "quiets" do
@launcher.quiet
@launcher.beat
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0
@launcher.beat
info = @config.redis { |c| c.hmget(@id, "busy") }
assert_equal ["1"], info
expires = @config.redis { |c| c.pttl(@id) }
assert_in_delta 60000, expires, 50
end
it "allows arbitrary proctitle extensions" do
Sidekiq::Launcher::PROCTITLES << proc { "xyz" }
@launcher.beat
Sidekiq::Launcher::PROCTITLES.pop
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end
end
end

View File

@ -28,13 +28,10 @@ describe Sidekiq::Manager do
mgr = new_manager
init_size = mgr.workers.size
processor = mgr.workers.first
begin
mgr.processor_result(processor, "ignored")
mgr.quiet
mgr.processor_result(processor, "ignored")
assert_equal init_size, mgr.workers.size
refute mgr.workers.include?(processor)
ensure
mgr.quiet
end
assert_equal init_size - 1, mgr.workers.size
refute mgr.workers.include?(processor)
end
end