mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Active Job: smaller footprint for the dev/test async adapter
Use one shared worker pool for all queues with 0-#CPU workers rather than separate pools per queue with 2-10*#CPU workers each.
This commit is contained in:
parent
b04d07337f
commit
a66780bfff
8 changed files with 110 additions and 133 deletions
|
@ -1,3 +1,10 @@
|
||||||
|
* Tune the async adapter for low-footprint dev/test usage. Use a single
|
||||||
|
thread pool for all queues and limit to 0 to #CPU total threads, down from
|
||||||
|
2 to 10*#CPU per queue.
|
||||||
|
|
||||||
|
*Jeremy Daer*
|
||||||
|
|
||||||
|
|
||||||
## Rails 5.0.0.beta3 (February 24, 2016) ##
|
## Rails 5.0.0.beta3 (February 24, 2016) ##
|
||||||
|
|
||||||
* Change the default adapter from inline to async. It's a better default as tests will then not mistakenly
|
* Change the default adapter from inline to async. It's a better default as tests will then not mistakenly
|
||||||
|
|
|
@ -32,7 +32,6 @@ module ActiveJob
|
||||||
autoload :Base
|
autoload :Base
|
||||||
autoload :QueueAdapters
|
autoload :QueueAdapters
|
||||||
autoload :ConfiguredJob
|
autoload :ConfiguredJob
|
||||||
autoload :AsyncJob
|
|
||||||
autoload :TestCase
|
autoload :TestCase
|
||||||
autoload :TestHelper
|
autoload :TestHelper
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,77 +0,0 @@
|
||||||
require 'concurrent/map'
|
|
||||||
require 'concurrent/scheduled_task'
|
|
||||||
require 'concurrent/executor/thread_pool_executor'
|
|
||||||
require 'concurrent/utility/processor_counter'
|
|
||||||
|
|
||||||
module ActiveJob
|
|
||||||
# == Active Job Async Job
|
|
||||||
#
|
|
||||||
# When enqueuing jobs with Async Job each job will be executed asynchronously
|
|
||||||
# on a +concurrent-ruby+ thread pool. All job data is retained in memory.
|
|
||||||
# Because job data is not saved to a persistent datastore there is no
|
|
||||||
# additional infrastructure needed and jobs process quickly. The lack of
|
|
||||||
# persistence, however, means that all unprocessed jobs will be lost on
|
|
||||||
# application restart. Therefore in-memory queue adapters are unsuitable for
|
|
||||||
# most production environments but are excellent for development and testing.
|
|
||||||
#
|
|
||||||
# Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby].
|
|
||||||
#
|
|
||||||
# To use Async Job set the queue_adapter config to +:async+.
|
|
||||||
#
|
|
||||||
# Rails.application.config.active_job.queue_adapter = :async
|
|
||||||
#
|
|
||||||
# Async Job supports job queues specified with +queue_as+. Queues are created
|
|
||||||
# automatically as needed and each has its own thread pool.
|
|
||||||
class AsyncJob
|
|
||||||
|
|
||||||
DEFAULT_EXECUTOR_OPTIONS = {
|
|
||||||
min_threads: [2, Concurrent.processor_count].max,
|
|
||||||
max_threads: Concurrent.processor_count * 10,
|
|
||||||
auto_terminate: true,
|
|
||||||
idletime: 60, # 1 minute
|
|
||||||
max_queue: 0, # unlimited
|
|
||||||
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
|
|
||||||
}.freeze
|
|
||||||
|
|
||||||
QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc:
|
|
||||||
hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool }
|
|
||||||
end
|
|
||||||
|
|
||||||
class << self
|
|
||||||
# Forces jobs to process immediately when testing the Active Job gem.
|
|
||||||
# This should only be called from within unit tests.
|
|
||||||
def perform_immediately! #:nodoc:
|
|
||||||
@perform_immediately = true
|
|
||||||
end
|
|
||||||
|
|
||||||
# Allows jobs to run asynchronously when testing the Active Job gem.
|
|
||||||
# This should only be called from within unit tests.
|
|
||||||
def perform_asynchronously! #:nodoc:
|
|
||||||
@perform_immediately = false
|
|
||||||
end
|
|
||||||
|
|
||||||
def create_thread_pool #:nodoc:
|
|
||||||
if @perform_immediately
|
|
||||||
Concurrent::ImmediateExecutor.new
|
|
||||||
else
|
|
||||||
Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue(job_data, queue: 'default') #:nodoc:
|
|
||||||
QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) }
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc:
|
|
||||||
delay = timestamp - Time.current.to_f
|
|
||||||
if delay > 0
|
|
||||||
Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job|
|
|
||||||
ActiveJob::Base.execute(job)
|
|
||||||
end
|
|
||||||
else
|
|
||||||
enqueue(job_data, queue: queue)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -1,22 +1,113 @@
|
||||||
require 'active_job/async_job'
|
require 'securerandom'
|
||||||
|
require 'concurrent/scheduled_task'
|
||||||
|
require 'concurrent/executor/thread_pool_executor'
|
||||||
|
require 'concurrent/utility/processor_counter'
|
||||||
|
|
||||||
module ActiveJob
|
module ActiveJob
|
||||||
module QueueAdapters
|
module QueueAdapters
|
||||||
# == Active Job Async adapter
|
# == Active Job Async adapter
|
||||||
#
|
#
|
||||||
# When enqueuing jobs with the Async adapter the job will be executed
|
# The Async adapter runs jobs with an in-process thread pool.
|
||||||
# asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html].
|
|
||||||
#
|
#
|
||||||
# To use +AsyncJob+ set the queue_adapter config to +:async+.
|
# This is the default queue adapter. It's well-suited for dev/test since
|
||||||
|
# it doesn't need an external infrastructure, but it's a poor fit for
|
||||||
|
# production since it drops pending jobs on restart.
|
||||||
#
|
#
|
||||||
# Rails.application.config.active_job.queue_adapter = :async
|
# To use this adapter, set queue adapter to +:async+:
|
||||||
|
#
|
||||||
|
# config.active_job.queue_adapter = :async
|
||||||
|
#
|
||||||
|
# To configure the adapter's thread pool, instantiate the adapter and
|
||||||
|
# pass your own config:
|
||||||
|
#
|
||||||
|
# config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
|
||||||
|
# min_threads: 1,
|
||||||
|
# max_threads: 2 * Concurrent.processor_count,
|
||||||
|
# idletime: 600.seconds
|
||||||
|
#
|
||||||
|
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
|
||||||
|
# jobs. Since jobs share a single thread pool, long-running jobs will block
|
||||||
|
# short-lived jobs. Fine for dev/test; bad for production.
|
||||||
class AsyncAdapter
|
class AsyncAdapter
|
||||||
|
# See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
|
||||||
|
def initialize(**executor_options)
|
||||||
|
@scheduler = Scheduler.new(**executor_options)
|
||||||
|
end
|
||||||
|
|
||||||
def enqueue(job) #:nodoc:
|
def enqueue(job) #:nodoc:
|
||||||
ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name)
|
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
|
||||||
end
|
end
|
||||||
|
|
||||||
def enqueue_at(job, timestamp) #:nodoc:
|
def enqueue_at(job, timestamp) #:nodoc:
|
||||||
ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name)
|
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
|
||||||
|
end
|
||||||
|
|
||||||
|
# Gracefully stop processing jobs. Finishes in-progress work and handles
|
||||||
|
# any new jobs following the executor's fallback policy (`caller_runs`).
|
||||||
|
# Waits for termination by default. Pass `wait: false` to continue.
|
||||||
|
def shutdown(wait: true) #:nodoc:
|
||||||
|
@scheduler.shutdown wait: wait
|
||||||
|
end
|
||||||
|
|
||||||
|
# Used for our test suite.
|
||||||
|
def immediate=(immediate) #:nodoc:
|
||||||
|
@scheduler.immediate = immediate
|
||||||
|
end
|
||||||
|
|
||||||
|
# Note that we don't actually need to serialize the jobs since we're
|
||||||
|
# performing them in-process, but we do so anyway for parity with other
|
||||||
|
# adapters and deployment environments. Otherwise, serialization bugs
|
||||||
|
# may creep in undetected.
|
||||||
|
class JobWrapper #:nodoc:
|
||||||
|
def initialize(job)
|
||||||
|
job.provider_job_id = SecureRandom.uuid
|
||||||
|
@job_data = job.serialize
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform
|
||||||
|
Base.execute @job_data
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
class Scheduler #:nodoc:
|
||||||
|
DEFAULT_EXECUTOR_OPTIONS = {
|
||||||
|
min_threads: 0,
|
||||||
|
max_threads: Concurrent.processor_count,
|
||||||
|
auto_terminate: true,
|
||||||
|
idletime: 60, # 1 minute
|
||||||
|
max_queue: 0, # unlimited
|
||||||
|
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
|
||||||
|
}.freeze
|
||||||
|
|
||||||
|
attr_accessor :immediate
|
||||||
|
|
||||||
|
def initialize(**options)
|
||||||
|
self.immediate = false
|
||||||
|
@immediate_executor = Concurrent::ImmediateExecutor.new
|
||||||
|
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue(job, queue_name:)
|
||||||
|
executor.post(job, &:perform)
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue_at(job, timestamp, queue_name:)
|
||||||
|
delay = timestamp - Time.current.to_f
|
||||||
|
if delay > 0
|
||||||
|
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
|
||||||
|
else
|
||||||
|
enqueue(job, queue_name: queue_name)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def shutdown(wait: true)
|
||||||
|
@async_executor.shutdown
|
||||||
|
@async_executor.wait_for_termination if wait
|
||||||
|
end
|
||||||
|
|
||||||
|
def executor
|
||||||
|
immediate ? @immediate_executor : @async_executor
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,4 +1,2 @@
|
||||||
require 'active_job/async_job'
|
|
||||||
|
|
||||||
ActiveJob::Base.queue_adapter = :async
|
ActiveJob::Base.queue_adapter = :async
|
||||||
ActiveJob::AsyncJob.perform_immediately!
|
ActiveJob::Base.queue_adapter.immediate = true
|
||||||
|
|
|
@ -1,42 +0,0 @@
|
||||||
require 'helper'
|
|
||||||
require 'jobs/hello_job'
|
|
||||||
require 'jobs/queue_as_job'
|
|
||||||
|
|
||||||
class AsyncJobTest < ActiveSupport::TestCase
|
|
||||||
def using_async_adapter?
|
|
||||||
ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter
|
|
||||||
end
|
|
||||||
|
|
||||||
setup do
|
|
||||||
ActiveJob::AsyncJob.perform_asynchronously!
|
|
||||||
end
|
|
||||||
|
|
||||||
teardown do
|
|
||||||
ActiveJob::AsyncJob::QUEUES.clear
|
|
||||||
ActiveJob::AsyncJob.perform_immediately!
|
|
||||||
end
|
|
||||||
|
|
||||||
test "#create_thread_pool returns a thread_pool" do
|
|
||||||
thread_pool = ActiveJob::AsyncJob.create_thread_pool
|
|
||||||
assert thread_pool.is_a? Concurrent::ExecutorService
|
|
||||||
assert_not thread_pool.is_a? Concurrent::ImmediateExecutor
|
|
||||||
end
|
|
||||||
|
|
||||||
test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do
|
|
||||||
ActiveJob::AsyncJob.perform_immediately!
|
|
||||||
thread_pool = ActiveJob::AsyncJob.create_thread_pool
|
|
||||||
assert thread_pool.is_a? Concurrent::ImmediateExecutor
|
|
||||||
end
|
|
||||||
|
|
||||||
test "enqueuing without specifying a queue uses the default queue" do
|
|
||||||
skip unless using_async_adapter?
|
|
||||||
HelloJob.perform_later
|
|
||||||
assert ActiveJob::AsyncJob::QUEUES.key? 'default'
|
|
||||||
end
|
|
||||||
|
|
||||||
test "enqueuing to a queue that does not exist creates the queue" do
|
|
||||||
skip unless using_async_adapter?
|
|
||||||
QueueAsJob.perform_later
|
|
||||||
assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s
|
|
||||||
end
|
|
||||||
end
|
|
|
@ -57,13 +57,13 @@ class QueuingTest < ActiveSupport::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
test 'should supply a provider_job_id when available for immediate jobs' do
|
test 'should supply a provider_job_id when available for immediate jobs' do
|
||||||
skip unless adapter_is?(:delayed_job, :sidekiq, :qu, :que, :queue_classic)
|
skip unless adapter_is?(:async, :delayed_job, :sidekiq, :qu, :que, :queue_classic)
|
||||||
test_job = TestJob.perform_later @id
|
test_job = TestJob.perform_later @id
|
||||||
assert test_job.provider_job_id, 'Provider job id should be set by provider'
|
assert test_job.provider_job_id, 'Provider job id should be set by provider'
|
||||||
end
|
end
|
||||||
|
|
||||||
test 'should supply a provider_job_id when available for delayed jobs' do
|
test 'should supply a provider_job_id when available for delayed jobs' do
|
||||||
skip unless adapter_is?(:delayed_job, :sidekiq, :que, :queue_classic)
|
skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic)
|
||||||
delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
|
delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id
|
||||||
assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider'
|
assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider'
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
module AsyncJobsManager
|
module AsyncJobsManager
|
||||||
def setup
|
def setup
|
||||||
ActiveJob::Base.queue_adapter = :async
|
ActiveJob::Base.queue_adapter = :async
|
||||||
|
ActiveJob::Base.queue_adapter.immediate = false
|
||||||
end
|
end
|
||||||
|
|
||||||
def clear_jobs
|
def clear_jobs
|
||||||
ActiveJob::AsyncJob::QUEUES.clear
|
ActiveJob::Base.queue_adapter.shutdown
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue