diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 1610fb0..88acba7 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -14,6 +14,8 @@ module Sidekiq::LimitFetch require_relative 'extensions/queue' require_relative 'extensions/manager' + TIMEOUT = Sidekiq::BasicFetch::TIMEOUT + extend self def new(_) @@ -39,14 +41,21 @@ module Sidekiq::LimitFetch def redis_retryable yield rescue Redis::BaseConnectionError - sleep 1 + sleep TIMEOUT retry + rescue Redis::CommandError => error + # If Redis was restarted and is still loading its snapshot, + # then we should treat this as a temporary connection error too. + if error.message =~ /^LOADING/ + sleep TIMEOUT + retry + else + raise + end end private - TIMEOUT = Sidekiq::BasicFetch::TIMEOUT - def redis_brpop(queues) if queues.empty? sleep TIMEOUT # there are no queues to handle, so lets sleep diff --git a/lib/sidekiq/limit_fetch/queues.rb b/lib/sidekiq/limit_fetch/queues.rb index 1f9d235..c794bad 100644 --- a/lib/sidekiq/limit_fetch/queues.rb +++ b/lib/sidekiq/limit_fetch/queues.rb @@ -20,15 +20,17 @@ module Sidekiq::LimitFetch::Queues end def acquire - selector.acquire(ordered_queues, namespace) - .tap {|it| save it } - .map {|it| "queue:#{it}" } + queues = saved + queues ||= Sidekiq::LimitFetch.redis_retryable do + selector.acquire(ordered_queues, namespace) + end + save queues + queues.map { |it| "queue:#{it}" } end def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name - Sidekiq::LimitFetch.redis_retryable do selector.release queues, namespace end @@ -141,13 +143,17 @@ module Sidekiq::LimitFetch::Queues Sidekiq::LimitFetch::Global::Selector end + def saved + Thread.current[THREAD_KEY] + end + def save(queues) Thread.current[THREAD_KEY] = queues end def restore - Thread.current[THREAD_KEY] || [] + saved || [] ensure - Thread.current[THREAD_KEY] = nil + save nil end end diff --git a/spec/sidekiq/limit_fetch/queues_spec.rb b/spec/sidekiq/limit_fetch/queues_spec.rb index d592a89..3ba67b3 100644 --- a/spec/sidekiq/limit_fetch/queues_spec.rb +++ b/spec/sidekiq/limit_fetch/queues_spec.rb @@ -15,61 +15,72 @@ RSpec.describe Sidekiq::LimitFetch::Queues do before { subject.start options } + def in_thread(&block) + thr = Thread.new(&block) + thr.join + end + it 'should acquire queues' do - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should acquire dynamically blocking queues' do - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 Sidekiq::Queue['queue1'].block - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 2 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should block except given queues' do Sidekiq::Queue['queue1'].block_except 'queue2' - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 1 Sidekiq::Queue['queue1'].block_except 'queue404' - subject.acquire + in_thread { subject.acquire } expect(Sidekiq::Queue['queue1'].probed).to eq 2 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end it 'should release queues' do - subject.acquire - subject.release_except nil + in_thread { + subject.acquire + subject.release_except nil + } expect(Sidekiq::Queue['queue1'].probed).to eq 0 expect(Sidekiq::Queue['queue2'].probed).to eq 0 end it 'should release queues except selected' do - subject.acquire - subject.release_except 'queue:queue1' + in_thread { + subject.acquire + subject.release_except 'queue:queue1' + } expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue2'].probed).to eq 0 end it 'should release when no queues was acquired' do queues.each {|name| Sidekiq::Queue[name].pause } - subject.acquire - expect { subject.release_except nil }.not_to raise_exception + in_thread { + subject.acquire + expect { subject.release_except nil }.not_to raise_exception + } end context 'blocking' do let(:blocking) { %w(queue1) } it 'should acquire blocking queues' do - 3.times { subject.acquire } + 3.times { in_thread { subject.acquire } } expect(Sidekiq::Queue['queue1'].probed).to eq 3 expect(Sidekiq::Queue['queue2'].probed).to eq 1 end diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb index b1db291..11cedcd 100644 --- a/spec/sidekiq/limit_fetch_spec.rb +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -6,7 +6,7 @@ RSpec.describe Sidekiq::LimitFetch do let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }} before do - subject::Queues.start options + subject::Queues.start options Sidekiq.redis do |it| it.del 'queue:queue1'