1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Better ActiveJob integration with Sidekiq options

This change allows ActiveJobs to directly use sidekiq_options to configure various Sidekiq internals/features, #4213
This commit is contained in:
Mike Perham 2019-07-24 10:52:54 -07:00
parent fc575fbb7e
commit b7996f4564
4 changed files with 162 additions and 127 deletions

View file

@ -1,15 +1,33 @@
# frozen_string_literal: true
require "sidekiq/worker"
module Sidekiq
class Rails < ::Rails::Engine
# By including the Options module, we allow AJs to directly control sidekiq features
# via the *sidekiq_options* class method and, for instance, not use AJ's retry system.
# AJ retries don't show up in the Sidekiq UI Retries tab, save any error data, can't be
# manually retried, don't automatically die, etc.
#
# class SomeJob < ActiveJob::Base
# queue_as :default
# sidekiq_options retry: 3, backtrace: 10
# def perform
# end
# end
initializer "sidekiq.active_job_integration" do
ActiveSupport.on_load(:active_job) do
include ::Sidekiq::Worker::Options unless respond_to?(:sidekiq_options)
end
end
# This hook happens after all initializers are run, just before returning
# from config/environment.rb back to sidekiq/cli.rb.
# We have to add the reloader after initialize to see if cache_classes has
# been turned on.
#
# None of this matters on the client-side, only within the Sidekiq process itself.
config.after_initialize do
# This hook happens after all initializers are run, just before returning
# from config/environment.rb back to sidekiq/cli.rb.
# We have to add the reloader after initialize to see if cache_classes has
# been turned on.
#
# None of this matters on the client-side, only within the Sidekiq process itself.
#
Sidekiq.configure_server do |_|
Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new
end

View file

@ -1,7 +1,6 @@
# frozen_string_literal: true
require "sidekiq/client"
require "sidekiq/worker/options"
module Sidekiq
##
@ -22,10 +21,120 @@ module Sidekiq
#
# Note that perform_async is a class method, perform is an instance method.
module Worker
##
# The Options module is extracted so we can include it in ActiveJob::Base
# and allow native AJs to configure Sidekiq features/internals.
module Options
def self.included(base)
base.extend(ClassMethods)
base.sidekiq_class_attribute :sidekiq_options_hash
base.sidekiq_class_attribute :sidekiq_retry_in_block
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
end
module ClassMethods
ACCESSOR_MUTEX = Mutex.new
##
# Allows customization for this type of Worker.
# Legal options:
#
# retry - enable the RetryJobs middleware for this Worker, *true* to use the default
# or *Integer* count
# backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
#
# In practice, any option is allowed. This is the main mechanism to configure the
# options for a specific job.
def sidekiq_options(opts = {})
opts = Hash[opts.map { |k, v| [k.to_s, v] }] # stringify
self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }])
end
def sidekiq_retry_in(&block)
self.sidekiq_retry_in_block = block
end
def sidekiq_retries_exhausted(&block)
self.sidekiq_retries_exhausted_block = block
end
def get_sidekiq_options # :nodoc:
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end
def sidekiq_class_attribute(*attrs)
instance_reader = true
instance_writer = true
attrs.each do |name|
synchronized_getter = "__synchronized_#{name}"
singleton_class.instance_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
end
define_singleton_method(synchronized_getter) { nil }
singleton_class.class_eval do
private(synchronized_getter)
end
define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }
ivar = "@#{name}"
singleton_class.instance_eval do
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
end
define_singleton_method("#{name}=") do |val|
singleton_class.class_eval do
ACCESSOR_MUTEX.synchronize do
undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
define_method(synchronized_getter) { val }
end
end
if singleton_class?
class_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined? ivar
instance_variable_get ivar
else
singleton_class.send name
end
end
end
end
val
end
if instance_reader
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined?(ivar)
instance_variable_get ivar
else
self.class.public_send name
end
end
end
if instance_writer
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
attr_writer name
end
end
end
end
end
attr_accessor :jid
def self.included(base)
raise ArgumentError, "You can only include Sidekiq::Worker::Options in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" }
base.include(Options)
base.extend(ClassMethods)

View file

@ -1,112 +0,0 @@
# frozen_string_literal: true
module Sidekiq
module Worker
module Options
def self.included(base)
base.extend(ClassMethods)
base.sidekiq_class_attribute :sidekiq_options_hash
base.sidekiq_class_attribute :sidekiq_retry_in_block
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
end
module ClassMethods
ACCESSOR_MUTEX = Mutex.new
##
# Allows customization for this type of Worker.
# Legal options:
#
# retry - enable the RetryJobs middleware for this Worker, *true* to use the default
# or *Integer* count
# backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
#
# In practice, any option is allowed. This is the main mechanism to configure the
# options for a specific job.
def sidekiq_options(opts = {})
opts = Hash[opts.map { |k, v| [k.to_s, v] }] # stringify
self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }])
end
def sidekiq_retry_in(&block)
self.sidekiq_retry_in_block = block
end
def sidekiq_retries_exhausted(&block)
self.sidekiq_retries_exhausted_block = block
end
def get_sidekiq_options # :nodoc:
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end
def sidekiq_class_attribute(*attrs)
instance_reader = true
instance_writer = true
attrs.each do |name|
synchronized_getter = "__synchronized_#{name}"
singleton_class.instance_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
end
define_singleton_method(synchronized_getter) { nil }
singleton_class.class_eval do
private(synchronized_getter)
end
define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }
ivar = "@#{name}"
singleton_class.instance_eval do
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
end
define_singleton_method("#{name}=") do |val|
singleton_class.class_eval do
ACCESSOR_MUTEX.synchronize do
undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
define_method(synchronized_getter) { val }
end
end
if singleton_class?
class_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined? ivar
instance_variable_get ivar
else
singleton_class.send name
end
end
end
end
val
end
if instance_reader
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined?(ivar)
instance_variable_get ivar
else
self.class.public_send name
end
end
end
if instance_writer
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
attr_writer name
end
end
end
end
end
end
end

View file

@ -1,25 +1,45 @@
# frozen_string_literal: true
require_relative 'helper'
require 'active_job'
require 'sidekiq/rails'
require 'sidekiq/api'
describe 'ActiveJob' do
before do
Sidekiq.redis {|c| c.flushdb }
# need to force this since we aren't booting a Rails app
ActiveJob::Base.queue_adapter = :sidekiq
ActiveJob::Base.logger = nil
ActiveJob::Base.send(:include, ::Sidekiq::Worker::Options) unless ActiveJob::Base.respond_to?(:sidekiq_options)
end
it 'does not allow Sidekiq::Worker in AJ::Base classes' do
ex = assert_raises ArgumentError do
Class.new(ActiveJob::Base) do
include Sidekiq::Worker
end
end
assert_includes ex.message, "can only include Sidekiq::Worker::Options"
assert_includes ex.message, "Sidekiq::Worker cannot be included"
end
it 'allows Sidekiq::Options in AJ::Base classes' do
Class.new(ActiveJob::Base) do
include Sidekiq::Worker::Options
sidekiq_options retry: true
it 'loads Sidekiq::Worker::Options in AJ::Base classes' do
aj = Class.new(ActiveJob::Base) do
queue_as :bar
sidekiq_options retry: 4, queue: 'foo'
sidekiq_retry_in { |count, _exception| count * 10 }
sidekiq_retries_exhausted do |msg, _exception|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
end
assert_equal 4, aj.get_sidekiq_options["retry"]
# When using ActiveJobs, you cannot set the queue with sidekiq_options, you must use
# queue_as or set(queue: ...). This is to avoid duplicate ways of doing the same thing.
instance = aj.perform_later(1, 2, 3)
q = Sidekiq::Queue.new("foo")
assert_equal 0, q.size
q = Sidekiq::Queue.new("bar")
assert_equal 1, q.size
assert_equal 24, instance.provider_job_id.size
end
end