diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index e90fbd22..1c2f9325 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -32,15 +32,17 @@ module Sidekiq @done = false end - def run + # Start this Sidekiq instance. If an embedding process already + # has a heartbeat thread, caller can use `async_beat: false` + # and instead have thread call Launcher#heartbeat every N seconds. + def run(async_beat: true) Sidekiq.freeze! - @thread = safe_thread("heartbeat", &method(:start_heartbeat)) + @thread = safe_thread("heartbeat", &method(:start_heartbeat)) if async_beat @poller.start @managers.each(&:start) end # Stops this instance from processing any more jobs, - # def quiet return if @done @@ -71,18 +73,30 @@ module Sidekiq @done end + # If embedding Sidekiq, you can have the process heartbeat + # call this method to regularly heartbeat rather than creating + # a separate thread. + def heartbeat + ❤ + end + private unless $TESTING BEAT_PAUSE = 10 def start_heartbeat loop do - heartbeat + beat sleep BEAT_PAUSE end logger.info("Heartbeat stopping...") end + def beat + $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded + ❤ + end + def clear_heartbeat flush_stats @@ -99,12 +113,6 @@ module Sidekiq # best effort, ignore network errors end - def heartbeat - $0 = PROCTITLES.map { |proc| proc.call(self, to_data) }.compact.join(" ") unless @embedded - - ❤ - end - def flush_stats fails = Processor::FAILURE.reset procd = Processor::PROCESSED.reset diff --git a/test/helper.rb b/test/helper.rb index e35f3a98..181664f5 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -5,7 +5,7 @@ Bundler.require(:default, :test) require "minitest/pride" require "maxitest/autorun" -# require "maxitest/threads" +require "maxitest/threads" $TESTING = true # disable minitest/parallel threads diff --git a/test/launcher.rb b/test/launcher.rb index a4a3678e..de7ee3e4 100644 --- a/test/launcher.rb +++ b/test/launcher.rb @@ -4,10 +4,6 @@ require_relative "helper" require "sidekiq/launcher" describe Sidekiq::Launcher do - subject do - Sidekiq::Launcher.new(@config) - end - before do @config = reset! @config.default_capsule.concurrency = 3 @@ -23,13 +19,15 @@ describe Sidekiq::Launcher do end it "starts and stops" do - subject.run - subject.stop + l = Sidekiq::Launcher.new(@config) + l.run(async_beat: false) + l.stop end describe "heartbeat" do before do - @id = subject.identity + @launcher = Sidekiq::Launcher.new(@config) + @id = @launcher.identity Sidekiq::Processor::WORK_STATE.set("a", {"b" => 1}) @@ -41,123 +39,52 @@ describe Sidekiq::Launcher do $0 = @proctitle end - describe "#heartbeat" do - describe "run" do - it "sets sidekiq version, tag and the number of busy workers to proctitle" do - subject.heartbeat + it "stores process info in redis" do + @launcher.beat - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0 - end + assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy]", $0 + workers, rtt = @config.redis { |c| c.hmget(@id, "busy", "rtt_us") } - it "stores process info in redis" do - subject.heartbeat + assert_equal "1", workers + refute_nil rtt + assert_in_delta 1000, rtt.to_i, 1000 - workers, rtt = @config.redis { |c| c.hmget(subject.identity, "busy", "rtt_us") } - - assert_equal "1", workers - refute_nil rtt - assert_in_delta 1000, rtt.to_i, 1000 - - expires = @config.redis { |c| c.pttl(subject.identity) } - - assert_in_delta 60000, expires, 500 - end - - describe "events" do - before do - @cnt = 0 - - @config.on(:heartbeat) do - @cnt += 1 - end - end - - it "fires start heartbeat event only once" do - assert_equal 0, @cnt - subject.heartbeat - assert_equal 1, @cnt - subject.heartbeat - assert_equal 1, @cnt - end - end - end - - describe "quiet" do - before do - subject.quiet - end - - it "sets stopping proctitle" do - subject.heartbeat - - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0 - end - - it "stores process info in redis" do - subject.heartbeat - - info = @config.redis { |c| c.hmget(subject.identity, "busy") } - - assert_equal ["1"], info - - expires = @config.redis { |c| c.pttl(subject.identity) } - - assert_in_delta 60000, expires, 50 - end - end - - it "fires new heartbeat events" do - i = 0 - @config.on(:heartbeat) do - i += 1 - end - assert_equal 0, i - subject.heartbeat - assert_equal 1, i - subject.heartbeat - assert_equal 1, i - end - - describe "when manager is active" do - before do - Sidekiq::Launcher::PROCTITLES << proc { "xyz" } - subject.heartbeat - Sidekiq::Launcher::PROCTITLES.pop - end - - it "sets useful info to proctitle" do - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0 - end - - it "stores process info in redis" do - info = @config.redis { |c| c.hmget(@id, "busy") } - assert_equal ["1"], info - expires = @config.redis { |c| c.pttl(@id) } - assert_in_delta 60000, expires, 500 - end - end + expires = @config.redis { |c| c.pttl(@id) } + assert_in_delta 60000, expires, 500 end - describe "when manager is stopped" do - before do - subject.quiet - subject.heartbeat - end + it "fires start heartbeat event only once" do + cnt = 0 - # after do - # puts system('redis-cli -n 15 keys "*" | while read LINE ; do TTL=`redis-cli -n 15 ttl "$LINE"`; if [ "$TTL" -eq -1 ]; then echo "$LINE"; fi; done;') - # end - - it "indicates stopping status in proctitle" do - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0 + @config.on(:heartbeat) do + cnt += 1 end + assert_equal 0, cnt + @launcher.heartbeat + assert_equal 1, cnt + @launcher.heartbeat + assert_equal 1, cnt + end - it "stores process info in redis" do - info = @config.redis { |c| c.hmget(@id, "busy") } - assert_equal ["1"], info - expires = @config.redis { |c| c.pttl(@id) } - assert_in_delta 60000, expires, 50 - end + it "quiets" do + @launcher.quiet + @launcher.beat + + assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] stopping", $0 + + @launcher.beat + info = @config.redis { |c| c.hmget(@id, "busy") } + assert_equal ["1"], info + + expires = @config.redis { |c| c.pttl(@id) } + assert_in_delta 60000, expires, 50 + end + + it "allows arbitrary proctitle extensions" do + Sidekiq::Launcher::PROCTITLES << proc { "xyz" } + @launcher.beat + Sidekiq::Launcher::PROCTITLES.pop + assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0 end end end diff --git a/test/manager.rb b/test/manager.rb index 21f9018d..d6838afb 100644 --- a/test/manager.rb +++ b/test/manager.rb @@ -28,13 +28,10 @@ describe Sidekiq::Manager do mgr = new_manager init_size = mgr.workers.size processor = mgr.workers.first - begin - mgr.processor_result(processor, "ignored") + mgr.quiet + mgr.processor_result(processor, "ignored") - assert_equal init_size, mgr.workers.size - refute mgr.workers.include?(processor) - ensure - mgr.quiet - end + assert_equal init_size - 1, mgr.workers.size + refute mgr.workers.include?(processor) end end