diff --git a/lib/sidekiq/testing.rb b/lib/sidekiq/testing.rb index cfc25324..8d80b96f 100644 --- a/lib/sidekiq/testing.rb +++ b/lib/sidekiq/testing.rb @@ -68,7 +68,7 @@ module Sidekiq def raw_push(payloads) if Sidekiq::Testing.fake? payloads.each do |job| - job['class'].constantize.jobs << Sidekiq.load_json(Sidekiq.dump_json(job)) + Queues.jobs[job['class'].constantize.sidekiq_options["queue"]] << Sidekiq.load_json(Sidekiq.dump_json(job)) end true elsif Sidekiq::Testing.inline? @@ -85,6 +85,64 @@ module Sidekiq end end + module Queues + ## + # The Queues class is only for testing the fake queue implementation. + # The data is structured as a hash with queue name as hash key and array + # of job data as the value. + # + # { + # "default"=>[ + # { + # "class"=>"TestTesting::QueueWorker", + # "args"=>[1, 2], + # "retry"=>true, + # "queue"=>"default", + # "jid"=>"abc5b065c5c4b27fc1102833", + # "created_at"=>1447445554.419934 + # } + # ] + # } + # + # Example: + # + # require 'sidekiq/testing' + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # assert_equal 1, Sidekiq::Queues["default"].size + # assert_equal :something, Sidekiq::Queues["default"].first['args'][0] + # + # You can also clear and drain all workers' jobs: + # + # assert_equal 0, Sidekiq::Queues["default"].size + # HardWorker.perform_async(:something) + # Sidekiq::Queues.clear_all + # assert_equal 0, Sidekiq::Queues["default"].size + # + # This can be useful to make sure jobs don't linger between tests: + # + # RSpec.configure do |config| + # config.before(:each) do + # Sidekiq::Queues.clear_all + # end + # end + # + class << self + def [](queue) + jobs[queue] + end + + def jobs + @jobs ||= Hash.new { |hash, key| hash[key] = [] } + end + + def clear_all + jobs.clear + end + end + end + module Worker ## # The Sidekiq testing infrastructure overrides perform_async @@ -143,28 +201,36 @@ module Sidekiq # module ClassMethods + # Queue for this worker + def queue + self.sidekiq_options["queue"] + end + # Jobs queued for this worker def jobs - Worker.jobs[self] + Queues.jobs[queue].select { |job| job["class"] == self.to_s } end # Clear all jobs for this worker def clear - jobs.clear + Queues.jobs[queue].clear end # Drain and run all jobs for this worker def drain - while job = jobs.shift do - process_job(job) + while jobs.any? + next_job = jobs.first + Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] } + process_job(next_job) end end # Pop out a single job and perform it def perform_one raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty? - job = jobs.shift - process_job(job) + next_job = jobs.first + Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] } + process_job(next_job) end def process_job(job) @@ -183,18 +249,22 @@ module Sidekiq class << self def jobs # :nodoc: - @jobs ||= Hash.new { |hash, key| hash[key] = [] } + Queues.jobs.values.flatten end # Clear all queued jobs across all workers def clear_all - jobs.clear + Queues.clear_all end # Drain all queued jobs across all workers def drain_all - until jobs.values.all?(&:empty?) do - jobs.keys.each(&:drain) + while jobs.any? + worker_classes = jobs.map { |job| job["class"] } + + worker_classes.each do |worker_class| + worker_class.constantize.drain + end end end end diff --git a/test/test_testing_fake.rb b/test/test_testing_fake.rb index 4084ea14..70c0ebf4 100644 --- a/test/test_testing_fake.rb +++ b/test/test_testing_fake.rb @@ -54,6 +54,7 @@ class TestTesting < Sidekiq::Test after do Sidekiq::Testing.disable! + Sidekiq::Queues.clear_all end it 'stubs the async call' do @@ -263,6 +264,58 @@ class TestTesting < Sidekiq::Test it 'can execute a job' do DirectWorker.execute_job(DirectWorker.new, [2, 3]) end + end + describe 'queue testing' do + before do + require 'sidekiq/testing' + Sidekiq::Testing.fake! + end + + after do + Sidekiq::Testing.disable! + Sidekiq::Queues.clear_all + end + + class QueueWorker + include Sidekiq::Worker + def perform(a, b) + a + b + end + end + + class AltQueueWorker + include Sidekiq::Worker + sidekiq_options queue: "alt" + def perform(a, b) + a + b + end + end + + it 'finds enqueued jobs' do + assert_equal 0, Sidekiq::Queues["default"].size + + QueueWorker.perform_async(1, 2) + QueueWorker.perform_async(1, 2) + AltQueueWorker.perform_async(1, 2) + + assert_equal 2, Sidekiq::Queues["default"].size + assert_equal [1, 2], Sidekiq::Queues["default"].first["args"] + + assert_equal 1, Sidekiq::Queues["alt"].size + end + + it 'clears out all queues' do + assert_equal 0, Sidekiq::Queues["default"].size + + QueueWorker.perform_async(1, 2) + QueueWorker.perform_async(1, 2) + AltQueueWorker.perform_async(1, 2) + + Sidekiq::Queues.clear_all + + assert_equal 0, Sidekiq::Queues["default"].size + assert_equal 0, Sidekiq::Queues["alt"].size + end end end