From b7996f4564044593c2cf2faccd7ba7c72a85bbf1 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Wed, 24 Jul 2019 10:52:54 -0700 Subject: [PATCH] Better ActiveJob integration with Sidekiq options This change allows ActiveJobs to directly use sidekiq_options to configure various Sidekiq internals/features, #4213 --- lib/sidekiq/rails.rb | 32 +++++++--- lib/sidekiq/worker.rb | 113 +++++++++++++++++++++++++++++++++- lib/sidekiq/worker/options.rb | 112 --------------------------------- test/test_rails.rb | 32 ++++++++-- 4 files changed, 162 insertions(+), 127 deletions(-) delete mode 100644 lib/sidekiq/worker/options.rb diff --git a/lib/sidekiq/rails.rb b/lib/sidekiq/rails.rb index ff4f669e..a1c10e53 100644 --- a/lib/sidekiq/rails.rb +++ b/lib/sidekiq/rails.rb @@ -1,15 +1,33 @@ # frozen_string_literal: true +require "sidekiq/worker" + module Sidekiq class Rails < ::Rails::Engine + # By including the Options module, we allow AJs to directly control sidekiq features + # via the *sidekiq_options* class method and, for instance, not use AJ's retry system. + # AJ retries don't show up in the Sidekiq UI Retries tab, save any error data, can't be + # manually retried, don't automatically die, etc. + # + # class SomeJob < ActiveJob::Base + # queue_as :default + # sidekiq_options retry: 3, backtrace: 10 + # def perform + # end + # end + initializer "sidekiq.active_job_integration" do + ActiveSupport.on_load(:active_job) do + include ::Sidekiq::Worker::Options unless respond_to?(:sidekiq_options) + end + end + + # This hook happens after all initializers are run, just before returning + # from config/environment.rb back to sidekiq/cli.rb. + # We have to add the reloader after initialize to see if cache_classes has + # been turned on. + # + # None of this matters on the client-side, only within the Sidekiq process itself. config.after_initialize do - # This hook happens after all initializers are run, just before returning - # from config/environment.rb back to sidekiq/cli.rb. - # We have to add the reloader after initialize to see if cache_classes has - # been turned on. - # - # None of this matters on the client-side, only within the Sidekiq process itself. - # Sidekiq.configure_server do |_| Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index d80eba8f..0b8d1353 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -1,7 +1,6 @@ # frozen_string_literal: true require "sidekiq/client" -require "sidekiq/worker/options" module Sidekiq ## @@ -22,10 +21,120 @@ module Sidekiq # # Note that perform_async is a class method, perform is an instance method. module Worker + ## + # The Options module is extracted so we can include it in ActiveJob::Base + # and allow native AJs to configure Sidekiq features/internals. + module Options + def self.included(base) + base.extend(ClassMethods) + base.sidekiq_class_attribute :sidekiq_options_hash + base.sidekiq_class_attribute :sidekiq_retry_in_block + base.sidekiq_class_attribute :sidekiq_retries_exhausted_block + end + + module ClassMethods + ACCESSOR_MUTEX = Mutex.new + + ## + # Allows customization for this type of Worker. + # Legal options: + # + # retry - enable the RetryJobs middleware for this Worker, *true* to use the default + # or *Integer* count + # backtrace - whether to save any error backtrace in the retry payload to display in web UI, + # can be true, false or an integer number of lines to save, default *false* + # + # In practice, any option is allowed. This is the main mechanism to configure the + # options for a specific job. + def sidekiq_options(opts = {}) + opts = Hash[opts.map { |k, v| [k.to_s, v] }] # stringify + self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }]) + end + + def sidekiq_retry_in(&block) + self.sidekiq_retry_in_block = block + end + + def sidekiq_retries_exhausted(&block) + self.sidekiq_retries_exhausted_block = block + end + + def get_sidekiq_options # :nodoc: + self.sidekiq_options_hash ||= Sidekiq.default_worker_options + end + + def sidekiq_class_attribute(*attrs) + instance_reader = true + instance_writer = true + + attrs.each do |name| + synchronized_getter = "__synchronized_#{name}" + + singleton_class.instance_eval do + undef_method(name) if method_defined?(name) || private_method_defined?(name) + end + + define_singleton_method(synchronized_getter) { nil } + singleton_class.class_eval do + private(synchronized_getter) + end + + define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } + + ivar = "@#{name}" + + singleton_class.instance_eval do + m = "#{name}=" + undef_method(m) if method_defined?(m) || private_method_defined?(m) + end + define_singleton_method("#{name}=") do |val| + singleton_class.class_eval do + ACCESSOR_MUTEX.synchronize do + undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) + define_method(synchronized_getter) { val } + end + end + + if singleton_class? + class_eval do + undef_method(name) if method_defined?(name) || private_method_defined?(name) + define_method(name) do + if instance_variable_defined? ivar + instance_variable_get ivar + else + singleton_class.send name + end + end + end + end + val + end + + if instance_reader + undef_method(name) if method_defined?(name) || private_method_defined?(name) + define_method(name) do + if instance_variable_defined?(ivar) + instance_variable_get ivar + else + self.class.public_send name + end + end + end + + if instance_writer + m = "#{name}=" + undef_method(m) if method_defined?(m) || private_method_defined?(m) + attr_writer name + end + end + end + end + end + attr_accessor :jid def self.included(base) - raise ArgumentError, "You can only include Sidekiq::Worker::Options in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } + raise ArgumentError, "Sidekiq::Worker cannot be included in an ActiveJob: #{base.name}" if base.ancestors.any? { |c| c.name == "ActiveJob::Base" } base.include(Options) base.extend(ClassMethods) diff --git a/lib/sidekiq/worker/options.rb b/lib/sidekiq/worker/options.rb deleted file mode 100644 index 0325795f..00000000 --- a/lib/sidekiq/worker/options.rb +++ /dev/null @@ -1,112 +0,0 @@ -# frozen_string_literal: true - -module Sidekiq - module Worker - module Options - def self.included(base) - base.extend(ClassMethods) - base.sidekiq_class_attribute :sidekiq_options_hash - base.sidekiq_class_attribute :sidekiq_retry_in_block - base.sidekiq_class_attribute :sidekiq_retries_exhausted_block - end - - module ClassMethods - ACCESSOR_MUTEX = Mutex.new - - ## - # Allows customization for this type of Worker. - # Legal options: - # - # retry - enable the RetryJobs middleware for this Worker, *true* to use the default - # or *Integer* count - # backtrace - whether to save any error backtrace in the retry payload to display in web UI, - # can be true, false or an integer number of lines to save, default *false* - # - # In practice, any option is allowed. This is the main mechanism to configure the - # options for a specific job. - def sidekiq_options(opts = {}) - opts = Hash[opts.map { |k, v| [k.to_s, v] }] # stringify - self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map { |k, v| [k.to_s, v] }]) - end - - def sidekiq_retry_in(&block) - self.sidekiq_retry_in_block = block - end - - def sidekiq_retries_exhausted(&block) - self.sidekiq_retries_exhausted_block = block - end - - def get_sidekiq_options # :nodoc: - self.sidekiq_options_hash ||= Sidekiq.default_worker_options - end - - def sidekiq_class_attribute(*attrs) - instance_reader = true - instance_writer = true - - attrs.each do |name| - synchronized_getter = "__synchronized_#{name}" - - singleton_class.instance_eval do - undef_method(name) if method_defined?(name) || private_method_defined?(name) - end - - define_singleton_method(synchronized_getter) { nil } - singleton_class.class_eval do - private(synchronized_getter) - end - - define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } } - - ivar = "@#{name}" - - singleton_class.instance_eval do - m = "#{name}=" - undef_method(m) if method_defined?(m) || private_method_defined?(m) - end - define_singleton_method("#{name}=") do |val| - singleton_class.class_eval do - ACCESSOR_MUTEX.synchronize do - undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter) - define_method(synchronized_getter) { val } - end - end - - if singleton_class? - class_eval do - undef_method(name) if method_defined?(name) || private_method_defined?(name) - define_method(name) do - if instance_variable_defined? ivar - instance_variable_get ivar - else - singleton_class.send name - end - end - end - end - val - end - - if instance_reader - undef_method(name) if method_defined?(name) || private_method_defined?(name) - define_method(name) do - if instance_variable_defined?(ivar) - instance_variable_get ivar - else - self.class.public_send name - end - end - end - - if instance_writer - m = "#{name}=" - undef_method(m) if method_defined?(m) || private_method_defined?(m) - attr_writer name - end - end - end - end - end - end -end diff --git a/test/test_rails.rb b/test/test_rails.rb index 5535fbfe..56e8ca3f 100644 --- a/test/test_rails.rb +++ b/test/test_rails.rb @@ -1,25 +1,45 @@ # frozen_string_literal: true require_relative 'helper' -require 'active_job' +require 'sidekiq/rails' +require 'sidekiq/api' describe 'ActiveJob' do + before do + Sidekiq.redis {|c| c.flushdb } + # need to force this since we aren't booting a Rails app + ActiveJob::Base.queue_adapter = :sidekiq + ActiveJob::Base.logger = nil + ActiveJob::Base.send(:include, ::Sidekiq::Worker::Options) unless ActiveJob::Base.respond_to?(:sidekiq_options) + end + it 'does not allow Sidekiq::Worker in AJ::Base classes' do ex = assert_raises ArgumentError do Class.new(ActiveJob::Base) do include Sidekiq::Worker end end - assert_includes ex.message, "can only include Sidekiq::Worker::Options" + assert_includes ex.message, "Sidekiq::Worker cannot be included" end - it 'allows Sidekiq::Options in AJ::Base classes' do - Class.new(ActiveJob::Base) do - include Sidekiq::Worker::Options - sidekiq_options retry: true + it 'loads Sidekiq::Worker::Options in AJ::Base classes' do + aj = Class.new(ActiveJob::Base) do + queue_as :bar + sidekiq_options retry: 4, queue: 'foo' sidekiq_retry_in { |count, _exception| count * 10 } sidekiq_retries_exhausted do |msg, _exception| Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}" end end + + assert_equal 4, aj.get_sidekiq_options["retry"] + + # When using ActiveJobs, you cannot set the queue with sidekiq_options, you must use + # queue_as or set(queue: ...). This is to avoid duplicate ways of doing the same thing. + instance = aj.perform_later(1, 2, 3) + q = Sidekiq::Queue.new("foo") + assert_equal 0, q.size + q = Sidekiq::Queue.new("bar") + assert_equal 1, q.size + assert_equal 24, instance.provider_job_id.size end end