mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rejigger raw_push for Pro's atomic batch fix
This commit is contained in:
parent
d8a50c6947
commit
ac69c7ee5e
1 changed files with 17 additions and 14 deletions
|
@ -176,23 +176,26 @@ module Sidekiq
|
|||
private
|
||||
|
||||
def raw_push(payloads)
|
||||
pushed = false
|
||||
@redis_pool.with do |conn|
|
||||
if payloads.first['at']
|
||||
pushed = conn.zadd('schedule', payloads.map do |hash|
|
||||
at = hash.delete('at').to_s
|
||||
[at, Sidekiq.dump_json(hash)]
|
||||
end)
|
||||
else
|
||||
q = payloads.first['queue']
|
||||
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
|
||||
_, pushed = conn.multi do
|
||||
conn.sadd('queues', q)
|
||||
conn.lpush("queue:#{q}", to_push)
|
||||
end
|
||||
conn.multi do
|
||||
atomic_push(conn, payloads)
|
||||
end
|
||||
end
|
||||
pushed
|
||||
true
|
||||
end
|
||||
|
||||
def atomic_push(conn, payloads)
|
||||
if payloads.first['at']
|
||||
conn.zadd('schedule', payloads.map do |hash|
|
||||
at = hash.delete('at').to_s
|
||||
[at, Sidekiq.dump_json(hash)]
|
||||
end)
|
||||
else
|
||||
q = payloads.first['queue']
|
||||
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }
|
||||
conn.sadd('queues', q)
|
||||
conn.lpush("queue:#{q}", to_push)
|
||||
end
|
||||
end
|
||||
|
||||
def process_single(worker_class, item)
|
||||
|
|
Loading…
Reference in a new issue