diff --git a/Gemfile b/Gemfile index 74151814..bcae6f8c 100644 --- a/Gemfile +++ b/Gemfile @@ -7,7 +7,6 @@ gem "redis-namespace" gem "rails", "~> 6.0" gem "sqlite3", platforms: :ruby gem "activerecord-jdbcsqlite3-adapter", platforms: :jruby -gem "after_commit_everywhere" # mail dependencies gem "net-smtp", platforms: :mri, require: false diff --git a/lib/sidekiq/xaclient.rb b/lib/sidekiq/xaclient.rb deleted file mode 100644 index 7c3f6303..00000000 --- a/lib/sidekiq/xaclient.rb +++ /dev/null @@ -1,61 +0,0 @@ -require "sidekiq/client" - -begin - require "after_commit_everywhere" -rescue LoadError - Sidekiq.logger.error("You need to add after_commit_everywhere to your Gemfile for this to work") - exit(-127) -end - -module Sidekiq - module TransactionAwareClient - include AfterCommitEverywhere - - ## - # Control job push within ActiveRecord transactions. Jobs can specify an - # "xa" attribute to define the policy to use: - # - # * true or "commit" means enqueue the job after committing the current transaction. - # * "rollback" means enqueue this job only if the current transaction rolls back - # * nil or false means enqueue the job immediately, Sidekiq's default behavior - # - # If we are not in a transaction, behavior should be unchanged. - # If we ARE in a transaction, the return value of JID will not be available - # due to the asynchronous callback. - def push(item) - # Sidekiq::Job does not merge sidekiq_options so we need to fallback - policy = item.fetch("xa") { |key| - kl = item["class"] - kl.respond_to?(:get_sidekiq_options) ? kl.get_sidekiq_options[key] : nil - } - if policy && in_transaction? - if policy == "commit" || policy == true - after_commit { super } - return "after_commit" - elsif policy == "rollback" - after_rollback { super } - return "after_rollback" - end - end - - super - end - - ## - # We don't provide transactionality for push_bulk because we don't want - # to hold potentially hundreds of thousands of job records in memory due to - # a long running enqueue process. TODO: wdyt? - def push_bulk(items) - super - end - end -end - -## -# Use `Sidekiq.transactional_push!` in your sidekiq.rb initializer -module Sidekiq - def self.transactional_push!(policy: "commit") # TODO: is this knob really necessary? - Sidekiq::Client.prepend(Sidekiq::TransactionAwareClient) - default_job_options["xa"] = policy - end -end diff --git a/test/test_xa.rb b/test/test_xa.rb deleted file mode 100644 index 7d363a98..00000000 --- a/test/test_xa.rb +++ /dev/null @@ -1,122 +0,0 @@ -# frozen_string_literal: true - -require_relative "helper" -require "sidekiq/api" -require "sidekiq/rails" -require "sidekiq/xaclient" -Sidekiq.transactional_push! - -require_relative "./dummy/config/environment" - -class Schema < ActiveRecord::Migration["6.1"] - def change - create_table :posts do |t| - t.string :title - t.date :published_date - end - end -end - -class PostJob - include Sidekiq::Job - def perform - end -end - -class Post < ActiveRecord::Base - after_create :do_thing - - def do_thing - PostJob.perform_async - end -end - -unless Post.connection.tables.include? "posts" - Schema.new.change -end - -describe "XA" do - before do - Sidekiq.redis { |c| c.flushdb } - @app = Dummy::Application.new - Post.delete_all - # need to force this since we aren't booting a Rails app - # ActiveJob::Base.queue_adapter = :sidekiq - # ActiveJob::Base.logger = nil - # ActiveJob::Base.send(:include, ::Sidekiq::Worker::Options) unless ActiveJob::Base.respond_to?(:sidekiq_options) - end - - after do - Sidekiq.default_job_options.delete("xa") - end - - describe ActiveRecord do - it "pushes immediately by default" do - Sidekiq.default_job_options["xa"] = nil - q = Sidekiq::Queue.new - assert_equal 0, q.size - - @app.executor.wrap do - ActiveRecord::Base.transaction do - Post.create!(title: "Hello", published_date: Date.today) - end - end - assert_equal 1, q.size - assert_equal 1, Post.count - - @app.executor.wrap do - ActiveRecord::Base.transaction do - Post.create!(title: "Hello", published_date: Date.today) - raise ActiveRecord::Rollback - end - end - assert_equal 2, q.size - assert_equal 1, Post.count - end - - it "can defer push within active transactions" do - Sidekiq.default_job_options["xa"] = "commit" - q = Sidekiq::Queue.new - assert_equal 0, q.size - - @app.executor.wrap do - ActiveRecord::Base.transaction do - Post.create!(title: "Hello", published_date: Date.today) - end - end - assert_equal 1, q.size - assert_equal 1, Post.count - - @app.executor.wrap do - ActiveRecord::Base.transaction do - Post.create!(title: "Hello", published_date: Date.today) - raise ActiveRecord::Rollback - end - end - assert_equal 1, q.size - assert_equal 1, Post.count - end - - it "can push after rollback" do - Sidekiq.default_job_options["xa"] = "commit" - q = Sidekiq::Queue.new - assert_equal 0, q.size - - @app.executor.wrap do - ActiveRecord::Base.transaction do - PostJob.set(xa: "rollback").perform_async - end - end - assert_equal 0, q.size - - @app.executor.wrap do - ActiveRecord::Base.transaction do - PostJob.set(xa: "rollback").perform_async - raise ActiveRecord::Rollback - end - end - assert_equal 1, q.size - end - - end -end