From 4aab89b288b417cf0c2e6303c9ca59ea7efb5bd8 Mon Sep 17 00:00:00 2001 From: brainopia Date: Thu, 24 Jan 2013 16:03:28 +0400 Subject: [PATCH] Add global mode --- README.md | 8 +- .../{limit_fetch => extensions}/queue.rb | 8 +- lib/sidekiq/limit_fetch.rb | 58 +++--------- lib/sidekiq/limit_fetch/global/selector.rb | 92 +++++++++++++++++++ lib/sidekiq/limit_fetch/global/semaphore.rb | 38 ++++++++ lib/sidekiq/limit_fetch/local/selector.rb | 19 ++++ lib/sidekiq/limit_fetch/local/semaphore.rb | 39 ++++++++ lib/sidekiq/limit_fetch/queues.rb | 57 ++++++++++++ lib/sidekiq/limit_fetch/semaphore.rb | 36 -------- lib/sidekiq/limit_fetch/unit_of_work.rb | 22 ++--- spec/limit_fetch_spec.rb | 81 ---------------- spec/sidekiq/extensions/queue_spec.rb | 85 +++++++++++++++++ spec/sidekiq/limit_fetch/queue_spec.rb | 72 --------------- spec/sidekiq/limit_fetch/queues_spec.rb | 75 +++++++++++++++ spec/sidekiq/limit_fetch/semaphore_spec.rb | 68 ++++++++++++++ spec/sidekiq/limit_fetch_spec.rb | 47 ++++++++++ spec/spec_helper.rb | 12 +++ 17 files changed, 566 insertions(+), 251 deletions(-) rename lib/sidekiq/{limit_fetch => extensions}/queue.rb (64%) create mode 100644 lib/sidekiq/limit_fetch/global/selector.rb create mode 100644 lib/sidekiq/limit_fetch/global/semaphore.rb create mode 100644 lib/sidekiq/limit_fetch/local/selector.rb create mode 100644 lib/sidekiq/limit_fetch/local/semaphore.rb create mode 100644 lib/sidekiq/limit_fetch/queues.rb delete mode 100644 lib/sidekiq/limit_fetch/semaphore.rb delete mode 100644 spec/limit_fetch_spec.rb create mode 100644 spec/sidekiq/extensions/queue_spec.rb delete mode 100644 spec/sidekiq/limit_fetch/queue_spec.rb create mode 100644 spec/sidekiq/limit_fetch/queues_spec.rb create mode 100644 spec/sidekiq/limit_fetch/semaphore_spec.rb create mode 100644 spec/sidekiq/limit_fetch_spec.rb diff --git a/README.md b/README.md index 4794b39..87715f5 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Specify limits which you want to place on queues inside sidekiq.yml: Or set it dynamically in your code: ```ruby - Sidekiq::Queue.new('queue_name1').limit = 5 + Sidekiq::Queue['queue_name1'].limit = 5 Sidekiq::Queue['queue_name2'].limit = 10 ``` @@ -43,7 +43,11 @@ will be preserved. Limits are applied per process. In case you have several worker processes and want to have global locks between them, you'll need to -wait just a bit more since support for global locks is underway. +enable global mode by setting global option, eg: + +```yaml +:global: true +``` Sponsored by [Evil Martians]. [Evil Martians]: http://evilmartians.com/ diff --git a/lib/sidekiq/limit_fetch/queue.rb b/lib/sidekiq/extensions/queue.rb similarity index 64% rename from lib/sidekiq/limit_fetch/queue.rb rename to lib/sidekiq/extensions/queue.rb index dd44895..0452002 100644 --- a/lib/sidekiq/limit_fetch/queue.rb +++ b/lib/sidekiq/extensions/queue.rb @@ -8,12 +8,12 @@ module Sidekiq :pause, :continue, :busy - def full_name - @rname + def lock + @lock ||= mode::Semaphore.new name end - def lock - @lock ||= LimitFetch::Semaphore.new + def mode + Sidekiq.options[:global] ? LimitFetch::Global : LimitFetch::Local end end end diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 281b5be..5c7c69c 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -2,10 +2,14 @@ require 'sidekiq' require 'sidekiq/fetch' class Sidekiq::LimitFetch - require_relative 'limit_fetch/semaphore' require_relative 'limit_fetch/unit_of_work' require_relative 'limit_fetch/singleton' - require_relative 'limit_fetch/queue' + require_relative 'limit_fetch/queues' + require_relative 'limit_fetch/local/semaphore' + require_relative 'limit_fetch/local/selector' + require_relative 'limit_fetch/global/semaphore' + require_relative 'limit_fetch/global/selector' + require_relative 'extensions/queue' Sidekiq.options[:fetch] = self @@ -14,55 +18,23 @@ class Sidekiq::LimitFetch end def initialize(options) - prepare_queues options - options[:strict] ? define_strict_queues : define_weighted_queues - end - - def available_queues - fetch_queues.select(&:acquire) + @queues = Queues.new options end def retrieve_work - queues = available_queues - - if queues.empty? - sleep Sidekiq::Fetcher::TIMEOUT - return - end - - queue_name, message = Sidekiq.redis do |it| - it.brpop *queues.map(&:full_name), Sidekiq::Fetcher::TIMEOUT - end - - if message - queue = queues.delete queues.find {|it| it.full_name == queue_name } - UnitOfWork.new queue, message - end - ensure - queues.each(&:release) if queues + queue, message = fetch_message + UnitOfWork.new queue, message if message end private - def prepare_queues(options) - limits = options[:limits] || {} - @queues = options[:queues].map do |name| - Sidekiq::Queue.new(name).tap do |it| - it.limit = limits[name] if limits[name] - end - end + def fetch_message + queue, _ = redis_blpop *@queues.acquire, Sidekiq::Fetcher::TIMEOUT + ensure + @queues.release_except queue end - def define_strict_queues - @queues.uniq! - def fetch_queues - @queues - end - end - - def define_weighted_queues - def fetch_queues - @queues.shuffle.uniq - end + def redis_blpop(*args) + Sidekiq.redis {|it| it.blpop *args } end end diff --git a/lib/sidekiq/limit_fetch/global/selector.rb b/lib/sidekiq/limit_fetch/global/selector.rb new file mode 100644 index 0000000..6ade2f9 --- /dev/null +++ b/lib/sidekiq/limit_fetch/global/selector.rb @@ -0,0 +1,92 @@ +module Sidekiq::LimitFetch::Global + module Selector + extend self + + def acquire(queues) + redis_eval :acquire, [namespace, uuid, queues] + end + + def release(queues) + redis_eval :release, [namespace, uuid, queues] + end + + private + + def namespace + @namespace ||= begin + namespace = Sidekiq.options[:namespace] + namespace + ':' if namespace + end + end + + def uuid + @uuid ||= SecureRandom.uuid + end + + def redis_eval(script_name, args) + Sidekiq.redis do |it| + begin + it.evalsha send("redis_#{script_name}_sha"), argv: args + rescue Redis::CommandError => error + raise unless error.message.include? 'NOSCRIPT' + it.eval send("redis_#{script_name}_script"), argv: args + end + end + end + + def redis_acquire_sha + @acquire_sha ||= Digest::SHA1.hexdigest redis_acquire_script + end + + def redis_release_sha + @release_sha ||= Digest::SHA1.hexdigest redis_release_script + end + + def redis_acquire_script + <<-LUA + local namespace = table.remove(ARGV, 1)..'limit_fetch:' + local worker_name = table.remove(ARGV, 1) + local queues = ARGV + local available = {} + + for _, queue in ipairs(queues) do + local busy_key = namespace..'busy:'..queue + local pause_key = namespace..'pause:'..queue + local paused = redis.call('get', pause_key) + + if not paused then + local limit_key = namespace..'limit:'..queue + local queue_limit = tonumber(redis.call('get', limit_key)) + + if queue_limit then + local queue_locks = redis.call('llen', busy_key) + + if queue_limit > queue_locks then + redis.call('rpush', busy_key, worker_name) + table.insert(available, queue) + end + else + redis.call('rpush', busy_key, worker_name) + table.insert(available, queue) + end + end + end + + return available + LUA + end + + def redis_release_script + <<-LUA + local namespace = table.remove(ARGV, 1)..'limit_fetch:' + local worker_name = table.remove(ARGV, 1) + local queues = ARGV + + for _, queue in ipairs(queues) do + local busy_key = namespace..'busy:'..queue + redis.call('lrem', busy_key, 1, worker_name) + end + LUA + end + end +end diff --git a/lib/sidekiq/limit_fetch/global/semaphore.rb b/lib/sidekiq/limit_fetch/global/semaphore.rb new file mode 100644 index 0000000..5c3e3be --- /dev/null +++ b/lib/sidekiq/limit_fetch/global/semaphore.rb @@ -0,0 +1,38 @@ +module Sidekiq::LimitFetch::Global + class Semaphore + PREFIX = 'limit_fetch' + + def initialize(name) + @name = name + end + + def limit + value = Sidekiq.redis {|it| it.get "#{PREFIX}:limit:#@name" } + value.to_i if value + end + + def limit=(value) + Sidekiq.redis {|it| it.set "#{PREFIX}:limit:#@name", value } + end + + def acquire + Selector.acquire([@name]).size > 0 + end + + def release + Selector.release [@name] + end + + def busy + Sidekiq.redis {|it| it.llen "#{PREFIX}:busy:#@name" } + end + + def pause + Sidekiq.redis {|it| it.set "#{PREFIX}:pause:#@name", true } + end + + def continue + Sidekiq.redis {|it| it.del "#{PREFIX}:pause:#@name" } + end + end +end diff --git a/lib/sidekiq/limit_fetch/local/selector.rb b/lib/sidekiq/limit_fetch/local/selector.rb new file mode 100644 index 0000000..28ae200 --- /dev/null +++ b/lib/sidekiq/limit_fetch/local/selector.rb @@ -0,0 +1,19 @@ +module Sidekiq::LimitFetch::Local + module Selector + extend self + + def acquire(names) + queues(names).select(&:acquire).map(&:name) + end + + def release(names) + queues(names).each(&:release) + end + + private + + def queues(names) + names.map {|name| Sidekiq::Queue[name] } + end + end +end diff --git a/lib/sidekiq/limit_fetch/local/semaphore.rb b/lib/sidekiq/limit_fetch/local/semaphore.rb new file mode 100644 index 0000000..d0b391c --- /dev/null +++ b/lib/sidekiq/limit_fetch/local/semaphore.rb @@ -0,0 +1,39 @@ +module Sidekiq::LimitFetch::Local + class Semaphore + attr_reader :limit, :busy + + def initialize(name) + @name = name + @lock = Mutex.new + @busy = 0 + @paused = false + end + + def limit=(value) + @lock.synchronize do + @limit = value + end + end + + def acquire + return if @paused + @lock.synchronize do + @busy += 1 if not @limit or @limit > @busy + end + end + + def release + @lock.synchronize do + @busy -= 1 + end + end + + def pause + @paused = true + end + + def continue + @paused = false + end + end +end diff --git a/lib/sidekiq/limit_fetch/queues.rb b/lib/sidekiq/limit_fetch/queues.rb new file mode 100644 index 0000000..5ba95dc --- /dev/null +++ b/lib/sidekiq/limit_fetch/queues.rb @@ -0,0 +1,57 @@ +class Sidekiq::LimitFetch + class Queues + THREAD_KEY = :acquired_queues + attr_reader :selector + + def initialize(options) + @queues = options[:queues] + options[:strict] ? strict_order! : weighted_order! + + set_selector options[:global] + set_limits options[:limits] + end + + def acquire + @selector.acquire(ordered_queues) + .tap {|it| save it } + .map {|it| "queue:#{it}" } + end + + def release_except(full_name) + @selector.release restore.delete_if {|name| full_name.to_s.include? name } + end + + private + + def set_selector(global) + @selector = global ? Global::Selector : Local::Selector + end + + def set_limits(limits) + ordered_queues.each do |name| + Sidekiq::Queue[name].tap do |it| + it.limit = (limits || {})[name] + end + end + end + + def strict_order! + @queues.uniq! + def ordered_queues; @queues end + end + + def weighted_order! + def ordered_queues; @queues.shuffle.uniq end + end + + def save(queues) + Thread.current[THREAD_KEY] = queues + end + + def restore + Thread.current[THREAD_KEY] + ensure + Thread.current[THREAD_KEY] = nil + end + end +end diff --git a/lib/sidekiq/limit_fetch/semaphore.rb b/lib/sidekiq/limit_fetch/semaphore.rb deleted file mode 100644 index 695bc2f..0000000 --- a/lib/sidekiq/limit_fetch/semaphore.rb +++ /dev/null @@ -1,36 +0,0 @@ -class Sidekiq::LimitFetch::Semaphore - attr_reader :limit, :busy - - def initialize - @lock = Mutex.new - @busy = 0 - @paused = false - end - - def limit=(value) - @lock.synchronize do - @limit = value - end - end - - def acquire - return if @paused - @lock.synchronize do - @busy += 1 if not @limit or @limit > @busy - end - end - - def release - @lock.synchronize do - @busy -= 1 - end - end - - def pause - @paused = true - end - - def continue - @paused = false - end -end diff --git a/lib/sidekiq/limit_fetch/unit_of_work.rb b/lib/sidekiq/limit_fetch/unit_of_work.rb index 42b6277..6851d1c 100644 --- a/lib/sidekiq/limit_fetch/unit_of_work.rb +++ b/lib/sidekiq/limit_fetch/unit_of_work.rb @@ -1,16 +1,12 @@ -Sidekiq::LimitFetch::UnitOfWork = Struct.new :queue_wrapper, :message do - extend Forwardable +module Sidekiq + class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork + def acknowledge + Queue[queue_name].release + end - def_delegator :queue_wrapper, :full_name, :queue - def_delegator :queue_wrapper, :name, :queue_name - def_delegator :queue_wrapper, :release - - def acknowledge - release - end - - def requeue - release - Sidekiq.redis {|it| it.rpush queue, message } + def requeue + super + acknowledge + end end end diff --git a/spec/limit_fetch_spec.rb b/spec/limit_fetch_spec.rb deleted file mode 100644 index 5f8cdb1..0000000 --- a/spec/limit_fetch_spec.rb +++ /dev/null @@ -1,81 +0,0 @@ -require 'spec_helper' - -describe Sidekiq::LimitFetch do - before :each do - Sidekiq.redis do |it| - it.del 'queue:example1' - it.rpush 'queue:example1', 'task' - it.expire 'queue:example1', 30 - end - end - - def queues(fetcher) - fetcher.available_queues.map(&:full_name) - end - - def new_fetcher(options={}) - described_class.new options.merge queues: %w(example1 example1 example2 example2) - end - - it 'should retrieve weighted queues' do - fetcher = new_fetcher - queues(fetcher).should =~ %w(queue:example1 queue:example2) - end - - it 'should retrieve strictly ordered queues' do - fetcher = new_fetcher strict: true - queues(fetcher).should == %w(queue:example1 queue:example2) - end - - it 'should retrieve only available queues' do - fetcher = new_fetcher strict: true, limits: { 'example1' => 2 } - queues = -> { fetcher.available_queues } - - queues1 = queues.call - queues2 = queues.call - queues1.should have(2).items - queues2.should have(2).items - queues.call.should have(1).items - - queues1.each(&:release) - queues.call.should have(2).items - queues.call.should have(1).items - - queues2.each(&:release) - queues.call.should have(2).items - queues.call.should have(1).items - end - - it 'should acquire lock on queue for excecution' do - fetcher = new_fetcher limits: { 'example1' => 1, 'example2' => 1 } - work = fetcher.retrieve_work - work.message.should == 'task' - work.queue.should == 'queue:example1' - work.queue_name.should == 'example1' - - queues = fetcher.available_queues - queues.should have(1).item - queues.each(&:release) - - work.requeue - work = fetcher.retrieve_work - work.message.should == 'task' - work.acknowledge - - fetcher.available_queues.should have(2).items - end - - it 'should set queue limits on the fly' do - Sidekiq::Queue['example1'].limit = 1 - Sidekiq::Queue['example2'].limit = 2 - - fetcher = new_fetcher - - fetcher.available_queues.should have(2).item - fetcher.available_queues.should have(1).item - fetcher.available_queues.should have(0).item - - Sidekiq::Queue['example1'].limit = 2 - fetcher.available_queues.should have(1).item - end -end diff --git a/spec/sidekiq/extensions/queue_spec.rb b/spec/sidekiq/extensions/queue_spec.rb new file mode 100644 index 0000000..4695ee3 --- /dev/null +++ b/spec/sidekiq/extensions/queue_spec.rb @@ -0,0 +1,85 @@ +require 'spec_helper' + +describe Sidekiq::Queue do + context 'singleton' do + shared_examples :constructor do + it 'with default name' do + new_object = -> { described_class.send constructor } + new_object.call.should == new_object.call + end + + it 'with given name' do + new_object = ->(name) { described_class.send constructor, name } + new_object.call('name').should == new_object.call('name') + end + end + + context '.new' do + let(:constructor) { :new } + it_behaves_like :constructor + end + + context '.[]' do + let(:constructor) { :[] } + it_behaves_like :constructor + end + + context '#lock' do + let(:name) { 'example' } + let(:queue) { Sidekiq::Queue[name] } + + shared_examples_for :lock do + it 'should be available' do + queue.acquire.should be + end + + it 'should be pausable' do + queue.pause + queue.acquire.should_not be + end + + it 'should be continuable' do + queue.pause + queue.continue + queue.acquire.should be + end + + it 'should be limitable' do + queue.limit = 1 + queue.acquire.should be + queue.acquire.should_not be + end + + it 'should be resizable' do + queue.limit = 0 + queue.acquire.should_not be + queue.limit = nil + queue.acquire.should be + end + + it 'should be countable' do + queue.limit = 3 + 5.times { queue.acquire } + queue.busy.should == 3 + end + + it 'should be releasable' do + queue.acquire + queue.busy.should == 1 + queue.release + queue.busy.should == 0 + end + end + + context 'global' do + before(:all) { Sidekiq.options[:global] = true } + it_behaves_like :lock + end + + context 'local' do + before(:all) { Sidekiq.options[:global] = false } + it_behaves_like :lock + end + end + end +end diff --git a/spec/sidekiq/limit_fetch/queue_spec.rb b/spec/sidekiq/limit_fetch/queue_spec.rb deleted file mode 100644 index 826179b..0000000 --- a/spec/sidekiq/limit_fetch/queue_spec.rb +++ /dev/null @@ -1,72 +0,0 @@ -require 'spec_helper' - -describe Sidekiq::Queue do - context 'singleton' do - shared_examples :constructor do - it 'with default name' do - new_object = -> { described_class.send constructor } - new_object.call.should == new_object.call - end - - it 'with given name' do - new_object = ->(name) { described_class.send constructor, name } - new_object.call('name').should == new_object.call('name') - end - end - - context '.new' do - let(:constructor) { :new } - it_behaves_like :constructor - end - - context '.[]' do - let(:constructor) { :[] } - it_behaves_like :constructor - end - - context '#acquire' do - let(:queue) { Sidekiq::Queue['example'] } - - it 'should be available' do - queue.acquire.should be - end - - it 'should be pausable' do - queue.pause - queue.acquire.should_not be - end - - it 'should be continuable' do - queue.pause - queue.continue - queue.acquire.should be - end - - it 'should be limitable' do - queue.limit = 1 - queue.acquire.should be - queue.acquire.should_not be - end - - it 'should be resizable' do - queue.limit = 0 - queue.acquire.should_not be - queue.limit = nil - queue.acquire.should be - end - - it 'should be countable' do - queue.limit = 3 - 5.times { queue.acquire } - queue.busy.should == 3 - end - - it 'should be releasable' do - queue.acquire - queue.busy.should == 1 - queue.release - queue.busy.should == 0 - end - end - end -end diff --git a/spec/sidekiq/limit_fetch/queues_spec.rb b/spec/sidekiq/limit_fetch/queues_spec.rb new file mode 100644 index 0000000..5647f8f --- /dev/null +++ b/spec/sidekiq/limit_fetch/queues_spec.rb @@ -0,0 +1,75 @@ +require 'spec_helper' + +describe Sidekiq::LimitFetch::Queues do + subject { described_class.new options } + + let(:queues) { %w[queue1 queue2] } + let(:limits) {{ 'queue1' => 3 }} + let(:strict) { true } + let(:global) { false } + + let(:options) do + { queues: queues, limits: limits, strict: strict, global: global } + end + + after(:each ) do + Thread.current[:available_queues] = nil + end + + shared_examples_for :selector do + it 'should acquire queues' do + subject.acquire + Sidekiq::Queue['queue1'].busy.should == 1 + Sidekiq::Queue['queue2'].busy.should == 1 + end + + it 'should release queues' do + subject.acquire + subject.release_except nil + Sidekiq::Queue['queue1'].busy.should == 0 + Sidekiq::Queue['queue2'].busy.should == 0 + end + + it 'should release queues except selected' do + subject.acquire + subject.release_except 'queue:queue1' + Sidekiq::Queue['queue1'].busy.should == 1 + Sidekiq::Queue['queue2'].busy.should == 0 + end + end + + context 'without global flag' do + it_should_behave_like :selector + + it 'without global flag should be local' do + subject.selector.should == Sidekiq::LimitFetch::Local::Selector + end + end + + context 'with global flag' do + let(:global) { true } + it_should_behave_like :selector + + it 'should use global selector' do + subject.selector.should == Sidekiq::LimitFetch::Global::Selector + end + end + + it 'should set limits' do + subject + Sidekiq::Queue['queue1'].limit.should == 3 + Sidekiq::Queue['queue2'].limit.should_not be + end + + context 'without strict flag' do + let(:strict) { false } + + it 'should retrieve weighted queues' do + subject.ordered_queues.should =~ %w(queue1 queue2) + end + end + + it 'with strict flag should retrieve strictly ordered queues' do + subject.ordered_queues.should == %w(queue1 queue2) + end +end diff --git a/spec/sidekiq/limit_fetch/semaphore_spec.rb b/spec/sidekiq/limit_fetch/semaphore_spec.rb new file mode 100644 index 0000000..81a6bef --- /dev/null +++ b/spec/sidekiq/limit_fetch/semaphore_spec.rb @@ -0,0 +1,68 @@ +require 'spec_helper' + +describe 'semaphore' do + shared_examples_for :semaphore do + it 'should have no limit by default' do + subject.limit.should_not be + end + + it 'should set limit' do + subject.limit = 4 + subject.limit.should == 4 + end + + it 'should acquire and count active tasks' do + 3.times { subject.acquire } + subject.busy.should == 3 + end + + it 'should acquire tasks with regard to limit' do + subject.limit = 4 + 6.times { subject.acquire } + subject.busy.should == 4 + end + + it 'should release active tasks' do + 6.times { subject.acquire } + 3.times { subject.release } + subject.busy.should == 3 + end + + it 'should pause tasks' do + 3.times { subject.acquire } + subject.pause + 2.times { subject.acquire } + subject.busy.should == 3 + 2.times { subject.release } + subject.busy.should == 1 + end + + it 'should unpause tasks' do + subject.pause + 3.times { subject.acquire } + subject.continue + 2.times { subject.acquire } + subject.busy.should == 2 + end + end + + let(:name) { 'default' } + + context 'local' do + subject { Sidekiq::LimitFetch::Local::Semaphore.new name } + it_behaves_like :semaphore + end + + context 'global' do + subject { Sidekiq::LimitFetch::Global::Semaphore.new name } + it_behaves_like :semaphore + + after :each do + Sidekiq.redis do |it| + it.del "limit_fetch:limit:#{name}" + it.del "limit_fetch:busy:#{name}" + it.del "limit_fetch:pause:#{name}" + end + end + end +end diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb new file mode 100644 index 0000000..465b4a6 --- /dev/null +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -0,0 +1,47 @@ +require 'spec_helper' + +describe Sidekiq::LimitFetch do + before :each do + Sidekiq.redis do |it| + it.del 'queue:queue1' + it.rpush 'queue:queue1', 'task1' + it.rpush 'queue:queue1', 'task2' + it.expire 'queue:queue1', 30 + end + end + + subject { described_class.new options } + let(:options) {{ queues: queues, limits: limits, global: global }} + let(:queues) { %w(queue1 queue1 queue2 queue2) } + let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }} + + shared_examples_for :strategy do + it 'should acquire lock on queue for execution' do + work = subject.retrieve_work + work.queue_name.should == 'queue1' + work.message.should == 'task1' + + subject.retrieve_work.should_not be + work.requeue + + work = subject.retrieve_work + work.message.should == 'task2' + + subject.retrieve_work.should_not be + work.acknowledge + + work = subject.retrieve_work + work.message.should == 'task1' + end + end + + context 'global' do + let(:global) { true } + it_behaves_like :strategy + end + + context 'local' do + let(:global) { false } + it_behaves_like :strategy + end +end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 65b89d9..15d2a78 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -3,5 +3,17 @@ require 'sidekiq/limit_fetch' RSpec.configure do |config| config.before :each do Sidekiq::Queue.instance_variable_set :@instances, {} + Sidekiq.options[:global] = defined?(global) ? global : nil + + Sidekiq.redis do |it| + clean_redis = ->(queue) do + it.del "limit_fetch:limit:#{queue}" + it.del "limit_fetch:busy:#{queue}" + it.del "limit_fetch:pause:#{queue}" + end + + clean_redis.call(name) if defined?(name) + queues.each(&clean_redis) if defined?(queues) and queues.is_a? Array + end end end