mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add 24 hr expiry, only register workers while working [#156]
Future work will use redis zsets to heartbeat Sidekiq processes
This commit is contained in:
parent
3b1955d1d7
commit
7ec820e9cc
7 changed files with 20 additions and 24 deletions
|
@ -1,3 +1,10 @@
|
||||||
|
1.1.4
|
||||||
|
-----------
|
||||||
|
|
||||||
|
- Add 24 hr expiry for basic keys set in Redis, to avoid any possible leaking.
|
||||||
|
- Only register workers in Redis while working, to avoid lingering
|
||||||
|
workers [#156]
|
||||||
|
|
||||||
1.1.3
|
1.1.3
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
|
|
|
@ -23,7 +23,6 @@ module Sidekiq
|
||||||
|
|
||||||
def initialize(boss)
|
def initialize(boss)
|
||||||
@boss = boss
|
@boss = boss
|
||||||
redis {|x| x.sadd('workers', self) }
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def process(msg, queue)
|
def process(msg, queue)
|
||||||
|
@ -53,9 +52,10 @@ module Sidekiq
|
||||||
def stats(worker, msg, queue)
|
def stats(worker, msg, queue)
|
||||||
redis do |conn|
|
redis do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
conn.set("worker:#{self}:started", Time.now.to_s)
|
conn.sadd('workers', self)
|
||||||
|
conn.setex("worker:#{self}:started", DEFAULT_EXPIRY, Time.now.to_s)
|
||||||
hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")}
|
hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")}
|
||||||
conn.set("worker:#{self}", Sidekiq.dump_json(hash))
|
conn.setex("worker:#{self}", DEFAULT_EXPIRY, Sidekiq.dump_json(hash))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -69,13 +69,13 @@ module Sidekiq
|
||||||
conn.multi do
|
conn.multi do
|
||||||
conn.incrby("stat:failed", 1)
|
conn.incrby("stat:failed", 1)
|
||||||
conn.del("stat:processed:#{self}")
|
conn.del("stat:processed:#{self}")
|
||||||
conn.srem("workers", self)
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
raise
|
raise
|
||||||
ensure
|
ensure
|
||||||
redis do |conn|
|
redis do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
|
conn.srem("workers", self)
|
||||||
conn.del("worker:#{self}")
|
conn.del("worker:#{self}")
|
||||||
conn.del("worker:#{self}:started")
|
conn.del("worker:#{self}:started")
|
||||||
conn.incrby("stat:processed", 1)
|
conn.incrby("stat:processed", 1)
|
||||||
|
|
|
@ -7,6 +7,8 @@ module Sidekiq
|
||||||
#
|
#
|
||||||
module Util
|
module Util
|
||||||
|
|
||||||
|
DEFAULT_EXPIRY = 24 * 60 * 60
|
||||||
|
|
||||||
class Pretty < Logger::Formatter
|
class Pretty < Logger::Formatter
|
||||||
# Provide a call() method that returns the formatted message.
|
# Provide a call() method that returns the formatted message.
|
||||||
def call(severity, time, program_name, message)
|
def call(severity, time, program_name, message)
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
VERSION = "1.1.3"
|
VERSION = "1.1.4"
|
||||||
end
|
end
|
||||||
|
|
|
@ -52,9 +52,8 @@ module Sidekiq
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
conn.smembers('workers').map do |w|
|
conn.smembers('workers').map do |w|
|
||||||
msg = conn.get("worker:#{w}")
|
msg = conn.get("worker:#{w}")
|
||||||
msg = Sidekiq.load_json(msg) if msg
|
msg ? [w, Sidekiq.load_json(msg)] : nil
|
||||||
[w, msg]
|
end.compact.sort { |x| x[1] ? -1 : 1 }
|
||||||
end.sort { |x| x[1] ? -1 : 1 }
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -102,8 +101,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def current_status
|
def current_status
|
||||||
return 'down' if workers.size == 0
|
return 'idle' if workers.size == 0
|
||||||
return 'idle' if workers.size > 0 && workers.map { |x| x[1] }.compact.size == 0
|
|
||||||
return 'active'
|
return 'active'
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -29,11 +29,6 @@ class TestStats < MiniTest::Unit::TestCase
|
||||||
processor = Sidekiq::Processor.new(boss)
|
processor = Sidekiq::Processor.new(boss)
|
||||||
boss.expect(:processor_done!, nil, [processor])
|
boss.expect(:processor_done!, nil, [processor])
|
||||||
|
|
||||||
# adds to the workers set upon initialize
|
|
||||||
set = conn.smembers('workers')
|
|
||||||
assert_equal 1, set.size
|
|
||||||
assert_match(/#{Regexp.escape(`hostname`.strip)}/, set.first)
|
|
||||||
|
|
||||||
assert_equal 0, conn.get('stat:failed').to_i
|
assert_equal 0, conn.get('stat:failed').to_i
|
||||||
assert_equal 0, conn.get('stat:processed').to_i
|
assert_equal 0, conn.get('stat:processed').to_i
|
||||||
assert_equal 0, conn.get("stat:processed:#{processor}").to_i
|
assert_equal 0, conn.get("stat:processed:#{processor}").to_i
|
||||||
|
@ -42,9 +37,6 @@ class TestStats < MiniTest::Unit::TestCase
|
||||||
processor.process(msg, 'xyzzy')
|
processor.process(msg, 'xyzzy')
|
||||||
processor.process(msg, 'xyzzy')
|
processor.process(msg, 'xyzzy')
|
||||||
|
|
||||||
set = conn.smembers('workers')
|
|
||||||
assert_equal 1, set.size
|
|
||||||
assert_match(/#{Regexp.escape(`hostname`.strip)}/, set.first)
|
|
||||||
assert_equal 0, conn.get('stat:failed').to_i
|
assert_equal 0, conn.get('stat:failed').to_i
|
||||||
assert_equal 3, conn.get('stat:processed').to_i
|
assert_equal 3, conn.get('stat:processed').to_i
|
||||||
assert_equal 3, conn.get("stat:processed:#{processor}").to_i
|
assert_equal 3, conn.get("stat:processed:#{processor}").to_i
|
||||||
|
@ -61,15 +53,12 @@ class TestStats < MiniTest::Unit::TestCase
|
||||||
assert_equal 0, conn.get('stat:processed').to_i
|
assert_equal 0, conn.get('stat:processed').to_i
|
||||||
|
|
||||||
processor = Sidekiq::Processor.new(boss)
|
processor = Sidekiq::Processor.new(boss)
|
||||||
assert_equal 1, conn.smembers('workers').size
|
|
||||||
|
|
||||||
pstr = processor.to_s
|
pstr = processor.to_s
|
||||||
assert_raises RuntimeError do
|
assert_raises RuntimeError do
|
||||||
processor.process(msg, 'xyzzy')
|
processor.process(msg, 'xyzzy')
|
||||||
end
|
end
|
||||||
|
|
||||||
set = conn.smembers('workers')
|
|
||||||
assert_equal 0, set.size
|
|
||||||
assert_equal 1, conn.get('stat:failed').to_i
|
assert_equal 1, conn.get('stat:failed').to_i
|
||||||
assert_equal 1, conn.get('stat:processed').to_i
|
assert_equal 1, conn.get('stat:processed').to_i
|
||||||
assert_equal nil, conn.get("stat:processed:#{pstr}")
|
assert_equal nil, conn.get("stat:processed:#{pstr}")
|
||||||
|
|
|
@ -27,14 +27,14 @@ class TestWeb < MiniTest::Unit::TestCase
|
||||||
it 'shows active queues' do
|
it 'shows active queues' do
|
||||||
get '/'
|
get '/'
|
||||||
assert_equal 200, last_response.status
|
assert_equal 200, last_response.status
|
||||||
assert_match last_response.body, /Sidekiq is down/
|
assert_match last_response.body, /Sidekiq is idle/
|
||||||
refute_match last_response.body, /default/
|
refute_match last_response.body, /default/
|
||||||
|
|
||||||
assert WebWorker.perform_async(1, 2)
|
assert WebWorker.perform_async(1, 2)
|
||||||
|
|
||||||
get '/'
|
get '/'
|
||||||
assert_equal 200, last_response.status
|
assert_equal 200, last_response.status
|
||||||
assert_match last_response.body, /Sidekiq is down/
|
assert_match last_response.body, /Sidekiq is idle/
|
||||||
assert_match last_response.body, /default/
|
assert_match last_response.body, /default/
|
||||||
refute_match last_response.body, /foo/
|
refute_match last_response.body, /foo/
|
||||||
|
|
||||||
|
@ -42,7 +42,7 @@ class TestWeb < MiniTest::Unit::TestCase
|
||||||
|
|
||||||
get '/'
|
get '/'
|
||||||
assert_equal 200, last_response.status
|
assert_equal 200, last_response.status
|
||||||
assert_match last_response.body, /Sidekiq is down/
|
assert_match last_response.body, /Sidekiq is idle/
|
||||||
assert_match last_response.body, /default/
|
assert_match last_response.body, /default/
|
||||||
assert_match last_response.body, /foo/
|
assert_match last_response.body, /foo/
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue