1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

merge master

This commit is contained in:
Mike Perham 2014-02-24 20:06:48 -08:00
commit b56d97ef72
2 changed files with 47 additions and 19 deletions

View file

@ -24,6 +24,8 @@ Please see [Upgrading.md](Upgrading.md) for upgrade notes.
-----------
- Auto-prune jobs older than one hour from the Workers page [#1508]
- Fix issue where a job could be lost when an exception occurs updating
Redis stats before the job executes [#1511]
2.17.6
-----------

View file

@ -74,36 +74,44 @@ module Sidekiq
end
def stats(worker, msg, queue)
redis do |conn|
conn.multi do
conn.sadd('workers', identity)
hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash))
# Do not conflate errors from the job with errors caused by updating
# stats so calling code can react appropriately
retry_and_suppress_exceptions do
hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
redis do |conn|
conn.multi do
conn.sadd('workers', identity)
conn.setex("worker:#{identity}", EXPIRY, hash)
end
end
end
begin
yield
rescue Exception
redis do |conn|
failed = "stat:failed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.incrby("stat:failed", 1)
conn.incrby(failed, 1)
retry_and_suppress_exceptions do
redis do |conn|
failed = "stat:failed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.incrby("stat:failed", 1)
conn.incrby(failed, 1)
end
conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
conn.expire(failed, STATS_TIMEOUT) if result.last == 1
end
raise
ensure
redis do |conn|
processed = "stat:processed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.srem("workers", identity)
conn.del("worker:#{identity}")
conn.incrby("stat:processed", 1)
conn.incrby(processed, 1)
retry_and_suppress_exceptions do
redis do |conn|
processed = "stat:processed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.srem("workers", identity)
conn.del("worker:#{identity}")
conn.incrby("stat:processed", 1)
conn.incrby(processed, 1)
end
conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
conn.expire(processed, STATS_TIMEOUT) if result.last == 1
end
end
end
@ -117,5 +125,23 @@ module Sidekiq
def cloned(ary)
Marshal.load(Marshal.dump(ary))
end
# If an exception occurs in the block passed to this method, that block will be retried up to max_retries times.
# All exceptions will be swallowed and logged.
def retry_and_suppress_exceptions(max_retries = 2)
retry_count = 0
begin
yield
rescue => e
retry_count += 1
if retry_count <= max_retries
Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"}
sleep(1)
retry
else
Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"}
end
end
end
end
end