From c03680fa4d9d82f2881e8b6c8e4a3c00b2cd5cf1 Mon Sep 17 00:00:00 2001 From: Adam Niedzielski Date: Fri, 6 May 2022 19:10:36 +0200 Subject: [PATCH] Add transaction-aware client (#5291) * Add transaction-aware client * Trigger CI build --- Gemfile | 1 + lib/sidekiq/transaction_aware_client.rb | 38 +++++++ lib/sidekiq/worker.rb | 3 +- test/test_transaction_aware_client.rb | 137 ++++++++++++++++++++++++ 4 files changed, 178 insertions(+), 1 deletion(-) create mode 100644 lib/sidekiq/transaction_aware_client.rb create mode 100644 test/test_transaction_aware_client.rb diff --git a/Gemfile b/Gemfile index bcae6f8c..74151814 100644 --- a/Gemfile +++ b/Gemfile @@ -7,6 +7,7 @@ gem "redis-namespace" gem "rails", "~> 6.0" gem "sqlite3", platforms: :ruby gem "activerecord-jdbcsqlite3-adapter", platforms: :jruby +gem "after_commit_everywhere" # mail dependencies gem "net-smtp", platforms: :mri, require: false diff --git a/lib/sidekiq/transaction_aware_client.rb b/lib/sidekiq/transaction_aware_client.rb new file mode 100644 index 00000000..67c21fa4 --- /dev/null +++ b/lib/sidekiq/transaction_aware_client.rb @@ -0,0 +1,38 @@ +# frozen_string_literal: true + +begin + require "after_commit_everywhere" +rescue LoadError + Sidekiq.logger.error("You need to add after_commit_everywhere to your Gemfile for this to work") + exit(-127) +end + +require "sidekiq/client" + +module Sidekiq + class TransactionAwareClient + def initialize(redis_pool) + @redis_client = Client.new(redis_pool) + end + + def push(item) + AfterCommitEverywhere.after_commit { @redis_client.push(item) } + end + + ## + # We don't provide transactionality for push_bulk because we don't want + # to hold potentially hundreds of thousands of job records in memory due to + # a long running enqueue process. + def push_bulk(items) + @redis_client.push_bulk(items) + end + end +end + +## +# Use `Sidekiq.transactional_push!` in your sidekiq.rb initializer +module Sidekiq + def self.transactional_push! + default_job_options["client_class"] = Sidekiq::TransactionAwareClient + end +end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 122bbe16..10736fcb 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -359,7 +359,8 @@ module Sidekiq def build_client # :nodoc: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool - Sidekiq::Client.new(pool) + client_class = get_sidekiq_options["client_class"] || Sidekiq::Client + client_class.new(pool) end end end diff --git a/test/test_transaction_aware_client.rb b/test/test_transaction_aware_client.rb new file mode 100644 index 00000000..64049884 --- /dev/null +++ b/test/test_transaction_aware_client.rb @@ -0,0 +1,137 @@ +# frozen_string_literal: true + +require_relative "helper" +require "sidekiq/api" +require "sidekiq/rails" +require "sidekiq/transaction_aware_client" + +require_relative "./dummy/config/environment" + +class Schema < ActiveRecord::Migration["6.1"] + def change + create_table :posts do |t| + t.string :title + t.date :published_date + end + end +end + +class PostJob + include Sidekiq::Job + def perform + end +end + +class AlwaysDeferredJob + include Sidekiq::Job + sidekiq_options client_class: Sidekiq::TransactionAwareClient + + def perform + end +end + +class AlwaysPushedJob + include Sidekiq::Job + sidekiq_options client_class: Sidekiq::Client + + def perform + end +end + +class Post < ActiveRecord::Base + after_create :do_thing + + def do_thing + PostJob.perform_async + end +end + +unless Post.connection.tables.include? "posts" + Schema.new.change +end + +describe Sidekiq::TransactionAwareClient do + before do + Sidekiq.redis { |c| c.flushdb } + @app = Dummy::Application.new + Post.delete_all + end + + after do + Sidekiq.default_job_options.delete("client_class") + end + + describe ActiveRecord do + it "pushes immediately by default" do + q = Sidekiq::Queue.new + assert_equal 0, q.size + + @app.executor.wrap do + ActiveRecord::Base.transaction do + Post.create!(title: "Hello", published_date: Date.today) + end + end + assert_equal 1, q.size + assert_equal 1, Post.count + + @app.executor.wrap do + ActiveRecord::Base.transaction do + Post.create!(title: "Hello", published_date: Date.today) + raise ActiveRecord::Rollback + end + end + assert_equal 2, q.size + assert_equal 1, Post.count + end + + it "can defer push within active transactions" do + Sidekiq.transactional_push! + q = Sidekiq::Queue.new + assert_equal 0, q.size + + @app.executor.wrap do + ActiveRecord::Base.transaction do + Post.create!(title: "Hello", published_date: Date.today) + end + end + assert_equal 1, q.size + assert_equal 1, Post.count + + @app.executor.wrap do + ActiveRecord::Base.transaction do + Post.create!(title: "Hello", published_date: Date.today) + raise ActiveRecord::Rollback + end + end + assert_equal 1, q.size + assert_equal 1, Post.count + end + + it "defers push when enabled on a per job basis" do + q = Sidekiq::Queue.new + assert_equal 0, q.size + + @app.executor.wrap do + ActiveRecord::Base.transaction do + AlwaysDeferredJob.perform_async + raise ActiveRecord::Rollback + end + end + assert_equal 0, q.size + end + + it "pushes immediately when disabled on a per job basis" do + Sidekiq.transactional_push! + q = Sidekiq::Queue.new + assert_equal 0, q.size + + @app.executor.wrap do + ActiveRecord::Base.transaction do + AlwaysPushedJob.perform_async + raise ActiveRecord::Rollback + end + end + assert_equal 1, q.size + end + end +end