2018-06-27 03:31:41 -04:00
|
|
|
# frozen_string_literal: true
|
|
|
|
|
2019-07-05 13:18:15 -04:00
|
|
|
require 'sidekiq/api'
|
|
|
|
|
2017-11-28 11:08:30 -05:00
|
|
|
Sidekiq::Worker.extend ActiveSupport::Concern
|
|
|
|
|
|
|
|
module ApplicationWorker
|
|
|
|
extend ActiveSupport::Concern
|
|
|
|
|
2017-12-12 11:21:00 -05:00
|
|
|
include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker
|
2019-10-18 07:11:44 -04:00
|
|
|
include WorkerAttributes
|
2020-01-24 07:09:01 -05:00
|
|
|
include WorkerContext
|
2020-08-04 11:09:27 -04:00
|
|
|
include Gitlab::SidekiqVersioning::Worker
|
2017-11-28 11:08:30 -05:00
|
|
|
|
2020-05-19 08:08:21 -04:00
|
|
|
LOGGING_EXTRA_KEY = 'extra'
|
2021-05-19 17:12:42 -04:00
|
|
|
DEFAULT_DELAY_INTERVAL = 1
|
2020-05-19 08:08:21 -04:00
|
|
|
|
2017-11-28 11:08:30 -05:00
|
|
|
included do
|
2017-11-28 11:16:50 -05:00
|
|
|
set_queue
|
2021-04-28 08:10:09 -04:00
|
|
|
after_set_class_attribute { set_queue }
|
2020-03-17 08:09:52 -04:00
|
|
|
|
|
|
|
def structured_payload(payload = {})
|
2021-03-16 17:11:53 -04:00
|
|
|
context = Gitlab::ApplicationContext.current.merge(
|
2020-11-04 22:09:03 -05:00
|
|
|
'class' => self.class.name,
|
2020-03-17 08:09:52 -04:00
|
|
|
'job_status' => 'running',
|
|
|
|
'queue' => self.class.queue,
|
|
|
|
'jid' => jid
|
|
|
|
)
|
|
|
|
|
|
|
|
payload.stringify_keys.merge(context)
|
|
|
|
end
|
2020-05-19 08:08:21 -04:00
|
|
|
|
|
|
|
def log_extra_metadata_on_done(key, value)
|
|
|
|
@done_log_extra_metadata ||= {}
|
|
|
|
@done_log_extra_metadata[key] = value
|
|
|
|
end
|
|
|
|
|
|
|
|
def logging_extras
|
|
|
|
return {} unless @done_log_extra_metadata
|
|
|
|
|
|
|
|
# Prefix keys with class name to avoid conflicts in Elasticsearch types.
|
|
|
|
# Also prefix with "extra." so that we know to log these new fields.
|
|
|
|
@done_log_extra_metadata.transform_keys do |k|
|
|
|
|
"#{LOGGING_EXTRA_KEY}.#{self.class.name.gsub("::", "_").underscore}.#{k}"
|
|
|
|
end
|
|
|
|
end
|
2017-11-28 11:08:30 -05:00
|
|
|
end
|
|
|
|
|
2018-08-27 08:35:31 -04:00
|
|
|
class_methods do
|
2017-11-28 11:16:50 -05:00
|
|
|
def inherited(subclass)
|
|
|
|
subclass.set_queue
|
2021-04-28 08:10:09 -04:00
|
|
|
subclass.after_set_class_attribute { subclass.set_queue }
|
2017-11-28 11:16:50 -05:00
|
|
|
end
|
|
|
|
|
2021-05-19 17:12:42 -04:00
|
|
|
def perform_async(*args)
|
|
|
|
# Worker execution for workers with data_consistency set to :delayed or :sticky
|
|
|
|
# will be delayed to give replication enough time to complete
|
|
|
|
if utilizes_load_balancing_capabilities? && data_consistency_delayed_execution_feature_flag_enabled?
|
|
|
|
perform_in(delay_interval, *args)
|
|
|
|
else
|
|
|
|
super
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2017-11-28 11:16:50 -05:00
|
|
|
def set_queue
|
2021-04-28 08:10:09 -04:00
|
|
|
queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
|
2017-12-12 11:21:00 -05:00
|
|
|
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
|
2017-11-28 11:16:50 -05:00
|
|
|
end
|
|
|
|
|
|
|
|
def queue_namespace(new_namespace = nil)
|
|
|
|
if new_namespace
|
|
|
|
sidekiq_options queue_namespace: new_namespace
|
|
|
|
|
|
|
|
set_queue
|
|
|
|
else
|
|
|
|
get_sidekiq_options['queue_namespace']&.to_s
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2017-11-28 11:08:30 -05:00
|
|
|
def queue
|
|
|
|
get_sidekiq_options['queue'].to_s
|
|
|
|
end
|
2017-11-29 10:30:17 -05:00
|
|
|
|
2020-06-12 08:08:56 -04:00
|
|
|
# Set/get which arguments can be logged and sent to Sentry.
|
|
|
|
#
|
|
|
|
# Numeric arguments are logged by default, so there is no need to
|
|
|
|
# list those.
|
|
|
|
#
|
|
|
|
# Non-numeric arguments must be listed by position, as Sidekiq
|
|
|
|
# cannot see argument names.
|
|
|
|
#
|
|
|
|
def loggable_arguments(*args)
|
|
|
|
if args.any?
|
|
|
|
@loggable_arguments = args
|
|
|
|
else
|
|
|
|
@loggable_arguments || []
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2019-07-05 13:18:15 -04:00
|
|
|
def queue_size
|
|
|
|
Sidekiq::Queue.new(queue).size
|
|
|
|
end
|
|
|
|
|
2017-11-29 10:30:17 -05:00
|
|
|
def bulk_perform_async(args_list)
|
|
|
|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
|
|
|
|
end
|
|
|
|
|
2020-06-02 14:08:32 -04:00
|
|
|
def bulk_perform_in(delay, args_list, batch_size: nil, batch_delay: nil)
|
2017-11-29 10:30:17 -05:00
|
|
|
now = Time.now.to_i
|
|
|
|
schedule = now + delay.to_i
|
|
|
|
|
|
|
|
if schedule <= now
|
2019-04-10 09:30:09 -04:00
|
|
|
raise ArgumentError, _('The schedule time must be in the future!')
|
2017-11-29 10:30:17 -05:00
|
|
|
end
|
|
|
|
|
2020-06-02 14:08:32 -04:00
|
|
|
if batch_size && batch_delay
|
|
|
|
args_list.each_slice(batch_size.to_i).with_index do |args_batch, idx|
|
|
|
|
batch_schedule = schedule + idx * batch_delay.to_i
|
|
|
|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_batch, 'at' => batch_schedule)
|
|
|
|
end
|
|
|
|
else
|
|
|
|
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
|
|
|
|
end
|
2017-11-29 10:30:17 -05:00
|
|
|
end
|
2021-05-19 17:12:42 -04:00
|
|
|
|
|
|
|
protected
|
|
|
|
|
|
|
|
def data_consistency_delayed_execution_feature_flag_enabled?
|
|
|
|
Feature.enabled?(:data_consistency_delayed_execution, default_enabled: :yaml)
|
|
|
|
end
|
|
|
|
|
|
|
|
def delay_interval
|
|
|
|
DEFAULT_DELAY_INTERVAL.seconds
|
|
|
|
end
|
2017-11-28 11:08:30 -05:00
|
|
|
end
|
|
|
|
end
|