mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Allow Sidekiq retry options in ActiveJob
c685d6928c
has introduced an exception that is raised on an attempt to include Sidekiq::Worker to an ActiveJob::Base descendant. That was done in order to prevent options not supported by ActiveJob, specifically `queue`. See https://github.com/mperham/sidekiq/issues/2424 Support for `set` was added later on, including the setting of `retry` from ActiveJob.d6538b0b4f
This change bridges the gap by allowing ActiveJob::Base descendants to include `Sidekiq::Options` and use `retry` option of `sidekiq_options`, and also `sidekiq_retry_in`, and `sidekiq_retries_exhausted`.
This commit is contained in:
parent
0ad6cec09a
commit
fc575fbb7e
3 changed files with 131 additions and 89 deletions
|
@ -1,6 +1,7 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "sidekiq/client"
|
||||
require "sidekiq/worker/options"
|
||||
|
||||
module Sidekiq
|
||||
##
|
||||
|
@ -24,12 +25,10 @@ module Sidekiq
|
|||
attr_accessor :jid
|
||||
|
||||
def self.included(base)
|
||||
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
|
||||
raise ArgumentError, "You can only include Sidekiq::Worker::Options in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
|
||||
|
||||
base.include(Options)
|
||||
base.extend(ClassMethods)
|
||||
base.sidekiq_class_attribute :sidekiq_options_hash
|
||||
base.sidekiq_class_attribute :sidekiq_retry_in_block
|
||||
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
|
||||
end
|
||||
|
||||
def logger
|
||||
|
@ -71,8 +70,6 @@ module Sidekiq
|
|||
end
|
||||
|
||||
module ClassMethods
|
||||
ACCESSOR_MUTEX = Mutex.new
|
||||
|
||||
def delay(*args)
|
||||
raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
|
||||
end
|
||||
|
@ -123,20 +120,7 @@ module Sidekiq
|
|||
# In practice, any option is allowed. This is the main mechanism to configure the
|
||||
# options for a specific job.
|
||||
def sidekiq_options(opts = {})
|
||||
# stringify
|
||||
self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }])
|
||||
end
|
||||
|
||||
def sidekiq_retry_in(&block)
|
||||
self.sidekiq_retry_in_block = block
|
||||
end
|
||||
|
||||
def sidekiq_retries_exhausted(&block)
|
||||
self.sidekiq_retries_exhausted_block = block
|
||||
end
|
||||
|
||||
def get_sidekiq_options # :nodoc:
|
||||
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
|
||||
super
|
||||
end
|
||||
|
||||
def client_push(item) # :nodoc:
|
||||
|
@ -148,72 +132,6 @@ module Sidekiq
|
|||
|
||||
Sidekiq::Client.new(pool).push(item)
|
||||
end
|
||||
|
||||
def sidekiq_class_attribute(*attrs)
|
||||
instance_reader = true
|
||||
instance_writer = true
|
||||
|
||||
attrs.each do |name|
|
||||
synchronized_getter = "__synchronized_#{name}"
|
||||
|
||||
singleton_class.instance_eval do
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
end
|
||||
|
||||
define_singleton_method(synchronized_getter) { nil }
|
||||
singleton_class.class_eval do
|
||||
private(synchronized_getter)
|
||||
end
|
||||
|
||||
define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }
|
||||
|
||||
ivar = "@#{name}"
|
||||
|
||||
singleton_class.instance_eval do
|
||||
m = "#{name}="
|
||||
undef_method(m) if method_defined?(m) || private_method_defined?(m)
|
||||
end
|
||||
define_singleton_method("#{name}=") do |val|
|
||||
singleton_class.class_eval do
|
||||
ACCESSOR_MUTEX.synchronize do
|
||||
undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
|
||||
define_method(synchronized_getter) { val }
|
||||
end
|
||||
end
|
||||
|
||||
if singleton_class?
|
||||
class_eval do
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
define_method(name) do
|
||||
if instance_variable_defined? ivar
|
||||
instance_variable_get ivar
|
||||
else
|
||||
singleton_class.send name
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
val
|
||||
end
|
||||
|
||||
if instance_reader
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
define_method(name) do
|
||||
if instance_variable_defined?(ivar)
|
||||
instance_variable_get ivar
|
||||
else
|
||||
self.class.public_send name
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if instance_writer
|
||||
m = "#{name}="
|
||||
undef_method(m) if method_defined?(m) || private_method_defined?(m)
|
||||
attr_writer name
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
112
lib/sidekiq/worker/options.rb
Normal file
112
lib/sidekiq/worker/options.rb
Normal file
|
@ -0,0 +1,112 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Sidekiq
|
||||
module Worker
|
||||
module Options
|
||||
def self.included(base)
|
||||
base.extend(ClassMethods)
|
||||
base.sidekiq_class_attribute :sidekiq_options_hash
|
||||
base.sidekiq_class_attribute :sidekiq_retry_in_block
|
||||
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
|
||||
end
|
||||
|
||||
module ClassMethods
|
||||
ACCESSOR_MUTEX = Mutex.new
|
||||
|
||||
##
|
||||
# Allows customization for this type of Worker.
|
||||
# Legal options:
|
||||
#
|
||||
# retry - enable the RetryJobs middleware for this Worker, *true* to use the default
|
||||
# or *Integer* count
|
||||
# backtrace - whether to save any error backtrace in the retry payload to display in web UI,
|
||||
# can be true, false or an integer number of lines to save, default *false*
|
||||
#
|
||||
# In practice, any option is allowed. This is the main mechanism to configure the
|
||||
# options for a specific job.
|
||||
def sidekiq_options(opts = {})
|
||||
opts = Hash[opts.map { |k, v| [k.to_s, v] }] # stringify
|
||||
self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }])
|
||||
end
|
||||
|
||||
def sidekiq_retry_in(&block)
|
||||
self.sidekiq_retry_in_block = block
|
||||
end
|
||||
|
||||
def sidekiq_retries_exhausted(&block)
|
||||
self.sidekiq_retries_exhausted_block = block
|
||||
end
|
||||
|
||||
def get_sidekiq_options # :nodoc:
|
||||
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
|
||||
end
|
||||
|
||||
def sidekiq_class_attribute(*attrs)
|
||||
instance_reader = true
|
||||
instance_writer = true
|
||||
|
||||
attrs.each do |name|
|
||||
synchronized_getter = "__synchronized_#{name}"
|
||||
|
||||
singleton_class.instance_eval do
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
end
|
||||
|
||||
define_singleton_method(synchronized_getter) { nil }
|
||||
singleton_class.class_eval do
|
||||
private(synchronized_getter)
|
||||
end
|
||||
|
||||
define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }
|
||||
|
||||
ivar = "@#{name}"
|
||||
|
||||
singleton_class.instance_eval do
|
||||
m = "#{name}="
|
||||
undef_method(m) if method_defined?(m) || private_method_defined?(m)
|
||||
end
|
||||
define_singleton_method("#{name}=") do |val|
|
||||
singleton_class.class_eval do
|
||||
ACCESSOR_MUTEX.synchronize do
|
||||
undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
|
||||
define_method(synchronized_getter) { val }
|
||||
end
|
||||
end
|
||||
|
||||
if singleton_class?
|
||||
class_eval do
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
define_method(name) do
|
||||
if instance_variable_defined? ivar
|
||||
instance_variable_get ivar
|
||||
else
|
||||
singleton_class.send name
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
val
|
||||
end
|
||||
|
||||
if instance_reader
|
||||
undef_method(name) if method_defined?(name) || private_method_defined?(name)
|
||||
define_method(name) do
|
||||
if instance_variable_defined?(ivar)
|
||||
instance_variable_get ivar
|
||||
else
|
||||
self.class.public_send name
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if instance_writer
|
||||
m = "#{name}="
|
||||
undef_method(m) if method_defined?(m) || private_method_defined?(m)
|
||||
attr_writer name
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -5,9 +5,21 @@ require 'active_job'
|
|||
describe 'ActiveJob' do
|
||||
it 'does not allow Sidekiq::Worker in AJ::Base classes' do
|
||||
ex = assert_raises ArgumentError do
|
||||
c = Class.new(ActiveJob::Base)
|
||||
c.send(:include, Sidekiq::Worker)
|
||||
Class.new(ActiveJob::Base) do
|
||||
include Sidekiq::Worker
|
||||
end
|
||||
end
|
||||
assert_includes ex.message, "can only include Sidekiq::Worker::Options"
|
||||
end
|
||||
|
||||
it 'allows Sidekiq::Options in AJ::Base classes' do
|
||||
Class.new(ActiveJob::Base) do
|
||||
include Sidekiq::Worker::Options
|
||||
sidekiq_options retry: true
|
||||
sidekiq_retry_in { |count, _exception| count * 10 }
|
||||
sidekiq_retries_exhausted do |msg, _exception|
|
||||
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
|
||||
end
|
||||
end
|
||||
assert_includes ex.message, "cannot include"
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue