From 404069ab83f9f57f8ac30f849d6a4556ab0ff24f Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Thu, 27 Feb 2014 21:15:08 -0800 Subject: [PATCH] Add Sidekiq::Workers#prune API to remove orphaned records --- Changes.md | 1 + lib/sidekiq/api.rb | 37 ++++++++++++++++++++++++++++++++++--- lib/sidekiq/web_helpers.rb | 31 ++----------------------------- test/test_api.rb | 16 ++++++++++++++++ web/views/_workers.erb | 2 +- 5 files changed, 54 insertions(+), 33 deletions(-) diff --git a/Changes.md b/Changes.md index 0b10795a..db2bf616 100644 --- a/Changes.md +++ b/Changes.md @@ -2,6 +2,7 @@ ----------- - Auto-prune jobs older than one hour from the Workers page [#1508] +- Add Sidekiq::Workers#prune which can perform the auto-pruning. - Fix issue where a job could be lost when an exception occurs updating Redis stats before the job executes [#1511] diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 460b1b69..1f8a8ac8 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -424,9 +424,10 @@ module Sidekiq Sidekiq.redis do |conn| workers = conn.smembers("workers") workers.each do |w| - msg, time = conn.mget("worker:#{w}", "worker:#{w}:started") - next unless msg - block.call(w, Sidekiq.load_json(msg), time) + json = conn.get("worker:#{w}") + next unless json + msg = Sidekiq.load_json(json) + block.call(w, msg, Time.at(msg['run_at']).to_s) end end end @@ -436,6 +437,36 @@ module Sidekiq conn.scard("workers") end.to_i end + + # Prune old worker entries from the Busy set. Worker entries + # can be orphaned if Sidekiq hard crashes while processing jobs. + # Default is to delete worker entries older than one hour. + # + # Returns the number of records removed. + def prune(older_than=60*60) + to_rem = [] + Sidekiq.redis do |conn| + conn.smembers('workers').each do |w| + msg = conn.get("worker:#{w}") + if !msg + to_rem << w + else + m = Sidekiq.load_json(msg) + run_at = Time.at(m['run_at']) + # prune jobs older than one hour + if run_at < (Time.now - older_than) + to_rem << w + else + end + end + end + end + + if to_rem.size > 0 + Sidekiq.redis { |conn| conn.srem('workers', to_rem) } + end + to_rem.size + end end end diff --git a/lib/sidekiq/web_helpers.rb b/lib/sidekiq/web_helpers.rb index 3acecf2b..36f02102 100644 --- a/lib/sidekiq/web_helpers.rb +++ b/lib/sidekiq/web_helpers.rb @@ -47,38 +47,11 @@ module Sidekiq end end - MAX_JOB_DURATION = 60*60 - def workers @workers ||= begin - to_rem = [] - workers = Sidekiq.redis do |conn| - conn.smembers('workers').map do |w| - msg = conn.get("worker:#{w}") - if !msg - to_rem << w - nil - else - m = Sidekiq.load_json(msg) - run_at = Time.at(m['run_at']) - # prune jobs older than one hour - if run_at < (Time.now - MAX_JOB_DURATION) - to_rem << w - nil - else - [w, m, run_at] - end - end - end.compact.sort { |x| x[1] ? -1 : 1 } + Sidekiq::Workers.new.tap do |w| + w.prune end - - # Detect and clear out any orphaned worker records. - # These can be left in Redis if Sidekiq crashes hard - # while processing jobs. - if to_rem.size > 0 - Sidekiq.redis { |conn| conn.srem('workers', to_rem) } - end - workers end end diff --git a/test/test_api.rb b/test/test_api.rb index c08e3031..e8ec16ce 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -362,6 +362,22 @@ class TestApi < Sidekiq::Test assert_equal 'default', y['queue'] assert_equal Time.now.year, DateTime.parse(z).year end + + s = '12346' + data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) }) + Sidekiq.redis do |c| + c.multi do + c.sadd('workers', s) + c.set("worker:#{s}", data) + c.set("worker:#{s}:started", Time.now.to_s) + c.sadd('workers', '123457') + end + end + + assert_equal 3, w.size + count = w.prune + assert_equal 1, w.size + assert_equal 2, count end it 'can reschedule jobs' do diff --git a/web/views/_workers.erb b/web/views/_workers.erb index d5896690..2e35d6b3 100644 --- a/web/views/_workers.erb +++ b/web/views/_workers.erb @@ -16,7 +16,7 @@
<%= display_args(msg['payload']['args']) %>
- <%= relative_time(run_at) %> + <%= relative_time(run_at.is_a?(String) ? DateTime.parse(run_at) : run_at) %> <% end %>