From 7ec820e9cc9cf7bf7cedb6a6ad157a2b56b03436 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 24 Apr 2012 07:15:29 -0700 Subject: [PATCH] Add 24 hr expiry, only register workers while working [#156] Future work will use redis zsets to heartbeat Sidekiq processes --- Changes.md | 7 +++++++ lib/sidekiq/processor.rb | 8 ++++---- lib/sidekiq/util.rb | 2 ++ lib/sidekiq/version.rb | 2 +- lib/sidekiq/web.rb | 8 +++----- test/test_stats.rb | 11 ----------- test/test_web.rb | 6 +++--- 7 files changed, 20 insertions(+), 24 deletions(-) diff --git a/Changes.md b/Changes.md index 9610e546..e3456c02 100644 --- a/Changes.md +++ b/Changes.md @@ -1,3 +1,10 @@ +1.1.4 +----------- + +- Add 24 hr expiry for basic keys set in Redis, to avoid any possible leaking. +- Only register workers in Redis while working, to avoid lingering + workers [#156] + 1.1.3 ----------- diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 900b9059..60715aee 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -23,7 +23,6 @@ module Sidekiq def initialize(boss) @boss = boss - redis {|x| x.sadd('workers', self) } end def process(msg, queue) @@ -53,9 +52,10 @@ module Sidekiq def stats(worker, msg, queue) redis do |conn| conn.multi do - conn.set("worker:#{self}:started", Time.now.to_s) + conn.sadd('workers', self) + conn.setex("worker:#{self}:started", DEFAULT_EXPIRY, Time.now.to_s) hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")} - conn.set("worker:#{self}", Sidekiq.dump_json(hash)) + conn.setex("worker:#{self}", DEFAULT_EXPIRY, Sidekiq.dump_json(hash)) end end @@ -69,13 +69,13 @@ module Sidekiq conn.multi do conn.incrby("stat:failed", 1) conn.del("stat:processed:#{self}") - conn.srem("workers", self) end end raise ensure redis do |conn| conn.multi do + conn.srem("workers", self) conn.del("worker:#{self}") conn.del("worker:#{self}:started") conn.incrby("stat:processed", 1) diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index b74819a5..702fa072 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -7,6 +7,8 @@ module Sidekiq # module Util + DEFAULT_EXPIRY = 24 * 60 * 60 + class Pretty < Logger::Formatter # Provide a call() method that returns the formatted message. def call(severity, time, program_name, message) diff --git a/lib/sidekiq/version.rb b/lib/sidekiq/version.rb index 01a29f1e..e7c7298f 100644 --- a/lib/sidekiq/version.rb +++ b/lib/sidekiq/version.rb @@ -1,3 +1,3 @@ module Sidekiq - VERSION = "1.1.3" + VERSION = "1.1.4" end diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index 9410d78c..5ab52e22 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -52,9 +52,8 @@ module Sidekiq Sidekiq.redis do |conn| conn.smembers('workers').map do |w| msg = conn.get("worker:#{w}") - msg = Sidekiq.load_json(msg) if msg - [w, msg] - end.sort { |x| x[1] ? -1 : 1 } + msg ? [w, Sidekiq.load_json(msg)] : nil + end.compact.sort { |x| x[1] ? -1 : 1 } end end end @@ -102,8 +101,7 @@ module Sidekiq end def current_status - return 'down' if workers.size == 0 - return 'idle' if workers.size > 0 && workers.map { |x| x[1] }.compact.size == 0 + return 'idle' if workers.size == 0 return 'active' end diff --git a/test/test_stats.rb b/test/test_stats.rb index bd1be3da..40634439 100644 --- a/test/test_stats.rb +++ b/test/test_stats.rb @@ -29,11 +29,6 @@ class TestStats < MiniTest::Unit::TestCase processor = Sidekiq::Processor.new(boss) boss.expect(:processor_done!, nil, [processor]) - # adds to the workers set upon initialize - set = conn.smembers('workers') - assert_equal 1, set.size - assert_match(/#{Regexp.escape(`hostname`.strip)}/, set.first) - assert_equal 0, conn.get('stat:failed').to_i assert_equal 0, conn.get('stat:processed').to_i assert_equal 0, conn.get("stat:processed:#{processor}").to_i @@ -42,9 +37,6 @@ class TestStats < MiniTest::Unit::TestCase processor.process(msg, 'xyzzy') processor.process(msg, 'xyzzy') - set = conn.smembers('workers') - assert_equal 1, set.size - assert_match(/#{Regexp.escape(`hostname`.strip)}/, set.first) assert_equal 0, conn.get('stat:failed').to_i assert_equal 3, conn.get('stat:processed').to_i assert_equal 3, conn.get("stat:processed:#{processor}").to_i @@ -61,15 +53,12 @@ class TestStats < MiniTest::Unit::TestCase assert_equal 0, conn.get('stat:processed').to_i processor = Sidekiq::Processor.new(boss) - assert_equal 1, conn.smembers('workers').size pstr = processor.to_s assert_raises RuntimeError do processor.process(msg, 'xyzzy') end - set = conn.smembers('workers') - assert_equal 0, set.size assert_equal 1, conn.get('stat:failed').to_i assert_equal 1, conn.get('stat:processed').to_i assert_equal nil, conn.get("stat:processed:#{pstr}") diff --git a/test/test_web.rb b/test/test_web.rb index 548b453a..a513df5f 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -27,14 +27,14 @@ class TestWeb < MiniTest::Unit::TestCase it 'shows active queues' do get '/' assert_equal 200, last_response.status - assert_match last_response.body, /Sidekiq is down/ + assert_match last_response.body, /Sidekiq is idle/ refute_match last_response.body, /default/ assert WebWorker.perform_async(1, 2) get '/' assert_equal 200, last_response.status - assert_match last_response.body, /Sidekiq is down/ + assert_match last_response.body, /Sidekiq is idle/ assert_match last_response.body, /default/ refute_match last_response.body, /foo/ @@ -42,7 +42,7 @@ class TestWeb < MiniTest::Unit::TestCase get '/' assert_equal 200, last_response.status - assert_match last_response.body, /Sidekiq is down/ + assert_match last_response.body, /Sidekiq is idle/ assert_match last_response.body, /default/ assert_match last_response.body, /foo/ end