mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
3464cd5c28
All indentation was normalized by rubocop auto-correct at 80e66cc4d9
.
But comments was still kept absolute position. This commit aligns
comments with method definitions for consistency.
225 lines
7.4 KiB
Ruby
225 lines
7.4 KiB
Ruby
require "thread"
|
|
require "monitor"
|
|
|
|
module ActiveSupport
|
|
module Concurrency
|
|
# A share/exclusive lock, otherwise known as a read/write lock.
|
|
#
|
|
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock
|
|
class ShareLock
|
|
include MonitorMixin
|
|
|
|
# We track Thread objects, instead of just using counters, because
|
|
# we need exclusive locks to be reentrant, and we need to be able
|
|
# to upgrade share locks to exclusive.
|
|
|
|
def raw_state # :nodoc:
|
|
synchronize do
|
|
threads = @sleeping.keys | @sharing.keys | @waiting.keys
|
|
threads |= [@exclusive_thread] if @exclusive_thread
|
|
|
|
data = {}
|
|
|
|
threads.each do |thread|
|
|
purpose, compatible = @waiting[thread]
|
|
|
|
data[thread] = {
|
|
thread: thread,
|
|
sharing: @sharing[thread],
|
|
exclusive: @exclusive_thread == thread,
|
|
purpose: purpose,
|
|
compatible: compatible,
|
|
waiting: !!@waiting[thread],
|
|
sleeper: @sleeping[thread],
|
|
}
|
|
end
|
|
|
|
# NB: Yields while holding our *internal* synchronize lock,
|
|
# which is supposed to be used only for a few instructions at
|
|
# a time. This allows the caller to inspect additional state
|
|
# without things changing out from underneath, but would have
|
|
# disastrous effects upon normal operation. Fortunately, this
|
|
# method is only intended to be called when things have
|
|
# already gone wrong.
|
|
yield data
|
|
end
|
|
end
|
|
|
|
def initialize
|
|
super()
|
|
|
|
@cv = new_cond
|
|
|
|
@sharing = Hash.new(0)
|
|
@waiting = {}
|
|
@sleeping = {}
|
|
@exclusive_thread = nil
|
|
@exclusive_depth = 0
|
|
end
|
|
|
|
# Returns false if +no_wait+ is set and the lock is not
|
|
# immediately available. Otherwise, returns true after the lock
|
|
# has been acquired.
|
|
#
|
|
# +purpose+ and +compatible+ work together; while this thread is
|
|
# waiting for the exclusive lock, it will yield its share (if any)
|
|
# to any other attempt whose +purpose+ appears in this attempt's
|
|
# +compatible+ list. This allows a "loose" upgrade, which, being
|
|
# less strict, prevents some classes of deadlocks.
|
|
#
|
|
# For many resources, loose upgrades are sufficient: if a thread
|
|
# is awaiting a lock, it is not running any other code. With
|
|
# +purpose+ matching, it is possible to yield only to other
|
|
# threads whose activity will not interfere.
|
|
def start_exclusive(purpose: nil, compatible: [], no_wait: false)
|
|
synchronize do
|
|
unless @exclusive_thread == Thread.current
|
|
if busy_for_exclusive?(purpose)
|
|
return false if no_wait
|
|
|
|
yield_shares(purpose: purpose, compatible: compatible, block_share: true) do
|
|
wait_for(:start_exclusive) { busy_for_exclusive?(purpose) }
|
|
end
|
|
end
|
|
@exclusive_thread = Thread.current
|
|
end
|
|
@exclusive_depth += 1
|
|
|
|
true
|
|
end
|
|
end
|
|
|
|
# Relinquish the exclusive lock. Must only be called by the thread
|
|
# that called start_exclusive (and currently holds the lock).
|
|
def stop_exclusive(compatible: [])
|
|
synchronize do
|
|
raise "invalid unlock" if @exclusive_thread != Thread.current
|
|
|
|
@exclusive_depth -= 1
|
|
if @exclusive_depth == 0
|
|
@exclusive_thread = nil
|
|
|
|
if eligible_waiters?(compatible)
|
|
yield_shares(compatible: compatible, block_share: true) do
|
|
wait_for(:stop_exclusive) { @exclusive_thread || eligible_waiters?(compatible) }
|
|
end
|
|
end
|
|
@cv.broadcast
|
|
end
|
|
end
|
|
end
|
|
|
|
def start_sharing
|
|
synchronize do
|
|
if @sharing[Thread.current] > 0 || @exclusive_thread == Thread.current
|
|
# We already hold a lock; nothing to wait for
|
|
elsif @waiting[Thread.current]
|
|
# We're nested inside a +yield_shares+ call: we'll resume as
|
|
# soon as there isn't an exclusive lock in our way
|
|
wait_for(:start_sharing) { @exclusive_thread }
|
|
else
|
|
# This is an initial / outermost share call: any outstanding
|
|
# requests for an exclusive lock get to go first
|
|
wait_for(:start_sharing) { busy_for_sharing?(false) }
|
|
end
|
|
@sharing[Thread.current] += 1
|
|
end
|
|
end
|
|
|
|
def stop_sharing
|
|
synchronize do
|
|
if @sharing[Thread.current] > 1
|
|
@sharing[Thread.current] -= 1
|
|
else
|
|
@sharing.delete Thread.current
|
|
@cv.broadcast
|
|
end
|
|
end
|
|
end
|
|
|
|
# Execute the supplied block while holding the Exclusive lock. If
|
|
# +no_wait+ is set and the lock is not immediately available,
|
|
# returns +nil+ without yielding. Otherwise, returns the result of
|
|
# the block.
|
|
#
|
|
# See +start_exclusive+ for other options.
|
|
def exclusive(purpose: nil, compatible: [], after_compatible: [], no_wait: false)
|
|
if start_exclusive(purpose: purpose, compatible: compatible, no_wait: no_wait)
|
|
begin
|
|
yield
|
|
ensure
|
|
stop_exclusive(compatible: after_compatible)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Execute the supplied block while holding the Share lock.
|
|
def sharing
|
|
start_sharing
|
|
begin
|
|
yield
|
|
ensure
|
|
stop_sharing
|
|
end
|
|
end
|
|
|
|
# Temporarily give up all held Share locks while executing the
|
|
# supplied block, allowing any +compatible+ exclusive lock request
|
|
# to proceed.
|
|
def yield_shares(purpose: nil, compatible: [], block_share: false)
|
|
loose_shares = previous_wait = nil
|
|
synchronize do
|
|
if loose_shares = @sharing.delete(Thread.current)
|
|
if previous_wait = @waiting[Thread.current]
|
|
purpose = nil unless purpose == previous_wait[0]
|
|
compatible &= previous_wait[1]
|
|
end
|
|
compatible |= [false] unless block_share
|
|
@waiting[Thread.current] = [purpose, compatible]
|
|
end
|
|
|
|
@cv.broadcast
|
|
end
|
|
|
|
begin
|
|
yield
|
|
ensure
|
|
synchronize do
|
|
wait_for(:yield_shares) { @exclusive_thread && @exclusive_thread != Thread.current }
|
|
|
|
if previous_wait
|
|
@waiting[Thread.current] = previous_wait
|
|
else
|
|
@waiting.delete Thread.current
|
|
end
|
|
@sharing[Thread.current] = loose_shares if loose_shares
|
|
end
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
# Must be called within synchronize
|
|
def busy_for_exclusive?(purpose)
|
|
busy_for_sharing?(purpose) ||
|
|
@sharing.size > (@sharing[Thread.current] > 0 ? 1 : 0)
|
|
end
|
|
|
|
def busy_for_sharing?(purpose)
|
|
(@exclusive_thread && @exclusive_thread != Thread.current) ||
|
|
@waiting.any? { |t, (_, c)| t != Thread.current && !c.include?(purpose) }
|
|
end
|
|
|
|
def eligible_waiters?(compatible)
|
|
@waiting.any? { |t, (p, _)| compatible.include?(p) && @waiting.all? { |t2, (_, c2)| t == t2 || c2.include?(p) } }
|
|
end
|
|
|
|
def wait_for(method)
|
|
@sleeping[Thread.current] = method
|
|
@cv.wait_while { yield }
|
|
ensure
|
|
@sleeping.delete Thread.current
|
|
end
|
|
end
|
|
end
|
|
end
|