diff --git a/Changes.md b/Changes.md index 44c85f8e..33faf212 100644 --- a/Changes.md +++ b/Changes.md @@ -24,6 +24,8 @@ Please see [Upgrading.md](Upgrading.md) for upgrade notes. ----------- - Auto-prune jobs older than one hour from the Workers page [#1508] +- Fix issue where a job could be lost when an exception occurs updating + Redis stats before the job executes [#1511] 2.17.6 ----------- diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 2a445cea..4c1240ed 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -74,36 +74,44 @@ module Sidekiq end def stats(worker, msg, queue) - redis do |conn| - conn.multi do - conn.sadd('workers', identity) - hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i } - conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash)) + # Do not conflate errors from the job with errors caused by updating + # stats so calling code can react appropriately + retry_and_suppress_exceptions do + hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i }) + redis do |conn| + conn.multi do + conn.sadd('workers', identity) + conn.setex("worker:#{identity}", EXPIRY, hash) + end end end begin yield rescue Exception - redis do |conn| - failed = "stat:failed:#{Time.now.utc.to_date}" - result = conn.multi do - conn.incrby("stat:failed", 1) - conn.incrby(failed, 1) + retry_and_suppress_exceptions do + redis do |conn| + failed = "stat:failed:#{Time.now.utc.to_date}" + result = conn.multi do + conn.incrby("stat:failed", 1) + conn.incrby(failed, 1) + end + conn.expire(failed, STATS_TIMEOUT) if result.last == 1 end - conn.expire(failed, STATS_TIMEOUT) if result.last == 1 end raise ensure - redis do |conn| - processed = "stat:processed:#{Time.now.utc.to_date}" - result = conn.multi do - conn.srem("workers", identity) - conn.del("worker:#{identity}") - conn.incrby("stat:processed", 1) - conn.incrby(processed, 1) + retry_and_suppress_exceptions do + redis do |conn| + processed = "stat:processed:#{Time.now.utc.to_date}" + result = conn.multi do + conn.srem("workers", identity) + conn.del("worker:#{identity}") + conn.incrby("stat:processed", 1) + conn.incrby(processed, 1) + end + conn.expire(processed, STATS_TIMEOUT) if result.last == 1 end - conn.expire(processed, STATS_TIMEOUT) if result.last == 1 end end end @@ -117,5 +125,23 @@ module Sidekiq def cloned(ary) Marshal.load(Marshal.dump(ary)) end + + # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times. + # All exceptions will be swallowed and logged. + def retry_and_suppress_exceptions(max_retries = 2) + retry_count = 0 + begin + yield + rescue => e + retry_count += 1 + if retry_count <= max_retries + Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} + sleep(1) + retry + else + Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"} + end + end + end end end