diff --git a/activerecord/CHANGELOG.md b/activerecord/CHANGELOG.md index 9f6d4b2eda..2f6731ebf6 100644 --- a/activerecord/CHANGELOG.md +++ b/activerecord/CHANGELOG.md @@ -1,3 +1,11 @@ +* Make `ActiveRecord::ConnectionPool` Fiber-safe + + When `ActiveSupport::IsolatedExecutionState.isolation_level` is set to `:fiber`, + the connection pool now supports multiple Fibers from the same Thread checking + out connections from the pool. + + *Alex Matchneer* + * Add `update_attribute!` to `ActiveRecord::Persistence` Similar to `update_attribute`, but raises `ActiveRecord::RecordNotSaved` when a `before_*` callback throws `:abort`. diff --git a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb index 43043b3ec1..e2ab70eec5 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb @@ -166,7 +166,7 @@ module ActiveRecord def lock_thread=(lock_thread) if lock_thread - @lock_thread = Thread.current + @lock_thread = ActiveSupport::IsolatedExecutionState.context else @lock_thread = nil end @@ -197,7 +197,7 @@ module ActiveRecord # This method only works for connections that have been obtained through # #connection or #with_connection methods, connections obtained through # #checkout will not be automatically released. - def release_connection(owner_thread = Thread.current) + def release_connection(owner_thread = ActiveSupport::IsolatedExecutionState.context) if conn = @thread_cached_conns.delete(connection_cache_key(owner_thread)) checkin conn end @@ -208,7 +208,7 @@ module ActiveRecord # exists checkout a connection, yield it to the block, and checkin the # connection when finished. def with_connection - unless conn = @thread_cached_conns[connection_cache_key(Thread.current)] + unless conn = @thread_cached_conns[connection_cache_key(ActiveSupport::IsolatedExecutionState.context)] conn = connection fresh_connection = true end @@ -510,7 +510,7 @@ module ActiveRecord end def current_thread - @lock_thread || Thread.current + @lock_thread || ActiveSupport::IsolatedExecutionState.context end # Take control of all existing connections so a "group" action such as @@ -527,13 +527,13 @@ module ActiveRecord def attempt_to_checkout_all_existing_connections(raise_on_acquisition_timeout = true) collected_conns = synchronize do # account for our own connections - @connections.select { |conn| conn.owner == Thread.current } + @connections.select { |conn| conn.owner == ActiveSupport::IsolatedExecutionState.context } end newly_checked_out = [] timeout_time = Process.clock_gettime(Process::CLOCK_MONOTONIC) + (@checkout_timeout * 2) - @available.with_a_bias_for(Thread.current) do + @available.with_a_bias_for(ActiveSupport::IsolatedExecutionState.context) do loop do synchronize do return if collected_conns.size == @connections.size && @now_connecting == 0 @@ -580,7 +580,7 @@ module ActiveRecord thread_report = [] @connections.each do |conn| - unless conn.owner == Thread.current + unless conn.owner == ActiveSupport::IsolatedExecutionState.context thread_report << "#{conn} is owned by #{conn.owner}" end end diff --git a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb index c55e12121f..428717c48c 100644 --- a/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb +++ b/activerecord/lib/active_record/connection_adapters/abstract_adapter.rb @@ -224,16 +224,16 @@ module ActiveRecord def lease if in_use? msg = +"Cannot lease connection, " - if @owner == Thread.current + if @owner == ActiveSupport::IsolatedExecutionState.context msg << "it is already leased by the current thread." else msg << "it is already in use by a different thread: #{@owner}. " \ - "Current thread: #{Thread.current}." + "Current thread: #{ActiveSupport::IsolatedExecutionState.context}." end raise ActiveRecordError, msg end - @owner = Thread.current + @owner = ActiveSupport::IsolatedExecutionState.context end def connection_class # :nodoc: @@ -264,10 +264,10 @@ module ActiveRecord # this method must only be called while holding connection pool's mutex def expire if in_use? - if @owner != Thread.current + if @owner != ActiveSupport::IsolatedExecutionState.context raise ActiveRecordError, "Cannot expire connection, " \ "it is owned by a different thread: #{@owner}. " \ - "Current thread: #{Thread.current}." + "Current thread: #{ActiveSupport::IsolatedExecutionState.context}." end @idle_since = Process.clock_gettime(Process::CLOCK_MONOTONIC) @@ -280,10 +280,10 @@ module ActiveRecord # this method must only be called while holding connection pool's mutex (and a desire for segfaults) def steal! # :nodoc: if in_use? - if @owner != Thread.current + if @owner != ActiveSupport::IsolatedExecutionState.context pool.send :remove_connection_from_thread_cache, self, @owner - @owner = Thread.current + @owner = ActiveSupport::IsolatedExecutionState.context end else raise ActiveRecordError, "Cannot steal connection, it is not currently leased." diff --git a/activerecord/test/cases/connection_pool_test.rb b/activerecord/test/cases/connection_pool_test.rb index 37f1eb756f..8c0078f149 100644 --- a/activerecord/test/cases/connection_pool_test.rb +++ b/activerecord/test/cases/connection_pool_test.rb @@ -5,11 +5,11 @@ require "concurrent/atomic/count_down_latch" module ActiveRecord module ConnectionAdapters - class ConnectionPoolTest < ActiveRecord::TestCase + module ConnectionPoolTests attr_reader :pool def setup - super + @previous_isolation_level = ActiveSupport::IsolatedExecutionState.isolation_level # Keep a duplicate pool so we do not bother others @db_config = ActiveRecord::Base.connection_pool.db_config @@ -27,8 +27,10 @@ module ActiveRecord end end - teardown do + def teardown + super @pool.disconnect! + ActiveSupport::IsolatedExecutionState.isolation_level = @previous_isolation_level end def active_connections(pool) @@ -48,7 +50,7 @@ module ActiveRecord def test_released_connection_moves_between_threads thread_conn = nil - Thread.new { + new_thread { pool.with_connection do |conn| thread_conn = conn end @@ -56,7 +58,7 @@ module ActiveRecord assert thread_conn - Thread.new { + new_thread { pool.with_connection do |conn| assert_equal thread_conn, conn end @@ -69,7 +71,7 @@ module ActiveRecord main_thread = pool.connection assert_equal 1, active_connections(pool).size - Thread.new { + new_thread { pool.with_connection do |conn| assert conn assert_equal 2, active_connections(pool).size @@ -102,11 +104,12 @@ module ActiveRecord end def test_full_pool_blocks + skip_fiber_testing cs = @pool.size.times.map { @pool.checkout } - t = Thread.new { @pool.checkout } + t = new_thread { @pool.checkout } # make sure our thread is in the timeout section - Thread.pass until @pool.num_waiting_in_queue == 1 + pass_to(t) until @pool.num_waiting_in_queue == 1 connection = cs.first connection.close @@ -114,6 +117,7 @@ module ActiveRecord end def test_full_pool_blocking_shares_load_interlock + skip_fiber_testing @pool.instance_variable_set(:@size, 1) load_interlock_latch = Concurrent::CountDownLatch.new @@ -122,7 +126,7 @@ module ActiveRecord able_to_get_connection = false able_to_load = false - thread_with_load_interlock = Thread.new do + thread_with_load_interlock = new_thread do ActiveSupport::Dependencies.interlock.running do load_interlock_latch.count_down connection_latch.wait @@ -133,7 +137,7 @@ module ActiveRecord end end - thread_with_last_connection = Thread.new do + thread_with_last_connection = new_thread do @pool.with_connection do connection_latch.count_down load_interlock_latch.wait @@ -152,11 +156,12 @@ module ActiveRecord end def test_removing_releases_latch + skip_fiber_testing cs = @pool.size.times.map { @pool.checkout } - t = Thread.new { @pool.checkout } + t = new_thread { @pool.checkout } # make sure our thread is in the timeout section - Thread.pass until @pool.num_waiting_in_queue == 1 + pass_to(t) until @pool.num_waiting_in_queue == 1 connection = cs.first @pool.remove connection @@ -179,13 +184,13 @@ module ActiveRecord def test_reap_inactive ready = Concurrent::CountDownLatch.new @pool.checkout - child = Thread.new do + child = new_thread do @pool.checkout @pool.checkout ready.count_down - Thread.stop + stop_thread end - ready.wait + pass_to(child) until ready.wait(0) assert_equal 3, active_connections(@pool).size @@ -273,7 +278,7 @@ module ActiveRecord idle_conn = @pool.checkout recent_conn = @pool.checkout active_conn = @pool.checkout - _dead_conn = Thread.new { @pool.checkout }.join + _dead_conn = new_thread { @pool.checkout }.join @pool.checkin idle_conn @pool.checkin recent_conn @@ -327,7 +332,7 @@ module ActiveRecord assert_not_nil main_connection threads = [] 4.times do |i| - threads << Thread.new(i) do + threads << new_thread(i) do thread_connection = pool.connection assert_not_nil thread_connection thread_connection.close @@ -336,7 +341,7 @@ module ActiveRecord threads.each(&:join) - Thread.new do + new_thread do assert pool.connection pool.connection.close end.join @@ -364,6 +369,8 @@ module ActiveRecord # available connections slowly, ensuring the wakeup order is # correct in this case. def test_checkout_fairness + skip_fiber_testing + @pool.instance_variable_set(:@size, 10) expected = (1..@pool.size).to_a.freeze # check out all connections so our threads start out waiting @@ -373,7 +380,7 @@ module ActiveRecord errors = [] threads = expected.map do |i| - t = Thread.new { + t = new_thread { begin @pool.checkout # never checked back in mutex.synchronize { order << i } @@ -381,7 +388,7 @@ module ActiveRecord mutex.synchronize { errors << e } end } - Thread.pass until @pool.num_waiting_in_queue == i + pass_to(t) until @pool.num_waiting_in_queue == i t end @@ -402,6 +409,8 @@ module ActiveRecord # group1 threads, and the fact that only group1 and no group2 # threads acquired a connection is enforced. def test_checkout_fairness_by_group + skip_fiber_testing + @pool.instance_variable_set(:@size, 10) # take all the connections conns = (1..10).map { @pool.checkout } @@ -410,7 +419,7 @@ module ActiveRecord errors = [] make_thread = proc do |i| - t = Thread.new { + t = new_thread { begin @pool.checkout # never checked back in mutex.synchronize { successes << i } @@ -418,7 +427,7 @@ module ActiveRecord mutex.synchronize { errors << e } end } - Thread.pass until @pool.num_waiting_in_queue == i + pass_to(t) until @pool.num_waiting_in_queue == i t end @@ -491,19 +500,16 @@ module ActiveRecord end end - class ConnectionTestModel < ActiveRecord::Base - self.abstract_class = true - end - def test_connection_notification_is_called payloads = [] subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload| payloads << payload end - ConnectionTestModel.establish_connection :arunit + + @connection_test_model_class.establish_connection :arunit assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort - assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name] + assert_equal @connection_test_model_class.name, payloads[0][:spec_name] assert_equal ActiveRecord::Base.default_shard, payloads[0][:shard] ensure ActiveSupport::Notifications.unsubscribe(subscription) if subscription @@ -514,10 +520,10 @@ module ActiveRecord subscription = ActiveSupport::Notifications.subscribe("!connection.active_record") do |name, started, finished, unique_id, payload| payloads << payload end - ConnectionTestModel.connects_to shards: { shard_two: { writing: :arunit } } + @connection_test_model_class.connects_to shards: { shard_two: { writing: :arunit } } assert_equal [:config, :shard, :spec_name], payloads[0].keys.sort - assert_equal "ActiveRecord::ConnectionAdapters::ConnectionPoolTest::ConnectionTestModel", payloads[0][:spec_name] + assert_equal @connection_test_model_class.name, payloads[0][:spec_name] assert_equal :shard_two, payloads[0][:shard] ensure ActiveSupport::Notifications.unsubscribe(subscription) if subscription @@ -538,6 +544,7 @@ module ActiveRecord end def test_concurrent_connection_establishment + skip_fiber_testing assert_operator @pool.connections.size, :<=, 1 all_threads_in_new_connection = Concurrent::CountDownLatch.new(@pool.size - @pool.connections.size) @@ -553,7 +560,7 @@ module ActiveRecord connecting_threads = [] @pool.size.times do - connecting_threads << Thread.new { @pool.checkout } + connecting_threads << new_thread { @pool.checkout } end begin @@ -579,7 +586,7 @@ module ActiveRecord [:disconnect, :clear_reloadable_connections].each do |group_action_method| @pool.with_connection do |connection| assert_raises(ExclusiveConnectionTimeoutError) do - Thread.new { @pool.public_send(group_action_method) }.join + new_thread { @pool.public_send(group_action_method) }.join end end end @@ -588,10 +595,11 @@ module ActiveRecord end def test_disconnect_and_clear_reloadable_connections_attempt_to_wait_for_threads_to_return_their_conns + skip_fiber_testing [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method| thread = timed_join_result = nil @pool.with_connection do |connection| - thread = Thread.new { @pool.send(group_action_method) } + thread = new_thread { @pool.send(group_action_method) } # give the other `thread` some time to get stuck in `group_action_method` timed_join_result = thread.join(0.3) @@ -611,7 +619,7 @@ module ActiveRecord @pool.checkout_timeout = 0.001 # no need to delay test suite by waiting the whole full default timeout [:disconnect!, :clear_reloadable_connections!].each do |group_action_method| @pool.with_connection do |connection| - Thread.new { @pool.send(group_action_method) }.join + new_thread { @pool.send(group_action_method) }.join # assert connection has been forcefully taken away from us assert_not_predicate @pool, :active_connection? @@ -622,6 +630,7 @@ module ActiveRecord end def test_disconnect_and_clear_reloadable_connections_are_able_to_preempt_other_waiting_threads + skip_fiber_testing with_single_connection_pool do |pool| [:disconnect, :disconnect!, :clear_reloadable_connections, :clear_reloadable_connections!].each do |group_action_method| conn = pool.connection # drain the only available connection @@ -629,23 +638,23 @@ module ActiveRecord begin # create a first_thread and let it get into the FIFO queue first - first_thread = Thread.new do + first_thread = new_thread do pool.with_connection { second_thread_done.wait } end # wait for first_thread to get in queue - Thread.pass until pool.num_waiting_in_queue == 1 + pass_to(first_thread) until pool.num_waiting_in_queue == 1 # create a different, later thread, that will attempt to do a "group action", # but because of the group action semantics it should be able to preempt the # first_thread when a connection is made available - second_thread = Thread.new do + second_thread = new_thread do pool.send(group_action_method) second_thread_done.set end # wait for second_thread to get in queue - Thread.pass until pool.num_waiting_in_queue == 2 + pass_to(second_thread) until pool.num_waiting_in_queue == 2 # return the only available connection pool.checkin(conn) @@ -662,8 +671,8 @@ module ActiveRecord if failed second_thread_done.set - first_thread.join(2) - second_thread.join(2) + first_thread&.join(2) + second_thread&.join(2) end first_thread.join(10) || raise("first_thread got stuck") @@ -674,18 +683,19 @@ module ActiveRecord end def test_clear_reloadable_connections_creates_new_connections_for_waiting_threads_if_necessary + skip_fiber_testing with_single_connection_pool do |pool| conn = pool.connection # drain the only available connection def conn.requires_reloading? # make sure it gets removed from the pool by clear_reloadable_connections true end - stuck_thread = Thread.new do + stuck_thread = new_thread do pool.with_connection { } end # wait for stuck_thread to get in queue - Thread.pass until pool.num_waiting_in_queue == 1 + pass_to(stuck_thread) until pool.num_waiting_in_queue == 1 pool.clear_reloadable_connections @@ -698,6 +708,7 @@ module ActiveRecord end def test_connection_pool_stat + Thread.report_on_exception, original_report_on_exception = false, Thread.report_on_exception with_single_connection_pool do |pool| pool.with_connection do |connection| stats = pool.stat @@ -707,13 +718,17 @@ module ActiveRecord stats = pool.stat assert_equal({ size: 1, connections: 1, busy: 0, dead: 0, idle: 1, waiting: 0, checkout_timeout: 5 }, stats) - Thread.new do - pool.checkout - Thread.current.kill - end.join + assert_raise(ThreadError) do + new_thread do + pool.checkout + raise ThreadError + end.join + end stats = pool.stat assert_equal({ size: 1, connections: 1, busy: 0, dead: 1, idle: 0, waiting: 0, checkout_timeout: 5 }, stats) + ensure + Thread.report_on_exception = original_report_on_exception end end @@ -729,7 +744,7 @@ module ActiveRecord # the pool is thread-safe. connections.each_index do |idx| if connections[idx] == conn2 - Thread.new do + new_thread do @pool.remove(conn2) end.join end @@ -772,5 +787,76 @@ module ActiveRecord pool.disconnect! if pool end end + + class ConnectionPoolThreadTest < ActiveRecord::TestCase + include ConnectionPoolTests + + class ThreadConnectionTestModel < ActiveRecord::Base + self.abstract_class = true + end + + def setup + super + ActiveSupport::IsolatedExecutionState.isolation_level = :thread + @connection_test_model_class = ThreadConnectionTestModel + end + + private + def new_thread(...) + Thread.new(...) + end + + def stop_thread + Thread.stop + end + + def pass_to(_thread) + Thread.pass + end + + def skip_fiber_testing; end + end + + class ConnectionPoolFiberTest < ActiveRecord::TestCase + include ConnectionPoolTests + + class FiberConnectionTestModel < ActiveRecord::Base + self.abstract_class = true + end + + class ThreadlikeFiber < Fiber + def join(timeout = nil) + now = Time.now + resume while alive? && (!timeout || Time.now - now < timeout) + end + + def terminate + nil + end + end + + def setup + super + ActiveSupport::IsolatedExecutionState.isolation_level = :fiber + @connection_test_model_class = FiberConnectionTestModel + end + + private + def new_thread(*args, &block) + ThreadlikeFiber.new(*args, &block) + end + + def stop_thread + Fiber.yield + end + + def pass_to(fiber) + fiber.resume + end + + def skip_fiber_testing + skip "Can't test isolation_level=fiber without a Ruby 3.1+ Fiber Scheduler" + end + end end end diff --git a/activesupport/lib/active_support/isolated_execution_state.rb b/activesupport/lib/active_support/isolated_execution_state.rb index 7e14b9b4f6..896d89ad34 100644 --- a/activesupport/lib/active_support/isolated_execution_state.rb +++ b/activesupport/lib/active_support/isolated_execution_state.rb @@ -4,25 +4,30 @@ require "fiber" module ActiveSupport module IsolatedExecutionState # :nodoc: - @isolation_level = :thread + @isolation_level = nil Thread.attr_accessor :active_support_execution_state Fiber.attr_accessor :active_support_execution_state class << self - attr_reader :isolation_level + attr_reader :isolation_level, :scope def isolation_level=(level) + return if level == @isolation_level + unless %i(thread fiber).include?(level) raise ArgumentError, "isolation_level must be `:thread` or `:fiber`, got: `#{level.inspect}`" end - if level != isolation_level - clear - singleton_class.alias_method(:current, "current_#{level}") - singleton_class.send(:private, :current) - @isolation_level = level - end + clear if @isolation_level + + @scope = + case level + when :thread; Thread + when :fiber; Fiber + end + + @isolation_level = level end def unique_id @@ -30,27 +35,27 @@ module ActiveSupport end def [](key) - current[key] + state[key] end def []=(key, value) - current[key] = value + state[key] = value end def clear - current.clear + state.clear + end + + def context + scope.current end private - def current_thread - Thread.current.active_support_execution_state ||= {} + def state + context.active_support_execution_state ||= {} end - - def current_fiber - Fiber.current.active_support_execution_state ||= {} - end - - alias_method :current, :current_thread end + + self.isolation_level = :thread end end