1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Add global mode

This commit is contained in:
brainopia 2013-01-24 16:03:28 +04:00
parent b20e96fbae
commit 4aab89b288
17 changed files with 566 additions and 251 deletions

View file

@ -21,7 +21,7 @@ Specify limits which you want to place on queues inside sidekiq.yml:
Or set it dynamically in your code: Or set it dynamically in your code:
```ruby ```ruby
Sidekiq::Queue.new('queue_name1').limit = 5 Sidekiq::Queue['queue_name1'].limit = 5
Sidekiq::Queue['queue_name2'].limit = 10 Sidekiq::Queue['queue_name2'].limit = 10
``` ```
@ -43,7 +43,11 @@ will be preserved.
Limits are applied per process. In case you have several worker Limits are applied per process. In case you have several worker
processes and want to have global locks between them, you'll need to processes and want to have global locks between them, you'll need to
wait just a bit more since support for global locks is underway. enable global mode by setting global option, eg:
```yaml
:global: true
```
Sponsored by [Evil Martians]. Sponsored by [Evil Martians].
[Evil Martians]: http://evilmartians.com/ [Evil Martians]: http://evilmartians.com/

View file

@ -8,12 +8,12 @@ module Sidekiq
:pause, :continue, :pause, :continue,
:busy :busy
def full_name def lock
@rname @lock ||= mode::Semaphore.new name
end end
def lock def mode
@lock ||= LimitFetch::Semaphore.new Sidekiq.options[:global] ? LimitFetch::Global : LimitFetch::Local
end end
end end
end end

View file

@ -2,10 +2,14 @@ require 'sidekiq'
require 'sidekiq/fetch' require 'sidekiq/fetch'
class Sidekiq::LimitFetch class Sidekiq::LimitFetch
require_relative 'limit_fetch/semaphore'
require_relative 'limit_fetch/unit_of_work' require_relative 'limit_fetch/unit_of_work'
require_relative 'limit_fetch/singleton' require_relative 'limit_fetch/singleton'
require_relative 'limit_fetch/queue' require_relative 'limit_fetch/queues'
require_relative 'limit_fetch/local/semaphore'
require_relative 'limit_fetch/local/selector'
require_relative 'limit_fetch/global/semaphore'
require_relative 'limit_fetch/global/selector'
require_relative 'extensions/queue'
Sidekiq.options[:fetch] = self Sidekiq.options[:fetch] = self
@ -14,55 +18,23 @@ class Sidekiq::LimitFetch
end end
def initialize(options) def initialize(options)
prepare_queues options @queues = Queues.new options
options[:strict] ? define_strict_queues : define_weighted_queues
end
def available_queues
fetch_queues.select(&:acquire)
end end
def retrieve_work def retrieve_work
queues = available_queues queue, message = fetch_message
UnitOfWork.new queue, message if message
if queues.empty?
sleep Sidekiq::Fetcher::TIMEOUT
return
end
queue_name, message = Sidekiq.redis do |it|
it.brpop *queues.map(&:full_name), Sidekiq::Fetcher::TIMEOUT
end
if message
queue = queues.delete queues.find {|it| it.full_name == queue_name }
UnitOfWork.new queue, message
end
ensure
queues.each(&:release) if queues
end end
private private
def prepare_queues(options) def fetch_message
limits = options[:limits] || {} queue, _ = redis_blpop *@queues.acquire, Sidekiq::Fetcher::TIMEOUT
@queues = options[:queues].map do |name| ensure
Sidekiq::Queue.new(name).tap do |it| @queues.release_except queue
it.limit = limits[name] if limits[name]
end
end
end end
def define_strict_queues def redis_blpop(*args)
@queues.uniq! Sidekiq.redis {|it| it.blpop *args }
def fetch_queues
@queues
end
end
def define_weighted_queues
def fetch_queues
@queues.shuffle.uniq
end
end end
end end

View file

@ -0,0 +1,92 @@
module Sidekiq::LimitFetch::Global
module Selector
extend self
def acquire(queues)
redis_eval :acquire, [namespace, uuid, queues]
end
def release(queues)
redis_eval :release, [namespace, uuid, queues]
end
private
def namespace
@namespace ||= begin
namespace = Sidekiq.options[:namespace]
namespace + ':' if namespace
end
end
def uuid
@uuid ||= SecureRandom.uuid
end
def redis_eval(script_name, args)
Sidekiq.redis do |it|
begin
it.evalsha send("redis_#{script_name}_sha"), argv: args
rescue Redis::CommandError => error
raise unless error.message.include? 'NOSCRIPT'
it.eval send("redis_#{script_name}_script"), argv: args
end
end
end
def redis_acquire_sha
@acquire_sha ||= Digest::SHA1.hexdigest redis_acquire_script
end
def redis_release_sha
@release_sha ||= Digest::SHA1.hexdigest redis_release_script
end
def redis_acquire_script
<<-LUA
local namespace = table.remove(ARGV, 1)..'limit_fetch:'
local worker_name = table.remove(ARGV, 1)
local queues = ARGV
local available = {}
for _, queue in ipairs(queues) do
local busy_key = namespace..'busy:'..queue
local pause_key = namespace..'pause:'..queue
local paused = redis.call('get', pause_key)
if not paused then
local limit_key = namespace..'limit:'..queue
local queue_limit = tonumber(redis.call('get', limit_key))
if queue_limit then
local queue_locks = redis.call('llen', busy_key)
if queue_limit > queue_locks then
redis.call('rpush', busy_key, worker_name)
table.insert(available, queue)
end
else
redis.call('rpush', busy_key, worker_name)
table.insert(available, queue)
end
end
end
return available
LUA
end
def redis_release_script
<<-LUA
local namespace = table.remove(ARGV, 1)..'limit_fetch:'
local worker_name = table.remove(ARGV, 1)
local queues = ARGV
for _, queue in ipairs(queues) do
local busy_key = namespace..'busy:'..queue
redis.call('lrem', busy_key, 1, worker_name)
end
LUA
end
end
end

View file

@ -0,0 +1,38 @@
module Sidekiq::LimitFetch::Global
class Semaphore
PREFIX = 'limit_fetch'
def initialize(name)
@name = name
end
def limit
value = Sidekiq.redis {|it| it.get "#{PREFIX}:limit:#@name" }
value.to_i if value
end
def limit=(value)
Sidekiq.redis {|it| it.set "#{PREFIX}:limit:#@name", value }
end
def acquire
Selector.acquire([@name]).size > 0
end
def release
Selector.release [@name]
end
def busy
Sidekiq.redis {|it| it.llen "#{PREFIX}:busy:#@name" }
end
def pause
Sidekiq.redis {|it| it.set "#{PREFIX}:pause:#@name", true }
end
def continue
Sidekiq.redis {|it| it.del "#{PREFIX}:pause:#@name" }
end
end
end

View file

@ -0,0 +1,19 @@
module Sidekiq::LimitFetch::Local
module Selector
extend self
def acquire(names)
queues(names).select(&:acquire).map(&:name)
end
def release(names)
queues(names).each(&:release)
end
private
def queues(names)
names.map {|name| Sidekiq::Queue[name] }
end
end
end

View file

@ -0,0 +1,39 @@
module Sidekiq::LimitFetch::Local
class Semaphore
attr_reader :limit, :busy
def initialize(name)
@name = name
@lock = Mutex.new
@busy = 0
@paused = false
end
def limit=(value)
@lock.synchronize do
@limit = value
end
end
def acquire
return if @paused
@lock.synchronize do
@busy += 1 if not @limit or @limit > @busy
end
end
def release
@lock.synchronize do
@busy -= 1
end
end
def pause
@paused = true
end
def continue
@paused = false
end
end
end

View file

@ -0,0 +1,57 @@
class Sidekiq::LimitFetch
class Queues
THREAD_KEY = :acquired_queues
attr_reader :selector
def initialize(options)
@queues = options[:queues]
options[:strict] ? strict_order! : weighted_order!
set_selector options[:global]
set_limits options[:limits]
end
def acquire
@selector.acquire(ordered_queues)
.tap {|it| save it }
.map {|it| "queue:#{it}" }
end
def release_except(full_name)
@selector.release restore.delete_if {|name| full_name.to_s.include? name }
end
private
def set_selector(global)
@selector = global ? Global::Selector : Local::Selector
end
def set_limits(limits)
ordered_queues.each do |name|
Sidekiq::Queue[name].tap do |it|
it.limit = (limits || {})[name]
end
end
end
def strict_order!
@queues.uniq!
def ordered_queues; @queues end
end
def weighted_order!
def ordered_queues; @queues.shuffle.uniq end
end
def save(queues)
Thread.current[THREAD_KEY] = queues
end
def restore
Thread.current[THREAD_KEY]
ensure
Thread.current[THREAD_KEY] = nil
end
end
end

View file

@ -1,36 +0,0 @@
class Sidekiq::LimitFetch::Semaphore
attr_reader :limit, :busy
def initialize
@lock = Mutex.new
@busy = 0
@paused = false
end
def limit=(value)
@lock.synchronize do
@limit = value
end
end
def acquire
return if @paused
@lock.synchronize do
@busy += 1 if not @limit or @limit > @busy
end
end
def release
@lock.synchronize do
@busy -= 1
end
end
def pause
@paused = true
end
def continue
@paused = false
end
end

View file

@ -1,16 +1,12 @@
Sidekiq::LimitFetch::UnitOfWork = Struct.new :queue_wrapper, :message do module Sidekiq
extend Forwardable class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork
def acknowledge
Queue[queue_name].release
end
def_delegator :queue_wrapper, :full_name, :queue def requeue
def_delegator :queue_wrapper, :name, :queue_name super
def_delegator :queue_wrapper, :release acknowledge
end
def acknowledge
release
end
def requeue
release
Sidekiq.redis {|it| it.rpush queue, message }
end end
end end

View file

@ -1,81 +0,0 @@
require 'spec_helper'
describe Sidekiq::LimitFetch do
before :each do
Sidekiq.redis do |it|
it.del 'queue:example1'
it.rpush 'queue:example1', 'task'
it.expire 'queue:example1', 30
end
end
def queues(fetcher)
fetcher.available_queues.map(&:full_name)
end
def new_fetcher(options={})
described_class.new options.merge queues: %w(example1 example1 example2 example2)
end
it 'should retrieve weighted queues' do
fetcher = new_fetcher
queues(fetcher).should =~ %w(queue:example1 queue:example2)
end
it 'should retrieve strictly ordered queues' do
fetcher = new_fetcher strict: true
queues(fetcher).should == %w(queue:example1 queue:example2)
end
it 'should retrieve only available queues' do
fetcher = new_fetcher strict: true, limits: { 'example1' => 2 }
queues = -> { fetcher.available_queues }
queues1 = queues.call
queues2 = queues.call
queues1.should have(2).items
queues2.should have(2).items
queues.call.should have(1).items
queues1.each(&:release)
queues.call.should have(2).items
queues.call.should have(1).items
queues2.each(&:release)
queues.call.should have(2).items
queues.call.should have(1).items
end
it 'should acquire lock on queue for excecution' do
fetcher = new_fetcher limits: { 'example1' => 1, 'example2' => 1 }
work = fetcher.retrieve_work
work.message.should == 'task'
work.queue.should == 'queue:example1'
work.queue_name.should == 'example1'
queues = fetcher.available_queues
queues.should have(1).item
queues.each(&:release)
work.requeue
work = fetcher.retrieve_work
work.message.should == 'task'
work.acknowledge
fetcher.available_queues.should have(2).items
end
it 'should set queue limits on the fly' do
Sidekiq::Queue['example1'].limit = 1
Sidekiq::Queue['example2'].limit = 2
fetcher = new_fetcher
fetcher.available_queues.should have(2).item
fetcher.available_queues.should have(1).item
fetcher.available_queues.should have(0).item
Sidekiq::Queue['example1'].limit = 2
fetcher.available_queues.should have(1).item
end
end

View file

@ -0,0 +1,85 @@
require 'spec_helper'
describe Sidekiq::Queue do
context 'singleton' do
shared_examples :constructor do
it 'with default name' do
new_object = -> { described_class.send constructor }
new_object.call.should == new_object.call
end
it 'with given name' do
new_object = ->(name) { described_class.send constructor, name }
new_object.call('name').should == new_object.call('name')
end
end
context '.new' do
let(:constructor) { :new }
it_behaves_like :constructor
end
context '.[]' do
let(:constructor) { :[] }
it_behaves_like :constructor
end
context '#lock' do
let(:name) { 'example' }
let(:queue) { Sidekiq::Queue[name] }
shared_examples_for :lock do
it 'should be available' do
queue.acquire.should be
end
it 'should be pausable' do
queue.pause
queue.acquire.should_not be
end
it 'should be continuable' do
queue.pause
queue.continue
queue.acquire.should be
end
it 'should be limitable' do
queue.limit = 1
queue.acquire.should be
queue.acquire.should_not be
end
it 'should be resizable' do
queue.limit = 0
queue.acquire.should_not be
queue.limit = nil
queue.acquire.should be
end
it 'should be countable' do
queue.limit = 3
5.times { queue.acquire }
queue.busy.should == 3
end
it 'should be releasable' do
queue.acquire
queue.busy.should == 1
queue.release
queue.busy.should == 0
end
end
context 'global' do
before(:all) { Sidekiq.options[:global] = true }
it_behaves_like :lock
end
context 'local' do
before(:all) { Sidekiq.options[:global] = false }
it_behaves_like :lock
end
end
end
end

View file

@ -1,72 +0,0 @@
require 'spec_helper'
describe Sidekiq::Queue do
context 'singleton' do
shared_examples :constructor do
it 'with default name' do
new_object = -> { described_class.send constructor }
new_object.call.should == new_object.call
end
it 'with given name' do
new_object = ->(name) { described_class.send constructor, name }
new_object.call('name').should == new_object.call('name')
end
end
context '.new' do
let(:constructor) { :new }
it_behaves_like :constructor
end
context '.[]' do
let(:constructor) { :[] }
it_behaves_like :constructor
end
context '#acquire' do
let(:queue) { Sidekiq::Queue['example'] }
it 'should be available' do
queue.acquire.should be
end
it 'should be pausable' do
queue.pause
queue.acquire.should_not be
end
it 'should be continuable' do
queue.pause
queue.continue
queue.acquire.should be
end
it 'should be limitable' do
queue.limit = 1
queue.acquire.should be
queue.acquire.should_not be
end
it 'should be resizable' do
queue.limit = 0
queue.acquire.should_not be
queue.limit = nil
queue.acquire.should be
end
it 'should be countable' do
queue.limit = 3
5.times { queue.acquire }
queue.busy.should == 3
end
it 'should be releasable' do
queue.acquire
queue.busy.should == 1
queue.release
queue.busy.should == 0
end
end
end
end

View file

@ -0,0 +1,75 @@
require 'spec_helper'
describe Sidekiq::LimitFetch::Queues do
subject { described_class.new options }
let(:queues) { %w[queue1 queue2] }
let(:limits) {{ 'queue1' => 3 }}
let(:strict) { true }
let(:global) { false }
let(:options) do
{ queues: queues, limits: limits, strict: strict, global: global }
end
after(:each ) do
Thread.current[:available_queues] = nil
end
shared_examples_for :selector do
it 'should acquire queues' do
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
end
it 'should release queues' do
subject.acquire
subject.release_except nil
Sidekiq::Queue['queue1'].busy.should == 0
Sidekiq::Queue['queue2'].busy.should == 0
end
it 'should release queues except selected' do
subject.acquire
subject.release_except 'queue:queue1'
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 0
end
end
context 'without global flag' do
it_should_behave_like :selector
it 'without global flag should be local' do
subject.selector.should == Sidekiq::LimitFetch::Local::Selector
end
end
context 'with global flag' do
let(:global) { true }
it_should_behave_like :selector
it 'should use global selector' do
subject.selector.should == Sidekiq::LimitFetch::Global::Selector
end
end
it 'should set limits' do
subject
Sidekiq::Queue['queue1'].limit.should == 3
Sidekiq::Queue['queue2'].limit.should_not be
end
context 'without strict flag' do
let(:strict) { false }
it 'should retrieve weighted queues' do
subject.ordered_queues.should =~ %w(queue1 queue2)
end
end
it 'with strict flag should retrieve strictly ordered queues' do
subject.ordered_queues.should == %w(queue1 queue2)
end
end

View file

@ -0,0 +1,68 @@
require 'spec_helper'
describe 'semaphore' do
shared_examples_for :semaphore do
it 'should have no limit by default' do
subject.limit.should_not be
end
it 'should set limit' do
subject.limit = 4
subject.limit.should == 4
end
it 'should acquire and count active tasks' do
3.times { subject.acquire }
subject.busy.should == 3
end
it 'should acquire tasks with regard to limit' do
subject.limit = 4
6.times { subject.acquire }
subject.busy.should == 4
end
it 'should release active tasks' do
6.times { subject.acquire }
3.times { subject.release }
subject.busy.should == 3
end
it 'should pause tasks' do
3.times { subject.acquire }
subject.pause
2.times { subject.acquire }
subject.busy.should == 3
2.times { subject.release }
subject.busy.should == 1
end
it 'should unpause tasks' do
subject.pause
3.times { subject.acquire }
subject.continue
2.times { subject.acquire }
subject.busy.should == 2
end
end
let(:name) { 'default' }
context 'local' do
subject { Sidekiq::LimitFetch::Local::Semaphore.new name }
it_behaves_like :semaphore
end
context 'global' do
subject { Sidekiq::LimitFetch::Global::Semaphore.new name }
it_behaves_like :semaphore
after :each do
Sidekiq.redis do |it|
it.del "limit_fetch:limit:#{name}"
it.del "limit_fetch:busy:#{name}"
it.del "limit_fetch:pause:#{name}"
end
end
end
end

View file

@ -0,0 +1,47 @@
require 'spec_helper'
describe Sidekiq::LimitFetch do
before :each do
Sidekiq.redis do |it|
it.del 'queue:queue1'
it.rpush 'queue:queue1', 'task1'
it.rpush 'queue:queue1', 'task2'
it.expire 'queue:queue1', 30
end
end
subject { described_class.new options }
let(:options) {{ queues: queues, limits: limits, global: global }}
let(:queues) { %w(queue1 queue1 queue2 queue2) }
let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }}
shared_examples_for :strategy do
it 'should acquire lock on queue for execution' do
work = subject.retrieve_work
work.queue_name.should == 'queue1'
work.message.should == 'task1'
subject.retrieve_work.should_not be
work.requeue
work = subject.retrieve_work
work.message.should == 'task2'
subject.retrieve_work.should_not be
work.acknowledge
work = subject.retrieve_work
work.message.should == 'task1'
end
end
context 'global' do
let(:global) { true }
it_behaves_like :strategy
end
context 'local' do
let(:global) { false }
it_behaves_like :strategy
end
end

View file

@ -3,5 +3,17 @@ require 'sidekiq/limit_fetch'
RSpec.configure do |config| RSpec.configure do |config|
config.before :each do config.before :each do
Sidekiq::Queue.instance_variable_set :@instances, {} Sidekiq::Queue.instance_variable_set :@instances, {}
Sidekiq.options[:global] = defined?(global) ? global : nil
Sidekiq.redis do |it|
clean_redis = ->(queue) do
it.del "limit_fetch:limit:#{queue}"
it.del "limit_fetch:busy:#{queue}"
it.del "limit_fetch:pause:#{queue}"
end
clean_redis.call(name) if defined?(name)
queues.each(&clean_redis) if defined?(queues) and queues.is_a? Array
end
end end
end end