mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Fix deprecated uses of Redis#pipelined
and Redis#multi
(#5139)
Context: https://github.com/redis/redis-rb/pull/1059 The following is deprecated ```ruby redis.pipelined do redis.get(key) end ``` And should be rewritten as: ```ruby redis.pipelined do |pipeline| pipeline.get(key) end ``` Functionally it makes no difference. Co-authored-by: Jean Boussier <jean.boussier@gmail.com>
This commit is contained in:
parent
34d081f4ed
commit
c792342753
8 changed files with 84 additions and 84 deletions
|
@ -54,14 +54,14 @@ module Sidekiq
|
|||
# O(1) redis calls
|
||||
def fetch_stats_fast!
|
||||
pipe1_res = Sidekiq.redis { |conn|
|
||||
conn.pipelined do
|
||||
conn.get("stat:processed")
|
||||
conn.get("stat:failed")
|
||||
conn.zcard("schedule")
|
||||
conn.zcard("retry")
|
||||
conn.zcard("dead")
|
||||
conn.scard("processes")
|
||||
conn.lrange("queue:default", -1, -1)
|
||||
conn.pipelined do |pipeline|
|
||||
pipeline.get("stat:processed")
|
||||
pipeline.get("stat:failed")
|
||||
pipeline.zcard("schedule")
|
||||
pipeline.zcard("retry")
|
||||
pipeline.zcard("dead")
|
||||
pipeline.scard("processes")
|
||||
pipeline.lrange("queue:default", -1, -1)
|
||||
end
|
||||
}
|
||||
|
||||
|
@ -101,9 +101,9 @@ module Sidekiq
|
|||
}
|
||||
|
||||
pipe2_res = Sidekiq.redis { |conn|
|
||||
conn.pipelined do
|
||||
processes.each { |key| conn.hget(key, "busy") }
|
||||
queues.each { |queue| conn.llen("queue:#{queue}") }
|
||||
conn.pipelined do |pipeline|
|
||||
processes.each { |key| pipeline.hget(key, "busy") }
|
||||
queues.each { |queue| pipeline.llen("queue:#{queue}") }
|
||||
end
|
||||
}
|
||||
|
||||
|
@ -147,9 +147,9 @@ module Sidekiq
|
|||
Sidekiq.redis do |conn|
|
||||
queues = conn.sscan_each("queues").to_a
|
||||
|
||||
lengths = conn.pipelined {
|
||||
lengths = conn.pipelined { |pipeline|
|
||||
queues.each do |queue|
|
||||
conn.llen("queue:#{queue}")
|
||||
pipeline.llen("queue:#{queue}")
|
||||
end
|
||||
}
|
||||
|
||||
|
@ -287,9 +287,9 @@ module Sidekiq
|
|||
|
||||
def clear
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.unlink(@rname)
|
||||
conn.srem("queues", name)
|
||||
conn.multi do |transaction|
|
||||
transaction.unlink(@rname)
|
||||
transaction.srem("queues", name)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -519,9 +519,9 @@ module Sidekiq
|
|||
|
||||
def remove_job
|
||||
Sidekiq.redis do |conn|
|
||||
results = conn.multi {
|
||||
conn.zrangebyscore(parent.name, score, score)
|
||||
conn.zremrangebyscore(parent.name, score, score)
|
||||
results = conn.multi { |transaction|
|
||||
transaction.zrangebyscore(parent.name, score, score)
|
||||
transaction.zremrangebyscore(parent.name, score, score)
|
||||
}.first
|
||||
|
||||
if results.size == 1
|
||||
|
@ -542,9 +542,9 @@ module Sidekiq
|
|||
yield msg if msg
|
||||
|
||||
# push the rest back onto the sorted set
|
||||
conn.multi do
|
||||
conn.multi do |transaction|
|
||||
nonmatched.each do |message|
|
||||
conn.zadd(parent.name, score.to_f.to_s, message)
|
||||
transaction.zadd(parent.name, score.to_f.to_s, message)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -731,10 +731,10 @@ module Sidekiq
|
|||
def kill(message, opts = {})
|
||||
now = Time.now.to_f
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.zadd(name, now.to_s, message)
|
||||
conn.zremrangebyscore(name, "-inf", now - self.class.timeout)
|
||||
conn.zremrangebyrank(name, 0, - self.class.max_jobs)
|
||||
conn.multi do |transaction|
|
||||
transaction.zadd(name, now.to_s, message)
|
||||
transaction.zremrangebyscore(name, "-inf", now - self.class.timeout)
|
||||
transaction.zremrangebyrank(name, 0, - self.class.max_jobs)
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -782,9 +782,9 @@ module Sidekiq
|
|||
count = 0
|
||||
Sidekiq.redis do |conn|
|
||||
procs = conn.sscan_each("processes").to_a.sort
|
||||
heartbeats = conn.pipelined {
|
||||
heartbeats = conn.pipelined { |pipeline|
|
||||
procs.each do |key|
|
||||
conn.hget(key, "info")
|
||||
pipeline.hget(key, "info")
|
||||
end
|
||||
}
|
||||
|
||||
|
@ -806,9 +806,9 @@ module Sidekiq
|
|||
# We're making a tradeoff here between consuming more memory instead of
|
||||
# making more roundtrips to Redis, but if you have hundreds or thousands of workers,
|
||||
# you'll be happier this way
|
||||
conn.pipelined do
|
||||
conn.pipelined do |pipeline|
|
||||
procs.each do |key|
|
||||
conn.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
|
||||
pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
|
||||
end
|
||||
end
|
||||
}
|
||||
|
@ -922,9 +922,9 @@ module Sidekiq
|
|||
def signal(sig)
|
||||
key = "#{identity}-signals"
|
||||
Sidekiq.redis do |c|
|
||||
c.multi do
|
||||
c.lpush(key, sig)
|
||||
c.expire(key, 60)
|
||||
c.multi do |transaction|
|
||||
transaction.lpush(key, sig)
|
||||
transaction.expire(key, 60)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -958,9 +958,9 @@ module Sidekiq
|
|||
Sidekiq.redis do |conn|
|
||||
procs = conn.sscan_each("processes").to_a
|
||||
procs.sort.each do |key|
|
||||
valid, workers = conn.pipelined {
|
||||
conn.exists?(key)
|
||||
conn.hgetall("#{key}:workers")
|
||||
valid, workers = conn.pipelined { |pipeline|
|
||||
pipeline.exists?(key)
|
||||
pipeline.hgetall("#{key}:workers")
|
||||
}
|
||||
next unless valid
|
||||
workers.each_pair do |tid, json|
|
||||
|
@ -988,9 +988,9 @@ module Sidekiq
|
|||
if procs.empty?
|
||||
0
|
||||
else
|
||||
conn.pipelined {
|
||||
conn.pipelined { |pipeline|
|
||||
procs.each do |key|
|
||||
conn.hget(key, "busy")
|
||||
pipeline.hget(key, "busy")
|
||||
end
|
||||
}.sum(&:to_i)
|
||||
end
|
||||
|
|
|
@ -189,8 +189,8 @@ module Sidekiq
|
|||
|
||||
def raw_push(payloads)
|
||||
@redis_pool.with do |conn|
|
||||
conn.pipelined do
|
||||
atomic_push(conn, payloads)
|
||||
conn.pipelined do |pipeline|
|
||||
atomic_push(pipeline, payloads)
|
||||
end
|
||||
end
|
||||
true
|
||||
|
|
|
@ -59,9 +59,9 @@ module Sidekiq
|
|||
end
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.pipelined do |pipeline|
|
||||
jobs_to_requeue.each do |queue, jobs|
|
||||
conn.rpush(queue, jobs)
|
||||
pipeline.rpush(queue, jobs)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -84,9 +84,9 @@ module Sidekiq
|
|||
# Note we don't stop the heartbeat thread; if the process
|
||||
# doesn't actually exit, it'll reappear in the Web UI.
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.srem("processes", identity)
|
||||
conn.unlink("#{identity}:workers")
|
||||
conn.pipelined do |pipeline|
|
||||
pipeline.srem("processes", identity)
|
||||
pipeline.unlink("#{identity}:workers")
|
||||
end
|
||||
end
|
||||
rescue
|
||||
|
@ -107,14 +107,14 @@ module Sidekiq
|
|||
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
||||
begin
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
conn.incrby("stat:processed", procd)
|
||||
conn.incrby("stat:processed:#{nowdate}", procd)
|
||||
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
|
||||
conn.pipelined do |pipeline|
|
||||
pipeline.incrby("stat:processed", procd)
|
||||
pipeline.incrby("stat:processed:#{nowdate}", procd)
|
||||
pipeline.expire("stat:processed:#{nowdate}", STATS_TTL)
|
||||
|
||||
conn.incrby("stat:failed", fails)
|
||||
conn.incrby("stat:failed:#{nowdate}", fails)
|
||||
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
|
||||
pipeline.incrby("stat:failed", fails)
|
||||
pipeline.incrby("stat:failed:#{nowdate}", fails)
|
||||
pipeline.expire("stat:failed:#{nowdate}", STATS_TTL)
|
||||
end
|
||||
end
|
||||
rescue => ex
|
||||
|
@ -138,16 +138,16 @@ module Sidekiq
|
|||
nowdate = Time.now.utc.strftime("%Y-%m-%d")
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.incrby("stat:processed", procd)
|
||||
conn.incrby("stat:processed:#{nowdate}", procd)
|
||||
conn.expire("stat:processed:#{nowdate}", STATS_TTL)
|
||||
conn.multi do |transaction|
|
||||
transaction.incrby("stat:processed", procd)
|
||||
transaction.incrby("stat:processed:#{nowdate}", procd)
|
||||
transaction.expire("stat:processed:#{nowdate}", STATS_TTL)
|
||||
|
||||
conn.incrby("stat:failed", fails)
|
||||
conn.incrby("stat:failed:#{nowdate}", fails)
|
||||
conn.expire("stat:failed:#{nowdate}", STATS_TTL)
|
||||
transaction.incrby("stat:failed", fails)
|
||||
transaction.incrby("stat:failed:#{nowdate}", fails)
|
||||
transaction.expire("stat:failed:#{nowdate}", STATS_TTL)
|
||||
|
||||
conn.unlink(workers_key)
|
||||
transaction.unlink(workers_key)
|
||||
curstate.each_pair do |tid, hash|
|
||||
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
|
||||
end
|
||||
|
@ -161,17 +161,17 @@ module Sidekiq
|
|||
kb = memory_usage(::Process.pid)
|
||||
|
||||
_, exists, _, _, msg = Sidekiq.redis { |conn|
|
||||
conn.multi {
|
||||
conn.sadd("processes", key)
|
||||
conn.exists?(key)
|
||||
conn.hmset(key, "info", to_json,
|
||||
conn.multi { |transaction|
|
||||
transaction.sadd("processes", key)
|
||||
transaction.exists?(key)
|
||||
transaction.hmset(key, "info", to_json,
|
||||
"busy", curstate.size,
|
||||
"beat", Time.now.to_f,
|
||||
"rtt_us", rtt,
|
||||
"quiet", @done,
|
||||
"rss", kb)
|
||||
conn.expire(key, 60)
|
||||
conn.rpop("#{key}-signals")
|
||||
transaction.expire(key, 60)
|
||||
transaction.rpop("#{key}-signals")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -16,22 +16,22 @@ module Sidekiq
|
|||
|
||||
case type
|
||||
when "zset"
|
||||
total_size, items = conn.multi {
|
||||
conn.zcard(key)
|
||||
total_size, items = conn.multi { |transaction|
|
||||
transaction.zcard(key)
|
||||
if rev
|
||||
conn.zrevrange(key, starting, ending, with_scores: true)
|
||||
transaction.zrevrange(key, starting, ending, with_scores: true)
|
||||
else
|
||||
conn.zrange(key, starting, ending, with_scores: true)
|
||||
transaction.zrange(key, starting, ending, with_scores: true)
|
||||
end
|
||||
}
|
||||
[current_page, total_size, items]
|
||||
when "list"
|
||||
total_size, items = conn.multi {
|
||||
total_size, items = conn.multi { |transaction|
|
||||
conn.llen(key)
|
||||
if rev
|
||||
conn.lrange(key, -ending - 1, -starting - 1)
|
||||
transaction.lrange(key, -ending - 1, -starting - 1)
|
||||
else
|
||||
conn.lrange(key, starting, ending)
|
||||
transaction.lrange(key, starting, ending)
|
||||
end
|
||||
}
|
||||
items.reverse! if rev
|
||||
|
|
|
@ -568,10 +568,10 @@ describe 'API' do
|
|||
|
||||
time = Time.now.to_f
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.sadd('processes', odata['key'])
|
||||
conn.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time)
|
||||
conn.sadd('processes', 'fake:pid')
|
||||
conn.multi do |transaction|
|
||||
transaction.sadd('processes', odata['key'])
|
||||
transaction.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'beat', time)
|
||||
transaction.sadd('processes', 'fake:pid')
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -621,9 +621,9 @@ describe 'API' do
|
|||
s = "#{key}:workers"
|
||||
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
|
||||
Sidekiq.redis do |c|
|
||||
c.multi do
|
||||
c.hmset(s, '5678', data)
|
||||
c.hmset("b#{s}", '5678', data)
|
||||
c.multi do |transaction|
|
||||
transaction.hmset(s, '5678', data)
|
||||
transaction.hmset("b#{s}", '5678', data)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -415,7 +415,7 @@ describe Sidekiq::Client do
|
|||
|
||||
it 'allows Resque helpers to point to different Redi' do
|
||||
conn = MiniTest::Mock.new
|
||||
conn.expect(:pipelined, []) { |*args, &block| block.call }
|
||||
conn.expect(:pipelined, []) { |*args, &block| block.call(conn) }
|
||||
conn.expect(:zadd, 1, [String, Array])
|
||||
DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn })
|
||||
Sidekiq::Client.enqueue_in(10, DWorker, 3)
|
||||
|
|
|
@ -718,10 +718,10 @@ describe Sidekiq::Web do
|
|||
key = "#{hostname}:#{$$}"
|
||||
msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}"
|
||||
Sidekiq.redis do |conn|
|
||||
conn.multi do
|
||||
conn.sadd("processes", key)
|
||||
conn.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4)
|
||||
conn.hmset("#{key}:workers", Time.now.to_f, msg)
|
||||
conn.multi do |transaction|
|
||||
transaction.sadd("processes", key)
|
||||
transaction.hmset(key, 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f, "queues" => []), 'at', Time.now.to_f, 'busy', 4)
|
||||
transaction.hmset("#{key}:workers", Time.now.to_f, msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue