From 48546fdae97707116163e2ec91b3244b9984ec61 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 6 Oct 2015 12:43:01 -0700 Subject: [PATCH] WIP manager and launcher --- lib/sidekiq/cli.rb | 2 +- lib/sidekiq/launcher.rb | 117 ++++++++++++------- lib/sidekiq/manager.rb | 244 ++++++++++++++------------------------- lib/sidekiq/processor.rb | 30 ++--- test/helper.rb | 17 +++ test/test_actors.rb | 9 +- test/test_launcher.rb | 92 +++++++++++++++ test/test_manager.rb | 94 ++------------- test/test_middleware.rb | 6 +- test/test_processor.rb | 65 +++-------- 10 files changed, 327 insertions(+), 349 deletions(-) create mode 100644 test/test_launcher.rb diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 403aae6a..8ee0b87a 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -129,7 +129,7 @@ module Sidekiq raise Interrupt when 'USR1' Sidekiq.logger.info "Received USR1, no longer accepting new work" - launcher.manager.async.stop + launcher.quiet fire_event(:quiet, true) when 'USR2' if Sidekiq.options[:logfile] diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index f4e3550c..237f2ef9 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -1,4 +1,3 @@ -require 'sidekiq/actor' require 'sidekiq/manager' require 'sidekiq/fetch' require 'sidekiq/scheduled' @@ -9,62 +8,101 @@ module Sidekiq # If any of these actors die, the Sidekiq process exits # immediately. class Launcher - include Actor include Util - trap_exit :actor_died - - attr_reader :manager, :poller, :fetcher + attr_accessor :manager, :poller, :fetcher def initialize(options) - @condvar = Celluloid::Condition.new - @manager = Sidekiq::Manager.new_link(@condvar, options) - @poller = Sidekiq::Scheduled::Poller.new_link - @fetcher = Sidekiq::Fetcher.new_link(@manager, options) + @condvar = ::ConditionVariable.new + @manager = Sidekiq::Manager.new(@condvar, options) + @poller = Sidekiq::Scheduled::Poller.new + @fetcher = Sidekiq::Fetcher.new(@manager, options) @manager.fetcher = @fetcher @done = false @options = options end - def actor_died(actor, reason) - # https://github.com/mperham/sidekiq/issues/2057#issuecomment-66485477 - return if @done || !reason - - Sidekiq.logger.warn("Sidekiq died due to the following error, cannot recover, process exiting") - handle_exception(reason) - exit(1) - end - def run - watchdog('Launcher#run') do - manager.async.start - poller.async.poll(true) - - start_heartbeat - end + @thread = safe_thread("heartbeat", &method(:start_heartbeat)) + @fetcher.start + @poller.start + @manager.start end + # Stops this instance from processing any more jobs, + # + def quiet + @manager.quiet + @fetcher.terminate + @poller.terminate + end + + # Shuts down the process. This method does not + # return until all work is complete and cleaned up. + # It can take up to the timeout to complete. def stop - watchdog('Launcher#stop') do - @done = true - Sidekiq::Fetcher.done! - fetcher.terminate if fetcher.alive? - poller.terminate if poller.alive? + deadline = Time.now + @options[:timeout] - manager.async.stop(:shutdown => true, :timeout => @options[:timeout]) - @condvar.wait - manager.terminate + @manager.quiet + @fetcher.terminate + @poller.terminate - # Requeue everything in case there was a worker who grabbed work while stopped - # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. - Sidekiq::Fetcher.strategy.bulk_requeue([], @options) + @manager.stop(deadline) - stop_heartbeat - end + # Requeue everything in case there was a worker who grabbed work while stopped + # This call is a no-op in Sidekiq but necessary for Sidekiq Pro. + Sidekiq::Fetcher.strategy.bulk_requeue([], @options) + + stop_heartbeat end private + JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API + + PROCTITLES = [ + proc { 'sidekiq'.freeze }, + proc { Sidekiq::VERSION }, + proc { |me, data| data['tag'] }, + proc { |me, data| "[#{me.manager.in_progress.size} of #{data['concurrency']} busy]" }, + proc { |me, data| "stopping" if me.manager.stopped? }, + ] + + def heartbeat(key, data, json) + while !@done + results = PROCTITLES.map {|x| x.(self, data) } + results.compact! + $0 = results.join(' ') + + ❤(key, json) + sleep 5 + end + end + + def ❤(key, json) + begin + _, _, _, msg = Sidekiq.redis do |conn| + conn.pipelined do + conn.sadd('processes', key) + conn.hmset(key, 'info', json, 'busy', manager.in_progress.size, 'beat', Time.now.to_f) + conn.expire(key, 60) + conn.rpop("#{key}-signals") + end + end + + return unless msg + + if JVM_RESERVED_SIGNALS.include?(msg) + Sidekiq::CLI.instance.handle_signal(msg) + else + ::Process.kill(msg, $$) + end + rescue => e + # ignore all redis/network issues + logger.error("heartbeat: #{e.message}") + end + end + def start_heartbeat key = identity data = { @@ -74,16 +112,17 @@ module Sidekiq 'tag' => @options[:tag] || '', 'concurrency' => @options[:concurrency], 'queues' => @options[:queues].uniq, - 'labels' => Sidekiq.options[:labels], + 'labels' => @options[:labels], 'identity' => identity, } # this data doesn't change so dump it to a string # now so we don't need to dump it every heartbeat. json = Sidekiq.dump_json(data) - manager.heartbeat(key, data, json) + heartbeat(key, data, json) end def stop_heartbeat + @done = true Sidekiq.redis do |conn| conn.pipelined do conn.srem('processes', identity) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index a73f0403..9ef844bf 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -1,156 +1,121 @@ # encoding: utf-8 require 'sidekiq/util' -require 'sidekiq/actor' require 'sidekiq/processor' require 'sidekiq/fetch' module Sidekiq ## - # The main router in the system. This - # manages the processor state and accepts messages - # from Redis to be dispatched to an idle processor. + # The Manager is the central coordination point in Sidekiq, controlling + # the lifecycle of the Processors and feeding them jobs as necessary. + # + # Tasks: + # + # 1. start: Spin up Processors. Issue fetch requests for each. + # 2. processor_done: Handle job success, issue fetch request. + # 3. processor_died: Handle job failure, throw away Processor, issue fetch request. + # 4. quiet: shutdown idle Processors, ignore further fetch requests. + # 5. stop: hard stop the Processors by deadline. + # + # Note that only the last task requires a Thread since it has to monitor + # the shutdown process. The other tasks are performed by other threads. # class Manager include Util - include Actor - trap_exit :processor_died + attr_writer :fetcher + attr_reader :in_progress attr_reader :ready - attr_reader :busy - attr_accessor :fetcher SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1 - JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API def initialize(condvar, options={}) logger.debug { options.inspect } @options = options @count = options[:concurrency] || 25 raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 - @done_callback = nil @finished = condvar @in_progress = {} - @threads = {} @done = false - @busy = [] - @ready = @count.times.map do - p = Processor.new_link(current_actor) - p.proxy_id = p.object_id - p + @ready = Array.new(@count) do + Processor.new(self) end - end - - def stop(options={}) - watchdog('Manager#stop died') do - should_shutdown = options[:shutdown] - timeout = options[:timeout] - - @done = true - - logger.info { "Terminating #{@ready.size} quiet workers" } - @ready.each { |x| x.terminate if x.alive? } - @ready.clear - - return if clean_up_for_graceful_shutdown - - hard_shutdown_in timeout if should_shutdown - end - end - - def clean_up_for_graceful_shutdown - if @busy.empty? - shutdown - return true - end - - after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown } - false + @plock = Mutex.new end def start - @ready.each { dispatch } + @ready.each { |x| x.start; dispatch } end - def when_done(&blk) - @done_callback = blk + def quiet + return if @done + + @done = true + + logger.info { "Terminating quiet workers" } + + @plock.synchronize do + @ready.each { |x| x.terminate } + @ready.clear + end + end + + def stop(deadline) + quiet + return shutdown if @in_progress.empty? + + logger.info { "Pausing to allow workers to finish..." } + remaining = deadline - Time.now + while remaining > 0.5 + return shutdown if @in_progress.empty? + sleep 0.5 + remaining = deadline - Time.now + end + return shutdown if @in_progress.empty? + + hard_shutdown end def processor_done(processor) - watchdog('Manager#processor_done died') do - @done_callback.call(processor) if @done_callback - @in_progress.delete(processor.object_id) - @threads.delete(processor.object_id) - @busy.delete(processor) - if stopped? - processor.terminate if processor.alive? - shutdown if @busy.empty? + @plock.synchronize do + @in_progress.delete(processor) + if @done + processor.terminate + #shutdown if @in_progress.empty? else - @ready << processor if processor.alive? + @ready << processor end - dispatch end + dispatch end def processor_died(processor, reason) - watchdog("Manager#processor_died died") do - @in_progress.delete(processor.object_id) - @threads.delete(processor.object_id) - @busy.delete(processor) - - unless stopped? - p = Processor.new_link(current_actor) - p.proxy_id = p.object_id - @ready << p - dispatch + @plock.synchronize do + @in_progress.delete(processor) + if @done + #shutdown if @in_progress.empty? else - shutdown if @busy.empty? + @ready << Processor.new(self) end end + dispatch end def assign(work) - watchdog("Manager#assign died") do - if stopped? - # Race condition between Manager#stop if Fetcher - # is blocked on redis and gets a message after - # all the ready Processors have been stopped. - # Push the message back to redis. - work.requeue - else + if @done + # Race condition between Manager#stop if Fetcher + # is blocked on redis and gets a message after + # all the ready Processors have been stopped. + # Push the message back to redis. + work.requeue + else + processor = nil + @plock.synchronize do processor = @ready.pop - @in_progress[processor.object_id] = work - @busy << processor - processor.async.process(work) + @in_progress[processor] = work end - end - end - - # A hack worthy of Rube Goldberg. We need to be able - # to hard stop a working thread. But there's no way for us to - # get handle to the underlying thread performing work for a processor - # so we have it call us and tell us. - def real_thread(proxy_id, thr) - @threads[proxy_id] = thr - end - - PROCTITLES = [ - proc { 'sidekiq'.freeze }, - proc { Sidekiq::VERSION }, - proc { |mgr, data| data['tag'] }, - proc { |mgr, data| "[#{mgr.busy.size} of #{data['concurrency']} busy]" }, - proc { |mgr, data| "stopping" if mgr.stopped? }, - ] - - def heartbeat(key, data, json) - results = PROCTITLES.map {|x| x.(self, data) } - results.compact! - $0 = results.join(' ') - - ❤(key, json) - after(5) do - heartbeat(key, data, json) + processor.request_process(work) end end @@ -160,77 +125,44 @@ module Sidekiq private - def ❤(key, json) - begin - _, _, _, msg = Sidekiq.redis do |conn| - conn.multi do - conn.sadd('processes', key) - conn.hmset(key, 'info', json, 'busy', @busy.size, 'beat', Time.now.to_f) - conn.expire(key, 60) - conn.rpop("#{key}-signals") - end - end + def hard_shutdown + # We've reached the timeout and we still have busy workers. + # They must die but their messages shall live on. + logger.warn { "Terminating #{@in_progress.size} busy worker threads" } + logger.warn { "Work still in progress #{@in_progress.values.inspect}" } - return unless msg + requeue - if JVM_RESERVED_SIGNALS.include?(msg) - Sidekiq::CLI.instance.handle_signal(msg) - else - ::Process.kill(msg, $$) - end - rescue => e - # ignore all redis/network issues - logger.error("heartbeat: #{e.message}") - end - end - - def hard_shutdown_in(delay) - logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." } - - after(delay) do - watchdog("Manager#hard_shutdown_in died") do - # We've reached the timeout and we still have busy workers. - # They must die but their messages shall live on. - logger.warn { "Terminating #{@busy.size} busy worker threads" } - logger.warn { "Work still in progress #{@in_progress.values.inspect}" } - - requeue - - @busy.each do |processor| - if processor.alive? && t = @threads.delete(processor.object_id) - t.raise Shutdown - end - end - - @finished.signal - end + @in_progress.each do |processor, _| + processor.kill end end def dispatch - return if stopped? - # This is a safety check to ensure we haven't leaked - # processors somehow. - raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty? + return if @done + # This is a safety check to ensure we haven't leaked processors somehow. + raise "BUG: No processors, cannot continue!" if @ready.empty? && @in_progress.empty? raise "No ready processor!?" if @ready.empty? - @fetcher.async.fetch + @fetcher.request_job end def shutdown requeue - @finished.signal end def requeue - # Re-enqueue terminated jobs + # Re-enqueue unfinished jobs # NOTE: You may notice that we may push a job back to redis before # the worker thread is terminated. This is ok because Sidekiq's # contract says that jobs are run AT LEAST once. Process termination # is delayed until we're certain the jobs are back in Redis because # it is worse to lose a job than to run it twice. - Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values, @options) - @in_progress.clear + jobs = nil + @plock.synchronize do + jobs = @in_progress.values + end + Sidekiq::Fetcher.strategy.bulk_requeue(jobs, @options) if jobs.size > 0 end end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 0263b205..34f088ce 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -33,37 +33,41 @@ module Sidekiq @mgr = mgr @done = false @work = ::Queue.new - @thread = safe_thread("processor", &method(:run)) end def terminate(wait=false) @done = true @work << nil + @thread.value if wait + end + + def kill(wait=false) # unlike the other actors, terminate does not wait # for the thread to finish because we don't know how # long the job will take to finish. Instead we # provide a `kill` method to call after the shutdown # timeout passes. - @thread.value if wait - end - - def kill(wait=false) @thread.raise ::Sidekiq::Shutdown @thread.value if wait end - def process(work) + def start + @thread ||= safe_thread("processor", &method(:run)) + end + + def request_process(work) raise ArgumentError, "Processor is shut down!" if @done + raise ArgumentError, "Processor has not started!" unless @thread @work << work end - private + private unless $TESTING def run begin while !@done job = @work.pop - go(job) if job + process(job) if job end rescue Exception => ex Sidekiq.logger.warn(ex.message) @@ -71,7 +75,7 @@ module Sidekiq end end - def go(work) + def process(work) msgstr = work.message queue = work.queue_name @@ -121,7 +125,7 @@ module Sidekiq retry_and_suppress_exceptions do hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i }) Sidekiq.redis do |conn| - conn.multi do + conn.pipelined do conn.hmset("#{identity}:workers", thread_identity, hash) conn.expire("#{identity}:workers", 14400) end @@ -135,7 +139,7 @@ module Sidekiq retry_and_suppress_exceptions do failed = "stat:failed:#{nowdate}" Sidekiq.redis do |conn| - conn.multi do + conn.pipelined do conn.incrby("stat:failed".freeze, 1) conn.incrby(failed, 1) conn.expire(failed, STATS_TIMEOUT) @@ -147,7 +151,7 @@ module Sidekiq retry_and_suppress_exceptions do processed = "stat:processed:#{nowdate}" Sidekiq.redis do |conn| - conn.multi do + conn.pipelined do conn.hdel("#{identity}:workers", thread_identity) conn.incrby("stat:processed".freeze, 1) conn.incrby(processed, 1) @@ -174,7 +178,7 @@ module Sidekiq rescue => e retry_count += 1 if retry_count <= max_retries - Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} + Sidekiq.logger.info {"Suppressing and retrying error: #{e.inspect}"} pause_for_recovery(retry_count) retry else diff --git a/test/helper.rb b/test/helper.rb index 5e029872..16db1df6 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -9,6 +9,23 @@ if ENV["COVERAGE"] end ENV['RACK_ENV'] = ENV['RAILS_ENV'] = 'test' +trap 'USR1' do + threads = Thread.list + + puts + puts "=" * 80 + puts "Received USR1 signal; printing all #{threads.count} thread backtraces." + + threads.each do |thr| + description = thr == Thread.main ? "Main thread" : thr.inspect + puts + puts "#{description} backtrace: " + puts thr.backtrace.join("\n") + end + + puts "=" * 80 +end + begin require 'pry-byebug' rescue LoadError diff --git a/test/test_actors.rb b/test/test_actors.rb index fa61643f..bbae85d2 100644 --- a/test/test_actors.rb +++ b/test/test_actors.rb @@ -94,13 +94,14 @@ class TestActors < Sidekiq::Test mgr = Mgr.new p = Sidekiq::Processor.new(mgr) + p.start SomeWorker.perform_async(0) job = Sidekiq.redis { |c| c.lpop("queue:default") } uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) a = $count mgr.mutex.synchronize do - p.process(uow) + p.request_process(uow) mgr.cond.wait(mgr.mutex) end b = $count @@ -115,13 +116,14 @@ class TestActors < Sidekiq::Test mgr = Mgr.new p = Sidekiq::Processor.new(mgr) + p.start SomeWorker.perform_async("boom") job = Sidekiq.redis { |c| c.lpop("queue:default") } uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) a = $count mgr.mutex.synchronize do - p.process(uow) + p.request_process(uow) mgr.cond.wait(mgr.mutex) end b = $count @@ -140,7 +142,8 @@ class TestActors < Sidekiq::Test job = Sidekiq.redis { |c| c.lpop("queue:default") } uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) a = $count - p.process(uow) + p.start + p.request_process(uow) sleep(0.02) p.terminate p.kill(true) diff --git a/test/test_launcher.rb b/test/test_launcher.rb new file mode 100644 index 00000000..eb2356b9 --- /dev/null +++ b/test/test_launcher.rb @@ -0,0 +1,92 @@ +require_relative 'helper' +require 'sidekiq/launcher' + +class TestLauncher < Sidekiq::Test + + describe 'launcher' do + before do + Sidekiq.redis {|c| c.flushdb } + end + + def new_manager(opts) + condvar = Minitest::Mock.new + condvar.expect(:signal, nil, []) + Sidekiq::Manager.new(condvar, opts) + end + + describe 'heartbeat' do + before do + uow = Object.new + + @processor = Minitest::Mock.new + @processor.expect(:request_process, nil, [uow]) + @processor.expect(:hash, 1234, []) + + @mgr = new_manager(options) + @launcher = Sidekiq::Launcher.new(options) + @launcher.manager = @mgr + + @mgr.ready << @processor + @mgr.assign(uow) + + @processor.verify + @proctitle = $0 + end + + after do + $0 = @proctitle + end + + describe 'when manager is active' do + before do + Sidekiq::Launcher::PROCTITLES << proc { "xyz" } + @launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data)) + Sidekiq::Launcher::PROCTITLES.pop + end + + it 'sets useful info to proctitle' do + assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0 + end + + it 'stores process info in redis' do + info = Sidekiq.redis { |c| c.hmget('identity', 'busy') } + assert_equal ["1"], info + expires = Sidekiq.redis { |c| c.pttl('identity') } + assert_in_delta 60000, expires, 500 + end + end + + describe 'when manager is stopped' do + before do + @processor.expect(:terminate, []) + + @launcher.quiet + @launcher.manager.processor_done(@processor) + @launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data)) + + @processor.verify + end + + it 'indicates stopping status in proctitle' do + assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0 + end + + it 'stores process info in redis' do + info = Sidekiq.redis { |c| c.hmget('identity', 'busy') } + assert_equal ["0"], info + expires = Sidekiq.redis { |c| c.pttl('identity') } + assert_in_delta 60000, expires, 50 + end + end + end + + def options + { :concurrency => 3, :queues => ['default'] } + end + + def heartbeat_data + { 'concurrency' => 3, 'tag' => 'myapp' } + end + end + +end diff --git a/test/test_manager.rb b/test/test_manager.rb index 11def61e..c5683cec 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -17,19 +17,19 @@ class TestManager < Sidekiq::Test it 'creates N processor instances' do mgr = new_manager(options) assert_equal options[:concurrency], mgr.ready.size - assert_equal [], mgr.busy + assert_equal({}, mgr.in_progress) end it 'assigns work to a processor' do uow = Object.new processor = Minitest::Mock.new - processor.expect(:async, processor, []) processor.expect(:process, nil, [uow]) + processor.expect(:hash, 1234, []) mgr = new_manager(options) mgr.ready << processor mgr.assign(uow) - assert_equal 1, mgr.busy.size + assert_equal 1, mgr.in_progress.size processor.verify end @@ -40,7 +40,7 @@ class TestManager < Sidekiq::Test mgr = new_manager(options) mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) - mgr.stop + mgr.quiet mgr.assign(uow) uow.verify end @@ -48,40 +48,38 @@ class TestManager < Sidekiq::Test it 'shuts down the system' do mgr = new_manager(options) mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) - mgr.stop + mgr.stop(Time.now) - assert mgr.busy.empty? + assert mgr.in_progress.empty? assert mgr.ready.empty? end it 'returns finished processors to the ready pool' do fetcher = MiniTest::Mock.new - fetcher.expect :async, fetcher, [] - fetcher.expect :fetch, nil, [] + fetcher.expect :request_job, nil, [] mgr = new_manager(options) mgr.fetcher = fetcher init_size = mgr.ready.size processor = mgr.ready.pop - mgr.busy << processor + mgr.in_progress[processor] = 'abc' mgr.processor_done(processor) - assert_equal 0, mgr.busy.size + assert_equal 0, mgr.in_progress.size assert_equal init_size, mgr.ready.size fetcher.verify end it 'throws away dead processors' do fetcher = MiniTest::Mock.new - fetcher.expect :async, fetcher, [] - fetcher.expect :fetch, nil, [] + fetcher.expect :request_job, nil, [] mgr = new_manager(options) mgr.fetcher = fetcher init_size = mgr.ready.size processor = mgr.ready.pop - mgr.busy << processor + mgr.in_progress[processor] = 'abc' mgr.processor_died(processor, 'ignored') - assert_equal 0, mgr.busy.size + assert_equal 0, mgr.in_progress.size assert_equal init_size, mgr.ready.size refute mgr.ready.include?(processor) fetcher.verify @@ -92,77 +90,9 @@ class TestManager < Sidekiq::Test assert_raises(ArgumentError) { new_manager(concurrency: -1) } end - describe 'heartbeat' do - before do - uow = Object.new - - @processor = Minitest::Mock.new - @processor.expect(:async, @processor, []) - @processor.expect(:process, nil, [uow]) - - @mgr = new_manager(options) - @mgr.ready << @processor - @mgr.assign(uow) - - @processor.verify - @proctitle = $0 - end - - after do - $0 = @proctitle - end - - describe 'when manager is active' do - before do - Sidekiq::Manager::PROCTITLES << proc { "xyz" } - @mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data)) - Sidekiq::Manager::PROCTITLES.pop - end - - it 'sets useful info to proctitle' do - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0 - end - - it 'stores process info in redis' do - info = Sidekiq.redis { |c| c.hmget('identity', 'busy') } - assert_equal ["1"], info - expires = Sidekiq.redis { |c| c.pttl('identity') } - assert_in_delta 60000, expires, 500 - end - end - - describe 'when manager is stopped' do - before do - @processor.expect(:alive?, []) - @processor.expect(:terminate, []) - - @mgr.stop - @mgr.processor_done(@processor) - @mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data)) - - @processor.verify - end - - it 'indicates stopping status in proctitle' do - assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0 - end - - it 'stores process info in redis' do - info = Sidekiq.redis { |c| c.hmget('identity', 'busy') } - assert_equal ["0"], info - expires = Sidekiq.redis { |c| c.pttl('identity') } - assert_in_delta 60000, expires, 50 - end - end - end - def options { :concurrency => 3, :queues => ['default'] } end - def heartbeat_data - { 'concurrency' => 3, 'tag' => 'myapp' } - end end - end diff --git a/test/test_middleware.rb b/test/test_middleware.rb index 74179e5e..5ae03705 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -80,11 +80,7 @@ class TestMiddleware < Sidekiq::Test boss = Minitest::Mock.new processor = Sidekiq::Processor.new(boss) - actor = Minitest::Mock.new - actor.expect(:processor_done, nil, [processor]) - actor.expect(:real_thread, nil, [nil, Thread]) - boss.expect(:async, actor, []) - boss.expect(:async, actor, []) + boss.expect(:processor_done, nil, [processor]) processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)) assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten end diff --git a/test/test_processor.rb b/test/test_processor.rb index ab1fe64b..f73f09d0 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -1,15 +1,17 @@ require_relative 'helper' +require 'sidekiq/fetch' +require 'sidekiq/cli' require 'sidekiq/processor' class TestProcessor < Sidekiq::Test TestException = Class.new(StandardError) TEST_EXCEPTION = TestException.new("kerboom!") - describe 'with mock setup' do + describe 'processor' do before do $invokes = 0 - @boss = Minitest::Mock.new - @processor = ::Sidekiq::Processor.new(@boss) + @mgr = Minitest::Mock.new + @processor = ::Sidekiq::Processor.new(@mgr) end class MockWorker @@ -27,13 +29,9 @@ class TestProcessor < Sidekiq::Test it 'processes as expected' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) - actor = Minitest::Mock.new - actor.expect(:processor_done, nil, [@processor]) - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) - @boss.expect(:async, actor, []) + @mgr.expect(:processor_done, nil, [Sidekiq::Processor]) @processor.process(work(msg)) - @boss.verify + @mgr.verify assert_equal 1, $invokes end @@ -43,46 +41,26 @@ class TestProcessor < Sidekiq::Test @processor.execute_job(worker, [1, 2, 3]) end - it 'passes exceptions to ExceptionHandler' do - actor = Minitest::Mock.new - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) - msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) - begin - @processor.process(work(msg)) - flunk "Expected #process to raise exception" - rescue TestException - end - - assert_equal 0, $invokes - end - it 're-raises exceptions after handling' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) re_raise = false - actor = Minitest::Mock.new - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) begin @processor.process(work(msg)) + flunk "Expected exception" rescue TestException re_raise = true end + assert_equal 0, $invokes assert re_raise, "does not re-raise exceptions after handling" end it 'does not modify original arguments' do msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] } msgstr = Sidekiq.dump_json(msg) - processor = ::Sidekiq::Processor.new(@boss) - actor = Minitest::Mock.new - actor.expect(:processor_done, nil, [processor]) - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) - @boss.expect(:async, actor, []) - processor.process(work(msgstr)) + @mgr.expect(:processor_done, nil, [@processor]) + @processor.process(work(msgstr)) assert_equal [['myarg']], msg['args'] end @@ -106,17 +84,13 @@ class TestProcessor < Sidekiq::Test let(:skip_job) { false } let(:worker_args) { ['myarg'] } let(:work) { MiniTest::Mock.new } - let(:actor) { Minitest::Mock.new } before do - work.expect(:queue_name, 'queues:default') + work.expect(:queue_name, 'queue:default') work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args })) Sidekiq.server_middleware do |chain| chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job end - - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) end after do @@ -156,8 +130,7 @@ class TestProcessor < Sidekiq::Test it 'acks the job' do work.expect(:acknowledge, nil) - @boss.expect(:async, actor, []) - actor.expect(:processor_done, nil, [@processor]) + @mgr.expect(:processor_done, nil, [@processor]) @processor.process(work) end end @@ -178,8 +151,7 @@ class TestProcessor < Sidekiq::Test describe 'everything goes well' do it 'acks the job' do work.expect(:acknowledge, nil) - @boss.expect(:async, actor, []) - actor.expect(:processor_done, nil, [@processor]) + @mgr.expect(:processor_done, nil, [@processor]) @processor.process(work) end end @@ -195,11 +167,7 @@ class TestProcessor < Sidekiq::Test def successful_job msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) - actor = Minitest::Mock.new - actor.expect(:real_thread, nil, [nil, Thread]) - actor.expect(:processor_done, nil, [@processor]) - @boss.expect(:async, actor, []) - @boss.expect(:async, actor, []) + @mgr.expect(:processor_done, nil, [@processor]) @processor.process(work(msg)) end @@ -215,9 +183,6 @@ class TestProcessor < Sidekiq::Test let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" } def failed_job - actor = Minitest::Mock.new - actor.expect(:real_thread, nil, [nil, Thread]) - @boss.expect(:async, actor, []) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) begin @processor.process(work(msg))