mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Merge pull request #28726 from matthewd/transaction-locking
Add comprehensive locking around DB transactions
This commit is contained in:
commit
92ba89dbcc
3 changed files with 70 additions and 52 deletions
|
@ -149,57 +149,67 @@ module ActiveRecord
|
|||
end
|
||||
|
||||
def begin_transaction(options = {})
|
||||
run_commit_callbacks = !current_transaction.joinable?
|
||||
transaction =
|
||||
if @stack.empty?
|
||||
RealTransaction.new(@connection, options, run_commit_callbacks: run_commit_callbacks)
|
||||
else
|
||||
SavepointTransaction.new(@connection, "active_record_#{@stack.size}", options,
|
||||
run_commit_callbacks: run_commit_callbacks)
|
||||
end
|
||||
@connection.lock.synchronize do
|
||||
run_commit_callbacks = !current_transaction.joinable?
|
||||
transaction =
|
||||
if @stack.empty?
|
||||
RealTransaction.new(@connection, options, run_commit_callbacks: run_commit_callbacks)
|
||||
else
|
||||
SavepointTransaction.new(@connection, "active_record_#{@stack.size}", options,
|
||||
run_commit_callbacks: run_commit_callbacks)
|
||||
end
|
||||
|
||||
@stack.push(transaction)
|
||||
transaction
|
||||
@stack.push(transaction)
|
||||
transaction
|
||||
end
|
||||
end
|
||||
|
||||
def commit_transaction
|
||||
transaction = @stack.last
|
||||
@connection.lock.synchronize do
|
||||
transaction = @stack.last
|
||||
|
||||
begin
|
||||
transaction.before_commit_records
|
||||
ensure
|
||||
@stack.pop
|
||||
begin
|
||||
transaction.before_commit_records
|
||||
ensure
|
||||
@stack.pop
|
||||
end
|
||||
|
||||
transaction.commit
|
||||
transaction.commit_records
|
||||
end
|
||||
|
||||
transaction.commit
|
||||
transaction.commit_records
|
||||
end
|
||||
|
||||
def rollback_transaction(transaction = nil)
|
||||
transaction ||= @stack.pop
|
||||
transaction.rollback
|
||||
transaction.rollback_records
|
||||
@connection.lock.synchronize do
|
||||
transaction ||= @stack.pop
|
||||
transaction.rollback
|
||||
transaction.rollback_records
|
||||
end
|
||||
end
|
||||
|
||||
def within_new_transaction(options = {})
|
||||
transaction = begin_transaction options
|
||||
yield
|
||||
rescue Exception => error
|
||||
if transaction
|
||||
rollback_transaction
|
||||
after_failure_actions(transaction, error)
|
||||
end
|
||||
raise
|
||||
ensure
|
||||
unless error
|
||||
if Thread.current.status == "aborting"
|
||||
rollback_transaction if transaction
|
||||
else
|
||||
begin
|
||||
commit_transaction
|
||||
rescue Exception
|
||||
rollback_transaction(transaction) unless transaction.state.completed?
|
||||
raise
|
||||
@connection.lock.synchronize do
|
||||
begin
|
||||
transaction = begin_transaction options
|
||||
yield
|
||||
rescue Exception => error
|
||||
if transaction
|
||||
rollback_transaction
|
||||
after_failure_actions(transaction, error)
|
||||
end
|
||||
raise
|
||||
ensure
|
||||
unless error
|
||||
if Thread.current.status == "aborting"
|
||||
rollback_transaction if transaction
|
||||
else
|
||||
begin
|
||||
commit_transaction
|
||||
rescue Exception
|
||||
rollback_transaction(transaction) unless transaction.state.completed?
|
||||
raise
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -74,7 +74,7 @@ module ActiveRecord
|
|||
SIMPLE_INT = /\A\d+\z/
|
||||
|
||||
attr_accessor :visitor, :pool
|
||||
attr_reader :schema_cache, :owner, :logger, :prepared_statements
|
||||
attr_reader :schema_cache, :owner, :logger, :prepared_statements, :lock
|
||||
alias :in_use? :owner
|
||||
|
||||
def self.type_cast_config_to_integer(config)
|
||||
|
|
|
@ -239,7 +239,9 @@ module ActiveRecord
|
|||
|
||||
# Is this connection alive and ready for queries?
|
||||
def active?
|
||||
@connection.query "SELECT 1"
|
||||
@lock.synchronize do
|
||||
@connection.query "SELECT 1"
|
||||
end
|
||||
true
|
||||
rescue PG::Error
|
||||
false
|
||||
|
@ -247,26 +249,32 @@ module ActiveRecord
|
|||
|
||||
# Close then reopen the connection.
|
||||
def reconnect!
|
||||
super
|
||||
@connection.reset
|
||||
configure_connection
|
||||
@lock.synchronize do
|
||||
super
|
||||
@connection.reset
|
||||
configure_connection
|
||||
end
|
||||
end
|
||||
|
||||
def reset!
|
||||
clear_cache!
|
||||
reset_transaction
|
||||
unless @connection.transaction_status == ::PG::PQTRANS_IDLE
|
||||
@connection.query "ROLLBACK"
|
||||
@lock.synchronize do
|
||||
clear_cache!
|
||||
reset_transaction
|
||||
unless @connection.transaction_status == ::PG::PQTRANS_IDLE
|
||||
@connection.query "ROLLBACK"
|
||||
end
|
||||
@connection.query "DISCARD ALL"
|
||||
configure_connection
|
||||
end
|
||||
@connection.query "DISCARD ALL"
|
||||
configure_connection
|
||||
end
|
||||
|
||||
# Disconnects from the database if already connected. Otherwise, this
|
||||
# method does nothing.
|
||||
def disconnect!
|
||||
super
|
||||
@connection.close rescue nil
|
||||
@lock.synchronize do
|
||||
super
|
||||
@connection.close rescue nil
|
||||
end
|
||||
end
|
||||
|
||||
def native_database_types #:nodoc:
|
||||
|
|
Loading…
Reference in a new issue