diff --git a/activerecord/CHANGELOG b/activerecord/CHANGELOG index dc18983ff6..8a7b638f18 100644 --- a/activerecord/CHANGELOG +++ b/activerecord/CHANGELOG @@ -1,5 +1,18 @@ *SVN* +* Row locking. Provide a locking clause with the :lock finder option or true for the default "FOR UPDATE". [Shugo Maeda] + # Obtain an exclusive lock on person 1 so we can safely increment visits. + Person.transaction do + # select * from people where id=1 for update + person = Person.find(1, :lock => true) + person.visits += 1 + person.save! + end + +* PostgreSQL: introduce allow_concurrency option which determines whether to use blocking or asynchronous #execute. Adapters with blocking #execute will deadlock Ruby threads. The default value is ActiveRecord::Base.allow_concurrency. [Jeremy Kemper] + +* Use a per-thread (rather than global) transaction mutex so you may execute concurrent transactions on separate connections. [Jeremy Kemper] + * Change AR::Base#to_param to return a String instead of a Fixnum. Closes #5320. [Nicholas Seckar] * Use explicit delegation instead of method aliasing for AR::Base.to_param -> AR::Base.id. #5299 (skaes@web.de) diff --git a/activerecord/lib/active_record/base.rb b/activerecord/lib/active_record/base.rb index fe528f514c..55befcd29f 100755 --- a/activerecord/lib/active_record/base.rb +++ b/activerecord/lib/active_record/base.rb @@ -365,6 +365,8 @@ module ActiveRecord #:nodoc: # * :select: By default, this is * as in SELECT * FROM, but can be changed if you for example want to do a join, but not # include the joined columns. # * :readonly: Mark the returned records read-only so they cannot be saved or updated. + # * :lock: An SQL fragment like "FOR UPDATE" or "LOCK IN SHARE MODE". + # :lock => true gives connection's default exclusive lock, usually "FOR UPDATE". # # Examples for find by id: # Person.find(1) # returns the object for ID = 1 @@ -384,6 +386,17 @@ module ActiveRecord #:nodoc: # Person.find(:all, :offset => 10, :limit => 10) # Person.find(:all, :include => [ :account, :friends ]) # Person.find(:all, :group => "category") + # + # Example for find with a lock. Imagine two concurrent transactions: + # each will read person.visits == 2, add 1 to it, and save, resulting + # in two saves of person.visits = 3. By locking the row, the second + # transaction has to wait until the first is finished; we get the + # expected person.visits == 4. + # Person.transaction do + # person = Person.find(1, :lock => true) + # person.visits += 1 + # person.save! + # end def find(*args) options = extract_options_from_args!(args) validate_find_options(options) @@ -850,7 +863,7 @@ module ActiveRecord #:nodoc: method_scoping.assert_valid_keys([ :find, :create ]) if f = method_scoping[:find] - f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly ]) + f.assert_valid_keys([ :conditions, :joins, :select, :include, :from, :offset, :limit, :order, :readonly, :lock ]) f[:readonly] = true if !f[:joins].blank? && !f.has_key?(:readonly) end @@ -1028,6 +1041,7 @@ module ActiveRecord #:nodoc: add_order!(sql, options[:order]) add_limit!(sql, options, scope) + add_lock!(sql, options, scope) sql end @@ -1061,13 +1075,18 @@ module ActiveRecord #:nodoc: # The optional scope argument is for the current :find scope. def add_limit!(sql, options, scope = :auto) scope = scope(:find) if :auto == scope - if scope - options[:limit] ||= scope[:limit] - options[:offset] ||= scope[:offset] - end + options = options.reverse_merge(:limit => scope[:limit], :offset => scope[:offset]) if scope connection.add_limit_offset!(sql, options) end + # The optional scope argument is for the current :find scope. + # The :lock option has precedence over a scoped :lock. + def add_lock!(sql, options, scope = :auto) + scope = scope(:find) if :auto == :scope + options = options.reverse_merge(:lock => scope[:lock]) if scope + connection.add_lock!(sql, options) + end + # The optional scope argument is for the current :find scope. def add_joins!(sql, options, scope = :auto) scope = scope(:find) if :auto == scope @@ -1361,12 +1380,12 @@ module ActiveRecord #:nodoc: end VALID_FIND_OPTIONS = [ :conditions, :include, :joins, :limit, :offset, - :order, :select, :readonly, :group, :from ] - + :order, :select, :readonly, :group, :from, :lock ] + def validate_find_options(options) #:nodoc: options.assert_valid_keys(VALID_FIND_OPTIONS) end - + def set_readonly_option!(options) #:nodoc: # Inherit :readonly from finder scope if set. Otherwise, # if :joins is not blank then :readonly defaults to true. @@ -2025,4 +2044,4 @@ module ActiveRecord #:nodoc: value end end -end \ No newline at end of file +end diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb index 2cdc8af688..ae59176fa3 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_specification.rb @@ -248,7 +248,8 @@ module ActiveRecord if spec.kind_of?(ActiveRecord::ConnectionAdapters::AbstractAdapter) active_connections[name] = spec elsif spec.kind_of?(ConnectionSpecification) - self.connection = self.send(spec.adapter_method, spec.config) + config = spec.config.reverse_merge(:allow_concurrency => @@allow_concurrency) + self.connection = self.send(spec.adapter_method, config) elsif spec.nil? raise ConnectionNotEstablished else diff --git a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb index c45454eacc..d91b919116 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/database_statements.rb @@ -91,6 +91,17 @@ module ActiveRecord end end + # Appends a locking clause to a SQL statement. *Modifies the +sql+ parameter*. + # # SELECT * FROM suppliers FOR UPDATE + # add_lock! 'SELECT * FROM suppliers', :lock => true + # add_lock! 'SELECT * FROM suppliers', :lock => ' FOR UPDATE' + def add_lock!(sql, options) + case lock = options[:lock] + when true: sql << ' FOR UPDATE' + when String: sql << " #{lock}" + end + end + def default_sequence_name(table, column) nil end diff --git a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb index 594d5c8f43..f46217d4f6 100644 --- a/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/postgresql_adapter.rb @@ -46,6 +46,7 @@ module ActiveRecord # * :schema_search_path -- An optional schema search path for the connection given as a string of comma-separated schema names. This is backward-compatible with the :schema_order option. # * :encoding -- An optional client encoding that is using in a SET client_encoding TO call on connection. # * :min_messages -- An optional client min messages that is using in a SET client_min_messages TO call on connection. + # * :allow_concurrency -- If true, use async query methods so Ruby threads don't deadlock; otherwise, use blocking query methods. class PostgreSQLAdapter < AbstractAdapter def adapter_name 'PostgreSQL' @@ -54,6 +55,7 @@ module ActiveRecord def initialize(connection, logger, config = {}) super(connection, logger) @config = config + @async = config[:allow_concurrency] configure_connection end @@ -67,7 +69,7 @@ module ActiveRecord end # postgres-pr raises a NoMethodError when querying if no conn is available rescue PGError, NoMethodError - false + false end # Close then reopen the connection. @@ -78,7 +80,7 @@ module ActiveRecord configure_connection end end - + def disconnect! # Both postgres and postgres-pr respond to :close @connection.close rescue nil @@ -99,11 +101,11 @@ module ActiveRecord :boolean => { :name => "boolean" } } end - + def supports_migrations? true - end - + end + def table_alias_length 63 end @@ -141,11 +143,23 @@ module ActiveRecord end def query(sql, name = nil) #:nodoc: - log(sql, name) { @connection.query(sql) } + log(sql, name) do + if @async + @connection.async_query(sql) + else + @connection.query(sql) + end + end end def execute(sql, name = nil) #:nodoc: - log(sql, name) { @connection.exec(sql) } + log(sql, name) do + if @async + @connection.async_exec(sql) + else + @connection.exec(sql) + end + end end def update(sql, name = nil) #:nodoc: @@ -162,7 +176,7 @@ module ActiveRecord def commit_db_transaction #:nodoc: execute "COMMIT" end - + def rollback_db_transaction #:nodoc: execute "ROLLBACK" end @@ -261,7 +275,7 @@ module ActiveRecord def pk_and_sequence_for(table) # First try looking for a sequence with a dependency on the # given table's primary key. - result = execute(<<-end_sql, 'PK and serial sequence')[0] + result = query(<<-end_sql, 'PK and serial sequence')[0] SELECT attr.attname, name.nspname, seq.relname FROM pg_class seq, pg_attribute attr, @@ -284,7 +298,7 @@ module ActiveRecord # Support the 7.x and 8.0 nextval('foo'::text) as well as # the 8.1+ nextval('foo'::regclass). # TODO: assumes sequence is in same schema as table. - result = execute(<<-end_sql, 'PK and custom sequence')[0] + result = query(<<-end_sql, 'PK and custom sequence')[0] SELECT attr.attname, name.nspname, split_part(def.adsrc, '\\\'', 2) FROM pg_class t JOIN pg_namespace name ON (t.relnamespace = name.oid) @@ -305,7 +319,7 @@ module ActiveRecord def rename_table(name, new_name) execute "ALTER TABLE #{name} RENAME TO #{new_name}" end - + def add_column(table_name, column_name, type, options = {}) execute("ALTER TABLE #{table_name} ADD #{column_name} #{type_to_sql(type, options[:limit])}") execute("ALTER TABLE #{table_name} ALTER #{column_name} SET NOT NULL") if options[:null] == false @@ -325,12 +339,12 @@ module ActiveRecord commit_db_transaction end change_column_default(table_name, column_name, options[:default]) unless options[:default].nil? - end + end def change_column_default(table_name, column_name, default) #:nodoc: execute "ALTER TABLE #{table_name} ALTER COLUMN #{column_name} SET DEFAULT '#{default}'" end - + def rename_column(table_name, column_name, new_column_name) #:nodoc: execute "ALTER TABLE #{table_name} RENAME COLUMN #{column_name} TO #{new_column_name}" end @@ -379,7 +393,7 @@ module ActiveRecord hashed_row = {} row.each_index do |cel_index| column = row[cel_index] - + case res.type(cel_index) when BYTEA_COLUMN_TYPE_OID column = unescape_bytea(column) @@ -392,6 +406,7 @@ module ActiveRecord rows << hashed_row end end + res.clear return rows end @@ -442,7 +457,7 @@ module ActiveRecord end unescape_bytea(s) end - + # Query a table's column names, default values, and types. # # The underlying query is roughly: @@ -482,7 +497,7 @@ module ActiveRecord when /^real|^money/i then 'float' when /^interval/i then 'string' # geometric types (the line type is currently not implemented in postgresql) - when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string' + when /^(?:point|lseg|box|"?path"?|polygon|circle)/i then 'string' when /^bytea/i then 'binary' else field_type # Pass through standard types. end @@ -492,16 +507,16 @@ module ActiveRecord # Boolean types return "t" if value =~ /true/i return "f" if value =~ /false/i - + # Char/String/Bytea type values return $1 if value =~ /^'(.*)'::(bpchar|text|character varying|bytea)$/ - + # Numeric values return value if value =~ /^-?[0-9]+(\.[0-9]*)?/ # Fixed dates / times return $1 if value =~ /^'(.+)'::(date|timestamp)/ - + # Anything else is blank, some user type, or some function # and we can't know the value of that, so return nil. return nil diff --git a/activerecord/lib/active_record/connection_adapters/sqlite_adapter.rb b/activerecord/lib/active_record/connection_adapters/sqlite_adapter.rb index 7b7232ea13..fec30fb021 100644 --- a/activerecord/lib/active_record/connection_adapters/sqlite_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/sqlite_adapter.rb @@ -184,6 +184,12 @@ module ActiveRecord end + # SELECT ... FOR UPDATE is redundant since the table is locked. + def add_lock!(sql, options) #:nodoc: + sql + end + + # SCHEMA STATEMENTS ======================================== def tables(name = nil) #:nodoc: diff --git a/activerecord/lib/active_record/fixtures.rb b/activerecord/lib/active_record/fixtures.rb index 4e4102d17a..3180d91c46 100755 --- a/activerecord/lib/active_record/fixtures.rb +++ b/activerecord/lib/active_record/fixtures.rb @@ -519,7 +519,7 @@ module Test #:nodoc: load_fixtures @@already_loaded_fixtures[self.class] = @loaded_fixtures end - ActiveRecord::Base.lock_mutex + ActiveRecord::Base.send :increment_open_transactions ActiveRecord::Base.connection.begin_db_transaction # Load fixtures for every test. @@ -538,7 +538,7 @@ module Test #:nodoc: # Rollback changes. if use_transactional_fixtures? ActiveRecord::Base.connection.rollback_db_transaction - ActiveRecord::Base.unlock_mutex + ActiveRecord::Base.send :decrement_open_transactions end ActiveRecord::Base.verify_active_connections! end diff --git a/activerecord/lib/active_record/transactions.rb b/activerecord/lib/active_record/transactions.rb index c222e18097..5e82fd2d8e 100644 --- a/activerecord/lib/active_record/transactions.rb +++ b/activerecord/lib/active_record/transactions.rb @@ -4,8 +4,6 @@ require 'thread' module ActiveRecord module Transactions # :nodoc: - TRANSACTION_MUTEX = Mutex.new - class TransactionError < ActiveRecordError # :nodoc: end @@ -79,8 +77,8 @@ module ActiveRecord module ClassMethods def transaction(*objects, &block) previous_handler = trap('TERM') { raise TransactionError, "Transaction aborted" } - lock_mutex - + increment_open_transactions + begin objects.each { |o| o.extend(Transaction::Simple) } objects.each { |o| o.start_transaction } @@ -93,22 +91,21 @@ module ActiveRecord objects.each { |o| o.abort_transaction } raise ensure - unlock_mutex + decrement_open_transactions trap('TERM', previous_handler) end end - - def lock_mutex#:nodoc: - Thread.current['open_transactions'] ||= 0 - TRANSACTION_MUTEX.lock if Thread.current['open_transactions'] == 0 - Thread.current['start_db_transaction'] = (Thread.current['open_transactions'] == 0) - Thread.current['open_transactions'] += 1 - end - - def unlock_mutex#:nodoc: - Thread.current['open_transactions'] -= 1 - TRANSACTION_MUTEX.unlock if Thread.current['open_transactions'] == 0 - end + + private + def increment_open_transactions #:nodoc: + open = Thread.current['open_transactions'] ||= 0 + Thread.current['start_db_transaction'] = open.zero? + Thread.current['open_transactions'] = open + 1 + end + + def decrement_open_transactions #:nodoc: + Thread.current['open_transactions'] -= 1 + end end def transaction(*objects, &block) diff --git a/activerecord/test/locking_test.rb b/activerecord/test/locking_test.rb index 105f19f2bc..bacc7b8ae0 100644 --- a/activerecord/test/locking_test.rb +++ b/activerecord/test/locking_test.rb @@ -2,16 +2,16 @@ require 'abstract_unit' require 'fixtures/person' require 'fixtures/legacy_thing' -class LockingTest < Test::Unit::TestCase +class OptimisticLockingTest < Test::Unit::TestCase fixtures :people, :legacy_things def test_lock_existing p1 = Person.find(1) p2 = Person.find(1) - + p1.first_name = "Michael" p1.save - + assert_raises(ActiveRecord::StaleObjectError) { p2.first_name = "should fail" p2.save @@ -24,13 +24,13 @@ class LockingTest < Test::Unit::TestCase assert_equal p1.id, p2.id p1.first_name = "Anika" p1.save - + assert_raises(ActiveRecord::StaleObjectError) { p2.first_name = "should fail" p2.save } end - + def test_lock_column_name_existing t1 = LegacyThing.find(1) t2 = LegacyThing.find(1) @@ -41,6 +41,85 @@ class LockingTest < Test::Unit::TestCase t2.tps_report_number = 300 t2.save } - end - + end +end + + +# TODO: test against the generated SQL since testing locking behavior itself +# is so cumbersome. Will deadlock Ruby threads if the underlying db.execute +# blocks, so separate script called by Kernel#system is needed. +# (See exec vs. async_exec in the PostgreSQL adapter.) +class PessimisticLockingTest < Test::Unit::TestCase + self.use_transactional_fixtures = false + fixtures :people + + def setup + @allow_concurrency = ActiveRecord::Base.allow_concurrency + ActiveRecord::Base.allow_concurrency = true + end + + def teardown + ActiveRecord::Base.allow_concurrency = @allow_concurrency + end + + # Test that the adapter doesn't blow up on add_lock! + def test_sane_find_with_lock + assert_nothing_raised do + Person.transaction do + Person.find 1, :lock => true + end + end + end + + # Test no-blowup for scoped lock. + def test_sane_find_with_lock + assert_nothing_raised do + Person.transaction do + Person.with_scope(:find => { :lock => true }) do + Person.find 1 + end + end + end + end + + if current_adapter?(:PostgreSQLAdapter) + def test_no_locks_no_wait + first, second = duel { Person.find 1 } + assert first.end > second.end + end + + def test_second_lock_waits + first, second = duel { Person.find 1, :lock => true } + assert second.end > first.end + end + + protected + def duel(zzz = 0.2) + t0, t1, t2, t3 = nil, nil, nil, nil + + a = Thread.new do + t0 = Time.now + Person.transaction do + yield + sleep zzz # block thread 2 for zzz seconds + end + t1 = Time.now + end + + b = Thread.new do + sleep zzz / 2.0 # ensure thread 1 tx starts first + t2 = Time.now + Person.transaction { yield } + t3 = Time.now + end + + a.join + b.join + + assert t1 > t0 + zzz + assert t2 > t0 + assert t3 > t2 + [t0.to_f..t1.to_f, t2.to_f..t3.to_f] + end + end end diff --git a/activerecord/test/threaded_connections_test.rb b/activerecord/test/threaded_connections_test.rb index a812ec642c..aaa56b3bfe 100644 --- a/activerecord/test/threaded_connections_test.rb +++ b/activerecord/test/threaded_connections_test.rb @@ -1,5 +1,6 @@ require 'abstract_unit' require 'fixtures/topic' +require 'fixtures/reply' unless %w(FrontBase).include? ActiveRecord::Base.connection.adapter_name class ThreadedConnectionsTest < Test::Unit::TestCase diff --git a/activerecord/test/transactions_test.rb b/activerecord/test/transactions_test.rb index 421da4d7a6..a8584cfd56 100644 --- a/activerecord/test/transactions_test.rb +++ b/activerecord/test/transactions_test.rb @@ -5,12 +5,9 @@ require 'fixtures/developer' class TransactionTest < Test::Unit::TestCase self.use_transactional_fixtures = false - fixtures :topics, :developers def setup - # sqlite does not seem to return these in the right order, so we sort them - # explicitly for sqlite's sake. sqlite3 does fine. @first, @second = Topic.find(1, 2).sort_by { |t| t.id } end @@ -137,75 +134,6 @@ class TransactionTest < Test::Unit::TestCase assert !Topic.find(2).approved?, "Second should have been unapproved" end - # This will cause transactions to overlap and fail unless they are - # performed on separate database connections. - def test_transaction_per_thread - assert_nothing_raised do - threads = (1..20).map do - Thread.new do - Topic.transaction do - topic = Topic.find(:first) - topic.approved = !topic.approved? - topic.save! - topic.approved = !topic.approved? - topic.save! - end - end - end - - threads.each { |t| t.join } - end - end - - # Test for dirty reads among simultaneous transactions. - def test_transaction_isolation__read_committed - # Should be invariant. - original_salary = Developer.find(1).salary - temporary_salary = 200000 - - assert_nothing_raised do - threads = (1..20).map do - Thread.new do - Developer.transaction do - # Expect original salary. - dev = Developer.find(1) - assert_equal original_salary, dev.salary - - dev.salary = temporary_salary - dev.save! - - # Expect temporary salary. - dev = Developer.find(1) - assert_equal temporary_salary, dev.salary - - dev.salary = original_salary - dev.save! - - # Expect original salary. - dev = Developer.find(1) - assert_equal original_salary, dev.salary - end - end - end - - # Keep our eyes peeled. - threads << Thread.new do - 10.times do - sleep 0.05 - Developer.transaction do - # Always expect original salary. - assert_equal original_salary, Developer.find(1).salary - end - end - end - - threads.each { |t| t.join } - end - - assert_equal original_salary, Developer.find(1).salary - end - - private def add_exception_raising_after_save_callback_to_topic Topic.class_eval { def after_save() raise "Make the transaction rollback" end } @@ -215,3 +143,86 @@ class TransactionTest < Test::Unit::TestCase Topic.class_eval { remove_method :after_save } end end + +if current_adapter?(:PostgreSQLAdapter) + class ConcurrentTransactionTest < TransactionTest + def setup + @allow_concurrency = ActiveRecord::Base.allow_concurrency + ActiveRecord::Base.allow_concurrency = true + super + end + + def teardown + super + ActiveRecord::Base.allow_concurrency = @allow_concurrency + end + + # This will cause transactions to overlap and fail unless they are performed on + # separate database connections. + def test_transaction_per_thread + assert_nothing_raised do + threads = (1..3).map do + Thread.new do + Topic.transaction do + topic = Topic.find(1) + topic.approved = !topic.approved? + topic.save! + topic.approved = !topic.approved? + topic.save! + end + end + end + + threads.each { |t| t.join } + end + end + + # Test for dirty reads among simultaneous transactions. + def test_transaction_isolation__read_committed + # Should be invariant. + original_salary = Developer.find(1).salary + temporary_salary = 200000 + + assert_nothing_raised do + threads = (1..3).map do + Thread.new do + Developer.transaction do + # Expect original salary. + dev = Developer.find(1) + assert_equal original_salary, dev.salary + + dev.salary = temporary_salary + dev.save! + + # Expect temporary salary. + dev = Developer.find(1) + assert_equal temporary_salary, dev.salary + + dev.salary = original_salary + dev.save! + + # Expect original salary. + dev = Developer.find(1) + assert_equal original_salary, dev.salary + end + end + end + + # Keep our eyes peeled. + threads << Thread.new do + 10.times do + sleep 0.05 + Developer.transaction do + # Always expect original salary. + assert_equal original_salary, Developer.find(1).salary + end + end + end + + threads.each { |t| t.join } + end + + assert_equal original_salary, Developer.find(1).salary + end + end +end