From 0a58b7f4f6921e32a8fccc2b49820398f5e23555 Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 14:58:35 -0500 Subject: [PATCH 1/7] Swallow (and attempt to retry) Redis timeout errors when updating stats. This is so code which calls #stats does not conflate job failures with stats failures. --- lib/sidekiq/processor.rb | 67 +++++++++++++++++++++++++++------------- 1 file changed, 46 insertions(+), 21 deletions(-) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 88e6a373..e3dda591 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -92,38 +92,45 @@ module Sidekiq end def stats(worker, msg, queue) - redis do |conn| - conn.multi do - conn.sadd('workers', identity) - conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s) - hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i } - conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash)) + # Do not conflate errors from the job with errors caused by updating stats so calling code can react appropriately + retry_and_suppress_redis_timeouts do + redis do |conn| + conn.multi do + conn.sadd('workers', identity) + conn.setex("worker:#{identity}:started", EXPIRY, Time.now.to_s) + hash = {:queue => queue, :payload => msg, :run_at => Time.now.to_i } + conn.setex("worker:#{identity}", EXPIRY, Sidekiq.dump_json(hash)) + end end end begin yield rescue Exception - redis do |conn| - failed = "stat:failed:#{Time.now.utc.to_date}" - result = conn.multi do - conn.incrby("stat:failed", 1) - conn.incrby(failed, 1) + retry_and_suppress_redis_timeouts do + redis do |conn| + failed = "stat:failed:#{Time.now.utc.to_date}" + result = conn.multi do + conn.incrby("stat:failed", 1) + conn.incrby(failed, 1) + end + conn.expire(failed, STATS_TIMEOUT) if result.last == 1 end - conn.expire(failed, STATS_TIMEOUT) if result.last == 1 end raise ensure - redis do |conn| - processed = "stat:processed:#{Time.now.utc.to_date}" - result = conn.multi do - conn.srem("workers", identity) - conn.del("worker:#{identity}") - conn.del("worker:#{identity}:started") - conn.incrby("stat:processed", 1) - conn.incrby(processed, 1) + retry_and_suppress_redis_timeouts do + redis do |conn| + processed = "stat:processed:#{Time.now.utc.to_date}" + result = conn.multi do + conn.srem("workers", identity) + conn.del("worker:#{identity}") + conn.del("worker:#{identity}:started") + conn.incrby("stat:processed", 1) + conn.incrby(processed, 1) + end + conn.expire(processed, STATS_TIMEOUT) if result.last == 1 end - conn.expire(processed, STATS_TIMEOUT) if result.last == 1 end end end @@ -137,5 +144,23 @@ module Sidekiq def cloned(ary) Marshal.load(Marshal.dump(ary)) end + + # If there is a Redis::TimeoutError, the block passed to this method will be retried up to max_retries times. + # All exceptions will be swallowed and logged. + def retry_and_suppress_redis_timeouts(max_retries = 2) + retry_count = 0 + begin + yield + rescue Redis::TimeoutError + retry_count += 1 + if retry_count <= max_retries + retry + else + Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"} + end + rescue StandardError => e + Sidekiq.logger.info {"Suppressing error #{e.inspect}"} + end + end end end From f761c674a4bd935a3993adbb8e9e9ef39eea5540 Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 16:08:57 -0500 Subject: [PATCH 2/7] Retry all exceptions. --- lib/sidekiq/processor.rb | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index e3dda591..f2e2c021 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -93,7 +93,7 @@ module Sidekiq def stats(worker, msg, queue) # Do not conflate errors from the job with errors caused by updating stats so calling code can react appropriately - retry_and_suppress_redis_timeouts do + retry_and_suppress_exceptions do redis do |conn| conn.multi do conn.sadd('workers', identity) @@ -107,7 +107,7 @@ module Sidekiq begin yield rescue Exception - retry_and_suppress_redis_timeouts do + retry_and_suppress_exceptions do redis do |conn| failed = "stat:failed:#{Time.now.utc.to_date}" result = conn.multi do @@ -119,7 +119,7 @@ module Sidekiq end raise ensure - retry_and_suppress_redis_timeouts do + retry_and_suppress_exceptions do redis do |conn| processed = "stat:processed:#{Time.now.utc.to_date}" result = conn.multi do @@ -147,19 +147,18 @@ module Sidekiq # If there is a Redis::TimeoutError, the block passed to this method will be retried up to max_retries times. # All exceptions will be swallowed and logged. - def retry_and_suppress_redis_timeouts(max_retries = 2) + def retry_and_suppress_exceptions(max_retries = 2) retry_count = 0 begin yield - rescue Redis::TimeoutError + rescue StandardError retry_count += 1 if retry_count <= max_retries + Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} retry else Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"} end - rescue StandardError => e - Sidekiq.logger.info {"Suppressing error #{e.inspect}"} end end end From a39a32b8e8aa741bc7e10ad17962fb2da6ee29e6 Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 16:09:29 -0500 Subject: [PATCH 3/7] Update comment. --- lib/sidekiq/processor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index f2e2c021..91464ac7 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -145,7 +145,7 @@ module Sidekiq Marshal.load(Marshal.dump(ary)) end - # If there is a Redis::TimeoutError, the block passed to this method will be retried up to max_retries times. + # If an exception, the block passed to this method will be retried up to max_retries times. # All exceptions will be swallowed and logged. def retry_and_suppress_exceptions(max_retries = 2) retry_count = 0 From fda80127d0736a4a37d86a8d8dfd778efb50e24b Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 16:10:07 -0500 Subject: [PATCH 4/7] Update comment. --- lib/sidekiq/processor.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 91464ac7..0bb52802 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -145,7 +145,7 @@ module Sidekiq Marshal.load(Marshal.dump(ary)) end - # If an exception, the block passed to this method will be retried up to max_retries times. + # If an exception occurs in the block passed to this method, that block will be retried up to max_retries times. # All exceptions will be swallowed and logged. def retry_and_suppress_exceptions(max_retries = 2) retry_count = 0 From a32fdd5c4dabb64caeb6595272365737a094500e Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 16:10:58 -0500 Subject: [PATCH 5/7] Sleep before retrying. --- lib/sidekiq/processor.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 0bb52802..6a41aa02 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -151,10 +151,11 @@ module Sidekiq retry_count = 0 begin yield - rescue StandardError + rescue => e retry_count += 1 if retry_count <= max_retries Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} + sleep(1) retry else Sidekiq.logger.info {"Exhausted #{max_retries} retries due to Redis timeouts: #{e.inspect}"} From cf641cbb290684ce83b642837d150243c4c18d71 Mon Sep 17 00:00:00 2001 From: Jonathan Hyman Date: Mon, 24 Feb 2014 21:43:11 -0500 Subject: [PATCH 6/7] Adds a changelog entry. --- Changes.md | 1 + 1 file changed, 1 insertion(+) diff --git a/Changes.md b/Changes.md index c8cf07a3..9617e750 100644 --- a/Changes.md +++ b/Changes.md @@ -2,6 +2,7 @@ ----------- - Auto-prune jobs older than one hour from the Workers page [#1508] +- Fix ReliableFetch issue where a job could be lost when an exception occurs updating Redis stats before the job executes [#1511] 2.17.6 ----------- From 08d277cd37aaa09d31fffbafc2b284cf36c989f6 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Mon, 24 Feb 2014 20:01:10 -0800 Subject: [PATCH 7/7] fix note --- Changes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/Changes.md b/Changes.md index 9617e750..0b10795a 100644 --- a/Changes.md +++ b/Changes.md @@ -2,7 +2,8 @@ ----------- - Auto-prune jobs older than one hour from the Workers page [#1508] -- Fix ReliableFetch issue where a job could be lost when an exception occurs updating Redis stats before the job executes [#1511] +- Fix issue where a job could be lost when an exception occurs updating + Redis stats before the job executes [#1511] 2.17.6 -----------