1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Remove much of the redis-client compat layer

This commit is contained in:
Mike Perham 2022-09-01 14:51:16 -07:00
parent 58ef385518
commit 0395eea2a4
No known key found for this signature in database
6 changed files with 19 additions and 43 deletions

View file

@ -58,7 +58,7 @@ module Sidekiq
def queues def queues
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
queues = conn.sscan_each("queues").to_a queues = conn.sscan("queues").to_a
lengths = conn.pipelined { |pipeline| lengths = conn.pipelined { |pipeline|
queues.each do |queue| queues.each do |queue|
@ -115,11 +115,11 @@ module Sidekiq
# @api private # @api private
def fetch_stats_slow! def fetch_stats_slow!
processes = Sidekiq.redis { |conn| processes = Sidekiq.redis { |conn|
conn.sscan_each("processes").to_a conn.sscan("processes").to_a
} }
queues = Sidekiq.redis { |conn| queues = Sidekiq.redis { |conn|
conn.sscan_each("queues").to_a conn.sscan("queues").to_a
} }
pipe2_res = Sidekiq.redis { |conn| pipe2_res = Sidekiq.redis { |conn|
@ -223,7 +223,7 @@ module Sidekiq
# #
# @return [Array<Sidekiq::Queue>] # @return [Array<Sidekiq::Queue>]
def self.all def self.all
Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) }
end end
attr_reader :name attr_reader :name
@ -601,7 +601,7 @@ module Sidekiq
match = "*#{match}*" unless match.include?("*") match = "*#{match}*" unless match.include?("*")
@pool.with do |conn| @pool.with do |conn|
conn.zscan_each(name, match: match, count: count) do |entry, score| conn.zscan(name, match: match, count: count) do |entry, score|
yield SortedEntry.new(self, score, entry) yield SortedEntry.new(self, score, entry)
end end
end end
@ -691,7 +691,7 @@ module Sidekiq
# @return [SortedEntry] the record or nil # @return [SortedEntry] the record or nil
def find_job(jid) def find_job(jid)
@pool.with do |conn| @pool.with do |conn|
conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score| conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score|
job = JSON.parse(entry) job = JSON.parse(entry)
matched = job["jid"] == jid matched = job["jid"] == jid
return SortedEntry.new(self, score, entry) if matched return SortedEntry.new(self, score, entry) if matched
@ -832,7 +832,7 @@ module Sidekiq
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
count = 0 count = 0
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a.sort procs = conn.sscan("processes").to_a.sort
heartbeats = conn.pipelined { |pipeline| heartbeats = conn.pipelined { |pipeline|
procs.each do |key| procs.each do |key|
pipeline.hget(key, "info") pipeline.hget(key, "info")
@ -852,7 +852,7 @@ module Sidekiq
def each def each
result = Sidekiq.redis { |conn| result = Sidekiq.redis { |conn|
procs = conn.sscan_each("processes").to_a.sort procs = conn.sscan("processes").to_a.sort
# We're making a tradeoff here between consuming more memory instead of # We're making a tradeoff here between consuming more memory instead of
# making more roundtrips to Redis, but if you have hundreds or thousands of workers, # making more roundtrips to Redis, but if you have hundreds or thousands of workers,
@ -1027,13 +1027,13 @@ module Sidekiq
def each(&block) def each(&block)
results = [] results = []
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a procs = conn.sscan("processes").to_a
procs.sort.each do |key| procs.sort.each do |key|
valid, workers = conn.pipelined { |pipeline| valid, workers = conn.pipelined { |pipeline|
pipeline.exists?(key) pipeline.exists(key)
pipeline.hgetall("#{key}:work") pipeline.hgetall("#{key}:work")
} }
next unless valid next unless valid > 0
workers.each_pair do |tid, json| workers.each_pair do |tid, json|
hsh = Sidekiq.load_json(json) hsh = Sidekiq.load_json(json)
p = hsh["payload"] p = hsh["payload"]
@ -1055,7 +1055,7 @@ module Sidekiq
# which can easily get out of sync with crashy processes. # which can easily get out of sync with crashy processes.
def size def size
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
procs = conn.sscan_each("processes").to_a procs = conn.sscan("processes").to_a
if procs.empty? if procs.empty?
0 0
else else

View file

@ -47,7 +47,7 @@ module Sidekiq # :nodoc:
return nil return nil
end end
queue, job = redis { |conn| conn.brpop(*qs) } queue, job = redis { |conn| conn.blocking_call(false, "brpop", *qs) }
UnitOfWork.new(queue, job, config) if queue UnitOfWork.new(queue, job, config) if queue
end end

View file

@ -154,7 +154,7 @@ module Sidekiq
_, exists, _, _, msg = redis { |conn| _, exists, _, _, msg = redis { |conn|
conn.multi { |transaction| conn.multi { |transaction|
transaction.sadd("processes", key) transaction.sadd("processes", key)
transaction.exists?(key) transaction.exists(key)
transaction.hmset(key, "info", to_json, transaction.hmset(key, "info", to_json,
"busy", curstate.size, "busy", curstate.size,
"beat", Time.now.to_f, "beat", Time.now.to_f,
@ -167,7 +167,7 @@ module Sidekiq
} }
# first heartbeat or recovering from an outage and need to reestablish our heartbeat # first heartbeat or recovering from an outage and need to reestablish our heartbeat
fire_event(:heartbeat) unless exists fire_event(:heartbeat) unless exists > 0
fire_event(:beat, oneshot: false) fire_event(:beat, oneshot: false)
return unless msg return unless msg

View file

@ -17,29 +17,10 @@ module Sidekiq
@client.call("EVALSHA", sha, keys.size, *keys, *argv) @client.call("EVALSHA", sha, keys.size, *keys, *argv)
end end
def brpoplpush(*args)
@client.blocking_call(false, "BRPOPLPUSH", *args)
end
def brpop(*args)
@client.blocking_call(false, "BRPOP", *args)
end
def set(*args)
@client.call("SET", *args) { |r| r == "OK" }
end
ruby2_keywords :set if respond_to?(:ruby2_keywords, true)
def sismember(*args)
@client.call("SISMEMBER", *args) { |c| c > 0 }
end
def exists?(key)
@client.call("EXISTS", key) { |c| c > 0 }
end
private private
# this allows us to use methods like `conn.hmset(...)` instead of having to use
# redis-client's native `conn.call("hmset", ...)`
def method_missing(*args, &block) def method_missing(*args, &block)
@client.call(*args, *block) @client.call(*args, *block)
end end
@ -53,11 +34,6 @@ module Sidekiq
CompatClient = RedisClient::Decorator.create(CompatMethods) CompatClient = RedisClient::Decorator.create(CompatMethods)
class CompatClient class CompatClient
%i[scan sscan zscan hscan].each do |method|
alias_method :"#{method}_each", method
undef_method method
end
# underscore methods are not official API # underscore methods are not official API
def _client def _client
@client @client

View file

@ -414,7 +414,7 @@ describe "API" do
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
refute conn.smembers("queues").include?("foo") refute conn.smembers("queues").include?("foo")
refute conn.exists?("queue:foo") refute(conn.exists("queue:foo") > 0)
end end
end end

View file

@ -163,7 +163,7 @@ describe Sidekiq::Web do
@config.redis do |conn| @config.redis do |conn|
refute conn.smembers("queues").include?("foo") refute conn.smembers("queues").include?("foo")
refute conn.exists?("queue:foo") refute(conn.exists("queue:foo") > 0)
end end
end end