From 3a17fda586ddacea018da632d0c036c9e88732da Mon Sep 17 00:00:00 2001 From: Alexey Chernenkov Date: Fri, 21 Dec 2018 01:39:57 +0500 Subject: [PATCH 1/4] Treat Redis LOADING error as a temporary connection error --- lib/sidekiq/limit_fetch.rb | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index d91c301..9cd9fbf 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -35,6 +35,15 @@ module Sidekiq::LimitFetch rescue Redis::BaseConnectionError sleep 1 retry + rescue Redis::CommandError => error + # If Redis was restarted and is still loading its snapshot, + # then we should treat this as a temporary connection error too. + if error.message =~ /^LOADING/ + sleep 1 + retry + else + raise + end end private From 651878e92584ae81651fa33d9e2a61ba005ff0df Mon Sep 17 00:00:00 2001 From: Alexey Chernenkov Date: Fri, 21 Dec 2018 01:48:04 +0500 Subject: [PATCH 2/4] Use common TIMEOUT value for both #redis_retryable and #redis_brpop --- lib/sidekiq/limit_fetch.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 9cd9fbf..7393625 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -14,6 +14,8 @@ module Sidekiq::LimitFetch require_relative 'extensions/queue' require_relative 'extensions/manager' + TIMEOUT = Sidekiq::BasicFetch::TIMEOUT + extend self def new(_) @@ -33,13 +35,13 @@ module Sidekiq::LimitFetch def redis_retryable yield rescue Redis::BaseConnectionError - sleep 1 + sleep TIMEOUT retry rescue Redis::CommandError => error # If Redis was restarted and is still loading its snapshot, # then we should treat this as a temporary connection error too. if error.message =~ /^LOADING/ - sleep 1 + sleep TIMEOUT retry else raise @@ -48,8 +50,6 @@ module Sidekiq::LimitFetch private - TIMEOUT = Sidekiq::BasicFetch::TIMEOUT - def redis_brpop(queues) if queues.empty? sleep TIMEOUT # there are no queues to handle, so lets sleep From 66e7102bcd30b3155b62d3a715ad2e0b5215686b Mon Sep 17 00:00:00 2001 From: Alexey Chernenkov Date: Fri, 21 Dec 2018 02:13:05 +0500 Subject: [PATCH 3/4] Check if there is already acquired and non-released queues and return them --- lib/sidekiq/limit_fetch/queues.rb | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/sidekiq/limit_fetch/queues.rb b/lib/sidekiq/limit_fetch/queues.rb index 4f38bc4..453f94e 100644 --- a/lib/sidekiq/limit_fetch/queues.rb +++ b/lib/sidekiq/limit_fetch/queues.rb @@ -19,15 +19,17 @@ module Sidekiq::LimitFetch::Queues end def acquire - selector.acquire(ordered_queues, namespace) - .tap {|it| save it } - .map {|it| "queue:#{it}" } + queues = saved + queues ||= Sidekiq::LimitFetch.redis_retryable do + selector.acquire(ordered_queues, namespace) + end + save queues + queues.map { |it| "queue:#{it}" } end def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name - Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end @@ -112,13 +114,17 @@ module Sidekiq::LimitFetch::Queues Sidekiq::LimitFetch::Global::Selector end + def saved + Thread.current[THREAD_KEY] + end + def save(queues) Thread.current[THREAD_KEY] = queues end def restore - Thread.current[THREAD_KEY] || [] + saved || [] ensure - Thread.current[THREAD_KEY] = nil + save nil end end From d051077d754861a5acc4b262192236e087950ff0 Mon Sep 17 00:00:00 2001 From: Alexey Chernenkov Date: Fri, 21 Dec 2018 14:44:22 +0500 Subject: [PATCH 4/4] Fix tests: should acquire queues in a separate thread --- spec/sidekiq/limit_fetch/queues_spec.rb | 35 ++++++++++++++++--------- spec/sidekiq/limit_fetch_spec.rb | 2 +- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/spec/sidekiq/limit_fetch/queues_spec.rb b/spec/sidekiq/limit_fetch/queues_spec.rb index d592a89..3ba67b3 100644 --- a/spec/sidekiq/limit_fetch/queues_spec.rb +++ b/spec/sidekiq/limit_fetch/queues_spec.rb @@ -15,61 +15,72 @@ RSpec.describe Sidekiq::LimitFetch::Queues do before { subject.start options } + def in_thread(&block) + thr = Thread.new(&block) + thr.join + end + it 'should acquire queues' do - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should acquire dynamically blocking queues' do - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 Sidekiq::Queue['queue1'].block - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 2 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should block except given queues' do Sidekiq::Queue['queue1'].block_except 'queue2' - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 Sidekiq::Queue['queue1'].block_except 'queue404' - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 2 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should release queues' do - subject.acquire - subject.release_except nil + in_thread { + subject.acquire + subject.release_except nil + } expect(Sidekiq::Queue['queue1'].probed).to eq 0 expect(Sidekiq::Queue['queue2'].probed).to eq 0 end it 'should release queues except selected' do - subject.acquire - subject.release_except 'queue:queue1' + in_thread { + subject.acquire + subject.release_except 'queue:queue1' + } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 0 end it 'should release when no queues was acquired' do queues.each {|name| Sidekiq::Queue[name].pause } - subject.acquire - expect { subject.release_except nil }.not_to raise_exception + in_thread { + subject.acquire + expect { subject.release_except nil }.not_to raise_exception + } end context 'blocking' do let(:blocking) { %w(queue1) } it 'should acquire blocking queues' do - 3.times { subject.acquire } + 3.times { in_thread { subject.acquire } } expect(Sidekiq::Queue['queue1'].probed).to eq 3 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb index b1db291..11cedcd 100644 --- a/spec/sidekiq/limit_fetch_spec.rb +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -6,7 +6,7 @@ RSpec.describe Sidekiq::LimitFetch do let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }} before do - subject::Queues.start options + subject::Queues.start options Sidekiq.redis do |it| it.del 'queue:queue1'