1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Separate probed and busy indicators for queue

This commit is contained in:
brainopia 2013-05-22 11:42:56 +04:00
parent afb7763561
commit f1e83a3384
12 changed files with 87 additions and 57 deletions

View file

@ -3,20 +3,17 @@ module Sidekiq
extend LimitFetch::Singleton, Forwardable extend LimitFetch::Singleton, Forwardable
def_delegators :lock, def_delegators :lock,
:limit, :limit=, :limit, :limit=,
:acquire, :release, :acquire, :release,
:pause, :unpause, :pause, :unpause,
:block, :unblock, :block, :unblock,
:paused?, :blocking?, :paused?, :blocking?,
:unblocked, :block_except, :unblocked, :block_except,
:busy :probed, :busy,
:increase_busy, :decrease_busy
def lock def lock
@lock ||= mode::Semaphore.new name @lock ||= LimitFetch::Global::Semaphore.new name
end
def mode
Sidekiq.options[:local] ? LimitFetch::Local : LimitFetch::Global
end end
end end
end end

View file

@ -37,8 +37,10 @@ module Sidekiq::LimitFetch::Global
next if it.get heartbeat_key processor next if it.get heartbeat_key processor
it.del processor_key processor it.del processor_key processor
it.keys('limit_fetch:busy:*').each do |queue| %w(limit_fetch:probed:* limit_fetch:busy:*).each do |pattern|
it.lrem queue, 0, processor it.keys(pattern).each do |queue|
it.lrem queue, 0, processor
end
end end
end end
end end

View file

@ -60,7 +60,7 @@ module Sidekiq::LimitFetch::Global
for _, queue in ipairs(queues) do for _, queue in ipairs(queues) do
if not blocking_mode or unblocked[queue] then if not blocking_mode or unblocked[queue] then
local busy_key = namespace..'busy:'..queue local probed_key = namespace..'probed:'..queue
local pause_key = namespace..'pause:'..queue local pause_key = namespace..'pause:'..queue
local paused = redis.call('get', pause_key) local paused = redis.call('get', pause_key)
@ -72,7 +72,7 @@ module Sidekiq::LimitFetch::Global
local can_block = redis.call('get', block_key) local can_block = redis.call('get', block_key)
if can_block or queue_limit then if can_block or queue_limit then
queue_locks = redis.call('llen', busy_key) queue_locks = redis.call('llen', probed_key)
end end
blocking_mode = can_block and queue_locks > 0 blocking_mode = can_block and queue_locks > 0
@ -84,7 +84,7 @@ module Sidekiq::LimitFetch::Global
end end
if not queue_limit or queue_limit > queue_locks then if not queue_limit or queue_limit > queue_locks then
redis.call('rpush', busy_key, worker_name) redis.call('rpush', probed_key, worker_name)
table.insert(available, queue) table.insert(available, queue)
end end
end end
@ -102,8 +102,8 @@ module Sidekiq::LimitFetch::Global
local queues = ARGV local queues = ARGV
for _, queue in ipairs(queues) do for _, queue in ipairs(queues) do
local busy_key = namespace..'busy:'..queue local probed_key = namespace..'probed:'..queue
redis.call('lrem', busy_key, 1, worker_name) redis.call('lrem', probed_key, 1, worker_name)
end end
LUA LUA
end end

View file

@ -23,13 +23,25 @@ module Sidekiq::LimitFetch::Global
end end
def release def release
Selector.release [@name] redis {|it| it.lrem "#{PREFIX}:probed:#@name", 1, Selector.uuid }
end end
def busy def busy
redis {|it| it.llen "#{PREFIX}:busy:#@name" } redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end end
def increase_busy
redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
end
def decrease_busy
redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
end
def probed
redis {|it| it.llen "#{PREFIX}:probed:#@name" }
end
def pause def pause
redis {|it| it.set "#{PREFIX}:pause:#@name", true } redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end end

View file

@ -1,19 +1,17 @@
class Sidekiq::LimitFetch class Sidekiq::LimitFetch
class Queues class Queues
THREAD_KEY = :acquired_queues THREAD_KEY = :acquired_queues
attr_reader :selector
def initialize(options) def initialize(options)
@queues = options[:queues] @queues = options[:queues]
options[:strict] ? strict_order! : weighted_order! options[:strict] ? strict_order! : weighted_order!
set_selector options[:local]
set_limits options[:limits] set_limits options[:limits]
set_blocks options[:blocking] set_blocks options[:blocking]
end end
def acquire def acquire
@selector.acquire(ordered_queues) selector.acquire(ordered_queues)
.tap {|it| save it } .tap {|it| save it }
.map {|it| "queue:#{it}" } .map {|it| "queue:#{it}" }
end end
@ -21,13 +19,13 @@ class Sidekiq::LimitFetch
def release_except(full_name) def release_except(full_name)
queues = restore queues = restore
queues.delete full_name[/queue:(.*)/, 1] if full_name queues.delete full_name[/queue:(.*)/, 1] if full_name
@selector.release queues selector.release queues
end end
private private
def set_selector(local) def selector
@selector = local ? Local::Selector : Global::Selector Global::Selector
end end
def set_limits(limits) def set_limits(limits)

View file

@ -1,6 +1,12 @@
module Sidekiq module Sidekiq
class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork
def initialize(queue, message)
super
Queue[queue_name].increase_busy
end
def acknowledge def acknowledge
Queue[queue_name].decrease_busy
Queue[queue_name].release Queue[queue_name].release
end end

View file

@ -59,14 +59,14 @@ describe Sidekiq::Queue do
it 'should be countable' do it 'should be countable' do
queue.limit = 3 queue.limit = 3
5.times { queue.acquire } 5.times { queue.acquire }
queue.busy.should == 3 queue.probed.should == 3
end end
it 'should be releasable' do it 'should be releasable' do
queue.acquire queue.acquire
queue.busy.should == 1 queue.probed.should == 1
queue.release queue.release
queue.busy.should == 0 queue.probed.should == 0
end end
it 'should tell if paused' do it 'should tell if paused' do

View file

@ -20,11 +20,11 @@ describe Sidekiq::LimitFetch::Global::Monitor do
it 'should remove invalidated old locks' do it 'should remove invalidated old locks' do
2.times { queue.acquire } 2.times { queue.acquire }
sleep 2*ttl sleep 2*ttl
queue.busy.should == 2 queue.probed.should == 2
described_class.stub :update_heartbeat described_class.stub :update_heartbeat
sleep 2*ttl sleep 2*ttl
queue.busy.should == 0 queue.probed.should == 0
end end
end end
end end

View file

@ -17,46 +17,46 @@ describe Sidekiq::LimitFetch::Queues do
it 'should acquire queues' do it 'should acquire queues' do
subject.acquire subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1 Sidekiq::Queue['queue1'].probed.should == 1
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
end end
it 'should acquire dynamically blocking queues' do it 'should acquire dynamically blocking queues' do
subject.acquire subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1 Sidekiq::Queue['queue1'].probed.should == 1
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
Sidekiq::Queue['queue1'].block Sidekiq::Queue['queue1'].block
subject.acquire subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2 Sidekiq::Queue['queue1'].probed.should == 2
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
end end
it 'should block except given queues' do it 'should block except given queues' do
Sidekiq::Queue['queue1'].block_except 'queue2' Sidekiq::Queue['queue1'].block_except 'queue2'
subject.acquire subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1 Sidekiq::Queue['queue1'].probed.should == 1
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
Sidekiq::Queue['queue1'].block_except 'queue404' Sidekiq::Queue['queue1'].block_except 'queue404'
subject.acquire subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2 Sidekiq::Queue['queue1'].probed.should == 2
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
end end
it 'should release queues' do it 'should release queues' do
subject.acquire subject.acquire
subject.release_except nil subject.release_except nil
Sidekiq::Queue['queue1'].busy.should == 0 Sidekiq::Queue['queue1'].probed.should == 0
Sidekiq::Queue['queue2'].busy.should == 0 Sidekiq::Queue['queue2'].probed.should == 0
end end
it 'should release queues except selected' do it 'should release queues except selected' do
subject.acquire subject.acquire
subject.release_except 'queue:queue1' subject.release_except 'queue:queue1'
Sidekiq::Queue['queue1'].busy.should == 1 Sidekiq::Queue['queue1'].probed.should == 1
Sidekiq::Queue['queue2'].busy.should == 0 Sidekiq::Queue['queue2'].probed.should == 0
end end
it 'should release when no queues was acquired' do it 'should release when no queues was acquired' do
@ -70,8 +70,8 @@ describe Sidekiq::LimitFetch::Queues do
it 'should acquire blocking queues' do it 'should acquire blocking queues' do
3.times { subject.acquire } 3.times { subject.acquire }
Sidekiq::Queue['queue1'].busy.should == 3 Sidekiq::Queue['queue1'].probed.should == 3
Sidekiq::Queue['queue2'].busy.should == 1 Sidekiq::Queue['queue2'].probed.should == 1
end end
end end

View file

@ -15,28 +15,28 @@ describe 'semaphore' do
it 'should acquire and count active tasks' do it 'should acquire and count active tasks' do
3.times { subject.acquire } 3.times { subject.acquire }
subject.busy.should == 3 subject.probed.should == 3
end end
it 'should acquire tasks with regard to limit' do it 'should acquire tasks with regard to limit' do
subject.limit = 4 subject.limit = 4
6.times { subject.acquire } 6.times { subject.acquire }
subject.busy.should == 4 subject.probed.should == 4
end end
it 'should release active tasks' do it 'should release active tasks' do
6.times { subject.acquire } 6.times { subject.acquire }
3.times { subject.release } 3.times { subject.release }
subject.busy.should == 3 subject.probed.should == 3
end end
it 'should pause tasks' do it 'should pause tasks' do
3.times { subject.acquire } 3.times { subject.acquire }
subject.pause subject.pause
2.times { subject.acquire } 2.times { subject.acquire }
subject.busy.should == 3 subject.probed.should == 3
2.times { subject.release } 2.times { subject.release }
subject.busy.should == 1 subject.probed.should == 1
end end
it 'should unpause tasks' do it 'should unpause tasks' do
@ -44,6 +44,6 @@ describe 'semaphore' do
3.times { subject.acquire } 3.times { subject.acquire }
subject.unpause subject.unpause
2.times { subject.acquire } 2.times { subject.acquire }
subject.busy.should == 2 subject.probed.should == 2
end end
end end

View file

@ -20,15 +20,27 @@ describe Sidekiq::LimitFetch do
work.queue_name.should == 'queue1' work.queue_name.should == 'queue1'
work.message.should == 'task1' work.message.should == 'task1'
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 0
subject.retrieve_work.should_not be subject.retrieve_work.should_not be
work.requeue work.requeue
Sidekiq::Queue['queue1'].busy.should == 0
Sidekiq::Queue['queue2'].busy.should == 0
work = subject.retrieve_work work = subject.retrieve_work
work.message.should == 'task2' work.message.should == 'task2'
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 0
subject.retrieve_work.should_not be subject.retrieve_work.should_not be
work.acknowledge work.acknowledge
Sidekiq::Queue['queue1'].busy.should == 0
Sidekiq::Queue['queue2'].busy.should == 0
work = subject.retrieve_work work = subject.retrieve_work
work.message.should == 'task1' work.message.should == 'task1'
end end

View file

@ -6,10 +6,13 @@ RSpec.configure do |config|
config.before :each do config.before :each do
Sidekiq.redis do |it| Sidekiq.redis do |it|
clean_redis = ->(queue) do clean_redis = ->(queue) do
it.del "limit_fetch:limit:#{queue}" it.pipelined do
it.del "limit_fetch:busy:#{queue}" it.del "limit_fetch:limit:#{queue}"
it.del "limit_fetch:pause:#{queue}" it.del "limit_fetch:busy:#{queue}"
it.del "limit_fetch:block:#{queue}" it.del "limit_fetch:probed:#{queue}"
it.del "limit_fetch:pause:#{queue}"
it.del "limit_fetch:block:#{queue}"
end
end end
clean_redis.call(name) if defined?(name) clean_redis.call(name) if defined?(name)