mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
103 lines
2.7 KiB
Ruby
103 lines
2.7 KiB
Ruby
require 'delegate'
|
|
require 'thread'
|
|
|
|
module ActiveSupport
|
|
# A Queue that simply inherits from STDLIB's Queue. When this
|
|
# queue is used, Rails automatically starts a job runner in a
|
|
# background thread.
|
|
class Queue < ::Queue
|
|
attr_writer :consumer
|
|
|
|
def initialize(consumer_options = {})
|
|
super()
|
|
@consumer_options = consumer_options
|
|
end
|
|
|
|
def consumer
|
|
@consumer ||= ThreadedQueueConsumer.new(self, @consumer_options)
|
|
end
|
|
|
|
# Drain the queue, running all jobs in a different thread. This method
|
|
# may not be available on production queues.
|
|
def drain
|
|
# run the jobs in a separate thread so assumptions of synchronous
|
|
# jobs are caught in test mode.
|
|
consumer.drain
|
|
end
|
|
end
|
|
|
|
class SynchronousQueue < Queue
|
|
def push(job)
|
|
super.tap { drain }
|
|
end
|
|
alias << push
|
|
alias enq push
|
|
end
|
|
|
|
# In test mode, the Rails queue is backed by an Array so that assertions
|
|
# can be made about its contents. The test queue provides a +jobs+
|
|
# method to make assertions about the queue's contents and a +drain+
|
|
# method to drain the queue and run the jobs.
|
|
#
|
|
# Jobs are run in a separate thread to catch mistakes where code
|
|
# assumes that the job is run in the same thread.
|
|
class TestQueue < Queue
|
|
# Get a list of the jobs off this queue. This method may not be
|
|
# available on production queues.
|
|
def jobs
|
|
@que.dup
|
|
end
|
|
|
|
# Marshal and unmarshal job before pushing it onto the queue. This will
|
|
# raise an exception on any attempts in tests to push jobs that can't (or
|
|
# shouldn't) be marshalled.
|
|
def push(job)
|
|
super Marshal.load(Marshal.dump(job))
|
|
end
|
|
end
|
|
|
|
# The threaded consumer will run jobs in a background thread in
|
|
# development mode or in a VM where running jobs on a thread in
|
|
# production mode makes sense.
|
|
#
|
|
# When the process exits, the consumer pushes a nil onto the
|
|
# queue and joins the thread, which will ensure that all jobs
|
|
# are executed before the process finally dies.
|
|
class ThreadedQueueConsumer
|
|
def initialize(queue, options = {})
|
|
@queue = queue
|
|
@logger = options[:logger]
|
|
end
|
|
|
|
def start
|
|
@thread = Thread.new { consume }
|
|
self
|
|
end
|
|
|
|
def shutdown
|
|
@queue.push nil
|
|
@thread.join
|
|
end
|
|
|
|
def drain
|
|
run(@queue.pop) until @queue.empty?
|
|
end
|
|
|
|
def consume
|
|
while job = @queue.pop
|
|
run job
|
|
end
|
|
end
|
|
|
|
def run(job)
|
|
job.run
|
|
rescue Exception => exception
|
|
handle_exception job, exception
|
|
end
|
|
|
|
def handle_exception(job, exception)
|
|
raise exception unless @logger
|
|
@logger.error "Job Error: #{exception.message}\n#{exception.backtrace.join("\n")}"
|
|
end
|
|
end
|
|
end
|