mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
648da12519
Before #34953, when using the `:async` Active Job queue adapter, jobs enqueued in `db/seeds.rb`, such as Active Storage analysis jobs, would cause a hang (see #34939). Therefore, #34953 changed all jobs enqueued in `db/seeds.rb` to use the `:inline` queue adapter instead. (This behavior was later limited to only take effect when the `:async` adapter was configured, see #35905.) However, inline jobs in `db/seeds.rb` cleared `CurrentAttributes` values (see #37526). Therefore, #37568 changed the `:inline` adapter to wrap each job in its own thread, for isolation. However, wrapping a job in its own thread affects which database connection it uses. Thus inline jobs can no longer execute within the calling thread's database transaction, including seeing any uncommitted changes. Additionally, if the calling thread is not wrapped with the executor, the inline job thread (which is wrapped with the executor) can deadlock on the load interlock. And when testing (with `connection_pool.lock_thread = true`), the inline job thread can deadlock on one of the locks added by #28083. Therefore, this commit reverts the solutions of #34953 and #37568, and instead wraps evaluation of `db/seeds.rb` with the executor. This eliminates the original hang from #34939, which was also due to running multiple threads and not wrapping all of them with the executor. And, because nested calls to `executor.wrap` are ignored, any inline jobs in `db/seeds.rb` will not clear `CurrentAttributes` values. Alternative fix for #34939. Reverts #34953. Reverts #35905. Partially reverts #35896. Alternative fix for #37526. Reverts #37568. Fixes #40552.
148 lines
4.7 KiB
Ruby
148 lines
4.7 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require "helper"
|
|
require "jobs/logging_job"
|
|
require "jobs/hello_job"
|
|
require "jobs/provider_jid_job"
|
|
require "active_support/core_ext/numeric/time"
|
|
|
|
class QueuingTest < ActiveSupport::TestCase
|
|
test "should run jobs enqueued on a listening queue" do
|
|
TestJob.perform_later @id
|
|
wait_for_jobs_to_finish_for(5.seconds)
|
|
assert job_executed
|
|
end
|
|
|
|
test "should not run jobs queued on a non-listening queue" do
|
|
skip if adapter_is?(:inline, :async, :sucker_punch)
|
|
old_queue = TestJob.queue_name
|
|
|
|
begin
|
|
TestJob.queue_as :some_other_queue
|
|
TestJob.perform_later @id
|
|
wait_for_jobs_to_finish_for(2.seconds)
|
|
assert_not job_executed
|
|
ensure
|
|
TestJob.queue_name = old_queue
|
|
end
|
|
end
|
|
|
|
test "should supply a wrapped class name to Sidekiq" do
|
|
skip unless adapter_is?(:sidekiq)
|
|
Sidekiq::Testing.fake! do
|
|
::HelloJob.perform_later
|
|
hash = ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper.jobs.first
|
|
assert_equal "ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", hash["class"]
|
|
assert_equal "HelloJob", hash["wrapped"]
|
|
end
|
|
end
|
|
|
|
test "should access provider_job_id inside Sidekiq job" do
|
|
skip unless adapter_is?(:sidekiq)
|
|
Sidekiq::Testing.inline! do
|
|
job = ::ProviderJidJob.perform_later
|
|
assert_equal "Provider Job ID: #{job.provider_job_id}", JobBuffer.last_value
|
|
end
|
|
end
|
|
|
|
test "should supply a wrapped class name to DelayedJob" do
|
|
skip unless adapter_is?(:delayed_job)
|
|
::HelloJob.perform_later
|
|
job = Delayed::Job.first
|
|
assert_match(/HelloJob \[[0-9a-f-]+\] from DelayedJob\(default\) with arguments: \[\]/, job.name)
|
|
end
|
|
|
|
test "resque JobWrapper should have instance variable queue" do
|
|
skip unless adapter_is?(:resque)
|
|
job = ::HelloJob.set(wait: 5.seconds).perform_later
|
|
hash = Resque.decode(Resque.find_delayed_selection { true }[0])
|
|
assert_equal hash["queue"], job.queue_name
|
|
end
|
|
|
|
test "should not run job enqueued in the future" do
|
|
TestJob.set(wait: 10.minutes).perform_later @id
|
|
wait_for_jobs_to_finish_for(5.seconds)
|
|
assert_not job_executed
|
|
rescue NotImplementedError
|
|
skip
|
|
end
|
|
|
|
test "should run job enqueued in the future at the specified time" do
|
|
TestJob.set(wait: 5.seconds).perform_later @id
|
|
wait_for_jobs_to_finish_for(2.seconds)
|
|
assert_not job_executed
|
|
wait_for_jobs_to_finish_for(10.seconds)
|
|
assert job_executed
|
|
rescue NotImplementedError
|
|
skip
|
|
end
|
|
|
|
test "should supply a provider_job_id when available for immediate jobs" do
|
|
skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
|
|
test_job = TestJob.perform_later @id
|
|
assert test_job.provider_job_id, "Provider job id should be set by provider"
|
|
end
|
|
|
|
test "should supply a provider_job_id when available for delayed jobs" do
|
|
skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
|
|
delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
|
|
assert delayed_test_job.provider_job_id, "Provider job id should by set for delayed jobs by provider"
|
|
end
|
|
|
|
test "current locale is kept while running perform_later" do
|
|
skip if adapter_is?(:inline)
|
|
|
|
begin
|
|
I18n.available_locales = [:en, :de]
|
|
I18n.locale = :de
|
|
|
|
TestJob.perform_later @id
|
|
wait_for_jobs_to_finish_for(5.seconds)
|
|
assert job_executed
|
|
assert_equal "de", job_executed_in_locale
|
|
ensure
|
|
I18n.available_locales = [:en]
|
|
I18n.locale = :en
|
|
end
|
|
end
|
|
|
|
test "current timezone is kept while running perform_later" do
|
|
skip if adapter_is?(:inline)
|
|
|
|
begin
|
|
current_zone = Time.zone
|
|
Time.zone = "Hawaii"
|
|
|
|
TestJob.perform_later @id
|
|
wait_for_jobs_to_finish_for(5.seconds)
|
|
assert job_executed
|
|
assert_equal "Hawaii", job_executed_in_timezone
|
|
ensure
|
|
Time.zone = current_zone
|
|
end
|
|
end
|
|
|
|
test "should run job with higher priority first" do
|
|
skip unless adapter_is?(:delayed_job, :que)
|
|
|
|
wait_until = Time.now + 3.seconds
|
|
TestJob.set(wait_until: wait_until, priority: 20).perform_later "#{@id}.1"
|
|
TestJob.set(wait_until: wait_until, priority: 10).perform_later "#{@id}.2"
|
|
wait_for_jobs_to_finish_for(10.seconds)
|
|
assert job_executed "#{@id}.1"
|
|
assert job_executed "#{@id}.2"
|
|
assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
|
|
end
|
|
|
|
test "should run job with higher priority first in Backburner" do
|
|
skip unless adapter_is?(:backburner)
|
|
|
|
jobs_manager.tube.pause(3)
|
|
TestJob.set(priority: 20).perform_later "#{@id}.1"
|
|
TestJob.set(priority: 10).perform_later "#{@id}.2"
|
|
wait_for_jobs_to_finish_for(10.seconds)
|
|
assert job_executed "#{@id}.1"
|
|
assert job_executed "#{@id}.2"
|
|
assert job_executed_at("#{@id}.2") < job_executed_at("#{@id}.1")
|
|
end
|
|
end
|