mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge branch 'master' of github.com:mperham/sidekiq
This commit is contained in:
commit
dea48a3f94
3 changed files with 16 additions and 26 deletions
|
@ -140,13 +140,8 @@ module Sidekiq
|
|||
end
|
||||
}
|
||||
|
||||
i = 0
|
||||
array_of_arrays = queues.each_with_object({}) { |queue, memo|
|
||||
memo[queue] = lengths[i]
|
||||
i += 1
|
||||
}.sort_by { |_, size| size }
|
||||
|
||||
Hash[array_of_arrays.reverse]
|
||||
array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
|
||||
Hash[array_of_arrays]
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -168,18 +163,12 @@ module Sidekiq
|
|||
private
|
||||
|
||||
def date_stat_hash(stat)
|
||||
i = 0
|
||||
stat_hash = {}
|
||||
keys = []
|
||||
dates = []
|
||||
dates = @start_date.downto(@start_date - @days_previous + 1).map { |date|
|
||||
date.strftime("%Y-%m-%d")
|
||||
}
|
||||
|
||||
while i < @days_previous
|
||||
date = @start_date - i
|
||||
datestr = date.strftime("%Y-%m-%d")
|
||||
keys << "stat:#{stat}:#{datestr}"
|
||||
dates << datestr
|
||||
i += 1
|
||||
end
|
||||
keys = dates.map { |datestr| "stat:#{stat}:#{datestr}" }
|
||||
|
||||
begin
|
||||
Sidekiq.redis do |conn|
|
||||
|
@ -478,7 +467,7 @@ module Sidekiq
|
|||
|
||||
def reschedule(at)
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zincrby(@parent.name, at - @score, Sidekiq.dump_json(@item))
|
||||
conn.zincrby(@parent.name, at.to_f - @score, Sidekiq.dump_json(@item))
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -523,7 +512,7 @@ module Sidekiq
|
|||
else
|
||||
# multiple jobs with the same score
|
||||
# find the one with the right JID and push it
|
||||
hash = results.group_by { |message|
|
||||
matched, nonmatched = results.partition { |message|
|
||||
if message.index(jid)
|
||||
msg = Sidekiq.load_json(message)
|
||||
msg["jid"] == jid
|
||||
|
@ -532,12 +521,12 @@ module Sidekiq
|
|||
end
|
||||
}
|
||||
|
||||
msg = hash.fetch(true, []).first
|
||||
msg = matched.first
|
||||
yield msg if msg
|
||||
|
||||
# push the rest back onto the sorted set
|
||||
conn.multi do
|
||||
hash.fetch(false, []).each do |message|
|
||||
nonmatched.each do |message|
|
||||
conn.zadd(parent.name, score.to_f.to_s, message)
|
||||
end
|
||||
end
|
||||
|
@ -785,10 +774,9 @@ module Sidekiq
|
|||
# the hash named key has an expiry of 60 seconds.
|
||||
# if it's not found, that means the process has not reported
|
||||
# in to Redis and probably died.
|
||||
to_prune = []
|
||||
heartbeats.each_with_index do |beat, i|
|
||||
to_prune << procs[i] if beat.nil?
|
||||
end
|
||||
to_prune = procs.select.with_index { |proc, i|
|
||||
heartbeats[i].nil?
|
||||
}
|
||||
count = conn.srem("processes", to_prune) unless to_prune.empty?
|
||||
end
|
||||
count
|
||||
|
|
|
@ -58,7 +58,7 @@ module Sidekiq
|
|||
# touch the connection pool so it is created before we
|
||||
# fire startup and start multithreading.
|
||||
ver = Sidekiq.redis_info["redis_version"]
|
||||
raise "You are using Redis v#{ver}, Sidekiq requires Redis v4.0.0 or greater" if ver < "4"
|
||||
raise "You are connecting to Redis v#{ver}, Sidekiq requires Redis v4.0.0 or greater" if ver < "4"
|
||||
|
||||
# Since the user can pass us a connection pool explicitly in the initializer, we
|
||||
# need to verify the size is large enough or else Sidekiq's performance is dramatically slowed.
|
||||
|
|
|
@ -613,11 +613,13 @@ describe 'API' do
|
|||
refute(retries.map { |r| r.score > (Time.now.to_f + 9) }.any?)
|
||||
|
||||
retries.each do |retri|
|
||||
retri.reschedule(Time.now + 15) if retri.jid == 'foo1'
|
||||
retri.reschedule(Time.now.to_f + 10) if retri.jid == 'foo2'
|
||||
end
|
||||
|
||||
assert_equal 2, retries.size
|
||||
assert(retries.map { |r| r.score > (Time.now.to_f + 9) }.any?)
|
||||
assert(retries.map { |r| r.score > (Time.now.to_f + 14) }.any?)
|
||||
end
|
||||
|
||||
it 'prunes processes which have died' do
|
||||
|
|
Loading…
Add table
Reference in a new issue