Add limits per process
This commit is contained in:
parent
5466d63e94
commit
24d673d8c6
13
README.md
13
README.md
|
@ -49,6 +49,19 @@ workers simultaneously.
|
|||
Ability to set limits dynamically allows you to resize worker
|
||||
distribution among queues any time you want.
|
||||
|
||||
### Limits per process
|
||||
|
||||
If you use multiple sidekiq processes then you can specify limits per process:
|
||||
|
||||
```
|
||||
:process_limits:
|
||||
queue_name: 2
|
||||
```
|
||||
|
||||
Or set it in your code: ```ruby
|
||||
Sidekiq::Queue['queue_name'].process_limit = 2
|
||||
```
|
||||
|
||||
### Busy workers by queue
|
||||
|
||||
You can see how many workers currently handling a queue:
|
||||
|
|
|
@ -5,6 +5,7 @@ module Sidekiq
|
|||
|
||||
def_delegators :lock,
|
||||
:limit, :limit=,
|
||||
:process_limit, :process_limit=,
|
||||
:acquire, :release,
|
||||
:pause, :unpause,
|
||||
:block, :unblock,
|
||||
|
|
|
@ -48,7 +48,8 @@ module Sidekiq::LimitFetch::Global
|
|||
local queues = ARGV
|
||||
local available = {}
|
||||
local unblocked = {}
|
||||
local queue_locks
|
||||
local locks
|
||||
local process_locks
|
||||
local blocking_mode
|
||||
|
||||
for _, queue in ipairs(queues) do
|
||||
|
@ -58,17 +59,30 @@ module Sidekiq::LimitFetch::Global
|
|||
local paused = redis.call('get', pause_key)
|
||||
|
||||
if not paused then
|
||||
local limit_key = namespace..'limit:'..queue
|
||||
local queue_limit = tonumber(redis.call('get', limit_key))
|
||||
local limit_key = namespace..'limit:'..queue
|
||||
local limit = tonumber(redis.call('get', limit_key))
|
||||
|
||||
local process_limit_key = namespace..'process_limit:'..queue
|
||||
local process_limit = tonumber(redis.call('get', process_limit_key))
|
||||
|
||||
local block_key = namespace..'block:'..queue
|
||||
local can_block = redis.call('get', block_key)
|
||||
|
||||
if can_block or queue_limit then
|
||||
queue_locks = redis.call('llen', probed_key)
|
||||
if can_block or limit then
|
||||
locks = redis.call('llen', probed_key)
|
||||
end
|
||||
|
||||
blocking_mode = can_block and queue_locks > 0
|
||||
if process_limit then
|
||||
local all_locks = redis.call('lrange', probed_key, 0, -1)
|
||||
process_locks = 0
|
||||
for _, process in ipairs(all_locks) do
|
||||
if process == worker_name then
|
||||
process_locks = process_locks + 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
blocking_mode = can_block and locks > 0
|
||||
|
||||
if blocking_mode and can_block ~= 'true' then
|
||||
for unblocked_queue in string.gmatch(can_block, "[^,]+") do
|
||||
|
@ -76,7 +90,8 @@ module Sidekiq::LimitFetch::Global
|
|||
end
|
||||
end
|
||||
|
||||
if not queue_limit or queue_limit > queue_locks then
|
||||
if (not limit or limit > locks) and
|
||||
(not process_limit or process_limit > process_locks) then
|
||||
redis.call('rpush', probed_key, worker_name)
|
||||
table.insert(available, queue)
|
||||
end
|
||||
|
|
|
@ -21,6 +21,15 @@ module Sidekiq::LimitFetch::Global
|
|||
redis {|it| it.set "#{PREFIX}:limit:#@name", value }
|
||||
end
|
||||
|
||||
def process_limit
|
||||
value = redis {|it| it.get "#{PREFIX}:process_limit:#@name" }
|
||||
value.to_i if value
|
||||
end
|
||||
|
||||
def process_limit=(value)
|
||||
redis {|it| it.set "#{PREFIX}:process_limit:#@name", value }
|
||||
end
|
||||
|
||||
def acquire
|
||||
Selector.acquire([@name], determine_namespace).size > 0
|
||||
end
|
||||
|
|
|
@ -24,6 +24,12 @@ describe 'semaphore' do
|
|||
subject.probed.should == 4
|
||||
end
|
||||
|
||||
it 'should acquire tasks with regard to process limit' do
|
||||
subject.process_limit = 4
|
||||
6.times { subject.acquire }
|
||||
subject.probed.should == 4
|
||||
end
|
||||
|
||||
it 'should release active tasks' do
|
||||
6.times { subject.acquire }
|
||||
3.times { subject.release }
|
||||
|
|
|
@ -11,6 +11,7 @@ RSpec.configure do |config|
|
|||
clean_redis = ->(queue) do
|
||||
it.pipelined do
|
||||
it.del "limit_fetch:limit:#{queue}"
|
||||
it.del "limit_fetch:process_limit:#{queue}"
|
||||
it.del "limit_fetch:busy:#{queue}"
|
||||
it.del "limit_fetch:probed:#{queue}"
|
||||
it.del "limit_fetch:pause:#{queue}"
|
||||
|
|
Loading…
Reference in New Issue