From f1e83a3384f42867b42ff8a8667fe47dc9b6124d Mon Sep 17 00:00:00 2001 From: brainopia Date: Wed, 22 May 2013 11:42:56 +0400 Subject: [PATCH] Separate probed and busy indicators for queue --- lib/sidekiq/extensions/queue.rb | 21 ++++++------ lib/sidekiq/limit_fetch/global/monitor.rb | 6 ++-- lib/sidekiq/limit_fetch/global/selector.rb | 10 +++--- lib/sidekiq/limit_fetch/global/semaphore.rb | 14 +++++++- lib/sidekiq/limit_fetch/queues.rb | 10 +++--- lib/sidekiq/limit_fetch/unit_of_work.rb | 6 ++++ spec/sidekiq/extensions/queue_spec.rb | 6 ++-- .../limit_fetch/global/monitor_spec.rb | 4 +-- spec/sidekiq/limit_fetch/queues_spec.rb | 32 +++++++++---------- spec/sidekiq/limit_fetch/semaphore_spec.rb | 12 +++---- spec/sidekiq/limit_fetch_spec.rb | 12 +++++++ spec/spec_helper.rb | 11 ++++--- 12 files changed, 87 insertions(+), 57 deletions(-) diff --git a/lib/sidekiq/extensions/queue.rb b/lib/sidekiq/extensions/queue.rb index 9ecde61..3f6fb90 100644 --- a/lib/sidekiq/extensions/queue.rb +++ b/lib/sidekiq/extensions/queue.rb @@ -3,20 +3,17 @@ module Sidekiq extend LimitFetch::Singleton, Forwardable def_delegators :lock, - :limit, :limit=, - :acquire, :release, - :pause, :unpause, - :block, :unblock, - :paused?, :blocking?, - :unblocked, :block_except, - :busy + :limit, :limit=, + :acquire, :release, + :pause, :unpause, + :block, :unblock, + :paused?, :blocking?, + :unblocked, :block_except, + :probed, :busy, + :increase_busy, :decrease_busy def lock - @lock ||= mode::Semaphore.new name - end - - def mode - Sidekiq.options[:local] ? LimitFetch::Local : LimitFetch::Global + @lock ||= LimitFetch::Global::Semaphore.new name end end end diff --git a/lib/sidekiq/limit_fetch/global/monitor.rb b/lib/sidekiq/limit_fetch/global/monitor.rb index 52deeb0..b11fd63 100644 --- a/lib/sidekiq/limit_fetch/global/monitor.rb +++ b/lib/sidekiq/limit_fetch/global/monitor.rb @@ -37,8 +37,10 @@ module Sidekiq::LimitFetch::Global next if it.get heartbeat_key processor it.del processor_key processor - it.keys('limit_fetch:busy:*').each do |queue| - it.lrem queue, 0, processor + %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern| + it.keys(pattern).each do |queue| + it.lrem queue, 0, processor + end end end end diff --git a/lib/sidekiq/limit_fetch/global/selector.rb b/lib/sidekiq/limit_fetch/global/selector.rb index c42e32b..b84d177 100644 --- a/lib/sidekiq/limit_fetch/global/selector.rb +++ b/lib/sidekiq/limit_fetch/global/selector.rb @@ -60,7 +60,7 @@ module Sidekiq::LimitFetch::Global for _, queue in ipairs(queues) do if not blocking_mode or unblocked[queue] then - local busy_key = namespace..'busy:'..queue + local probed_key = namespace..'probed:'..queue local pause_key = namespace..'pause:'..queue local paused = redis.call('get', pause_key) @@ -72,7 +72,7 @@ module Sidekiq::LimitFetch::Global local can_block = redis.call('get', block_key) if can_block or queue_limit then - queue_locks = redis.call('llen', busy_key) + queue_locks = redis.call('llen', probed_key) end blocking_mode = can_block and queue_locks > 0 @@ -84,7 +84,7 @@ module Sidekiq::LimitFetch::Global end if not queue_limit or queue_limit > queue_locks then - redis.call('rpush', busy_key, worker_name) + redis.call('rpush', probed_key, worker_name) table.insert(available, queue) end end @@ -102,8 +102,8 @@ module Sidekiq::LimitFetch::Global local queues = ARGV for _, queue in ipairs(queues) do - local busy_key = namespace..'busy:'..queue - redis.call('lrem', busy_key, 1, worker_name) + local probed_key = namespace..'probed:'..queue + redis.call('lrem', probed_key, 1, worker_name) end LUA end diff --git a/lib/sidekiq/limit_fetch/global/semaphore.rb b/lib/sidekiq/limit_fetch/global/semaphore.rb index 3374204..bb91cd8 100644 --- a/lib/sidekiq/limit_fetch/global/semaphore.rb +++ b/lib/sidekiq/limit_fetch/global/semaphore.rb @@ -23,13 +23,25 @@ module Sidekiq::LimitFetch::Global end def release - Selector.release [@name] + redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid } end def busy redis {|it| it.llen "#{PREFIX}:busy:#@name" } end + def increase_busy + redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } + end + + def decrease_busy + redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid } + end + + def probed + redis {|it| it.llen "#{PREFIX}:probed:#@name" } + end + def pause redis {|it| it.set "#{PREFIX}:pause:#@name", true } end diff --git a/lib/sidekiq/limit_fetch/queues.rb b/lib/sidekiq/limit_fetch/queues.rb index 6d7ee8f..99ebdd7 100644 --- a/lib/sidekiq/limit_fetch/queues.rb +++ b/lib/sidekiq/limit_fetch/queues.rb @@ -1,19 +1,17 @@ class Sidekiq::LimitFetch class Queues THREAD_KEY = :acquired_queues - attr_reader :selector def initialize(options) @queues = options[:queues] options[:strict] ? strict_order! : weighted_order! - set_selector options[:local] set_limits options[:limits] set_blocks options[:blocking] end def acquire - @selector.acquire(ordered_queues) + selector.acquire(ordered_queues) .tap {|it| save it } .map {|it| "queue:#{it}" } end @@ -21,13 +19,13 @@ class Sidekiq::LimitFetch def release_except(full_name) queues = restore queues.delete full_name[/queue:(.*)/, 1] if full_name - @selector.release queues + selector.release queues end private - def set_selector(local) - @selector = local ? Local::Selector : Global::Selector + def selector + Global::Selector end def set_limits(limits) diff --git a/lib/sidekiq/limit_fetch/unit_of_work.rb b/lib/sidekiq/limit_fetch/unit_of_work.rb index 6851d1c..ef9ef04 100644 --- a/lib/sidekiq/limit_fetch/unit_of_work.rb +++ b/lib/sidekiq/limit_fetch/unit_of_work.rb @@ -1,6 +1,12 @@ module Sidekiq class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork + def initialize(queue, message) + super + Queue[queue_name].increase_busy + end + def acknowledge + Queue[queue_name].decrease_busy Queue[queue_name].release end diff --git a/spec/sidekiq/extensions/queue_spec.rb b/spec/sidekiq/extensions/queue_spec.rb index 3c56897..538bd9b 100644 --- a/spec/sidekiq/extensions/queue_spec.rb +++ b/spec/sidekiq/extensions/queue_spec.rb @@ -59,14 +59,14 @@ describe Sidekiq::Queue do it 'should be countable' do queue.limit = 3 5.times { queue.acquire } - queue.busy.should == 3 + queue.probed.should == 3 end it 'should be releasable' do queue.acquire - queue.busy.should == 1 + queue.probed.should == 1 queue.release - queue.busy.should == 0 + queue.probed.should == 0 end it 'should tell if paused' do diff --git a/spec/sidekiq/limit_fetch/global/monitor_spec.rb b/spec/sidekiq/limit_fetch/global/monitor_spec.rb index ba7aeaa..ddfc65e 100644 --- a/spec/sidekiq/limit_fetch/global/monitor_spec.rb +++ b/spec/sidekiq/limit_fetch/global/monitor_spec.rb @@ -20,11 +20,11 @@ describe Sidekiq::LimitFetch::Global::Monitor do it 'should remove invalidated old locks' do 2.times { queue.acquire } sleep 2*ttl - queue.busy.should == 2 + queue.probed.should == 2 described_class.stub :update_heartbeat sleep 2*ttl - queue.busy.should == 0 + queue.probed.should == 0 end end end diff --git a/spec/sidekiq/limit_fetch/queues_spec.rb b/spec/sidekiq/limit_fetch/queues_spec.rb index a08229b..7c9b7fb 100644 --- a/spec/sidekiq/limit_fetch/queues_spec.rb +++ b/spec/sidekiq/limit_fetch/queues_spec.rb @@ -17,46 +17,46 @@ describe Sidekiq::LimitFetch::Queues do it 'should acquire queues' do subject.acquire - Sidekiq::Queue['queue1'].busy.should == 1 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 1 + Sidekiq::Queue['queue2'].probed.should == 1 end it 'should acquire dynamically blocking queues' do subject.acquire - Sidekiq::Queue['queue1'].busy.should == 1 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 1 + Sidekiq::Queue['queue2'].probed.should == 1 Sidekiq::Queue['queue1'].block subject.acquire - Sidekiq::Queue['queue1'].busy.should == 2 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 2 + Sidekiq::Queue['queue2'].probed.should == 1 end it 'should block except given queues' do Sidekiq::Queue['queue1'].block_except 'queue2' subject.acquire - Sidekiq::Queue['queue1'].busy.should == 1 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 1 + Sidekiq::Queue['queue2'].probed.should == 1 Sidekiq::Queue['queue1'].block_except 'queue404' subject.acquire - Sidekiq::Queue['queue1'].busy.should == 2 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 2 + Sidekiq::Queue['queue2'].probed.should == 1 end it 'should release queues' do subject.acquire subject.release_except nil - Sidekiq::Queue['queue1'].busy.should == 0 - Sidekiq::Queue['queue2'].busy.should == 0 + Sidekiq::Queue['queue1'].probed.should == 0 + Sidekiq::Queue['queue2'].probed.should == 0 end it 'should release queues except selected' do subject.acquire subject.release_except 'queue:queue1' - Sidekiq::Queue['queue1'].busy.should == 1 - Sidekiq::Queue['queue2'].busy.should == 0 + Sidekiq::Queue['queue1'].probed.should == 1 + Sidekiq::Queue['queue2'].probed.should == 0 end it 'should release when no queues was acquired' do @@ -70,8 +70,8 @@ describe Sidekiq::LimitFetch::Queues do it 'should acquire blocking queues' do 3.times { subject.acquire } - Sidekiq::Queue['queue1'].busy.should == 3 - Sidekiq::Queue['queue2'].busy.should == 1 + Sidekiq::Queue['queue1'].probed.should == 3 + Sidekiq::Queue['queue2'].probed.should == 1 end end diff --git a/spec/sidekiq/limit_fetch/semaphore_spec.rb b/spec/sidekiq/limit_fetch/semaphore_spec.rb index 7516acf..03e1e11 100644 --- a/spec/sidekiq/limit_fetch/semaphore_spec.rb +++ b/spec/sidekiq/limit_fetch/semaphore_spec.rb @@ -15,28 +15,28 @@ describe 'semaphore' do it 'should acquire and count active tasks' do 3.times { subject.acquire } - subject.busy.should == 3 + subject.probed.should == 3 end it 'should acquire tasks with regard to limit' do subject.limit = 4 6.times { subject.acquire } - subject.busy.should == 4 + subject.probed.should == 4 end it 'should release active tasks' do 6.times { subject.acquire } 3.times { subject.release } - subject.busy.should == 3 + subject.probed.should == 3 end it 'should pause tasks' do 3.times { subject.acquire } subject.pause 2.times { subject.acquire } - subject.busy.should == 3 + subject.probed.should == 3 2.times { subject.release } - subject.busy.should == 1 + subject.probed.should == 1 end it 'should unpause tasks' do @@ -44,6 +44,6 @@ describe 'semaphore' do 3.times { subject.acquire } subject.unpause 2.times { subject.acquire } - subject.busy.should == 2 + subject.probed.should == 2 end end diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb index e886e80..d526d94 100644 --- a/spec/sidekiq/limit_fetch_spec.rb +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -20,15 +20,27 @@ describe Sidekiq::LimitFetch do work.queue_name.should == 'queue1' work.message.should == 'task1' + Sidekiq::Queue['queue1'].busy.should == 1 + Sidekiq::Queue['queue2'].busy.should == 0 + subject.retrieve_work.should_not be work.requeue + Sidekiq::Queue['queue1'].busy.should == 0 + Sidekiq::Queue['queue2'].busy.should == 0 + work = subject.retrieve_work work.message.should == 'task2' + Sidekiq::Queue['queue1'].busy.should == 1 + Sidekiq::Queue['queue2'].busy.should == 0 + subject.retrieve_work.should_not be work.acknowledge + Sidekiq::Queue['queue1'].busy.should == 0 + Sidekiq::Queue['queue2'].busy.should == 0 + work = subject.retrieve_work work.message.should == 'task1' end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index b7aa3e2..d239831 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -6,10 +6,13 @@ RSpec.configure do |config| config.before :each do Sidekiq.redis do |it| clean_redis = ->(queue) do - it.del "limit_fetch:limit:#{queue}" - it.del "limit_fetch:busy:#{queue}" - it.del "limit_fetch:pause:#{queue}" - it.del "limit_fetch:block:#{queue}" + it.pipelined do + it.del "limit_fetch:limit:#{queue}" + it.del "limit_fetch:busy:#{queue}" + it.del "limit_fetch:probed:#{queue}" + it.del "limit_fetch:pause:#{queue}" + it.del "limit_fetch:block:#{queue}" + end end clean_redis.call(name) if defined?(name)