mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Missing git add :trollface:
This commit is contained in:
parent
adff4a706a
commit
602000be90
4 changed files with 288 additions and 0 deletions
65
railties/lib/rails/queueing.rb
Normal file
65
railties/lib/rails/queueing.rb
Normal file
|
@ -0,0 +1,65 @@
|
|||
module Rails
|
||||
module Queueing
|
||||
# In test mode, the Rails queue is backed by an Array so that assertions
|
||||
# can be made about its contents. The test queue provides a +contents+
|
||||
# method to make assertions about the queue's contents and a +drain+
|
||||
# method to drain the queue and run the jobs.
|
||||
#
|
||||
# Jobs are run in a separate thread to catch mistakes where code
|
||||
# assumes that the job is run in the same thread.
|
||||
class TestQueue
|
||||
attr_reader :contents
|
||||
|
||||
def initialize
|
||||
@contents = []
|
||||
end
|
||||
|
||||
def drain
|
||||
# run the jobs in a separate thread so assumptions of synchronous
|
||||
# jobs are caught in test mode.
|
||||
t = Thread.new do
|
||||
while job = @contents.pop
|
||||
job.run
|
||||
end
|
||||
end
|
||||
t.join
|
||||
end
|
||||
|
||||
# implement the Queue API
|
||||
def push(object)
|
||||
@contents << object
|
||||
end
|
||||
end
|
||||
|
||||
# The threaded consumer will run jobs in a background thread in
|
||||
# development mode or in a VM where running jobs on a thread in
|
||||
# production mode makes sense.
|
||||
#
|
||||
# When the process exits, the consumer pushes a nil onto the
|
||||
# queue and joins the thread, which will ensure that all jobs
|
||||
# are executed before the process finally dies.
|
||||
class ThreadedConsumer
|
||||
def self.start(queue)
|
||||
new(queue).start
|
||||
end
|
||||
|
||||
def initialize(queue)
|
||||
@queue = queue
|
||||
end
|
||||
|
||||
def start
|
||||
@thread = Thread.new do
|
||||
while job = @queue.pop
|
||||
job.run
|
||||
end
|
||||
end
|
||||
self
|
||||
end
|
||||
|
||||
def shutdown
|
||||
@queue.push nil
|
||||
@thread.join
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
114
railties/test/application/queue_test.rb
Normal file
114
railties/test/application/queue_test.rb
Normal file
|
@ -0,0 +1,114 @@
|
|||
require 'isolation/abstract_unit'
|
||||
require 'rack/test'
|
||||
|
||||
module ApplicationTests
|
||||
class GeneratorsTest < ActiveSupport::TestCase
|
||||
include ActiveSupport::Testing::Isolation
|
||||
|
||||
def setup
|
||||
build_app
|
||||
boot_rails
|
||||
end
|
||||
|
||||
def teardown
|
||||
teardown_app
|
||||
end
|
||||
|
||||
def app_const
|
||||
@app_const ||= Class.new(Rails::Application)
|
||||
end
|
||||
|
||||
test "the queue is a TestQueue in test mode" do
|
||||
app("test")
|
||||
assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue
|
||||
assert_kind_of Rails::Queueing::TestQueue, Rails.queue
|
||||
end
|
||||
|
||||
test "the queue is a Queue in development mode" do
|
||||
app("development")
|
||||
assert_kind_of Queue, Rails.application.queue
|
||||
assert_kind_of Queue, Rails.queue
|
||||
end
|
||||
|
||||
test "in development mode, an enqueued job will be processed in a separate thread" do
|
||||
app("development")
|
||||
current = Thread.current
|
||||
|
||||
job = Struct.new(:origin, :target).new(Thread.current)
|
||||
def job.run
|
||||
self.target = Thread.current
|
||||
end
|
||||
|
||||
Rails.queue.push job
|
||||
sleep 0.1
|
||||
|
||||
assert job.target, "The job was run"
|
||||
assert_not_equal job.origin, job.target
|
||||
end
|
||||
|
||||
test "in test mode, explicitly draining the queue will process it in a separate thread" do
|
||||
app("test")
|
||||
current = Thread.current
|
||||
|
||||
job = Struct.new(:origin, :target).new(Thread.current)
|
||||
def job.run
|
||||
self.target = Thread.current
|
||||
end
|
||||
|
||||
Rails.queue.push job
|
||||
Rails.queue.drain
|
||||
|
||||
assert job.target, "The job was run"
|
||||
assert_not_equal job.origin, job.target
|
||||
end
|
||||
|
||||
test "in test mode, the queue can be observed" do
|
||||
app("test")
|
||||
|
||||
job = Class.new(Struct.new(:id)) do
|
||||
def run
|
||||
end
|
||||
end
|
||||
|
||||
jobs = (1..10).map do |id|
|
||||
job.new(id)
|
||||
end
|
||||
|
||||
jobs.each do |job|
|
||||
Rails.queue.push job
|
||||
end
|
||||
|
||||
assert_equal jobs, Rails.queue.contents
|
||||
end
|
||||
|
||||
test "a custom queue implementation can be provided" do
|
||||
add_to_env_config "production", <<-RUBY
|
||||
require "my_queue"
|
||||
config.queue = MyQueue
|
||||
RUBY
|
||||
|
||||
app_file "lib/my_queue.rb", <<-RUBY
|
||||
class MyQueue
|
||||
def push(job)
|
||||
job.run
|
||||
end
|
||||
end
|
||||
RUBY
|
||||
|
||||
app("production")
|
||||
|
||||
assert_kind_of MyQueue, Rails.queue
|
||||
|
||||
job = Class.new(Struct.new(:id, :ran)) do
|
||||
def run
|
||||
self.ran = true
|
||||
end
|
||||
end
|
||||
|
||||
job1 = job.new(1)
|
||||
Rails.queue.push job1
|
||||
|
||||
assert_equal true, job1.ran
|
||||
end
|
||||
end
|
||||
end
|
44
railties/test/queueing/test_queue_test.rb
Normal file
44
railties/test/queueing/test_queue_test.rb
Normal file
|
@ -0,0 +1,44 @@
|
|||
require 'abstract_unit'
|
||||
require 'rails/queueing'
|
||||
|
||||
class TestQueueTest < ActiveSupport::TestCase
|
||||
class Job
|
||||
attr_reader :id
|
||||
def initialize(id, &block)
|
||||
@id = id
|
||||
@block = block
|
||||
end
|
||||
|
||||
def run
|
||||
@block.call if @block
|
||||
end
|
||||
end
|
||||
|
||||
def setup
|
||||
@queue = Rails::Queueing::TestQueue.new
|
||||
end
|
||||
|
||||
def test_contents
|
||||
assert_equal [], @queue.contents
|
||||
job = Job.new(1)
|
||||
@queue.push job
|
||||
assert_equal [job], @queue.contents
|
||||
end
|
||||
|
||||
def test_drain
|
||||
t = nil
|
||||
ran = false
|
||||
|
||||
job = Job.new(1) do
|
||||
ran = true
|
||||
t = Thread.current
|
||||
end
|
||||
|
||||
@queue.push job
|
||||
@queue.drain
|
||||
|
||||
assert_equal [], @queue.contents
|
||||
assert ran, "The job runs synchronously when the queue is drained"
|
||||
assert_not_equal t, Thread.current
|
||||
end
|
||||
end
|
65
railties/test/queueing/threaded_consumer_test.rb
Normal file
65
railties/test/queueing/threaded_consumer_test.rb
Normal file
|
@ -0,0 +1,65 @@
|
|||
require 'abstract_unit'
|
||||
require 'rails/queueing'
|
||||
|
||||
class TestThreadConsumer < ActiveSupport::TestCase
|
||||
class Job
|
||||
attr_reader :id
|
||||
def initialize(id, &block)
|
||||
@id = id
|
||||
@block = block
|
||||
end
|
||||
|
||||
def run
|
||||
@block.call if @block
|
||||
end
|
||||
end
|
||||
|
||||
def setup
|
||||
@queue = Queue.new
|
||||
@consumer = Rails::Queueing::ThreadedConsumer.start(@queue)
|
||||
end
|
||||
|
||||
def teardown
|
||||
@queue.push nil
|
||||
end
|
||||
|
||||
test "the jobs are executed" do
|
||||
ran = false
|
||||
|
||||
job = Job.new(1) do
|
||||
ran = true
|
||||
end
|
||||
|
||||
@queue.push job
|
||||
sleep 0.1
|
||||
assert_equal true, ran
|
||||
end
|
||||
|
||||
test "the jobs are not executed synchronously" do
|
||||
ran = false
|
||||
|
||||
job = Job.new(1) do
|
||||
sleep 0.1
|
||||
ran = true
|
||||
end
|
||||
|
||||
@queue.push job
|
||||
assert_equal false, ran
|
||||
end
|
||||
|
||||
test "shutting down the queue synchronously drains the jobs" do
|
||||
ran = false
|
||||
|
||||
job = Job.new(1) do
|
||||
sleep 0.1
|
||||
ran = true
|
||||
end
|
||||
|
||||
@queue.push job
|
||||
assert_equal false, ran
|
||||
|
||||
@consumer.shutdown
|
||||
|
||||
assert_equal true, ran
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue