mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor redis client adapter to be standalone and remove unnecessary namespacing
This commit is contained in:
parent
aaac999c6d
commit
f38cf3050c
4 changed files with 164 additions and 162 deletions
154
lib/sidekiq/redis_client_adapter.rb
Normal file
154
lib/sidekiq/redis_client_adapter.rb
Normal file
|
@ -0,0 +1,154 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "connection_pool"
|
||||
require "redis_client"
|
||||
require "redis_client/decorator"
|
||||
require "uri"
|
||||
|
||||
module Sidekiq
|
||||
class RedisClientAdapter
|
||||
BaseError = RedisClient::Error
|
||||
CommandError = RedisClient::CommandError
|
||||
|
||||
module CompatMethods
|
||||
def info
|
||||
@client.call("INFO") { |i| i.lines(chomp: true).map { |l| l.split(":", 2) }.select { |l| l.size == 2 }.to_h }
|
||||
end
|
||||
|
||||
def evalsha(sha, keys, argv)
|
||||
@client.call("EVALSHA", sha, keys.size, *keys, *argv)
|
||||
end
|
||||
|
||||
def brpoplpush(*args)
|
||||
@client.blocking_call(false, "BRPOPLPUSH", *args)
|
||||
end
|
||||
|
||||
def brpop(*args)
|
||||
@client.blocking_call(false, "BRPOP", *args)
|
||||
end
|
||||
|
||||
def set(*args)
|
||||
@client.call("SET", *args) { |r| r == "OK" }
|
||||
end
|
||||
ruby2_keywords :set if respond_to?(:ruby2_keywords, true)
|
||||
|
||||
def sismember(*args)
|
||||
@client.call("SISMEMBER", *args) { |c| c > 0 }
|
||||
end
|
||||
|
||||
def exists?(key)
|
||||
@client.call("EXISTS", key) { |c| c > 0 }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def method_missing(*args, &block)
|
||||
@client.call(*args, *block)
|
||||
end
|
||||
ruby2_keywords :method_missing if respond_to?(:ruby2_keywords, true)
|
||||
|
||||
def respond_to_missing?(name, include_private = false)
|
||||
super # Appease the linter. We can't tell what is a valid command.
|
||||
end
|
||||
end
|
||||
|
||||
CompatClient = RedisClient::Decorator.create(CompatMethods)
|
||||
|
||||
class CompatClient
|
||||
%i[scan sscan zscan hscan].each do |method|
|
||||
alias_method :"#{method}_each", method
|
||||
undef_method method
|
||||
end
|
||||
|
||||
def disconnect!
|
||||
@client.close
|
||||
end
|
||||
|
||||
def connection
|
||||
{id: @client.id}
|
||||
end
|
||||
|
||||
def redis
|
||||
self
|
||||
end
|
||||
|
||||
def _client
|
||||
@client
|
||||
end
|
||||
|
||||
def message
|
||||
yield nil, @queue.pop
|
||||
end
|
||||
|
||||
# NB: this method does not return
|
||||
def subscribe(chan)
|
||||
@queue = ::Queue.new
|
||||
|
||||
pubsub = @client.pubsub
|
||||
pubsub.call("subscribe", chan)
|
||||
|
||||
loop do
|
||||
evt = pubsub.next_event
|
||||
next if evt.nil?
|
||||
next unless evt[0] == "message" && evt[1] == chan
|
||||
|
||||
(_, _, msg) = evt
|
||||
@queue << msg
|
||||
yield self
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(options)
|
||||
opts = client_opts(options)
|
||||
@config = if opts.key?(:sentinels)
|
||||
RedisClient.sentinel(**opts)
|
||||
else
|
||||
RedisClient.config(**opts)
|
||||
end
|
||||
end
|
||||
|
||||
def new_client
|
||||
CompatClient.new(@config.new_client)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def client_opts(options)
|
||||
opts = options.dup
|
||||
|
||||
if opts[:namespace]
|
||||
Sidekiq.logger.error("Your Redis configuration uses the namespace '#{opts[:namespace]}' but this feature isn't supported by redis-client. " \
|
||||
"Either use the redis adapter or remove the namespace.")
|
||||
Kernel.exit(-127)
|
||||
end
|
||||
|
||||
opts.delete(:size)
|
||||
opts.delete(:pool_timeout)
|
||||
|
||||
if opts[:network_timeout]
|
||||
opts[:timeout] = opts[:network_timeout]
|
||||
opts.delete(:network_timeout)
|
||||
end
|
||||
|
||||
if opts[:driver]
|
||||
opts[:driver] = opts[:driver].to_sym
|
||||
end
|
||||
|
||||
opts[:name] = opts.delete(:master_name) if opts.key?(:master_name)
|
||||
opts[:role] = opts[:role].to_sym if opts.key?(:role)
|
||||
opts.delete(:url) if opts.key?(:sentinels)
|
||||
|
||||
# Issue #3303, redis-rb will silently retry an operation.
|
||||
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
||||
# is performed twice but I believe this is much, much rarer
|
||||
# than the reconnect silently fixing a problem; we keep it
|
||||
# on by default.
|
||||
opts[:reconnect_attempts] ||= 1
|
||||
|
||||
opts
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
Sidekiq::RedisConnection.adapter = Sidekiq::RedisClientAdapter
|
|
@ -6,8 +6,6 @@ require "uri"
|
|||
|
||||
module Sidekiq
|
||||
module RedisConnection
|
||||
autoload :RedisClientAdapter, "sidekiq/redis_connection/redis_client_adapter"
|
||||
|
||||
class RedisAdapter
|
||||
BaseError = Redis::BaseError
|
||||
CommandError = Redis::CommandError
|
||||
|
@ -66,16 +64,20 @@ module Sidekiq
|
|||
class << self
|
||||
attr_reader :adapter
|
||||
|
||||
# RedisConnection.adapter = :redis
|
||||
# RedisConnection.adapter = :redis_client
|
||||
def adapter=(adapter)
|
||||
raise "no" if adapter == self
|
||||
@adapter = case adapter
|
||||
when :redis_client
|
||||
RedisClientAdapter
|
||||
result = case adapter
|
||||
when :redis
|
||||
RedisAdapter
|
||||
else
|
||||
when Class
|
||||
adapter
|
||||
else
|
||||
require "sidekiq/#{adapter}_adapter"
|
||||
nil
|
||||
end
|
||||
@adapter = result if result
|
||||
end
|
||||
|
||||
def create(options = {})
|
||||
|
|
|
@ -1,154 +0,0 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
require "connection_pool"
|
||||
require "redis_client"
|
||||
require "redis_client/decorator"
|
||||
require "uri"
|
||||
|
||||
module Sidekiq
|
||||
module RedisConnection
|
||||
class RedisClientAdapter
|
||||
BaseError = RedisClient::Error
|
||||
CommandError = RedisClient::CommandError
|
||||
|
||||
module CompatMethods
|
||||
def info
|
||||
@client.call("INFO") { |i| i.lines(chomp: true).map { |l| l.split(":", 2) }.select { |l| l.size == 2 }.to_h }
|
||||
end
|
||||
|
||||
def evalsha(sha, keys, argv)
|
||||
@client.call("EVALSHA", sha, keys.size, *keys, *argv)
|
||||
end
|
||||
|
||||
def brpoplpush(*args)
|
||||
@client.blocking_call(false, "BRPOPLPUSH", *args)
|
||||
end
|
||||
|
||||
def brpop(*args)
|
||||
@client.blocking_call(false, "BRPOP", *args)
|
||||
end
|
||||
|
||||
def set(*args)
|
||||
@client.call("SET", *args) { |r| r == "OK" }
|
||||
end
|
||||
ruby2_keywords :set if respond_to?(:ruby2_keywords, true)
|
||||
|
||||
def sismember(*args)
|
||||
@client.call("SISMEMBER", *args) { |c| c > 0 }
|
||||
end
|
||||
|
||||
def exists?(key)
|
||||
@client.call("EXISTS", key) { |c| c > 0 }
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def method_missing(*args, &block)
|
||||
@client.call(*args, *block)
|
||||
end
|
||||
ruby2_keywords :method_missing if respond_to?(:ruby2_keywords, true)
|
||||
|
||||
def respond_to_missing?(name, include_private = false)
|
||||
super # Appease the linter. We can't tell what is a valid command.
|
||||
end
|
||||
end
|
||||
|
||||
CompatClient = RedisClient::Decorator.create(CompatMethods)
|
||||
|
||||
class CompatClient
|
||||
%i[scan sscan zscan hscan].each do |method|
|
||||
alias_method :"#{method}_each", method
|
||||
undef_method method
|
||||
end
|
||||
|
||||
def disconnect!
|
||||
@client.close
|
||||
end
|
||||
|
||||
def connection
|
||||
{id: @client.id}
|
||||
end
|
||||
|
||||
def redis
|
||||
self
|
||||
end
|
||||
|
||||
def _client
|
||||
@client
|
||||
end
|
||||
|
||||
def message
|
||||
yield nil, @queue.pop
|
||||
end
|
||||
|
||||
# NB: this method does not return
|
||||
def subscribe(chan)
|
||||
@queue = ::Queue.new
|
||||
|
||||
pubsub = @client.pubsub
|
||||
pubsub.call("subscribe", chan)
|
||||
|
||||
loop do
|
||||
evt = pubsub.next_event
|
||||
next if evt.nil?
|
||||
next unless evt[0] == "message" && evt[1] == chan
|
||||
|
||||
(_, _, msg) = evt
|
||||
@queue << msg
|
||||
yield self
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(options)
|
||||
opts = client_opts(options)
|
||||
@config = if opts.key?(:sentinels)
|
||||
RedisClient.sentinel(**opts)
|
||||
else
|
||||
RedisClient.config(**opts)
|
||||
end
|
||||
end
|
||||
|
||||
def new_client
|
||||
CompatClient.new(@config.new_client)
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def client_opts(options)
|
||||
opts = options.dup
|
||||
|
||||
if opts[:namespace]
|
||||
Sidekiq.logger.error("Your Redis configuration uses the namespace '#{opts[:namespace]}' but this feature isn't supported by redis-client. " \
|
||||
"Either use the redis adapter or remove the namespace.")
|
||||
Kernel.exit(-127)
|
||||
end
|
||||
|
||||
opts.delete(:size)
|
||||
opts.delete(:pool_timeout)
|
||||
|
||||
if opts[:network_timeout]
|
||||
opts[:timeout] = opts[:network_timeout]
|
||||
opts.delete(:network_timeout)
|
||||
end
|
||||
|
||||
if opts[:driver]
|
||||
opts[:driver] = opts[:driver].to_sym
|
||||
end
|
||||
|
||||
opts[:name] = opts.delete(:master_name) if opts.key?(:master_name)
|
||||
opts[:role] = opts[:role].to_sym if opts.key?(:role)
|
||||
opts.delete(:url) if opts.key?(:sentinels)
|
||||
|
||||
# Issue #3303, redis-rb will silently retry an operation.
|
||||
# This can lead to duplicate jobs if Sidekiq::Client's LPUSH
|
||||
# is performed twice but I believe this is much, much rarer
|
||||
# than the reconnect silently fixing a problem; we keep it
|
||||
# on by default.
|
||||
opts[:reconnect_attempts] ||= 1
|
||||
|
||||
opts
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -34,12 +34,12 @@ describe Sidekiq::RedisConnection do
|
|||
end
|
||||
|
||||
def self.redis_client?
|
||||
Sidekiq::RedisConnection.adapter == Sidekiq::RedisConnection::RedisClientAdapter
|
||||
Sidekiq::RedisConnection.adapter.name == "Sidekiq::RedisClientAdapter"
|
||||
end
|
||||
|
||||
if redis_client?
|
||||
def client_class
|
||||
Sidekiq::RedisConnection::RedisClientAdapter::CompatClient
|
||||
Sidekiq::RedisClientAdapter::CompatClient
|
||||
end
|
||||
else
|
||||
def client_class
|
||||
|
|
Loading…
Add table
Reference in a new issue