r4644@asus: jeremy | 2006-06-16 14:57:03 -0700

locking
 r4645@asus:  jeremy | 2006-06-17 12:41:30 -0700
 missing reply fixture
 r4646@asus:  jeremy | 2006-06-19 13:05:23 -0700
 Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections.
 r4647@asus:  jeremy | 2006-06-19 13:07:23 -0700
 PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency.
 r4648@asus:  jeremy | 2006-06-19 13:08:40 -0700
 Pass the default allow_concurrency when instantiating new connections.
 r4649@asus:  jeremy | 2006-06-19 13:11:12 -0700
 Break out concurrent transaction tests and run them for PostgreSQLAdapter only (need to fork or system('some_test_script') for the other adapters)
 r4650@asus:  jeremy | 2006-06-19 13:42:48 -0700
 Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE".
 r4661@asus:  jeremy | 2006-06-19 15:36:51 -0700
 excise the junk mutex


git-svn-id: http://svn-commit.rubyonrails.org/rails/trunk@4460 5ecf4fe2-1ee6-0310-87b1-e25e094e27de
This commit is contained in:
Jeremy Kemper 2006-06-19 22:48:51 +00:00
parent e5fc5aaffe
commit 15aa6e0552
11 changed files with 280 additions and 127 deletions

View File

@ -1,5 +1,18 @@
*SVN*
* Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE". [Shugo Maeda]
# Obtain an exclusive lock on person 1 so we can safely increment visits.
Person.transaction do
# select * from people where id=1 for update
person = Person.find(1, :lock => true)
person.visits += 1
person.save!
end
* PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency. [Jeremy Kemper]
* Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections. [Jeremy Kemper]
* Change AR::Base#to_param to return a String instead of a Fixnum. Closes #5320. [Nicholas Seckar]
* Use explicit delegation instead of method aliasing for AR::Base.to_param -> AR::Base.id. #5299 (skaes@web.de)

View File

@ -365,6 +365,8 @@ module ActiveRecord #:nodoc:
# * <tt>:select</tt>: By default, this is * as in SELECT * FROM, but can be changed if you for example want to do a join, but not
# include the joined columns.
# * <tt>:readonly</tt>: Mark the returned records read-only so they cannot be saved or updated.
# * <tt>:lock</tt>: An SQL fragment like "FOR UPDATE" or "LOCK IN SHARE MODE".
# :lock => true gives connection's default exclusive lock, usually "FOR UPDATE".
#
# Examples for find by id:
# Person.find(1) # returns the object for ID = 1
@ -384,6 +386,17 @@ module ActiveRecord #:nodoc:
# Person.find(:all, :offset => 10, :limit => 10)
# Person.find(:all, :include => [ :account, :friends ])
# Person.find(:all, :group => "category")
#
# Example for find with a lock. Imagine two concurrent transactions:
# each will read person.visits == 2, add 1 to it, and save, resulting
# in two saves of person.visits = 3. By locking the row, the second
# transaction has to wait until the first is finished; we get the
# expected person.visits == 4.
# Person.transaction do
# person = Person.find(1, :lock => true)
# person.visits += 1
# person.save!
# end
def find(*args)
options = extract_options_from_args!(args)
validate_find_options(options)
@ -850,7 +863,7 @@ module ActiveRecord #:nodoc:
method_scoping.assert_valid_keys([ :find, :create ])
if f = method_scoping[:find]
f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly ])
f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly, :lock ])
f[:readonly] = true if !f[:joins].blank? && !f.has_key?(:readonly)
end
@ -1028,6 +1041,7 @@ module ActiveRecord #:nodoc:
add_order!(sql, options[:order])
add_limit!(sql, options, scope)
add_lock!(sql, options, scope)
sql
end
@ -1061,13 +1075,18 @@ module ActiveRecord #:nodoc:
# The optional scope argument is for the current :find scope.
def add_limit!(sql, options, scope = :auto)
scope = scope(:find) if :auto == scope
if scope
options[:limit] ||= scope[:limit]
options[:offset] ||= scope[:offset]
end
options = options.reverse_merge(:limit => scope[:limit], :offset => scope[:offset]) if scope
connection.add_limit_offset!(sql, options)
end
# The optional scope argument is for the current :find scope.
# The :lock option has precedence over a scoped :lock.
def add_lock!(sql, options, scope = :auto)
scope = scope(:find) if :auto == :scope
options = options.reverse_merge(:lock => scope[:lock]) if scope
connection.add_lock!(sql, options)
end
# The optional scope argument is for the current :find scope.
def add_joins!(sql, options, scope = :auto)
scope = scope(:find) if :auto == scope
@ -1361,12 +1380,12 @@ module ActiveRecord #:nodoc:
end
VALID_FIND_OPTIONS = [ :conditions, :include, :joins, :limit, :offset,
:order, :select, :readonly, :group, :from ]
:order, :select, :readonly, :group, :from, :lock ]
def validate_find_options(options) #:nodoc:
options.assert_valid_keys(VALID_FIND_OPTIONS)
end
def set_readonly_option!(options) #:nodoc:
# Inherit :readonly from finder scope if set. Otherwise,
# if :joins is not blank then :readonly defaults to true.
@ -2025,4 +2044,4 @@ module ActiveRecord #:nodoc:
value
end
end
end
end

View File

@ -248,7 +248,8 @@ module ActiveRecord
if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter)
active_connections[name] = spec
elsif spec.kind_of?(ConnectionSpecification)
self.connection = self.send(spec.adapter_method, spec.config)
config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency)
self.connection = self.send(spec.adapter_method, config)
elsif spec.nil?
raise ConnectionNotEstablished
else

View File

@ -91,6 +91,17 @@ module ActiveRecord
end
end
# Appends a locking clause to a SQL statement. *Modifies the +sql+ parameter*.
# # SELECT * FROM suppliers FOR UPDATE
# add_lock! 'SELECT * FROM suppliers', :lock => true
# add_lock! 'SELECT * FROM suppliers', :lock => ' FOR UPDATE'
def add_lock!(sql, options)
case lock = options[:lock]
when true: sql << ' FOR UPDATE'
when String: sql << " #{lock}"
end
end
def default_sequence_name(table, column)
nil
end

View File

@ -46,6 +46,7 @@ module ActiveRecord
# * <tt>:schema_search_path</tt> -- An optional schema search path for the connection given as a string of comma-separated schema names. This is backward-compatible with the :schema_order option.
# * <tt>:encoding</tt> -- An optional client encoding that is using in a SET client_encoding TO <encoding> call on connection.
# * <tt>:min_messages</tt> -- An optional client min messages that is using in a SET client_min_messages TO <min_messages> call on connection.
# * <tt>:allow_concurrency</tt> -- If true, use async query methods so Ruby threads don't deadlock; otherwise, use blocking query methods.
class PostgreSQLAdapter < AbstractAdapter
def adapter_name
'PostgreSQL'
@ -54,6 +55,7 @@ module ActiveRecord
def initialize(connection, logger, config = {})
super(connection, logger)
@config = config
@async = config[:allow_concurrency]
configure_connection
end
@ -67,7 +69,7 @@ module ActiveRecord
end
# postgres-pr raises a NoMethodError when querying if no conn is available
rescue PGError, NoMethodError
false
false
end
# Close then reopen the connection.
@ -78,7 +80,7 @@ module ActiveRecord
configure_connection
end
end
def disconnect!
# Both postgres and postgres-pr respond to :close
@connection.close rescue nil
@ -99,11 +101,11 @@ module ActiveRecord
:boolean => { :name => "boolean" }
}
end
def supports_migrations?
true
end
end
def table_alias_length
63
end
@ -141,11 +143,23 @@ module ActiveRecord
end
def query(sql, name = nil) #:nodoc:
log(sql, name) { @connection.query(sql) }
log(sql, name) do
if @async
@connection.async_query(sql)
else
@connection.query(sql)
end
end
end
def execute(sql, name = nil) #:nodoc:
log(sql, name) { @connection.exec(sql) }
log(sql, name) do
if @async
@connection.async_exec(sql)
else
@connection.exec(sql)
end
end
end
def update(sql, name = nil) #:nodoc:
@ -162,7 +176,7 @@ module ActiveRecord
def commit_db_transaction #:nodoc:
execute "COMMIT"
end
def rollback_db_transaction #:nodoc:
execute "ROLLBACK"
end
@ -261,7 +275,7 @@ module ActiveRecord
def pk_and_sequence_for(table)
# First try looking for a sequence with a dependency on the
# given table's primary key.
result = execute(<<-end_sql, 'PK and serial sequence')[0]
result = query(<<-end_sql, 'PK and serial sequence')[0]
SELECT attr.attname, name.nspname, seq.relname
FROM pg_class seq,
pg_attribute attr,
@ -284,7 +298,7 @@ module ActiveRecord
# Support the 7.x and 8.0 nextval('foo'::text) as well as
# the 8.1+ nextval('foo'::regclass).
# TODO: assumes sequence is in same schema as table.
result = execute(<<-end_sql, 'PK and custom sequence')[0]
result = query(<<-end_sql, 'PK and custom sequence')[0]
SELECT attr.attname, name.nspname, split_part(def.adsrc, '\\\'', 2)
FROM pg_class t
JOIN pg_namespace name ON (t.relnamespace = name.oid)
@ -305,7 +319,7 @@ module ActiveRecord
def rename_table(name, new_name)
execute "ALTER TABLE #{name} RENAME TO #{new_name}"
end
def add_column(table_name, column_name, type, options = {})
execute("ALTER TABLE #{table_name} ADD #{column_name} #{type_to_sql(type, options[:limit])}")
execute("ALTER TABLE #{table_name} ALTER #{column_name} SET NOT NULL") if options[:null] == false
@ -325,12 +339,12 @@ module ActiveRecord
commit_db_transaction
end
change_column_default(table_name, column_name, options[:default]) unless options[:default].nil?
end
end
def change_column_default(table_name, column_name, default) #:nodoc:
execute "ALTER TABLE #{table_name} ALTER COLUMN #{column_name} SET DEFAULT '#{default}'"
end
def rename_column(table_name, column_name, new_column_name) #:nodoc:
execute "ALTER TABLE #{table_name} RENAME COLUMN #{column_name} TO #{new_column_name}"
end
@ -379,7 +393,7 @@ module ActiveRecord
hashed_row = {}
row.each_index do |cel_index|
column = row[cel_index]
case res.type(cel_index)
when BYTEA_COLUMN_TYPE_OID
column = unescape_bytea(column)
@ -392,6 +406,7 @@ module ActiveRecord
rows << hashed_row
end
end
res.clear
return rows
end
@ -442,7 +457,7 @@ module ActiveRecord
end
unescape_bytea(s)
end
# Query a table's column names, default values, and types.
#
# The underlying query is roughly:
@ -482,7 +497,7 @@ module ActiveRecord
when /^real|^money/i then 'float'
when /^interval/i then 'string'
# geometric types (the line type is currently not implemented in postgresql)
when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string'
when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string'
when /^bytea/i then 'binary'
else field_type # Pass through standard types.
end
@ -492,16 +507,16 @@ module ActiveRecord
# Boolean types
return "t" if value =~ /true/i
return "f" if value =~ /false/i
# Char/String/Bytea type values
return $1 if value =~ /^'(.*)'::(bpchar|text|character varying|bytea)$/
# Numeric values
return value if value =~ /^-?[0-9]+(\.[0-9]*)?/
# Fixed dates / times
return $1 if value =~ /^'(.+)'::(date|timestamp)/
# Anything else is blank, some user type, or some function
# and we can't know the value of that, so return nil.
return nil

View File

@ -184,6 +184,12 @@ module ActiveRecord
end
# SELECT ... FOR UPDATE is redundant since the table is locked.
def add_lock!(sql, options) #:nodoc:
sql
end
# SCHEMA STATEMENTS ========================================
def tables(name = nil) #:nodoc:

View File

@ -519,7 +519,7 @@ module Test #:nodoc:
load_fixtures
@@already_loaded_fixtures[self.class] = @loaded_fixtures
end
ActiveRecord::Base.lock_mutex
ActiveRecord::Base.send :increment_open_transactions
ActiveRecord::Base.connection.begin_db_transaction
# Load fixtures for every test.
@ -538,7 +538,7 @@ module Test #:nodoc:
# Rollback changes.
if use_transactional_fixtures?
ActiveRecord::Base.connection.rollback_db_transaction
ActiveRecord::Base.unlock_mutex
ActiveRecord::Base.send :decrement_open_transactions
end
ActiveRecord::Base.verify_active_connections!
end

View File

@ -4,8 +4,6 @@ require 'thread'
module ActiveRecord
module Transactions # :nodoc:
TRANSACTION_MUTEX = Mutex.new
class TransactionError < ActiveRecordError # :nodoc:
end
@ -79,8 +77,8 @@ module ActiveRecord
module ClassMethods
def transaction(*objects, &block)
previous_handler = trap('TERM') { raise TransactionError, "Transaction aborted" }
lock_mutex
increment_open_transactions
begin
objects.each { |o| o.extend(Transaction::Simple) }
objects.each { |o| o.start_transaction }
@ -93,22 +91,21 @@ module ActiveRecord
objects.each { |o| o.abort_transaction }
raise
ensure
unlock_mutex
decrement_open_transactions
trap('TERM', previous_handler)
end
end
def lock_mutex#:nodoc:
Thread.current['open_transactions'] ||= 0
TRANSACTION_MUTEX.lock if Thread.current['open_transactions'] == 0
Thread.current['start_db_transaction'] = (Thread.current['open_transactions'] == 0)
Thread.current['open_transactions'] += 1
end
def unlock_mutex#:nodoc:
Thread.current['open_transactions'] -= 1
TRANSACTION_MUTEX.unlock if Thread.current['open_transactions'] == 0
end
private
def increment_open_transactions #:nodoc:
open = Thread.current['open_transactions'] ||= 0
Thread.current['start_db_transaction'] = open.zero?
Thread.current['open_transactions'] = open + 1
end
def decrement_open_transactions #:nodoc:
Thread.current['open_transactions'] -= 1
end
end
def transaction(*objects, &block)

View File

@ -2,16 +2,16 @@ require 'abstract_unit'
require 'fixtures/person'
require 'fixtures/legacy_thing'
class LockingTest < Test::Unit::TestCase
class OptimisticLockingTest < Test::Unit::TestCase
fixtures :people, :legacy_things
def test_lock_existing
p1 = Person.find(1)
p2 = Person.find(1)
p1.first_name = "Michael"
p1.save
assert_raises(ActiveRecord::StaleObjectError) {
p2.first_name = "should fail"
p2.save
@ -24,13 +24,13 @@ class LockingTest < Test::Unit::TestCase
assert_equal p1.id, p2.id
p1.first_name = "Anika"
p1.save
assert_raises(ActiveRecord::StaleObjectError) {
p2.first_name = "should fail"
p2.save
}
end
def test_lock_column_name_existing
t1 = LegacyThing.find(1)
t2 = LegacyThing.find(1)
@ -41,6 +41,85 @@ class LockingTest < Test::Unit::TestCase
t2.tps_report_number = 300
t2.save
}
end
end
end
# TODO: test against the generated SQL since testing locking behavior itself
# is so cumbersome. Will deadlock Ruby threads if the underlying db.execute
# blocks, so separate script called by Kernel#system is needed.
# (See exec vs. async_exec in the PostgreSQL adapter.)
class PessimisticLockingTest < Test::Unit::TestCase
self.use_transactional_fixtures = false
fixtures :people
def setup
@allow_concurrency = ActiveRecord::Base.allow_concurrency
ActiveRecord::Base.allow_concurrency = true
end
def teardown
ActiveRecord::Base.allow_concurrency = @allow_concurrency
end
# Test that the adapter doesn't blow up on add_lock!
def test_sane_find_with_lock
assert_nothing_raised do
Person.transaction do
Person.find 1, :lock => true
end
end
end
# Test no-blowup for scoped lock.
def test_sane_find_with_lock
assert_nothing_raised do
Person.transaction do
Person.with_scope(:find => { :lock => true }) do
Person.find 1
end
end
end
end
if current_adapter?(:PostgreSQLAdapter)
def test_no_locks_no_wait
first, second = duel { Person.find 1 }
assert first.end > second.end
end
def test_second_lock_waits
first, second = duel { Person.find 1, :lock => true }
assert second.end > first.end
end
protected
def duel(zzz = 0.2)
t0, t1, t2, t3 = nil, nil, nil, nil
a = Thread.new do
t0 = Time.now
Person.transaction do
yield
sleep zzz # block thread 2 for zzz seconds
end
t1 = Time.now
end
b = Thread.new do
sleep zzz / 2.0 # ensure thread 1 tx starts first
t2 = Time.now
Person.transaction { yield }
t3 = Time.now
end
a.join
b.join
assert t1 > t0 + zzz
assert t2 > t0
assert t3 > t2
[t0.to_f..t1.to_f, t2.to_f..t3.to_f]
end
end
end

View File

@ -1,5 +1,6 @@
require 'abstract_unit'
require 'fixtures/topic'
require 'fixtures/reply'
unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name
class ThreadedConnectionsTest < Test::Unit::TestCase

View File

@ -5,12 +5,9 @@ require 'fixtures/developer'
class TransactionTest < Test::Unit::TestCase
self.use_transactional_fixtures = false
fixtures :topics, :developers
def setup
# sqlite does not seem to return these in the right order, so we sort them
# explicitly for sqlite's sake. sqlite3 does fine.
@first, @second = Topic.find(1, 2).sort_by { |t| t.id }
end
@ -137,75 +134,6 @@ class TransactionTest < Test::Unit::TestCase
assert !Topic.find(2).approved?, "Second should have been unapproved"
end
# This will cause transactions to overlap and fail unless they are
# performed on separate database connections.
def test_transaction_per_thread
assert_nothing_raised do
threads = (1..20).map do
Thread.new do
Topic.transaction do
topic = Topic.find(:first)
topic.approved = !topic.approved?
topic.save!
topic.approved = !topic.approved?
topic.save!
end
end
end
threads.each { |t| t.join }
end
end
# Test for dirty reads among simultaneous transactions.
def test_transaction_isolation__read_committed
# Should be invariant.
original_salary = Developer.find(1).salary
temporary_salary = 200000
assert_nothing_raised do
threads = (1..20).map do
Thread.new do
Developer.transaction do
# Expect original salary.
dev = Developer.find(1)
assert_equal original_salary, dev.salary
dev.salary = temporary_salary
dev.save!
# Expect temporary salary.
dev = Developer.find(1)
assert_equal temporary_salary, dev.salary
dev.salary = original_salary
dev.save!
# Expect original salary.
dev = Developer.find(1)
assert_equal original_salary, dev.salary
end
end
end
# Keep our eyes peeled.
threads << Thread.new do
10.times do
sleep 0.05
Developer.transaction do
# Always expect original salary.
assert_equal original_salary, Developer.find(1).salary
end
end
end
threads.each { |t| t.join }
end
assert_equal original_salary, Developer.find(1).salary
end
private
def add_exception_raising_after_save_callback_to_topic
Topic.class_eval { def after_save() raise "Make the transaction rollback" end }
@ -215,3 +143,86 @@ class TransactionTest < Test::Unit::TestCase
Topic.class_eval { remove_method :after_save }
end
end
if current_adapter?(:PostgreSQLAdapter)
class ConcurrentTransactionTest < TransactionTest
def setup
@allow_concurrency = ActiveRecord::Base.allow_concurrency
ActiveRecord::Base.allow_concurrency = true
super
end
def teardown
super
ActiveRecord::Base.allow_concurrency = @allow_concurrency
end
# This will cause transactions to overlap and fail unless they are performed on
# separate database connections.
def test_transaction_per_thread
assert_nothing_raised do
threads = (1..3).map do
Thread.new do
Topic.transaction do
topic = Topic.find(1)
topic.approved = !topic.approved?
topic.save!
topic.approved = !topic.approved?
topic.save!
end
end
end
threads.each { |t| t.join }
end
end
# Test for dirty reads among simultaneous transactions.
def test_transaction_isolation__read_committed
# Should be invariant.
original_salary = Developer.find(1).salary
temporary_salary = 200000
assert_nothing_raised do
threads = (1..3).map do
Thread.new do
Developer.transaction do
# Expect original salary.
dev = Developer.find(1)
assert_equal original_salary, dev.salary
dev.salary = temporary_salary
dev.save!
# Expect temporary salary.
dev = Developer.find(1)
assert_equal temporary_salary, dev.salary
dev.salary = original_salary
dev.save!
# Expect original salary.
dev = Developer.find(1)
assert_equal original_salary, dev.salary
end
end
end
# Keep our eyes peeled.
threads << Thread.new do
10.times do
sleep 0.05
Developer.transaction do
# Always expect original salary.
assert_equal original_salary, Developer.find(1).salary
end
end
end
threads.each { |t| t.join }
end
assert_equal original_salary, Developer.find(1).salary
end
end
end