mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Make EventedFileUpdateChecker garbage collectable
The Listen gem dispatches "file changed" events from a separate thread. This thread holds a reference to the `changed` callback, and runs until the listener is stopped. Thus, objects that are part of the `changed` callback's scope cannot be garbage collected until the listener is stopped. This commit isolates the `changed` callback and associated scope in an `EventedFileUpdateChecker::Core` instance. This ensures that the `EventedFileUpdateChecker` instance can be garbage collected. Additionally, this commit adds a finalizer to the `EventedFileUpdateChecker` instance, which stops the listener and ensures that the `EventedFileUpdateChecker::Core` instance can be garbage collected.
This commit is contained in:
parent
6601acf268
commit
8e1e670adb
2 changed files with 72 additions and 39 deletions
|
@ -39,52 +39,31 @@ module ActiveSupport
|
|||
raise ArgumentError, "A block is required to initialize an EventedFileUpdateChecker"
|
||||
end
|
||||
|
||||
@ph = PathHelper.new
|
||||
@files = files.map { |f| @ph.xpath(f) }.to_set
|
||||
|
||||
@dirs = {}
|
||||
dirs.each do |dir, exts|
|
||||
@dirs[@ph.xpath(dir)] = Array(exts).map { |ext| @ph.normalize_extension(ext) }
|
||||
end
|
||||
|
||||
@block = block
|
||||
@updated = Concurrent::AtomicBoolean.new(false)
|
||||
@lcsp = @ph.longest_common_subpath(@dirs.keys)
|
||||
@pid = Process.pid
|
||||
@boot_mutex = Mutex.new
|
||||
|
||||
dtw = directories_to_watch
|
||||
@dtw, @missing = dtw.partition(&:exist?)
|
||||
|
||||
boot!
|
||||
@core = Core.new(files, dirs)
|
||||
ObjectSpace.define_finalizer(self, @core.finalizer)
|
||||
end
|
||||
|
||||
def updated?
|
||||
@boot_mutex.synchronize do
|
||||
@core.mutex.synchronize do
|
||||
if @pid != Process.pid
|
||||
boot!
|
||||
@core.start
|
||||
@pid = Process.pid
|
||||
@updated.make_true
|
||||
@core.updated.make_true
|
||||
end
|
||||
end
|
||||
|
||||
if @missing.any?(&:exist?)
|
||||
@boot_mutex.synchronize do
|
||||
appeared, @missing = @missing.partition(&:exist?)
|
||||
shutdown!
|
||||
|
||||
@dtw += appeared
|
||||
boot!
|
||||
|
||||
@updated.make_true
|
||||
end
|
||||
if @core.restart?
|
||||
@core.thread_safely(&:restart)
|
||||
@core.updated.make_true
|
||||
end
|
||||
|
||||
@updated.true?
|
||||
@core.updated.true?
|
||||
end
|
||||
|
||||
def execute
|
||||
@updated.make_false
|
||||
@core.updated.make_false
|
||||
@block.call
|
||||
end
|
||||
|
||||
|
@ -96,18 +75,58 @@ module ActiveSupport
|
|||
end
|
||||
end
|
||||
|
||||
private
|
||||
def boot!
|
||||
normalize_dirs!
|
||||
class Core
|
||||
attr_reader :updated, :mutex
|
||||
|
||||
def initialize(files, dirs)
|
||||
@ph = PathHelper.new
|
||||
@files = files.map { |file| @ph.xpath(file) }.to_set
|
||||
|
||||
@dirs = dirs.each_with_object({}) do |(dir, exts), hash|
|
||||
hash[@ph.xpath(dir)] = Array(exts).map { |ext| @ph.normalize_extension(ext) }.to_set
|
||||
end
|
||||
|
||||
@common_path = @ph.longest_common_subpath(@dirs.keys)
|
||||
|
||||
@dtw = directories_to_watch
|
||||
@missing = []
|
||||
|
||||
@updated = Concurrent::AtomicBoolean.new(false)
|
||||
@mutex = Mutex.new
|
||||
|
||||
start
|
||||
end
|
||||
|
||||
def finalizer
|
||||
proc { stop }
|
||||
end
|
||||
|
||||
def thread_safely
|
||||
@mutex.synchronize do
|
||||
yield self
|
||||
end
|
||||
end
|
||||
|
||||
def start
|
||||
normalize_dirs!
|
||||
@dtw, @missing = [*@dtw, *@missing].partition(&:exist?)
|
||||
@listener = @dtw.any? ? Listen.to(*@dtw, &method(:changed)) : nil
|
||||
@listener&.start
|
||||
end
|
||||
|
||||
def shutdown!
|
||||
def stop
|
||||
@listener&.stop
|
||||
end
|
||||
|
||||
def restart
|
||||
stop
|
||||
start
|
||||
end
|
||||
|
||||
def restart?
|
||||
@missing.any?(&:exist?)
|
||||
end
|
||||
|
||||
def normalize_dirs!
|
||||
@dirs.transform_keys! do |dir|
|
||||
dir.exist? ? dir.realpath : dir
|
||||
|
@ -115,7 +134,7 @@ module ActiveSupport
|
|||
end
|
||||
|
||||
def changed(modified, added, removed)
|
||||
unless updated?
|
||||
unless @updated.true?
|
||||
@updated.make_true if (modified + added + removed).any? { |f| watching?(f) }
|
||||
end
|
||||
end
|
||||
|
@ -135,7 +154,7 @@ module ActiveSupport
|
|||
|
||||
if matching && (matching.empty? || matching.include?(ext))
|
||||
break true
|
||||
elsif dir == @lcsp || dir.root?
|
||||
elsif dir == @common_path || dir.root?
|
||||
break false
|
||||
end
|
||||
end
|
||||
|
@ -154,6 +173,7 @@ module ActiveSupport
|
|||
|
||||
@ph.filter_out_descendants(dtw)
|
||||
end
|
||||
end
|
||||
|
||||
class PathHelper
|
||||
def xpath(path)
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
require_relative "abstract_unit"
|
||||
require "pathname"
|
||||
require "weakref"
|
||||
require_relative "file_update_checker_shared_tests"
|
||||
|
||||
class EventedFileUpdateCheckerTest < ActiveSupport::TestCase
|
||||
|
@ -77,6 +78,18 @@ class EventedFileUpdateCheckerTest < ActiveSupport::TestCase
|
|||
Process.wait(pid)
|
||||
end
|
||||
|
||||
test "can be garbage collected" do
|
||||
previous_threads = Thread.list
|
||||
checker_ref = WeakRef.new(ActiveSupport::EventedFileUpdateChecker.new([], tmpdir => ".rb") { })
|
||||
listener_threads = Thread.list - previous_threads
|
||||
|
||||
wait # Wait for listener thread to start processing events.
|
||||
GC.start
|
||||
|
||||
assert_not_predicate checker_ref, :weakref_alive?
|
||||
assert_empty Thread.list & listener_threads
|
||||
end
|
||||
|
||||
test "should detect changes through symlink" do
|
||||
actual_dir = File.join(tmpdir, "actual")
|
||||
linked_dir = File.join(tmpdir, "linked")
|
||||
|
|
Loading…
Reference in a new issue