diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 913164fbc5..1e03c27535 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,10 @@ +* Tune the async adapter for low-footprint dev/test usage. Use a single + thread pool for all queues and limit to 0 to #CPU total threads, down from + 2 to 10*#CPU per queue. + + *Jeremy Daer* + + ## Rails 5.0.0.beta3 (February 24, 2016) ## * Change the default adapter from inline to async. It's a better default as tests will then not mistakenly diff --git a/activejob/lib/active_job.rb b/activejob/lib/active_job.rb index f7e05f7cd3..4b9065cf50 100644 --- a/activejob/lib/active_job.rb +++ b/activejob/lib/active_job.rb @@ -32,7 +32,6 @@ module ActiveJob autoload :Base autoload :QueueAdapters autoload :ConfiguredJob - autoload :AsyncJob autoload :TestCase autoload :TestHelper end diff --git a/activejob/lib/active_job/async_job.rb b/activejob/lib/active_job/async_job.rb deleted file mode 100644 index 417ace21d2..0000000000 --- a/activejob/lib/active_job/async_job.rb +++ /dev/null @@ -1,77 +0,0 @@ -require 'concurrent/map' -require 'concurrent/scheduled_task' -require 'concurrent/executor/thread_pool_executor' -require 'concurrent/utility/processor_counter' - -module ActiveJob - # == Active Job Async Job - # - # When enqueuing jobs with Async Job each job will be executed asynchronously - # on a +concurrent-ruby+ thread pool. All job data is retained in memory. - # Because job data is not saved to a persistent datastore there is no - # additional infrastructure needed and jobs process quickly. The lack of - # persistence, however, means that all unprocessed jobs will be lost on - # application restart. Therefore in-memory queue adapters are unsuitable for - # most production environments but are excellent for development and testing. - # - # Read more about Concurrent Ruby {here}[https://github.com/ruby-concurrency/concurrent-ruby]. - # - # To use Async Job set the queue_adapter config to +:async+. - # - # Rails.application.config.active_job.queue_adapter = :async - # - # Async Job supports job queues specified with +queue_as+. Queues are created - # automatically as needed and each has its own thread pool. - class AsyncJob - - DEFAULT_EXECUTOR_OPTIONS = { - min_threads: [2, Concurrent.processor_count].max, - max_threads: Concurrent.processor_count * 10, - auto_terminate: true, - idletime: 60, # 1 minute - max_queue: 0, # unlimited - fallback_policy: :caller_runs # shouldn't matter -- 0 max queue - }.freeze - - QUEUES = Concurrent::Map.new do |hash, queue_name| #:nodoc: - hash.compute_if_absent(queue_name) { ActiveJob::AsyncJob.create_thread_pool } - end - - class << self - # Forces jobs to process immediately when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_immediately! #:nodoc: - @perform_immediately = true - end - - # Allows jobs to run asynchronously when testing the Active Job gem. - # This should only be called from within unit tests. - def perform_asynchronously! #:nodoc: - @perform_immediately = false - end - - def create_thread_pool #:nodoc: - if @perform_immediately - Concurrent::ImmediateExecutor.new - else - Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS) - end - end - - def enqueue(job_data, queue: 'default') #:nodoc: - QUEUES[queue].post(job_data) { |job| ActiveJob::Base.execute(job) } - end - - def enqueue_at(job_data, timestamp, queue: 'default') #:nodoc: - delay = timestamp - Time.current.to_f - if delay > 0 - Concurrent::ScheduledTask.execute(delay, args: [job_data], executor: QUEUES[queue]) do |job| - ActiveJob::Base.execute(job) - end - else - enqueue(job_data, queue: queue) - end - end - end - end -end diff --git a/activejob/lib/active_job/queue_adapters/async_adapter.rb b/activejob/lib/active_job/queue_adapters/async_adapter.rb index 3d3c749883..922bc4afce 100644 --- a/activejob/lib/active_job/queue_adapters/async_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/async_adapter.rb @@ -1,22 +1,113 @@ -require 'active_job/async_job' +require 'securerandom' +require 'concurrent/scheduled_task' +require 'concurrent/executor/thread_pool_executor' +require 'concurrent/utility/processor_counter' module ActiveJob module QueueAdapters # == Active Job Async adapter # - # When enqueuing jobs with the Async adapter the job will be executed - # asynchronously using {AsyncJob}[http://api.rubyonrails.org/classes/ActiveJob/AsyncJob.html]. + # The Async adapter runs jobs with an in-process thread pool. # - # To use +AsyncJob+ set the queue_adapter config to +:async+. + # This is the default queue adapter. It's well-suited for dev/test since + # it doesn't need an external infrastructure, but it's a poor fit for + # production since it drops pending jobs on restart. # - # Rails.application.config.active_job.queue_adapter = :async + # To use this adapter, set queue adapter to +:async+: + # + # config.active_job.queue_adapter = :async + # + # To configure the adapter's thread pool, instantiate the adapter and + # pass your own config: + # + # config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \ + # min_threads: 1, + # max_threads: 2 * Concurrent.processor_count, + # idletime: 600.seconds + # + # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute + # jobs. Since jobs share a single thread pool, long-running jobs will block + # short-lived jobs. Fine for dev/test; bad for production. class AsyncAdapter + # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options. + def initialize(**executor_options) + @scheduler = Scheduler.new(**executor_options) + end + def enqueue(job) #:nodoc: - ActiveJob::AsyncJob.enqueue(job.serialize, queue: job.queue_name) + @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name end def enqueue_at(job, timestamp) #:nodoc: - ActiveJob::AsyncJob.enqueue_at(job.serialize, timestamp, queue: job.queue_name) + @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name + end + + # Gracefully stop processing jobs. Finishes in-progress work and handles + # any new jobs following the executor's fallback policy (`caller_runs`). + # Waits for termination by default. Pass `wait: false` to continue. + def shutdown(wait: true) #:nodoc: + @scheduler.shutdown wait: wait + end + + # Used for our test suite. + def immediate=(immediate) #:nodoc: + @scheduler.immediate = immediate + end + + # Note that we don't actually need to serialize the jobs since we're + # performing them in-process, but we do so anyway for parity with other + # adapters and deployment environments. Otherwise, serialization bugs + # may creep in undetected. + class JobWrapper #:nodoc: + def initialize(job) + job.provider_job_id = SecureRandom.uuid + @job_data = job.serialize + end + + def perform + Base.execute @job_data + end + end + + class Scheduler #:nodoc: + DEFAULT_EXECUTOR_OPTIONS = { + min_threads: 0, + max_threads: Concurrent.processor_count, + auto_terminate: true, + idletime: 60, # 1 minute + max_queue: 0, # unlimited + fallback_policy: :caller_runs # shouldn't matter -- 0 max queue + }.freeze + + attr_accessor :immediate + + def initialize(**options) + self.immediate = false + @immediate_executor = Concurrent::ImmediateExecutor.new + @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options)) + end + + def enqueue(job, queue_name:) + executor.post(job, &:perform) + end + + def enqueue_at(job, timestamp, queue_name:) + delay = timestamp - Time.current.to_f + if delay > 0 + Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform) + else + enqueue(job, queue_name: queue_name) + end + end + + def shutdown(wait: true) + @async_executor.shutdown + @async_executor.wait_for_termination if wait + end + + def executor + immediate ? @immediate_executor : @async_executor + end end end end diff --git a/activejob/test/adapters/async.rb b/activejob/test/adapters/async.rb index 5fcfb89566..08eb9658cd 100644 --- a/activejob/test/adapters/async.rb +++ b/activejob/test/adapters/async.rb @@ -1,4 +1,2 @@ -require 'active_job/async_job' - ActiveJob::Base.queue_adapter = :async -ActiveJob::AsyncJob.perform_immediately! +ActiveJob::Base.queue_adapter.immediate = true diff --git a/activejob/test/cases/async_job_test.rb b/activejob/test/cases/async_job_test.rb deleted file mode 100644 index 2642cfc608..0000000000 --- a/activejob/test/cases/async_job_test.rb +++ /dev/null @@ -1,42 +0,0 @@ -require 'helper' -require 'jobs/hello_job' -require 'jobs/queue_as_job' - -class AsyncJobTest < ActiveSupport::TestCase - def using_async_adapter? - ActiveJob::Base.queue_adapter.is_a? ActiveJob::QueueAdapters::AsyncAdapter - end - - setup do - ActiveJob::AsyncJob.perform_asynchronously! - end - - teardown do - ActiveJob::AsyncJob::QUEUES.clear - ActiveJob::AsyncJob.perform_immediately! - end - - test "#create_thread_pool returns a thread_pool" do - thread_pool = ActiveJob::AsyncJob.create_thread_pool - assert thread_pool.is_a? Concurrent::ExecutorService - assert_not thread_pool.is_a? Concurrent::ImmediateExecutor - end - - test "#create_thread_pool returns an ImmediateExecutor after #perform_immediately! is called" do - ActiveJob::AsyncJob.perform_immediately! - thread_pool = ActiveJob::AsyncJob.create_thread_pool - assert thread_pool.is_a? Concurrent::ImmediateExecutor - end - - test "enqueuing without specifying a queue uses the default queue" do - skip unless using_async_adapter? - HelloJob.perform_later - assert ActiveJob::AsyncJob::QUEUES.key? 'default' - end - - test "enqueuing to a queue that does not exist creates the queue" do - skip unless using_async_adapter? - QueueAsJob.perform_later - assert ActiveJob::AsyncJob::QUEUES.key? QueueAsJob::MY_QUEUE.to_s - end -end diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb index d8425c9706..40f27500a5 100644 --- a/activejob/test/integration/queuing_test.rb +++ b/activejob/test/integration/queuing_test.rb @@ -57,13 +57,13 @@ class QueuingTest < ActiveSupport::TestCase end test 'should supply a provider_job_id when available for immediate jobs' do - skip unless adapter_is?(:delayed_job, :sidekiq, :qu, :que, :queue_classic) + skip unless adapter_is?(:async, :delayed_job, :sidekiq, :qu, :que, :queue_classic) test_job = TestJob.perform_later @id assert test_job.provider_job_id, 'Provider job id should be set by provider' end test 'should supply a provider_job_id when available for delayed jobs' do - skip unless adapter_is?(:delayed_job, :sidekiq, :que, :queue_classic) + skip unless adapter_is?(:async, :delayed_job, :sidekiq, :que, :queue_classic) delayed_test_job = TestJob.set(wait: 1.minute).perform_later @id assert delayed_test_job.provider_job_id, 'Provider job id should by set for delayed jobs by provider' end diff --git a/activejob/test/support/integration/adapters/async.rb b/activejob/test/support/integration/adapters/async.rb index 42beb12b1f..44ab98437a 100644 --- a/activejob/test/support/integration/adapters/async.rb +++ b/activejob/test/support/integration/adapters/async.rb @@ -1,9 +1,10 @@ module AsyncJobsManager def setup ActiveJob::Base.queue_adapter = :async + ActiveJob::Base.queue_adapter.immediate = false end def clear_jobs - ActiveJob::AsyncJob::QUEUES.clear + ActiveJob::Base.queue_adapter.shutdown end end