mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add transaction-aware client (#5291)
* Add transaction-aware client * Trigger CI build
This commit is contained in:
parent
0cf086ff6b
commit
c03680fa4d
4 changed files with 178 additions and 1 deletions
1
Gemfile
1
Gemfile
|
@ -7,6 +7,7 @@ gem "redis-namespace"
|
|||
gem "rails", "~> 6.0"
|
||||
gem "sqlite3", platforms: :ruby
|
||||
gem "activerecord-jdbcsqlite3-adapter", platforms: :jruby
|
||||
gem "after_commit_everywhere"
|
||||
|
||||
# mail dependencies
|
||||
gem "net-smtp", platforms: :mri, require: false
|
||||
|
|
38
lib/sidekiq/transaction_aware_client.rb
Normal file
38
lib/sidekiq/transaction_aware_client.rb
Normal file
|
@ -0,0 +1,38 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
begin
|
||||
require "after_commit_everywhere"
|
||||
rescue LoadError
|
||||
Sidekiq.logger.error("You need to add after_commit_everywhere to your Gemfile for this to work")
|
||||
exit(-127)
|
||||
end
|
||||
|
||||
require "sidekiq/client"
|
||||
|
||||
module Sidekiq
|
||||
class TransactionAwareClient
|
||||
def initialize(redis_pool)
|
||||
@redis_client = Client.new(redis_pool)
|
||||
end
|
||||
|
||||
def push(item)
|
||||
AfterCommitEverywhere.after_commit { @redis_client.push(item) }
|
||||
end
|
||||
|
||||
##
|
||||
# We don't provide transactionality for push_bulk because we don't want
|
||||
# to hold potentially hundreds of thousands of job records in memory due to
|
||||
# a long running enqueue process.
|
||||
def push_bulk(items)
|
||||
@redis_client.push_bulk(items)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
##
|
||||
# Use `Sidekiq.transactional_push!` in your sidekiq.rb initializer
|
||||
module Sidekiq
|
||||
def self.transactional_push!
|
||||
default_job_options["client_class"] = Sidekiq::TransactionAwareClient
|
||||
end
|
||||
end
|
|
@ -359,7 +359,8 @@ module Sidekiq
|
|||
|
||||
def build_client # :nodoc:
|
||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options["pool"] || Sidekiq.redis_pool
|
||||
Sidekiq::Client.new(pool)
|
||||
client_class = get_sidekiq_options["client_class"] || Sidekiq::Client
|
||||
client_class.new(pool)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
137
test/test_transaction_aware_client.rb
Normal file
137
test/test_transaction_aware_client.rb
Normal file
|
@ -0,0 +1,137 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require_relative "helper"
|
||||
require "sidekiq/api"
|
||||
require "sidekiq/rails"
|
||||
require "sidekiq/transaction_aware_client"
|
||||
|
||||
require_relative "./dummy/config/environment"
|
||||
|
||||
class Schema < ActiveRecord::Migration["6.1"]
|
||||
def change
|
||||
create_table :posts do |t|
|
||||
t.string :title
|
||||
t.date :published_date
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class PostJob
|
||||
include Sidekiq::Job
|
||||
def perform
|
||||
end
|
||||
end
|
||||
|
||||
class AlwaysDeferredJob
|
||||
include Sidekiq::Job
|
||||
sidekiq_options client_class: Sidekiq::TransactionAwareClient
|
||||
|
||||
def perform
|
||||
end
|
||||
end
|
||||
|
||||
class AlwaysPushedJob
|
||||
include Sidekiq::Job
|
||||
sidekiq_options client_class: Sidekiq::Client
|
||||
|
||||
def perform
|
||||
end
|
||||
end
|
||||
|
||||
class Post < ActiveRecord::Base
|
||||
after_create :do_thing
|
||||
|
||||
def do_thing
|
||||
PostJob.perform_async
|
||||
end
|
||||
end
|
||||
|
||||
unless Post.connection.tables.include? "posts"
|
||||
Schema.new.change
|
||||
end
|
||||
|
||||
describe Sidekiq::TransactionAwareClient do
|
||||
before do
|
||||
Sidekiq.redis { |c| c.flushdb }
|
||||
@app = Dummy::Application.new
|
||||
Post.delete_all
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.default_job_options.delete("client_class")
|
||||
end
|
||||
|
||||
describe ActiveRecord do
|
||||
it "pushes immediately by default" do
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 0, q.size
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
Post.create!(title: "Hello", published_date: Date.today)
|
||||
end
|
||||
end
|
||||
assert_equal 1, q.size
|
||||
assert_equal 1, Post.count
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
Post.create!(title: "Hello", published_date: Date.today)
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 2, q.size
|
||||
assert_equal 1, Post.count
|
||||
end
|
||||
|
||||
it "can defer push within active transactions" do
|
||||
Sidekiq.transactional_push!
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 0, q.size
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
Post.create!(title: "Hello", published_date: Date.today)
|
||||
end
|
||||
end
|
||||
assert_equal 1, q.size
|
||||
assert_equal 1, Post.count
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
Post.create!(title: "Hello", published_date: Date.today)
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 1, q.size
|
||||
assert_equal 1, Post.count
|
||||
end
|
||||
|
||||
it "defers push when enabled on a per job basis" do
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 0, q.size
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
AlwaysDeferredJob.perform_async
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 0, q.size
|
||||
end
|
||||
|
||||
it "pushes immediately when disabled on a per job basis" do
|
||||
Sidekiq.transactional_push!
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 0, q.size
|
||||
|
||||
@app.executor.wrap do
|
||||
ActiveRecord::Base.transaction do
|
||||
AlwaysPushedJob.perform_async
|
||||
raise ActiveRecord::Rollback
|
||||
end
|
||||
end
|
||||
assert_equal 1, q.size
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Reference in a new issue