Merge branch 'da-refactor-broadcast-message' into 'master'
Refactor BroadcastMessage to use Gitlab::JsonCache See merge request gitlab-org/gitlab-ce!23934
This commit is contained in:
commit
8f2911ca0b
|
@ -22,49 +22,30 @@ class BroadcastMessage < ActiveRecord::Base
|
|||
after_commit :flush_redis_cache
|
||||
|
||||
def self.current
|
||||
raw_messages = Rails.cache.fetch(CACHE_KEY, expires_in: cache_expires_in) do
|
||||
messages = cache.fetch(CACHE_KEY, as: BroadcastMessage, expires_in: cache_expires_in) do
|
||||
remove_legacy_cache_key
|
||||
current_and_future_messages.to_json
|
||||
current_and_future_messages
|
||||
end
|
||||
|
||||
messages = decode_messages(raw_messages)
|
||||
|
||||
return [] unless messages&.present?
|
||||
|
||||
now_or_future = messages.select(&:now_or_future?)
|
||||
|
||||
# If there are cached entries but none are to be displayed we'll purge the
|
||||
# cache so we don't keep running this code all the time.
|
||||
Rails.cache.delete(CACHE_KEY) if now_or_future.empty?
|
||||
cache.expire(CACHE_KEY) if now_or_future.empty?
|
||||
|
||||
now_or_future.select(&:now?)
|
||||
end
|
||||
|
||||
def self.decode_messages(raw_messages)
|
||||
return unless raw_messages&.present?
|
||||
|
||||
message_list = ActiveSupport::JSON.decode(raw_messages)
|
||||
|
||||
return unless message_list.is_a?(Array)
|
||||
|
||||
valid_attr = BroadcastMessage.attribute_names
|
||||
|
||||
message_list.map do |raw|
|
||||
BroadcastMessage.new(raw) if valid_cache_entry?(raw, valid_attr)
|
||||
end.compact
|
||||
rescue ActiveSupport::JSON.parse_error
|
||||
end
|
||||
|
||||
def self.valid_cache_entry?(raw, valid_attr)
|
||||
return false unless raw.is_a?(Hash)
|
||||
|
||||
(raw.keys - valid_attr).empty?
|
||||
end
|
||||
|
||||
def self.current_and_future_messages
|
||||
where('ends_at > :now', now: Time.zone.now).order_id_asc
|
||||
end
|
||||
|
||||
def self.cache
|
||||
Gitlab::JsonCache.new(cache_key_with_version: false)
|
||||
end
|
||||
|
||||
def self.cache_expires_in
|
||||
nil
|
||||
end
|
||||
|
@ -74,7 +55,7 @@ class BroadcastMessage < ActiveRecord::Base
|
|||
# environment a one-shot migration would not work because the cache
|
||||
# would be repopulated by a node that has not been upgraded.
|
||||
def self.remove_legacy_cache_key
|
||||
Rails.cache.delete(LEGACY_CACHE_KEY)
|
||||
cache.expire(LEGACY_CACHE_KEY)
|
||||
end
|
||||
|
||||
def active?
|
||||
|
@ -102,7 +83,7 @@ class BroadcastMessage < ActiveRecord::Base
|
|||
end
|
||||
|
||||
def flush_redis_cache
|
||||
Rails.cache.delete(CACHE_KEY)
|
||||
self.class.cache.expire(CACHE_KEY)
|
||||
self.class.remove_legacy_cache_key
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,87 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Gitlab
|
||||
class JsonCache
|
||||
attr_reader :backend, :cache_key_with_version, :namespace
|
||||
|
||||
def initialize(options = {})
|
||||
@backend = options.fetch(:backend, Rails.cache)
|
||||
@namespace = options.fetch(:namespace, nil)
|
||||
@cache_key_with_version = options.fetch(:cache_key_with_version, true)
|
||||
end
|
||||
|
||||
def active?
|
||||
if backend.respond_to?(:active?)
|
||||
backend.active?
|
||||
else
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
def cache_key(key)
|
||||
expanded_cache_key = [namespace, key].compact
|
||||
|
||||
if cache_key_with_version
|
||||
expanded_cache_key << Rails.version
|
||||
end
|
||||
|
||||
expanded_cache_key.join(':')
|
||||
end
|
||||
|
||||
def expire(key)
|
||||
backend.delete(cache_key(key))
|
||||
end
|
||||
|
||||
def read(key, klass = nil)
|
||||
value = backend.read(cache_key(key))
|
||||
value = parse_value(value, klass) if value
|
||||
value
|
||||
end
|
||||
|
||||
def write(key, value, options = nil)
|
||||
backend.write(cache_key(key), value.to_json, options)
|
||||
end
|
||||
|
||||
def fetch(key, options = {}, &block)
|
||||
klass = options.delete(:as)
|
||||
value = read(key, klass)
|
||||
|
||||
return value unless value.nil?
|
||||
|
||||
value = yield
|
||||
|
||||
write(key, value, options)
|
||||
|
||||
value
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def parse_value(raw, klass)
|
||||
value = ActiveSupport::JSON.decode(raw)
|
||||
|
||||
case value
|
||||
when Hash then parse_entry(value, klass)
|
||||
when Array then parse_entries(value, klass)
|
||||
else
|
||||
value
|
||||
end
|
||||
rescue ActiveSupport::JSON.parse_error
|
||||
nil
|
||||
end
|
||||
|
||||
def parse_entry(raw, klass)
|
||||
klass.new(raw) if valid_entry?(raw, klass)
|
||||
end
|
||||
|
||||
def valid_entry?(raw, klass)
|
||||
return false unless klass && raw.is_a?(Hash)
|
||||
|
||||
(raw.keys - klass.attribute_names).empty?
|
||||
end
|
||||
|
||||
def parse_entries(values, klass)
|
||||
values.map { |value| parse_entry(value, klass) }.compact
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,401 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe Gitlab::JsonCache do
|
||||
let(:backend) { double('backend').as_null_object }
|
||||
let(:namespace) { 'geo' }
|
||||
let(:key) { 'foo' }
|
||||
let(:expanded_key) { "#{namespace}:#{key}:#{Rails.version}" }
|
||||
let(:broadcast_message) { create(:broadcast_message) }
|
||||
|
||||
subject(:cache) { described_class.new(namespace: namespace, backend: backend) }
|
||||
|
||||
describe '#active?' do
|
||||
context 'when backend respond to active? method' do
|
||||
it 'delegates to the underlying cache implementation' do
|
||||
backend = double('backend', active?: false)
|
||||
|
||||
cache = described_class.new(namespace: namespace, backend: backend)
|
||||
|
||||
expect(cache.active?).to eq(false)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when backend does not respond to active? method' do
|
||||
it 'returns true' do
|
||||
backend = double('backend')
|
||||
|
||||
cache = described_class.new(namespace: namespace, backend: backend)
|
||||
|
||||
expect(cache.active?).to eq(true)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#cache_key' do
|
||||
context 'when namespace is not defined' do
|
||||
it 'expands out the key with Rails version' do
|
||||
cache = described_class.new(cache_key_with_version: true)
|
||||
|
||||
cache_key = cache.cache_key(key)
|
||||
|
||||
expect(cache_key).to eq("#{key}:#{Rails.version}")
|
||||
end
|
||||
end
|
||||
|
||||
context 'when cache_key_with_version is true' do
|
||||
it 'expands out the key with namespace and Rails version' do
|
||||
cache = described_class.new(namespace: namespace, cache_key_with_version: true)
|
||||
|
||||
cache_key = cache.cache_key(key)
|
||||
|
||||
expect(cache_key).to eq("#{namespace}:#{key}:#{Rails.version}")
|
||||
end
|
||||
end
|
||||
|
||||
context 'when cache_key_with_version is false' do
|
||||
it 'expands out the key with namespace' do
|
||||
cache = described_class.new(namespace: namespace, cache_key_with_version: false)
|
||||
|
||||
cache_key = cache.cache_key(key)
|
||||
|
||||
expect(cache_key).to eq("#{namespace}:#{key}")
|
||||
end
|
||||
end
|
||||
|
||||
context 'when namespace is nil, and cache_key_with_version is false' do
|
||||
it 'returns the key' do
|
||||
cache = described_class.new(namespace: nil, cache_key_with_version: false)
|
||||
|
||||
cache_key = cache.cache_key(key)
|
||||
|
||||
expect(cache_key).to eq(key)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#expire' do
|
||||
it 'expires the given key from the cache' do
|
||||
cache.expire(key)
|
||||
|
||||
expect(backend).to have_received(:delete).with(expanded_key)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#read' do
|
||||
it 'reads the given key from the cache' do
|
||||
cache.read(key)
|
||||
|
||||
expect(backend).to have_received(:read).with(expanded_key)
|
||||
end
|
||||
|
||||
it 'returns the cached value when there is data in the cache with the given key' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return("true")
|
||||
|
||||
expect(cache.read(key)).to eq(true)
|
||||
end
|
||||
|
||||
it 'returns nil when there is no data in the cache with the given key' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return(nil)
|
||||
|
||||
expect(cache.read(key)).to be_nil
|
||||
end
|
||||
|
||||
context 'when the cached value is a hash' do
|
||||
it 'parses the cached value' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return(broadcast_message.to_json)
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to eq(broadcast_message)
|
||||
end
|
||||
|
||||
it 'returns nil when klass is nil' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return(broadcast_message.to_json)
|
||||
|
||||
expect(cache.read(key)).to be_nil
|
||||
end
|
||||
|
||||
it 'gracefully handles bad cached entry' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return('{')
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to be_nil
|
||||
end
|
||||
|
||||
it 'gracefully handles an empty hash' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return('{}')
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to be_a(BroadcastMessage)
|
||||
end
|
||||
|
||||
it 'gracefully handles unknown attributes' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return(broadcast_message.attributes.merge(unknown_attribute: 1).to_json)
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the cached value is an array' do
|
||||
it 'parses the cached value' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return([broadcast_message].to_json)
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to eq([broadcast_message])
|
||||
end
|
||||
|
||||
it 'returns an empty array when klass is nil' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return([broadcast_message].to_json)
|
||||
|
||||
expect(cache.read(key)).to eq([])
|
||||
end
|
||||
|
||||
it 'gracefully handles bad cached entry' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return('[')
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to be_nil
|
||||
end
|
||||
|
||||
it 'gracefully handles an empty array' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return('[]')
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to eq([])
|
||||
end
|
||||
|
||||
it 'gracefully handles unknown attributes' do
|
||||
allow(backend).to receive(:read)
|
||||
.with(expanded_key)
|
||||
.and_return([{ unknown_attribute: 1 }, broadcast_message.attributes].to_json)
|
||||
|
||||
expect(cache.read(key, BroadcastMessage)).to eq([broadcast_message])
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe '#write' do
|
||||
it 'writes value to the cache with the given key' do
|
||||
cache.write(key, true)
|
||||
|
||||
expect(backend).to have_received(:write).with(expanded_key, "true", nil)
|
||||
end
|
||||
|
||||
it 'writes a string containing a JSON representation of the value to the cache' do
|
||||
cache.write(key, broadcast_message)
|
||||
|
||||
expect(backend).to have_received(:write)
|
||||
.with(expanded_key, broadcast_message.to_json, nil)
|
||||
end
|
||||
|
||||
it 'passes options the underlying cache implementation' do
|
||||
cache.write(key, true, expires_in: 15.seconds)
|
||||
|
||||
expect(backend).to have_received(:write)
|
||||
.with(expanded_key, "true", expires_in: 15.seconds)
|
||||
end
|
||||
|
||||
it 'passes options the underlying cache implementation when options is empty' do
|
||||
cache.write(key, true, {})
|
||||
|
||||
expect(backend).to have_received(:write)
|
||||
.with(expanded_key, "true", {})
|
||||
end
|
||||
|
||||
it 'passes options the underlying cache implementation when options is nil' do
|
||||
cache.write(key, true, nil)
|
||||
|
||||
expect(backend).to have_received(:write)
|
||||
.with(expanded_key, "true", nil)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#fetch', :use_clean_rails_memory_store_caching do
|
||||
let(:backend) { Rails.cache }
|
||||
|
||||
it 'requires a block' do
|
||||
expect { cache.fetch(key) }.to raise_error(LocalJumpError)
|
||||
end
|
||||
|
||||
it 'passes options the underlying cache implementation' do
|
||||
expect(backend).to receive(:write)
|
||||
.with(expanded_key, "true", expires_in: 15.seconds)
|
||||
|
||||
cache.fetch(key, expires_in: 15.seconds) { true }
|
||||
end
|
||||
|
||||
context 'when the given key does not exist in the cache' do
|
||||
context 'when the result of the block is truthy' do
|
||||
it 'returns the result of the block' do
|
||||
result = cache.fetch(key) { true }
|
||||
|
||||
expect(result).to eq(true)
|
||||
end
|
||||
|
||||
it 'caches the value' do
|
||||
expect(backend).to receive(:write).with(expanded_key, "true", {})
|
||||
|
||||
cache.fetch(key) { true }
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the result of the block is false' do
|
||||
it 'returns the result of the block' do
|
||||
result = cache.fetch(key) { false }
|
||||
|
||||
expect(result).to eq(false)
|
||||
end
|
||||
|
||||
it 'caches the value' do
|
||||
expect(backend).to receive(:write).with(expanded_key, "false", {})
|
||||
|
||||
cache.fetch(key) { false }
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the result of the block is nil' do
|
||||
it 'returns the result of the block' do
|
||||
result = cache.fetch(key) { nil }
|
||||
|
||||
expect(result).to eq(nil)
|
||||
end
|
||||
|
||||
it 'caches the value' do
|
||||
expect(backend).to receive(:write).with(expanded_key, "null", {})
|
||||
|
||||
cache.fetch(key) { nil }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the given key exists in the cache' do
|
||||
context 'when the cached value is a hash' do
|
||||
before do
|
||||
backend.write(expanded_key, broadcast_message.to_json)
|
||||
end
|
||||
|
||||
it 'parses the cached value' do
|
||||
result = cache.fetch(key, as: BroadcastMessage) { 'block result' }
|
||||
|
||||
expect(result).to eq(broadcast_message)
|
||||
end
|
||||
|
||||
it "returns the result of the block when 'as' option is nil" do
|
||||
result = cache.fetch(key, as: nil) { 'block result' }
|
||||
|
||||
expect(result).to eq('block result')
|
||||
end
|
||||
|
||||
it "returns the result of the block when 'as' option is not informed" do
|
||||
result = cache.fetch(key) { 'block result' }
|
||||
|
||||
expect(result).to eq('block result')
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the cached value is a array' do
|
||||
before do
|
||||
backend.write(expanded_key, [broadcast_message].to_json)
|
||||
end
|
||||
|
||||
it 'parses the cached value' do
|
||||
result = cache.fetch(key, as: BroadcastMessage) { 'block result' }
|
||||
|
||||
expect(result).to eq([broadcast_message])
|
||||
end
|
||||
|
||||
it "returns an empty array when 'as' option is nil" do
|
||||
result = cache.fetch(key, as: nil) { 'block result' }
|
||||
|
||||
expect(result).to eq([])
|
||||
end
|
||||
|
||||
it "returns an empty array when 'as' option is not informed" do
|
||||
result = cache.fetch(key) { 'block result' }
|
||||
|
||||
expect(result).to eq([])
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the cached value is true' do
|
||||
before do
|
||||
backend.write(expanded_key, "true")
|
||||
end
|
||||
|
||||
it 'returns the cached value' do
|
||||
result = cache.fetch(key) { 'block result' }
|
||||
|
||||
expect(result).to eq(true)
|
||||
end
|
||||
|
||||
it 'does not execute the block' do
|
||||
expect { |block| cache.fetch(key, &block) }.not_to yield_control
|
||||
end
|
||||
|
||||
it 'does not write to the cache' do
|
||||
expect(backend).not_to receive(:write)
|
||||
|
||||
cache.fetch(key) { 'block result' }
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the cached value is false' do
|
||||
before do
|
||||
backend.write(expanded_key, "false")
|
||||
end
|
||||
|
||||
it 'returns the cached value' do
|
||||
result = cache.fetch(key) { 'block result' }
|
||||
|
||||
expect(result).to eq(false)
|
||||
end
|
||||
|
||||
it 'does not execute the block' do
|
||||
expect { |block| cache.fetch(key, &block) }.not_to yield_control
|
||||
end
|
||||
|
||||
it 'does not write to the cache' do
|
||||
expect(backend).not_to receive(:write)
|
||||
|
||||
cache.fetch(key) { 'block result' }
|
||||
end
|
||||
end
|
||||
|
||||
context 'when the cached value is nil' do
|
||||
before do
|
||||
backend.write(expanded_key, "null")
|
||||
end
|
||||
|
||||
it 'returns the result of the block' do
|
||||
result = cache.fetch(key) { 'block result' }
|
||||
|
||||
expect(result).to eq('block result')
|
||||
end
|
||||
|
||||
it 'writes the result of the block to the cache' do
|
||||
expect(backend).to receive(:write)
|
||||
.with(expanded_key, 'block result'.to_json, {})
|
||||
|
||||
cache.fetch(key) { 'block result' }
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -49,7 +49,7 @@ describe BroadcastMessage do
|
|||
it 'caches the output of the query' do
|
||||
create(:broadcast_message)
|
||||
|
||||
expect(described_class).to receive(:where).and_call_original.once
|
||||
expect(described_class).to receive(:current_and_future_messages).and_call_original.once
|
||||
|
||||
described_class.current
|
||||
|
||||
|
@ -93,27 +93,6 @@ describe BroadcastMessage do
|
|||
expect(Rails.cache).to receive(:delete).with(described_class::LEGACY_CACHE_KEY)
|
||||
expect(described_class.current.length).to eq(0)
|
||||
end
|
||||
|
||||
it 'gracefully handles bad cache entry' do
|
||||
allow(described_class).to receive(:current_and_future_messages).and_return('{')
|
||||
|
||||
expect(described_class.current).to be_empty
|
||||
end
|
||||
|
||||
it 'gracefully handles an empty hash' do
|
||||
allow(described_class).to receive(:current_and_future_messages).and_return('{}')
|
||||
|
||||
expect(described_class.current).to be_empty
|
||||
end
|
||||
|
||||
it 'gracefully handles unknown attributes' do
|
||||
message = create(:broadcast_message)
|
||||
|
||||
allow(described_class).to receive(:current_and_future_messages)
|
||||
.and_return([{ bad_attr: 1 }, message])
|
||||
|
||||
expect(described_class.current).to eq([message])
|
||||
end
|
||||
end
|
||||
|
||||
describe '#active?' do
|
||||
|
|
Loading…
Reference in New Issue