diff --git a/.rspec b/.rspec new file mode 100644 index 0000000..32458fe --- /dev/null +++ b/.rspec @@ -0,0 +1 @@ +--require spec_helper --color diff --git a/lib/sidekiq/extensions/manager.rb b/lib/sidekiq/extensions/manager.rb new file mode 100644 index 0000000..02cb460 --- /dev/null +++ b/lib/sidekiq/extensions/manager.rb @@ -0,0 +1,16 @@ +class Sidekiq::Manager + module InitLimitFetch + def initialize(options={}) + options[:fetch] = Sidekiq::LimitFetch + super + end + + def start + Sidekiq::LimitFetch::Queues.start options + Global::Monitor.start! + super + end + end + + prepend InitLimitFetch +end diff --git a/lib/sidekiq/extensions/queue.rb b/lib/sidekiq/extensions/queue.rb index c99d6d5..ac34563 100644 --- a/lib/sidekiq/extensions/queue.rb +++ b/lib/sidekiq/extensions/queue.rb @@ -1,6 +1,6 @@ module Sidekiq class Queue - extend LimitFetch::Singleton, Forwardable + extend LimitFetch::Instances, Forwardable attr_reader :rname def_delegators :lock, diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 6cc8343..9344692 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -1,63 +1,39 @@ -require 'sidekiq' -require 'sidekiq/fetch' -require 'sidekiq/util' -require 'sidekiq/api' require 'forwardable' +require 'sidekiq' +require 'sidekiq/manager' +require 'sidekiq/api' -class Sidekiq::LimitFetch +module Sidekiq::LimitFetch autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work' - require_relative 'limit_fetch/redis' - require_relative 'limit_fetch/singleton' + require_relative 'limit_fetch/instances' require_relative 'limit_fetch/queues' require_relative 'limit_fetch/global/semaphore' require_relative 'limit_fetch/global/selector' require_relative 'limit_fetch/global/monitor' require_relative 'extensions/queue' + require_relative 'extensions/manager' - include Redis - Sidekiq.options[:fetch] = self + extend self - TIMEOUT = \ - if Sidekiq::VERSION < '4.0.0' - Sidekiq::Fetcher::TIMEOUT - else - Sidekiq::BasicFetch::TIMEOUT - end - - def self.bulk_requeue(*args) - Sidekiq::BasicFetch.bulk_requeue *args - end - - def initialize(options) - @queues = Queues.new options.merge(namespace: determine_namespace) + def new(_) + self end def retrieve_work - queue, message = fetch_message + queue, message = redis_brpop *Queues.acquire, Sidekiq::BasicFetch::TIMEOUT + Queues.release_except queue UnitOfWork.new queue, message if message end + def bulk_requeue(*args) + Sidekiq::BasicFetch.bulk_requeue(*args) + end + private - def fetch_message - queue, _ = redis_brpop *@queues.acquire, TIMEOUT - ensure - @queues.release_except queue - end - def redis_brpop(*args) return if args.size < 2 - query = -> redis { redis.brpop *args } - - if busy_local_queues.any? {|queue| not args.include? queue.rname } - nonblocking_redis(&query) - else - redis(&query) - end - end - - def busy_local_queues - Sidekiq::Queue.instances.select(&:local_busy?) + Sidekiq.redis {|it| it.brpop *args } end end diff --git a/lib/sidekiq/limit_fetch/global/monitor.rb b/lib/sidekiq/limit_fetch/global/monitor.rb index da3e18d..9fa8946 100644 --- a/lib/sidekiq/limit_fetch/global/monitor.rb +++ b/lib/sidekiq/limit_fetch/global/monitor.rb @@ -1,6 +1,5 @@ module Sidekiq::LimitFetch::Global module Monitor - include Sidekiq::LimitFetch::Redis extend self HEARTBEAT_PREFIX = 'limit:heartbeat:' @@ -8,10 +7,10 @@ module Sidekiq::LimitFetch::Global HEARTBEAT_TTL = 20 REFRESH_TIMEOUT = 5 - def start!(queues, ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) + def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) Thread.new do loop do - add_dynamic queues if queues.dynamic? + add_dynamic_queues update_heartbeat ttl invalidate_old_processes sleep timeout @@ -20,23 +19,24 @@ module Sidekiq::LimitFetch::Global end def all_processes - redis {|it| it.smembers PROCESS_SET } + Sidekiq.redis {|it| it.smembers PROCESS_SET } end def old_processes all_processes.reject do |process| - redis {|it| it.get heartbeat_key process } + Sidekiq.redis {|it| it.get heartbeat_key process } end end def remove_old_processes! - redis do |it| + Sidekiq.redis do |it| old_processes.each {|process| it.srem PROCESS_SET, process } end end - def add_dynamic(queues) - queues.add Sidekiq::Queue.all.map(&:name) + def add_dynamic_queues + queues = Sidekiq::LimitFetch::Queues + queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic? end private diff --git a/lib/sidekiq/limit_fetch/global/semaphore.rb b/lib/sidekiq/limit_fetch/global/semaphore.rb index b18e48f..ad98149 100644 --- a/lib/sidekiq/limit_fetch/global/semaphore.rb +++ b/lib/sidekiq/limit_fetch/global/semaphore.rb @@ -1,7 +1,5 @@ module Sidekiq::LimitFetch::Global class Semaphore - include Sidekiq::LimitFetch::Redis - PREFIX = 'limit_fetch' attr_reader :local_busy @@ -45,7 +43,7 @@ module Sidekiq::LimitFetch::Global end def acquire - Selector.acquire([@name], determine_namespace).size > 0 + Selector.acquire([@name], namespace).size > 0 end def release @@ -163,5 +161,15 @@ module Sidekiq::LimitFetch::Global it.lrem "#{PREFIX}:busy:#@name", 0, process end end + + private + + def redis(&block) + Sidekiq.redis(&block) + end + + def namespace + Sidekiq::LimitFetch::Queues.namespace + end end end diff --git a/lib/sidekiq/limit_fetch/singleton.rb b/lib/sidekiq/limit_fetch/instances.rb similarity index 86% rename from lib/sidekiq/limit_fetch/singleton.rb rename to lib/sidekiq/limit_fetch/instances.rb index f6c2bed..ed664f1 100644 --- a/lib/sidekiq/limit_fetch/singleton.rb +++ b/lib/sidekiq/limit_fetch/instances.rb @@ -1,4 +1,4 @@ -module Sidekiq::LimitFetch::Singleton +module Sidekiq::LimitFetch::Instances def self.extended(klass) klass.instance_variable_set :@instances, {} end diff --git a/lib/sidekiq/limit_fetch/queues.rb b/lib/sidekiq/limit_fetch/queues.rb index 7365078..9ae7e68 100644 --- a/lib/sidekiq/limit_fetch/queues.rb +++ b/lib/sidekiq/limit_fetch/queues.rb @@ -1,88 +1,97 @@ -class Sidekiq::LimitFetch - class Queues - THREAD_KEY = :acquired_queues +module Sidekiq::LimitFetch::Queues + extend self - def initialize(options) - @queues = options[:queues] - @namespace = options[:namespace] - @dynamic = options[:dynamic] + THREAD_KEY = :acquired_queues - options[:strict] ? strict_order! : weighted_order! + def start(options) + @queues = options[:queues] + @dynamic = options[:dynamic] - set :process_limit, options[:process_limits] - set :limit, options[:limits] - set_blocks options[:blocking] - end + options[:strict] ? strict_order! : weighted_order! - def acquire - selector.acquire(ordered_queues, @namespace) - .tap {|it| save it } - .map {|it| "queue:#{it}" } - end + set :process_limit, options[:process_limits] + set :limit, options[:limits] + set_blocks options[:blocking] + end - def release_except(full_name) - queues = restore - queues.delete full_name[/queue:(.*)/, 1] if full_name - selector.release queues, @namespace - end + def acquire + selector.acquire(ordered_queues, namespace) + .tap {|it| save it } + .map {|it| "queue:#{it}" } + end - def dynamic? - @dynamic - end + def release_except(full_name) + queues = restore + queues.delete full_name[/queue:(.*)/, 1] if full_name + selector.release queues, namespace + end - def add(queues) - queues.each do |queue| - @queues.push queue unless @queues.include? queue - end - end + def dynamic? + @dynamic + end - def strict_order! - @queues.uniq! - def ordered_queues; @queues end - end - - def weighted_order! - def ordered_queues; @queues.shuffle.uniq end - end - - private - - def selector - Global::Selector - end - - def set(limit_type, limits) - limits ||= {} - each_queue do |queue| - limit = limits[queue.name.to_s] || limits[queue.name.to_sym] - queue.send "#{limit_type}=", limit unless queue.limit_changed? - end - end - - def set_blocks(blocks) - each_queue(&:unblock) - - blocks.to_a.each do |it| - if it.is_a? Array - it.each {|name| Sidekiq::Queue[name].block_except it } - else - Sidekiq::Queue[it].block - end - end - end - - def save(queues) - Thread.current[THREAD_KEY] = queues - end - - def restore - Thread.current[THREAD_KEY] || [] - ensure - Thread.current[THREAD_KEY] = nil - end - - def each_queue - @queues.uniq.each {|it| yield Sidekiq::Queue[it] } + def add(queues) + queues.each do |queue| + @queues.push queue unless @queues.include? queue end end + + def strict_order! + @queues.uniq! + def ordered_queues; @queues end + end + + def weighted_order! + def ordered_queues; @queues.shuffle.uniq end + end + + def namespace + @namespace ||= Sidekiq.redis do |it| + if it.respond_to?(:namespace) and it.namespace + it.namespace + ':' + else + '' + end + end + end + + private + + def selector + Sidekiq::LimitFetch::Global::Selector + end + + def set(limit_type, limits) + limits ||= {} + each_queue do |queue| + limit = limits[queue.name.to_s] || limits[queue.name.to_sym] + queue.send "#{limit_type}=", limit unless queue.limit_changed? + end + end + + def set_blocks(blocks) + each_queue(&:unblock) + + blocks.to_a.each do |it| + if it.is_a? Array + it.each {|name| Sidekiq::Queue[name].block_except it } + else + Sidekiq::Queue[it].block + end + end + end + + def save(queues) + Thread.current[THREAD_KEY] = queues + end + + def restore + Thread.current[THREAD_KEY] || [] + ensure + Thread.current[THREAD_KEY] = nil + end + + def each_queue + @queues.uniq.each {|it| yield Sidekiq::Queue[it] } + end end diff --git a/lib/sidekiq/limit_fetch/redis.rb b/lib/sidekiq/limit_fetch/redis.rb deleted file mode 100644 index 3bacfbf..0000000 --- a/lib/sidekiq/limit_fetch/redis.rb +++ /dev/null @@ -1,35 +0,0 @@ -module Sidekiq::LimitFetch::Redis - extend self - - def nonblocking_redis - redis do |redis| - # Celluloid 0.16 broke this method + yield redis - if Sidekiq::VERSION >= '4.0.0' || Celluloid::VERSION.to_f >= 0.16 - yield redis - else - # prevent blocking of fetcher - # more bullet-proof and faster (O_O) - # than using Celluloid::IO - # - # https://github.com/brainopia/sidekiq-limit_fetch/issues/41 - # explanation of why Future#value is beneficial here - begin - Celluloid::Future.new { yield redis }.value - rescue Celluloid::Task::TerminatedError - end - end - end - end - - def redis - Sidekiq.redis {|it| yield it } - end - - def determine_namespace - redis do |it| - if it.respond_to?(:namespace) and it.namespace - it.namespace + ':' - end - end - end -end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb deleted file mode 100644 index aa4e61b..0000000 --- a/lib/sidekiq/manager.rb +++ /dev/null @@ -1,14 +0,0 @@ -module Sidekiq - class Manager - - def start - queues = Sidekiq::LimitFetch::Queues.new options.merge(namespace: Sidekiq::LimitFetch::Redis.determine_namespace) - Sidekiq::LimitFetch::Global::Monitor.start! queues - - @workers.each do |x| - x.start - end - end - - end -end diff --git a/sidekiq-limit_fetch.gemspec b/sidekiq-limit_fetch.gemspec index 3b12843..2492fc9 100644 --- a/sidekiq-limit_fetch.gemspec +++ b/sidekiq-limit_fetch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |gem| gem.name = 'sidekiq-limit_fetch' - gem.version = '2.4.2' + gem.version = '3.0.0' gem.license = 'MIT' gem.authors = 'brainopia' gem.email = 'brainopia@evilmartians.com' @@ -13,9 +13,9 @@ Gem::Specification.new do |gem| gem.files = `git ls-files`.split($/) gem.test_files = gem.files.grep %r{^spec/} - gem.require_paths = %w(lib) + gem.require_paths = 'lib' - gem.add_dependency 'sidekiq', '>= 2.6.5' - gem.add_development_dependency 'rspec', '~> 3.2.0' + gem.add_dependency 'sidekiq', '>= 4' + gem.add_development_dependency 'rspec' gem.add_development_dependency 'rake' end diff --git a/spec/sidekiq/extensions/queue_spec.rb b/spec/sidekiq/extensions/queue_spec.rb index 350d1c7..cd0af35 100644 --- a/spec/sidekiq/extensions/queue_spec.rb +++ b/spec/sidekiq/extensions/queue_spec.rb @@ -1,5 +1,3 @@ -require 'spec_helper' - RSpec.describe Sidekiq::Queue do context 'singleton' do shared_examples :constructor do diff --git a/spec/sidekiq/limit_fetch/global/monitor_spec.rb b/spec/sidekiq/limit_fetch/global/monitor_spec.rb index 797c999..94a8312 100644 --- a/spec/sidekiq/limit_fetch/global/monitor_spec.rb +++ b/spec/sidekiq/limit_fetch/global/monitor_spec.rb @@ -1,21 +1,11 @@ -require 'spec_helper' - -Thread.abort_on_exception = true - RSpec.describe Sidekiq::LimitFetch::Global::Monitor do - let(:queues) { double dynamic?: false } - let(:monitor) { described_class.start! queues, ttl, timeout } + let(:monitor) { described_class.start! ttl, timeout } let(:ttl) { 1 } let(:queue) { Sidekiq::Queue[name] } let(:name) { 'default' } - before :each do - monitor - end - - after :each do - monitor.kill - end + before { monitor } + after { monitor.kill } context 'old locks' do let(:timeout) { 0.5 } diff --git a/spec/sidekiq/limit_fetch/queues_spec.rb b/spec/sidekiq/limit_fetch/queues_spec.rb index de8c64d..d592a89 100644 --- a/spec/sidekiq/limit_fetch/queues_spec.rb +++ b/spec/sidekiq/limit_fetch/queues_spec.rb @@ -1,8 +1,4 @@ -require 'spec_helper' - RSpec.describe Sidekiq::LimitFetch::Queues do - subject { described_class.new options } - let(:queues) { %w[queue1 queue2] } let(:limits) {{ 'queue1' => 3 }} let(:strict) { true } @@ -14,10 +10,11 @@ RSpec.describe Sidekiq::LimitFetch::Queues do limits: limits, strict: strict, blocking: blocking, - process_limits: process_limits, - namespace: Sidekiq::LimitFetch::Redis.determine_namespace } + process_limits: process_limits } end + before { subject.start options } + it 'should acquire queues' do subject.acquire expect(Sidekiq::Queue['queue1'].probed).to eq 1 diff --git a/spec/sidekiq/limit_fetch/semaphore_spec.rb b/spec/sidekiq/limit_fetch/semaphore_spec.rb index 836d7f9..cb16fc9 100644 --- a/spec/sidekiq/limit_fetch/semaphore_spec.rb +++ b/spec/sidekiq/limit_fetch/semaphore_spec.rb @@ -1,5 +1,3 @@ -require 'spec_helper' - RSpec.describe 'semaphore' do let(:name) { 'default' } subject { Sidekiq::LimitFetch::Global::Semaphore.new name } diff --git a/spec/sidekiq/limit_fetch_spec.rb b/spec/sidekiq/limit_fetch_spec.rb index 0900169..f93afa2 100644 --- a/spec/sidekiq/limit_fetch_spec.rb +++ b/spec/sidekiq/limit_fetch_spec.rb @@ -1,7 +1,7 @@ -require 'spec_helper' +Thread.abort_on_exception = true RSpec.describe Sidekiq::LimitFetch do - before :each do + before do Sidekiq.redis do |it| it.del 'queue:queue1' it.lpush 'queue:queue1', 'task1' @@ -10,11 +10,12 @@ RSpec.describe Sidekiq::LimitFetch do end end - subject { described_class.new options } let(:options) {{ queues: queues, limits: limits }} let(:queues) { %w(queue1 queue1 queue2 queue2) } let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }} + before { subject::Queues.start options } + it 'should acquire lock on queue for execution' do work = subject.retrieve_work expect(work.queue_name).to eq 'queue1' diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 9429d7d..87f63d7 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -1,6 +1,3 @@ -require 'sidekiq/version' -require 'celluloid/autostart' if Sidekiq::VERSION < '4.0.0' -require 'sidekiq/fetch' require 'sidekiq/limit_fetch' Sidekiq.logger = nil @@ -10,7 +7,7 @@ RSpec.configure do |config| config.order = :random config.disable_monkey_patching! config.raise_errors_for_deprecations! - config.before :each do + config.before do Sidekiq::Queue.reset_instances! Sidekiq.redis do |it| clean_redis = ->(queue) do