mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Move ActiveJob::LogSubscriber into its own file
This commit is contained in:
parent
d41e282a94
commit
e8b3a46d2e
4 changed files with 114 additions and 108 deletions
|
@ -8,6 +8,7 @@ require "active_job/enqueuing"
|
||||||
require "active_job/execution"
|
require "active_job/execution"
|
||||||
require "active_job/callbacks"
|
require "active_job/callbacks"
|
||||||
require "active_job/exceptions"
|
require "active_job/exceptions"
|
||||||
|
require "active_job/log_subscriber"
|
||||||
require "active_job/logging"
|
require "active_job/logging"
|
||||||
require "active_job/timezones"
|
require "active_job/timezones"
|
||||||
require "active_job/translation"
|
require "active_job/translation"
|
||||||
|
|
111
activejob/lib/active_job/log_subscriber.rb
Normal file
111
activejob/lib/active_job/log_subscriber.rb
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
require "active_support/log_subscriber"
|
||||||
|
|
||||||
|
module ActiveJob
|
||||||
|
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
|
||||||
|
def enqueue(event)
|
||||||
|
info do
|
||||||
|
job = event.payload[:job]
|
||||||
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue_at(event)
|
||||||
|
info do
|
||||||
|
job = event.payload[:job]
|
||||||
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform_start(event)
|
||||||
|
info do
|
||||||
|
job = event.payload[:job]
|
||||||
|
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(event)
|
||||||
|
job = event.payload[:job]
|
||||||
|
ex = event.payload[:exception_object]
|
||||||
|
if ex
|
||||||
|
error do
|
||||||
|
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
||||||
|
end
|
||||||
|
else
|
||||||
|
info do
|
||||||
|
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def enqueue_retry(event)
|
||||||
|
job = event.payload[:job]
|
||||||
|
ex = event.payload[:error]
|
||||||
|
wait = event.payload[:wait]
|
||||||
|
|
||||||
|
info do
|
||||||
|
if ex
|
||||||
|
"Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
|
||||||
|
else
|
||||||
|
"Retrying #{job.class} in #{wait.to_i} seconds."
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def retry_stopped(event)
|
||||||
|
job = event.payload[:job]
|
||||||
|
ex = event.payload[:error]
|
||||||
|
|
||||||
|
error do
|
||||||
|
"Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def discard(event)
|
||||||
|
job = event.payload[:job]
|
||||||
|
ex = event.payload[:error]
|
||||||
|
|
||||||
|
error do
|
||||||
|
"Discarded #{job.class} due to a #{ex.class}."
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
def queue_name(event)
|
||||||
|
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
|
||||||
|
end
|
||||||
|
|
||||||
|
def args_info(job)
|
||||||
|
if job.arguments.any?
|
||||||
|
" with arguments: " +
|
||||||
|
job.arguments.map { |arg| format(arg).inspect }.join(", ")
|
||||||
|
else
|
||||||
|
""
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def format(arg)
|
||||||
|
case arg
|
||||||
|
when Hash
|
||||||
|
arg.transform_values { |value| format(value) }
|
||||||
|
when Array
|
||||||
|
arg.map { |value| format(value) }
|
||||||
|
when GlobalID::Identification
|
||||||
|
arg.to_global_id rescue arg
|
||||||
|
else
|
||||||
|
arg
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def scheduled_at(event)
|
||||||
|
Time.at(event.payload[:job].scheduled_at).utc
|
||||||
|
end
|
||||||
|
|
||||||
|
def logger
|
||||||
|
ActiveJob::Base.logger
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
ActiveJob::LogSubscriber.attach_to :active_job
|
|
@ -51,111 +51,5 @@ module ActiveJob
|
||||||
def logger_tagged_by_active_job?
|
def logger_tagged_by_active_job?
|
||||||
logger.formatter.current_tags.include?("ActiveJob")
|
logger.formatter.current_tags.include?("ActiveJob")
|
||||||
end
|
end
|
||||||
|
|
||||||
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
|
|
||||||
def enqueue(event)
|
|
||||||
info do
|
|
||||||
job = event.payload[:job]
|
|
||||||
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def enqueue_at(event)
|
|
||||||
info do
|
|
||||||
job = event.payload[:job]
|
|
||||||
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform_start(event)
|
|
||||||
info do
|
|
||||||
job = event.payload[:job]
|
|
||||||
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def perform(event)
|
|
||||||
job = event.payload[:job]
|
|
||||||
ex = event.payload[:exception_object]
|
|
||||||
if ex
|
|
||||||
error do
|
|
||||||
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
|
||||||
end
|
|
||||||
else
|
|
||||||
info do
|
|
||||||
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue_retry(event)
|
|
||||||
job = event.payload[:job]
|
|
||||||
ex = event.payload[:error]
|
|
||||||
wait = event.payload[:wait]
|
|
||||||
|
|
||||||
info do
|
|
||||||
if ex
|
|
||||||
"Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
|
|
||||||
else
|
|
||||||
"Retrying #{job.class} in #{wait.to_i} seconds."
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def retry_stopped(event)
|
|
||||||
job = event.payload[:job]
|
|
||||||
ex = event.payload[:error]
|
|
||||||
|
|
||||||
error do
|
|
||||||
"Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def discard(event)
|
|
||||||
job = event.payload[:job]
|
|
||||||
ex = event.payload[:error]
|
|
||||||
|
|
||||||
error do
|
|
||||||
"Discarded #{job.class} due to a #{ex.class}."
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
|
||||||
def queue_name(event)
|
|
||||||
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
|
|
||||||
end
|
|
||||||
|
|
||||||
def args_info(job)
|
|
||||||
if job.arguments.any?
|
|
||||||
" with arguments: " +
|
|
||||||
job.arguments.map { |arg| format(arg).inspect }.join(", ")
|
|
||||||
else
|
|
||||||
""
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def format(arg)
|
|
||||||
case arg
|
|
||||||
when Hash
|
|
||||||
arg.transform_values { |value| format(value) }
|
|
||||||
when Array
|
|
||||||
arg.map { |value| format(value) }
|
|
||||||
when GlobalID::Identification
|
|
||||||
arg.to_global_id rescue arg
|
|
||||||
else
|
|
||||||
arg
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def scheduled_at(event)
|
|
||||||
Time.at(event.payload[:job].scheduled_at).utc
|
|
||||||
end
|
|
||||||
|
|
||||||
def logger
|
|
||||||
ActiveJob::Base.logger
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
ActiveJob::Logging::LogSubscriber.attach_to :active_job
|
|
||||||
|
|
|
@ -34,12 +34,12 @@ class LoggingTest < ActiveSupport::TestCase
|
||||||
@old_logger = ActiveJob::Base.logger
|
@old_logger = ActiveJob::Base.logger
|
||||||
@logger = ActiveSupport::TaggedLogging.new(TestLogger.new)
|
@logger = ActiveSupport::TaggedLogging.new(TestLogger.new)
|
||||||
set_logger @logger
|
set_logger @logger
|
||||||
ActiveJob::Logging::LogSubscriber.attach_to :active_job
|
ActiveJob::LogSubscriber.attach_to :active_job
|
||||||
end
|
end
|
||||||
|
|
||||||
def teardown
|
def teardown
|
||||||
super
|
super
|
||||||
ActiveJob::Logging::LogSubscriber.log_subscribers.pop
|
ActiveJob::LogSubscriber.log_subscribers.pop
|
||||||
set_logger @old_logger
|
set_logger @old_logger
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue