mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add Sidekiq::Workers#prune API to remove orphaned records
This commit is contained in:
parent
08d277cd37
commit
404069ab83
5 changed files with 54 additions and 33 deletions
|
@ -2,6 +2,7 @@
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
- Auto-prune jobs older than one hour from the Workers page [#1508]
|
- Auto-prune jobs older than one hour from the Workers page [#1508]
|
||||||
|
- Add Sidekiq::Workers#prune which can perform the auto-pruning.
|
||||||
- Fix issue where a job could be lost when an exception occurs updating
|
- Fix issue where a job could be lost when an exception occurs updating
|
||||||
Redis stats before the job executes [#1511]
|
Redis stats before the job executes [#1511]
|
||||||
|
|
||||||
|
|
|
@ -424,9 +424,10 @@ module Sidekiq
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
workers = conn.smembers("workers")
|
workers = conn.smembers("workers")
|
||||||
workers.each do |w|
|
workers.each do |w|
|
||||||
msg, time = conn.mget("worker:#{w}", "worker:#{w}:started")
|
json = conn.get("worker:#{w}")
|
||||||
next unless msg
|
next unless json
|
||||||
block.call(w, Sidekiq.load_json(msg), time)
|
msg = Sidekiq.load_json(json)
|
||||||
|
block.call(w, msg, Time.at(msg['run_at']).to_s)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -436,6 +437,36 @@ module Sidekiq
|
||||||
conn.scard("workers")
|
conn.scard("workers")
|
||||||
end.to_i
|
end.to_i
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Prune old worker entries from the Busy set. Worker entries
|
||||||
|
# can be orphaned if Sidekiq hard crashes while processing jobs.
|
||||||
|
# Default is to delete worker entries older than one hour.
|
||||||
|
#
|
||||||
|
# Returns the number of records removed.
|
||||||
|
def prune(older_than=60*60)
|
||||||
|
to_rem = []
|
||||||
|
Sidekiq.redis do |conn|
|
||||||
|
conn.smembers('workers').each do |w|
|
||||||
|
msg = conn.get("worker:#{w}")
|
||||||
|
if !msg
|
||||||
|
to_rem << w
|
||||||
|
else
|
||||||
|
m = Sidekiq.load_json(msg)
|
||||||
|
run_at = Time.at(m['run_at'])
|
||||||
|
# prune jobs older than one hour
|
||||||
|
if run_at < (Time.now - older_than)
|
||||||
|
to_rem << w
|
||||||
|
else
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
if to_rem.size > 0
|
||||||
|
Sidekiq.redis { |conn| conn.srem('workers', to_rem) }
|
||||||
|
end
|
||||||
|
to_rem.size
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -47,38 +47,11 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
MAX_JOB_DURATION = 60*60
|
|
||||||
|
|
||||||
def workers
|
def workers
|
||||||
@workers ||= begin
|
@workers ||= begin
|
||||||
to_rem = []
|
Sidekiq::Workers.new.tap do |w|
|
||||||
workers = Sidekiq.redis do |conn|
|
w.prune
|
||||||
conn.smembers('workers').map do |w|
|
|
||||||
msg = conn.get("worker:#{w}")
|
|
||||||
if !msg
|
|
||||||
to_rem << w
|
|
||||||
nil
|
|
||||||
else
|
|
||||||
m = Sidekiq.load_json(msg)
|
|
||||||
run_at = Time.at(m['run_at'])
|
|
||||||
# prune jobs older than one hour
|
|
||||||
if run_at < (Time.now - MAX_JOB_DURATION)
|
|
||||||
to_rem << w
|
|
||||||
nil
|
|
||||||
else
|
|
||||||
[w, m, run_at]
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end.compact.sort { |x| x[1] ? -1 : 1 }
|
|
||||||
end
|
end
|
||||||
|
|
||||||
# Detect and clear out any orphaned worker records.
|
|
||||||
# These can be left in Redis if Sidekiq crashes hard
|
|
||||||
# while processing jobs.
|
|
||||||
if to_rem.size > 0
|
|
||||||
Sidekiq.redis { |conn| conn.srem('workers', to_rem) }
|
|
||||||
end
|
|
||||||
workers
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -362,6 +362,22 @@ class TestApi < Sidekiq::Test
|
||||||
assert_equal 'default', y['queue']
|
assert_equal 'default', y['queue']
|
||||||
assert_equal Time.now.year, DateTime.parse(z).year
|
assert_equal Time.now.year, DateTime.parse(z).year
|
||||||
end
|
end
|
||||||
|
|
||||||
|
s = '12346'
|
||||||
|
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
|
||||||
|
Sidekiq.redis do |c|
|
||||||
|
c.multi do
|
||||||
|
c.sadd('workers', s)
|
||||||
|
c.set("worker:#{s}", data)
|
||||||
|
c.set("worker:#{s}:started", Time.now.to_s)
|
||||||
|
c.sadd('workers', '123457')
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_equal 3, w.size
|
||||||
|
count = w.prune
|
||||||
|
assert_equal 1, w.size
|
||||||
|
assert_equal 2, count
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'can reschedule jobs' do
|
it 'can reschedule jobs' do
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
<td>
|
<td>
|
||||||
<div class="args"><%= display_args(msg['payload']['args']) %></div>
|
<div class="args"><%= display_args(msg['payload']['args']) %></div>
|
||||||
</td>
|
</td>
|
||||||
<td><%= relative_time(run_at) %></td>
|
<td><%= relative_time(run_at.is_a?(String) ? DateTime.parse(run_at) : run_at) %></td>
|
||||||
</tr>
|
</tr>
|
||||||
<% end %>
|
<% end %>
|
||||||
</table>
|
</table>
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue