1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Refactor local cache to avoid serializing entries repeatedly

Since `LocalCache` store needs to call `dup_value!` on write (to avoid
mutating the original value), we end up serializing each cache entry twice.
Once for the local cache, and a second time for the actual backend. So the
write performance is quite bad.

So the idea here is than rather than to store `Entry` instances, the local
cache now instead store whatever payload was sent to the real backend.

This means that we now only serialize the `Entry` once, and if the cache
store was configured with an optimized coder, it will be used for the local
cache too.

Current Rails `main`:
```
fetch in rails 7.0.0.alpha
                         52.423  (± 1.9%) i/s -    265.000  in   5.058089s
write in rails 7.0.0.alpha
                         12.412  (± 0.0%) i/s -     62.000  in   5.005204s
```

Current Rails `main` with local cache disabled:
```
fetch in rails 7.0.0.alpha
                         52.047  (± 3.8%) i/s -    260.000  in   5.000138s
write in rails 7.0.0.alpha
                         25.513  (± 0.0%) i/s -    128.000  in   5.018942s
```

This branch:
```
fetch in rails 7.0.0.alpha
                         50.259  (± 4.0%) i/s -    255.000  in   5.085783s
write in rails 7.0.0.alpha
                         25.805  (± 0.0%) i/s -    130.000  in   5.039486s
```

So essentially, the local cache overhead on write has been eliminated.

Benchmark:
```
require "bundler/inline"

gemfile(true) do
  source "https://rubygems.org"
  gem "rails", github: "rails/rails", branch: "main"
  gem 'benchmark-ips'
  gem "mysql2"
  gem "pry"
end

require "active_record"
require "logger"
require 'benchmark/ips'

ActiveRecord::Base.establish_connection(adapter: "mysql2", database: 'test', host: 'localhost', user: 'root', password: '')
ActiveRecord::Base.logger = Logger.new(nil)

ActiveRecord::Schema.define do
  create_table :users, force: true do |t|
    t.string :name
    t.integer :phone
  end
end

class ApplicationRecord < ActiveRecord::Base
  self.abstract_class = true
end

class User < ApplicationRecord
end

1_000.times { |i| User.create(name: "test #{i}") }

cache = ActiveSupport::Cache::FileStore.new(ARGV[0] || '/tmp/rails-cache')
cache.clear
unless ENV['DISABLE_LOCAL_CACHE']
  ActiveSupport::Cache::Strategy::LocalCache::LocalCacheRegistry.set_cache_for(
    cache.middleware.local_cache_key,
    ActiveSupport::Cache::Strategy::LocalCache::LocalStore.new
  )
end

h = {}
h = User.last(Integer(ENV.fetch('SIZE', 1000))).each { |u| h[u.id] = u }

puts "== Benchmarking read_entry code and write_entry code in rails #{Rails.version}"

Benchmark.ips do |x|
  x.report("fetch in rails #{Rails.version}") {
    cache.fetch('key', compress: false) { h }
  }

  x.report("write in rails #{Rails.version}") {
    cache.write("key+#{Time.now}", h, compress: false) { h }
  }
end
```
This commit is contained in:
Jean Boussier 2021-06-30 10:41:32 +02:00
parent c0911e9a36
commit 81b70f13ca
7 changed files with 139 additions and 212 deletions

View file

@ -72,19 +72,26 @@ module ActiveSupport
private private
def read_entry(key, **options) def read_entry(key, **options)
if File.exist?(key) if payload = read_serialized_entry(key, **options)
entry = deserialize_entry(File.binread(key)) entry = deserialize_entry(payload)
entry if entry.is_a?(Cache::Entry) entry if entry.is_a?(Cache::Entry)
end end
rescue => e end
logger.error("FileStoreError (#{e}): #{e.message}") if logger
def read_serialized_entry(key, **)
File.binread(key) if File.exist?(key)
rescue => error
logger.error("FileStoreError (#{error}): #{error.message}") if logger
nil nil
end end
def write_entry(key, entry, **options) def write_entry(key, entry, **options)
write_serialized_entry(key, serialize_entry(entry, **options), **options)
end
def write_serialized_entry(key, payload, **options)
return false if options[:unless_exist] && File.exist?(key) return false if options[:unless_exist] && File.exist?(key)
ensure_cache_path(File.dirname(key)) ensure_cache_path(File.dirname(key))
payload = serialize_entry(entry, **options)
File.atomic_write(key, cache_path) { |f| f.write(payload) } File.atomic_write(key, cache_path) { |f| f.write(payload) }
true true
end end
@ -95,9 +102,9 @@ module ActiveSupport
File.delete(key) File.delete(key)
delete_empty_directories(File.dirname(key)) delete_empty_directories(File.dirname(key))
true true
rescue => e rescue
# Just in case the error was caused by another process deleting the file first. # Just in case the error was caused by another process deleting the file first.
raise e if File.exist?(key) raise if File.exist?(key)
false false
end end
end end

View file

@ -25,27 +25,12 @@ module ActiveSupport
# MemCacheStore implements the Strategy::LocalCache strategy which implements # MemCacheStore implements the Strategy::LocalCache strategy which implements
# an in-memory cache inside of a block. # an in-memory cache inside of a block.
class MemCacheStore < Store class MemCacheStore < Store
# Provide support for raw values in the local cache strategy.
module LocalCacheWithRaw # :nodoc:
private
def write_entry(key, entry, **options)
if options[:raw] && local_cache
raw_entry = Entry.new(entry.value.to_s)
raw_entry.expires_at = entry.expires_at
super(key, raw_entry, **options)
else
super
end
end
end
# Advertise cache versioning support. # Advertise cache versioning support.
def self.supports_cache_versioning? def self.supports_cache_versioning?
true true
end end
prepend Strategy::LocalCache prepend Strategy::LocalCache
prepend LocalCacheWithRaw
ESCAPE_KEY_CHARS = /[\x00-\x20%\x7F-\xFF]/n ESCAPE_KEY_CHARS = /[\x00-\x20%\x7F-\xFF]/n
@ -188,15 +173,22 @@ module ActiveSupport
# Read an entry from the cache. # Read an entry from the cache.
def read_entry(key, **options) def read_entry(key, **options)
deserialize_entry(read_serialized_entry(key, **options), **options)
end
def read_serialized_entry(key, **options)
rescue_error_with(nil) do rescue_error_with(nil) do
deserialize_entry(@data.with { |c| c.get(key, options) }, raw: options[:raw]) @data.with { |c| c.get(key, options) }
end end
end end
# Write an entry to the cache. # Write an entry to the cache.
def write_entry(key, entry, **options) def write_entry(key, entry, **options)
write_serialized_entry(key, serialize_entry(entry, **options), **options)
end
def write_serialized_entry(key, payload, **options)
method = options[:unless_exist] ? :add : :set method = options[:unless_exist] ? :add : :set
value = options[:raw] ? entry.value.to_s : serialize_entry(entry, **options)
expires_in = options[:expires_in].to_i expires_in = options[:expires_in].to_i
if options[:race_condition_ttl] && expires_in > 0 && !options[:raw] if options[:race_condition_ttl] && expires_in > 0 && !options[:raw]
# Set the memcache expire a few minutes in the future to support race condition ttls on read # Set the memcache expire a few minutes in the future to support race condition ttls on read
@ -204,7 +196,7 @@ module ActiveSupport
end end
rescue_error_with false do rescue_error_with false do
# The value "compress: false" prevents duplicate compression within Dalli. # The value "compress: false" prevents duplicate compression within Dalli.
@data.with { |c| c.send(method, key, value, expires_in, **options, compress: false) } @data.with { |c| c.send(method, key, payload, expires_in, **options, compress: false) }
end end
end end
@ -231,6 +223,14 @@ module ActiveSupport
rescue_error_with(false) { @data.with { |c| c.delete(key) } } rescue_error_with(false) { @data.with { |c| c.delete(key) } }
end end
def serialize_entry(entry, raw: false, **options)
if raw
entry.value.to_s
else
super(entry, raw: raw, **options)
end
end
# Memcache keys are binaries. So we need to force their encoding to binary # Memcache keys are binaries. So we need to force their encoding to binary
# before applying the regular expression to ensure we are escaping all # before applying the regular expression to ensure we are escaping all
# characters properly. # characters properly.
@ -244,7 +244,7 @@ module ActiveSupport
key key
end end
def deserialize_entry(payload, raw:) def deserialize_entry(payload, raw: false, **)
if payload && raw if payload && raw
Entry.new(payload) Entry.new(payload)
else else

View file

@ -33,10 +33,18 @@ module ActiveSupport
end end
private private
def read_entry(key, **options) def read_entry(key, **s)
deserialize_entry(read_serialized_entry(key))
end end
def write_entry(key, entry, **options) def read_serialized_entry(_key, **)
end
def write_entry(key, entry, **)
write_serialized_entry(key, serialize_entry(entry))
end
def write_serialized_entry(_key, _payload, **)
true true
end end

View file

@ -70,35 +70,7 @@ module ActiveSupport
true true
end end
# Support raw values in the local cache strategy.
module LocalCacheWithRaw # :nodoc:
private
def write_entry(key, entry, **options)
if options[:raw] && local_cache
raw_entry = Entry.new(serialize_entry(entry, raw: true))
raw_entry.expires_at = entry.expires_at
super(key, raw_entry, **options)
else
super
end
end
def write_multi_entries(entries, **options)
if options[:raw] && local_cache
raw_entries = entries.map do |key, entry|
raw_entry = Entry.new(serialize_entry(entry, raw: true))
raw_entry.expires_at = entry.expires_at
end.to_h
super(raw_entries, **options)
else
super
end
end
end
prepend Strategy::LocalCache prepend Strategy::LocalCache
prepend LocalCacheWithRaw
class << self class << self
# Factory method to create a new Redis instance. # Factory method to create a new Redis instance.
@ -348,9 +320,12 @@ module ActiveSupport
# Store provider interface: # Store provider interface:
# Read an entry from the cache. # Read an entry from the cache.
def read_entry(key, **options) def read_entry(key, **options)
deserialize_entry(read_serialized_entry(key, **options), **options)
end
def read_serialized_entry(key, raw: false, **options)
failsafe :read_entry do failsafe :read_entry do
raw = options&.fetch(:raw, false) redis.with { |c| c.get(key) }
deserialize_entry(redis.with { |c| c.get(key) }, raw: raw)
end end
end end
@ -387,9 +362,11 @@ module ActiveSupport
# Write an entry to the cache. # Write an entry to the cache.
# #
# Requires Redis 2.6.12+ for extended SET options. # Requires Redis 2.6.12+ for extended SET options.
def write_entry(key, entry, unless_exist: false, raw: false, expires_in: nil, race_condition_ttl: nil, **options) def write_entry(key, entry, raw: false, **options)
serialized_entry = serialize_entry(entry, raw: raw, **options) write_serialized_entry(key, serialize_entry(entry, raw: raw, **options), raw: raw, **options)
end
def write_serialized_entry(key, payload, raw: false, unless_exist: false, expires_in: nil, race_condition_ttl: nil, **options)
# If race condition TTL is in use, ensure that cache entries # If race condition TTL is in use, ensure that cache entries
# stick around a bit longer after they would have expired # stick around a bit longer after they would have expired
# so we can purposefully serve stale entries. # so we can purposefully serve stale entries.
@ -397,16 +374,14 @@ module ActiveSupport
expires_in += 5.minutes expires_in += 5.minutes
end end
failsafe :write_entry, returning: false do modifiers = {}
if unless_exist || expires_in if unless_exist || expires_in
modifiers = {} modifiers[:nx] = unless_exist
modifiers[:nx] = unless_exist modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in
modifiers[:px] = (1000 * expires_in.to_f).ceil if expires_in end
redis.with { |c| c.set key, serialized_entry, **modifiers } failsafe :write_entry, returning: false do
else redis.with { |c| c.set key, payload, **modifiers }
redis.with { |c| c.set key, serialized_entry }
end
end end
end end
@ -459,8 +434,8 @@ module ActiveSupport
end end
end end
def deserialize_entry(payload, raw:) def deserialize_entry(payload, raw: false, **)
if payload && raw if raw && !payload.nil?
Entry.new(payload) Entry.new(payload)
else else
super(payload) super(payload)

View file

@ -34,119 +34,34 @@ module ActiveSupport
# Simple memory backed cache. This cache is not thread safe and is intended only # Simple memory backed cache. This cache is not thread safe and is intended only
# for serving as a temporary memory cache for a single thread. # for serving as a temporary memory cache for a single thread.
class LocalStore < Store class LocalStore
class Entry # :nodoc:
class << self
def build(cache_entry)
return if cache_entry.nil?
return cache_entry if cache_entry.compressed?
value = cache_entry.value
if value.is_a?(String)
DupableEntry.new(cache_entry)
elsif !value || value == true || value.is_a?(Numeric)
new(cache_entry)
else
MutableEntry.new(cache_entry)
end
end
end
attr_reader :value, :version
attr_accessor :expires_at
def initialize(cache_entry)
@value = cache_entry.value
@expires_at = cache_entry.expires_at
@version = cache_entry.version
end
def local?
true
end
def compressed?
false
end
def mismatched?(version)
@version && version && @version != version
end
def expired?
expires_at && expires_at <= Time.now.to_f
end
def marshal_dump
raise NotImplementedError, "LocalStore::Entry should never be serialized"
end
end
class DupableEntry < Entry # :nodoc:
def initialize(_cache_entry)
super
unless @value.frozen?
@value = @value.dup.freeze
end
end
def value
@value.dup
end
end
class MutableEntry < Entry # :nodoc:
def initialize(cache_entry)
@payload = Marshal.dump(cache_entry.value)
@expires_at = cache_entry.expires_at
@version = cache_entry.version
end
def value
Marshal.load(@payload)
end
end
def initialize def initialize
super
@data = {} @data = {}
end end
# Don't allow synchronizing since it isn't thread safe.
def synchronize # :nodoc:
yield
end
def clear(options = nil) def clear(options = nil)
@data.clear @data.clear
end end
def read_entry(key, **options) def read_entry(key)
@data[key] @data[key]
end end
def read_multi_entries(keys, **options) def read_multi_entries(keys)
values = {} @data.slice(*keys)
keys.each do |name|
entry = read_entry(name, **options)
values[name] = entry.value if entry
end
values
end end
def write_entry(key, entry, **options) def write_entry(key, entry)
@data[key] = Entry.build(entry) @data[key] = entry
true true
end end
def delete_entry(key, **options) def delete_entry(key)
!!@data.delete(key) !!@data.delete(key)
end end
def fetch_entry(key, options = nil) # :nodoc: def fetch_entry(key) # :nodoc:
@data.fetch(key) { @data[key] = Entry.build(yield) } @data.fetch(key) { @data[key] = yield }
end end
end end
@ -184,19 +99,19 @@ module ActiveSupport
def increment(name, amount = 1, **options) # :nodoc: def increment(name, amount = 1, **options) # :nodoc:
return super unless local_cache return super unless local_cache
value = bypass_local_cache { super } value = bypass_local_cache { super }
write_cache_value(name, value, **options) write_cache_value(name, value, raw: true, **options)
value value
end end
def decrement(name, amount = 1, **options) # :nodoc: def decrement(name, amount = 1, **options) # :nodoc:
return super unless local_cache return super unless local_cache
value = bypass_local_cache { super } value = bypass_local_cache { super }
write_cache_value(name, value, **options) write_cache_value(name, value, raw: true, **options)
value value
end end
private private
def read_entry(key, **options) def read_serialized_entry(key, raw: false, **options)
if cache = local_cache if cache = local_cache
hit = true hit = true
entry = cache.fetch_entry(key) do entry = cache.fetch_entry(key) do
@ -213,7 +128,7 @@ module ActiveSupport
def read_multi_entries(keys, **options) def read_multi_entries(keys, **options)
return super unless local_cache return super unless local_cache
local_entries = local_cache.read_multi_entries(keys, **options) local_entries = local_cache.read_multi_entries(keys)
missed_keys = keys - local_entries.keys missed_keys = keys - local_entries.keys
if missed_keys.any? if missed_keys.any?
@ -223,35 +138,27 @@ module ActiveSupport
end end
end end
def write_entry(key, entry, **options) def write_serialized_entry(key, payload, **)
if options[:unless_exist] if return_value = super
local_cache.delete_entry(key, **options) if local_cache local_cache.write_entry(key, payload) if local_cache
else else
local_cache.write_entry(key, entry, **options) if local_cache local_cache.delete_entry(key) if local_cache
end
if entry.local?
super(key, new_entry(entry.value, options), **options)
else
super
end end
return_value
end end
def delete_entry(key, **options) def delete_entry(key, **)
local_cache.delete_entry(key, **options) if local_cache local_cache.delete_entry(key) if local_cache
super super
end end
def write_cache_value(name, value, **options) def write_cache_value(name, value, **options)
name = normalize_key(name, options) name = normalize_key(name, options)
cache = local_cache cache = local_cache
cache.mute do if value
if value cache.write_entry(name, serialize_entry(new_entry(value, **options), **options))
cache.write(name, value, options) else
else cache.delete_entry(name)
cache.delete(name, **options)
end
end end
end end

View file

@ -89,8 +89,8 @@ module LocalCacheBehavior
def test_local_cache_fetch def test_local_cache_fetch
@cache.with_local_cache do @cache.with_local_cache do
@cache.send(:local_cache).write "foo", "bar" @cache.send(:local_cache).write_entry "foo", "bar"
assert_equal "bar", @cache.send(:local_cache).fetch("foo") assert_equal "bar", @cache.send(:local_cache).fetch_entry("foo")
end end
end end
@ -150,7 +150,10 @@ module LocalCacheBehavior
@cache.write("foo", 1, raw: true) @cache.write("foo", 1, raw: true)
@peek.write("foo", 2, raw: true) @peek.write("foo", 2, raw: true)
@cache.increment("foo") @cache.increment("foo")
assert_equal 3, @cache.read("foo", raw: true)
expected = @peek.read("foo", raw: true)
assert_equal 3, Integer(expected)
assert_equal expected, @cache.read("foo", raw: true)
end end
end end
@ -158,8 +161,11 @@ module LocalCacheBehavior
@cache.with_local_cache do @cache.with_local_cache do
@cache.write("foo", 1, raw: true) @cache.write("foo", 1, raw: true)
@peek.write("foo", 3, raw: true) @peek.write("foo", 3, raw: true)
@cache.decrement("foo") @cache.decrement("foo")
assert_equal 2, @cache.read("foo", raw: true) expected = @peek.read("foo", raw: true)
assert_equal 2, Integer(expected)
assert_equal expected, @cache.read("foo", raw: true)
end end
end end

View file

@ -5,26 +5,26 @@ require "active_support/cache"
require_relative "../behaviors" require_relative "../behaviors"
require "dalli" require "dalli"
# Emulates a latency on Dalli's back-end for the key latency to facilitate class MemCacheStoreTest < ActiveSupport::TestCase
# connection pool testing. # Emulates a latency on Dalli's back-end for the key latency to facilitate
class SlowDalliClient < Dalli::Client # connection pool testing.
def get(key, options = {}) class SlowDalliClient < Dalli::Client
if /latency/.match?(key) def get(key, options = {})
sleep 3 if /latency/.match?(key)
super sleep 3
else super
super else
super
end
end end
end end
end
class UnavailableDalliServer < Dalli::Server class UnavailableDalliServer < Dalli::Server
def alive? def alive?
false false
end
end end
end
class MemCacheStoreTest < ActiveSupport::TestCase
begin begin
servers = ENV["MEMCACHE_SERVERS"] || "localhost:11211" servers = ENV["MEMCACHE_SERVERS"] || "localhost:11211"
ss = Dalli::Client.new(servers).stats ss = Dalli::Client.new(servers).stats
@ -268,6 +268,30 @@ class MemCacheStoreTest < ActiveSupport::TestCase
end end
end end
def test_initial_object_mutation_after_fetch
if ActiveSupport::Cache.format_version == 6.1
skip "Local cache mutation can't be prevented on legacy MemCacheStore"
else
super
end
end
def test_initial_object_mutation_after_write
if ActiveSupport::Cache.format_version == 6.1
skip "Local cache mutation can't be prevented on legacy MemCacheStore"
else
super
end
end
def test_local_cache_of_read_returns_a_copy_of_the_entry
if ActiveSupport::Cache.format_version == 6.1
skip "Local cache mutation can't be prevented on legacy MemCacheStore"
else
super
end
end
private private
def random_string(length) def random_string(length)
(0...length).map { (65 + rand(26)).chr }.join (0...length).map { (65 + rand(26)).chr }.join
@ -325,6 +349,11 @@ class OptimizedMemCacheStoreTest < MemCacheStoreTest
super super
end end
def teardown
super
ActiveSupport::Cache.format_version = @previous_format
end
def test_forward_compatibility def test_forward_compatibility
previous_format = ActiveSupport::Cache.format_version previous_format = ActiveSupport::Cache.format_version
ActiveSupport::Cache.format_version = 6.1 ActiveSupport::Cache.format_version = 6.1
@ -344,9 +373,4 @@ class OptimizedMemCacheStoreTest < MemCacheStoreTest
@cache.write("foo", "bar") @cache.write("foo", "bar")
assert_equal "bar", @old_store.read("foo") assert_equal "bar", @old_store.read("foo")
end end
def teardown
super
ActiveSupport::Cache.format_version = @previous_format
end
end end