diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index dfe1eef3..c710d348 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -58,7 +58,7 @@ module Sidekiq def queues Sidekiq.redis do |conn| - queues = conn.sscan_each("queues").to_a + queues = conn.sscan("queues").to_a lengths = conn.pipelined { |pipeline| queues.each do |queue| @@ -115,11 +115,11 @@ module Sidekiq # @api private def fetch_stats_slow! processes = Sidekiq.redis { |conn| - conn.sscan_each("processes").to_a + conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| - conn.sscan_each("queues").to_a + conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| @@ -223,7 +223,7 @@ module Sidekiq # # @return [Array] def self.all - Sidekiq.redis { |c| c.sscan_each("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } + Sidekiq.redis { |c| c.sscan("queues").to_a }.sort.map { |q| Sidekiq::Queue.new(q) } end attr_reader :name @@ -601,7 +601,7 @@ module Sidekiq match = "*#{match}*" unless match.include?("*") @pool.with do |conn| - conn.zscan_each(name, match: match, count: count) do |entry, score| + conn.zscan(name, match: match, count: count) do |entry, score| yield SortedEntry.new(self, score, entry) end end @@ -691,7 +691,7 @@ module Sidekiq # @return [SortedEntry] the record or nil def find_job(jid) @pool.with do |conn| - conn.zscan_each(name, match: "*#{jid}*", count: 100) do |entry, score| + conn.zscan(name, match: "*#{jid}*", count: 100) do |entry, score| job = JSON.parse(entry) matched = job["jid"] == jid return SortedEntry.new(self, score, entry) if matched @@ -832,7 +832,7 @@ module Sidekiq return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } count = 0 Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a.sort + procs = conn.sscan("processes").to_a.sort heartbeats = conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "info") @@ -852,7 +852,7 @@ module Sidekiq def each result = Sidekiq.redis { |conn| - procs = conn.sscan_each("processes").to_a.sort + procs = conn.sscan("processes").to_a.sort # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, @@ -1027,13 +1027,13 @@ module Sidekiq def each(&block) results = [] Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a procs.sort.each do |key| valid, workers = conn.pipelined { |pipeline| - pipeline.exists?(key) + pipeline.exists(key) pipeline.hgetall("#{key}:work") } - next unless valid + next unless valid > 0 workers.each_pair do |tid, json| hsh = Sidekiq.load_json(json) p = hsh["payload"] @@ -1055,7 +1055,7 @@ module Sidekiq # which can easily get out of sync with crashy processes. def size Sidekiq.redis do |conn| - procs = conn.sscan_each("processes").to_a + procs = conn.sscan("processes").to_a if procs.empty? 0 else diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 9de9cd47..f297e4ef 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -47,7 +47,7 @@ module Sidekiq # :nodoc: return nil end - queue, job = redis { |conn| conn.brpop(*qs) } + queue, job = redis { |conn| conn.blocking_call(false, "brpop", *qs) } UnitOfWork.new(queue, job, config) if queue end diff --git a/lib/sidekiq/launcher.rb b/lib/sidekiq/launcher.rb index 036dedd4..de108e3c 100644 --- a/lib/sidekiq/launcher.rb +++ b/lib/sidekiq/launcher.rb @@ -154,7 +154,7 @@ module Sidekiq _, exists, _, _, msg = redis { |conn| conn.multi { |transaction| transaction.sadd("processes", key) - transaction.exists?(key) + transaction.exists(key) transaction.hmset(key, "info", to_json, "busy", curstate.size, "beat", Time.now.to_f, @@ -167,7 +167,7 @@ module Sidekiq } # first heartbeat or recovering from an outage and need to reestablish our heartbeat - fire_event(:heartbeat) unless exists + fire_event(:heartbeat) unless exists > 0 fire_event(:beat, oneshot: false) return unless msg diff --git a/lib/sidekiq/redis_client_adapter.rb b/lib/sidekiq/redis_client_adapter.rb index 8059c88b..85151a42 100644 --- a/lib/sidekiq/redis_client_adapter.rb +++ b/lib/sidekiq/redis_client_adapter.rb @@ -17,29 +17,10 @@ module Sidekiq @client.call("EVALSHA", sha, keys.size, *keys, *argv) end - def brpoplpush(*args) - @client.blocking_call(false, "BRPOPLPUSH", *args) - end - - def brpop(*args) - @client.blocking_call(false, "BRPOP", *args) - end - - def set(*args) - @client.call("SET", *args) { |r| r == "OK" } - end - ruby2_keywords :set if respond_to?(:ruby2_keywords, true) - - def sismember(*args) - @client.call("SISMEMBER", *args) { |c| c > 0 } - end - - def exists?(key) - @client.call("EXISTS", key) { |c| c > 0 } - end - private + # this allows us to use methods like `conn.hmset(...)` instead of having to use + # redis-client's native `conn.call("hmset", ...)` def method_missing(*args, &block) @client.call(*args, *block) end @@ -53,11 +34,6 @@ module Sidekiq CompatClient = RedisClient::Decorator.create(CompatMethods) class CompatClient - %i[scan sscan zscan hscan].each do |method| - alias_method :"#{method}_each", method - undef_method method - end - # underscore methods are not official API def _client @client diff --git a/test/api.rb b/test/api.rb index f9eb6e89..085af3c4 100644 --- a/test/api.rb +++ b/test/api.rb @@ -414,7 +414,7 @@ describe "API" do Sidekiq.redis do |conn| refute conn.smembers("queues").include?("foo") - refute conn.exists?("queue:foo") + refute(conn.exists("queue:foo") > 0) end end diff --git a/test/web.rb b/test/web.rb index 00aac627..e3cf7850 100644 --- a/test/web.rb +++ b/test/web.rb @@ -163,7 +163,7 @@ describe Sidekiq::Web do @config.redis do |conn| refute conn.smembers("queues").include?("foo") - refute conn.exists?("queue:foo") + refute(conn.exists("queue:foo") > 0) end end