1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Remove local mode

This commit is contained in:
brainopia 2013-05-22 10:55:42 +04:00
parent 1117a36896
commit afb7763561
9 changed files with 151 additions and 327 deletions

View file

@ -6,8 +6,6 @@ class Sidekiq::LimitFetch
require_relative 'limit_fetch/singleton'
require_relative 'limit_fetch/queues'
require_relative 'limit_fetch/local/semaphore'
require_relative 'limit_fetch/local/selector'
require_relative 'limit_fetch/global/semaphore'
require_relative 'limit_fetch/global/selector'
require_relative 'limit_fetch/global/monitor'
@ -20,7 +18,7 @@ class Sidekiq::LimitFetch
end
def initialize(options)
Global::Monitor.start! unless options[:local]
Global::Monitor.start!
@queues = Queues.new options
end

View file

@ -1,31 +0,0 @@
module Sidekiq::LimitFetch::Local
module Selector
extend self
def acquire(names)
blocked = false
unblocked = []
queues(names).select {|queue|
next false if blocked and not unblocked.include?(queue.name)
if not queue.paused? and queue.blocking? and queue.busy > 0
blocked = true
unblocked = queue.unblocked || []
end
queue.acquire
}.map(&:name)
end
def release(names)
queues(names).each(&:release)
end
private
def queues(names)
names.map {|name| Sidekiq::Queue[name] }
end
end
end

View file

@ -1,61 +0,0 @@
module Sidekiq::LimitFetch::Local
class Semaphore
attr_reader :limit, :busy, :unblocked
def initialize(name)
@name = name
@lock = Mutex.new
@busy = 0
@paused = false
end
def limit=(value)
@lock.synchronize do
@limit = value
end
end
def acquire
return if @paused
@lock.synchronize do
@busy += 1 if not @limit or @limit > @busy
end
end
def release
@lock.synchronize do
@busy -= 1
end
end
def pause
@paused = true
end
def unpause
@paused = false
end
def paused?
@paused
end
def block
@block = true
end
def block_except(*queues)
raise ArgumentError if queues.empty?
@unblocked = queues
@block = true
end
def unblock
@block = false
end
def blocking?
@block
end
end
end

View file

@ -28,73 +28,61 @@ describe Sidekiq::Queue do
let(:name) { 'example' }
let(:queue) { Sidekiq::Queue[name] }
shared_examples_for :lock do
it 'should be available' do
queue.acquire.should be
end
it 'should be pausable' do
queue.pause
queue.acquire.should_not be
end
it 'should be continuable' do
queue.pause
queue.unpause
queue.acquire.should be
end
it 'should be limitable' do
queue.limit = 1
queue.acquire.should be
queue.acquire.should_not be
end
it 'should be resizable' do
queue.limit = 0
queue.acquire.should_not be
queue.limit = nil
queue.acquire.should be
end
it 'should be countable' do
queue.limit = 3
5.times { queue.acquire }
queue.busy.should == 3
end
it 'should be releasable' do
queue.acquire
queue.busy.should == 1
queue.release
queue.busy.should == 0
end
it 'should tell if paused' do
queue.should_not be_paused
queue.pause
queue.should be_paused
queue.unpause
queue.should_not be_paused
end
it 'should tell if blocking' do
queue.should_not be_blocking
queue.block
queue.should be_blocking
queue.unblock
queue.should_not be_blocking
end
it 'should be available' do
queue.acquire.should be
end
context 'global' do
before(:all) { Sidekiq.options[:local] = false }
it_behaves_like :lock
it 'should be pausable' do
queue.pause
queue.acquire.should_not be
end
context 'local' do
before(:all) { Sidekiq.options[:local] = true }
it_behaves_like :lock
it 'should be continuable' do
queue.pause
queue.unpause
queue.acquire.should be
end
it 'should be limitable' do
queue.limit = 1
queue.acquire.should be
queue.acquire.should_not be
end
it 'should be resizable' do
queue.limit = 0
queue.acquire.should_not be
queue.limit = nil
queue.acquire.should be
end
it 'should be countable' do
queue.limit = 3
5.times { queue.acquire }
queue.busy.should == 3
end
it 'should be releasable' do
queue.acquire
queue.busy.should == 1
queue.release
queue.busy.should == 0
end
it 'should tell if paused' do
queue.should_not be_paused
queue.pause
queue.should be_paused
queue.unpause
queue.should_not be_paused
end
it 'should tell if blocking' do
queue.should_not be_blocking
queue.block
queue.should be_blocking
queue.unblock
queue.should_not be_blocking
end
end
end

View file

@ -7,17 +7,6 @@ describe Sidekiq::LimitFetch::Global::Monitor do
let(:name) { 'default' }
before :each do
# namespaces = [
# described_class::PROCESSOR_NAMESPACE,
# described_class::HEARTBEAT_NAMESPACE
# ]
# Sidekiq.redis do |it|
# namespaces.flat_map {|namespace|
# it.keys(namespace + '*')
# }.each {|key| it.del key }
# end
monitor
end

View file

@ -6,97 +6,72 @@ describe Sidekiq::LimitFetch::Queues do
let(:queues) { %w[queue1 queue2] }
let(:limits) {{ 'queue1' => 3 }}
let(:strict) { true }
let(:local) {}
let(:blocking) {}
let(:options) do
{ queues: queues,
limits: limits,
strict: strict,
local: local,
blocking: blocking }
end
after(:each ) do
Thread.current[:available_queues] = nil
it 'should acquire queues' do
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
end
shared_examples_for :selector do
it 'should acquire queues' do
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
end
it 'should acquire dynamically blocking queues' do
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
it 'should acquire dynamically blocking queues' do
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
Sidekiq::Queue['queue1'].block
Sidekiq::Queue['queue1'].block
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2
Sidekiq::Queue['queue2'].busy.should == 1
end
it 'should block except given queues' do
Sidekiq::Queue['queue1'].block_except 'queue2'
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
Sidekiq::Queue['queue1'].block_except 'queue404'
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2
Sidekiq::Queue['queue2'].busy.should == 1
end
it 'should release queues' do
subject.acquire
subject.release_except nil
Sidekiq::Queue['queue1'].busy.should == 0
Sidekiq::Queue['queue2'].busy.should == 0
end
it 'should release queues except selected' do
subject.acquire
subject.release_except 'queue:queue1'
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 0
end
it 'should release when no queues was acquired' do
queues.each {|name| Sidekiq::Queue[name].pause }
subject.acquire
-> { subject.release_except nil }.should_not raise_exception
end
context 'blocking' do
let(:blocking) { %w(queue1) }
it 'should acquire blocking queues' do
3.times { subject.acquire }
Sidekiq::Queue['queue1'].busy.should == 3
Sidekiq::Queue['queue2'].busy.should == 1
end
end
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2
Sidekiq::Queue['queue2'].busy.should == 1
end
context 'without local flag' do
it_should_behave_like :selector
it 'should block except given queues' do
Sidekiq::Queue['queue1'].block_except 'queue2'
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 1
it 'without local flag should be global' do
subject.selector.should == Sidekiq::LimitFetch::Global::Selector
end
Sidekiq::Queue['queue1'].block_except 'queue404'
subject.acquire
Sidekiq::Queue['queue1'].busy.should == 2
Sidekiq::Queue['queue2'].busy.should == 1
end
context 'with local flag' do
let(:local) { true }
it_should_behave_like :selector
it 'should release queues' do
subject.acquire
subject.release_except nil
Sidekiq::Queue['queue1'].busy.should == 0
Sidekiq::Queue['queue2'].busy.should == 0
end
it 'should use local selector' do
subject.selector.should == Sidekiq::LimitFetch::Local::Selector
it 'should release queues except selected' do
subject.acquire
subject.release_except 'queue:queue1'
Sidekiq::Queue['queue1'].busy.should == 1
Sidekiq::Queue['queue2'].busy.should == 0
end
it 'should release when no queues was acquired' do
queues.each {|name| Sidekiq::Queue[name].pause }
subject.acquire
-> { subject.release_except nil }.should_not raise_exception
end
context 'blocking' do
let(:blocking) { %w(queue1) }
it 'should acquire blocking queues' do
3.times { subject.acquire }
Sidekiq::Queue['queue1'].busy.should == 3
Sidekiq::Queue['queue2'].busy.should == 1
end
end

View file

@ -1,68 +1,49 @@
require 'spec_helper'
describe 'semaphore' do
shared_examples_for :semaphore do
it 'should have no limit by default' do
subject.limit.should_not be
end
it 'should set limit' do
subject.limit = 4
subject.limit.should == 4
end
it 'should acquire and count active tasks' do
3.times { subject.acquire }
subject.busy.should == 3
end
it 'should acquire tasks with regard to limit' do
subject.limit = 4
6.times { subject.acquire }
subject.busy.should == 4
end
it 'should release active tasks' do
6.times { subject.acquire }
3.times { subject.release }
subject.busy.should == 3
end
it 'should pause tasks' do
3.times { subject.acquire }
subject.pause
2.times { subject.acquire }
subject.busy.should == 3
2.times { subject.release }
subject.busy.should == 1
end
it 'should unpause tasks' do
subject.pause
3.times { subject.acquire }
subject.unpause
2.times { subject.acquire }
subject.busy.should == 2
end
end
let(:name) { 'default' }
subject { Sidekiq::LimitFetch::Global::Semaphore.new name }
context 'local' do
subject { Sidekiq::LimitFetch::Local::Semaphore.new name }
it_behaves_like :semaphore
it 'should have no limit by default' do
subject.limit.should_not be
end
context 'global' do
subject { Sidekiq::LimitFetch::Global::Semaphore.new name }
it_behaves_like :semaphore
it 'should set limit' do
subject.limit = 4
subject.limit.should == 4
end
after :each do
Sidekiq.redis do |it|
it.del "limit_fetch:limit:#{name}"
it.del "limit_fetch:busy:#{name}"
it.del "limit_fetch:pause:#{name}"
end
end
it 'should acquire and count active tasks' do
3.times { subject.acquire }
subject.busy.should == 3
end
it 'should acquire tasks with regard to limit' do
subject.limit = 4
6.times { subject.acquire }
subject.busy.should == 4
end
it 'should release active tasks' do
6.times { subject.acquire }
3.times { subject.release }
subject.busy.should == 3
end
it 'should pause tasks' do
3.times { subject.acquire }
subject.pause
2.times { subject.acquire }
subject.busy.should == 3
2.times { subject.release }
subject.busy.should == 1
end
it 'should unpause tasks' do
subject.pause
3.times { subject.acquire }
subject.unpause
2.times { subject.acquire }
subject.busy.should == 2
end
end

View file

@ -11,37 +11,25 @@ describe Sidekiq::LimitFetch do
end
subject { described_class.new options }
let(:options) {{ queues: queues, limits: limits, local: local }}
let(:options) {{ queues: queues, limits: limits }}
let(:queues) { %w(queue1 queue1 queue2 queue2) }
let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }}
shared_examples_for :strategy do
it 'should acquire lock on queue for execution' do
work = subject.retrieve_work
work.queue_name.should == 'queue1'
work.message.should == 'task1'
it 'should acquire lock on queue for execution' do
work = subject.retrieve_work
work.queue_name.should == 'queue1'
work.message.should == 'task1'
subject.retrieve_work.should_not be
work.requeue
subject.retrieve_work.should_not be
work.requeue
work = subject.retrieve_work
work.message.should == 'task2'
work = subject.retrieve_work
work.message.should == 'task2'
subject.retrieve_work.should_not be
work.acknowledge
subject.retrieve_work.should_not be
work.acknowledge
work = subject.retrieve_work
work.message.should == 'task1'
end
end
context 'global' do
let(:local) { false }
it_behaves_like :strategy
end
context 'local' do
let(:local) { true }
it_behaves_like :strategy
work = subject.retrieve_work
work.message.should == 'task1'
end
end

View file

@ -4,9 +4,6 @@ require 'sidekiq/fetch'
RSpec.configure do |config|
config.before :each do
Sidekiq::Queue.instance_variable_set :@instances, {}
Sidekiq.options[:local] = defined?(local) ? local : nil
Sidekiq.redis do |it|
clean_redis = ->(queue) do
it.del "limit_fetch:limit:#{queue}"