1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Release support for early adopters of sidekiq 4.0

This commit is contained in:
brainopia 2015-10-15 02:36:10 +03:00
parent 070a72a8c4
commit 206e022294
17 changed files with 155 additions and 213 deletions

1
.rspec Normal file
View file

@ -0,0 +1 @@
--require spec_helper --color

View file

@ -0,0 +1,16 @@
class Sidekiq::Manager
module InitLimitFetch
def initialize(options={})
options[:fetch] = Sidekiq::LimitFetch
super
end
def start
Sidekiq::LimitFetch::Queues.start options
Global::Monitor.start!
super
end
end
prepend InitLimitFetch
end

View file

@ -1,6 +1,6 @@
module Sidekiq module Sidekiq
class Queue class Queue
extend LimitFetch::Singleton, Forwardable extend LimitFetch::Instances, Forwardable
attr_reader :rname attr_reader :rname
def_delegators :lock, def_delegators :lock,

View file

@ -1,63 +1,39 @@
require 'sidekiq'
require 'sidekiq/fetch'
require 'sidekiq/util'
require 'sidekiq/api'
require 'forwardable' require 'forwardable'
require 'sidekiq'
require 'sidekiq/manager'
require 'sidekiq/api'
class Sidekiq::LimitFetch module Sidekiq::LimitFetch
autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work' autoload :UnitOfWork, 'sidekiq/limit_fetch/unit_of_work'
require_relative 'limit_fetch/redis' require_relative 'limit_fetch/instances'
require_relative 'limit_fetch/singleton'
require_relative 'limit_fetch/queues' require_relative 'limit_fetch/queues'
require_relative 'limit_fetch/global/semaphore' require_relative 'limit_fetch/global/semaphore'
require_relative 'limit_fetch/global/selector' require_relative 'limit_fetch/global/selector'
require_relative 'limit_fetch/global/monitor' require_relative 'limit_fetch/global/monitor'
require_relative 'extensions/queue' require_relative 'extensions/queue'
require_relative 'extensions/manager'
include Redis extend self
Sidekiq.options[:fetch] = self
TIMEOUT = \ def new(_)
if Sidekiq::VERSION < '4.0.0' self
Sidekiq::Fetcher::TIMEOUT
else
Sidekiq::BasicFetch::TIMEOUT
end
def self.bulk_requeue(*args)
Sidekiq::BasicFetch.bulk_requeue *args
end
def initialize(options)
@queues = Queues.new options.merge(namespace: determine_namespace)
end end
def retrieve_work def retrieve_work
queue, message = fetch_message queue, message = redis_brpop *Queues.acquire, Sidekiq::BasicFetch::TIMEOUT
Queues.release_except queue
UnitOfWork.new queue, message if message UnitOfWork.new queue, message if message
end end
def bulk_requeue(*args)
Sidekiq::BasicFetch.bulk_requeue(*args)
end
private private
def fetch_message
queue, _ = redis_brpop *@queues.acquire, TIMEOUT
ensure
@queues.release_except queue
end
def redis_brpop(*args) def redis_brpop(*args)
return if args.size < 2 return if args.size < 2
query = -> redis { redis.brpop *args } Sidekiq.redis {|it| it.brpop *args }
if busy_local_queues.any? {|queue| not args.include? queue.rname }
nonblocking_redis(&query)
else
redis(&query)
end
end
def busy_local_queues
Sidekiq::Queue.instances.select(&:local_busy?)
end end
end end

View file

@ -1,6 +1,5 @@
module Sidekiq::LimitFetch::Global module Sidekiq::LimitFetch::Global
module Monitor module Monitor
include Sidekiq::LimitFetch::Redis
extend self extend self
HEARTBEAT_PREFIX = 'limit:heartbeat:' HEARTBEAT_PREFIX = 'limit:heartbeat:'
@ -8,10 +7,10 @@ module Sidekiq::LimitFetch::Global
HEARTBEAT_TTL = 20 HEARTBEAT_TTL = 20
REFRESH_TIMEOUT = 5 REFRESH_TIMEOUT = 5
def start!(queues, ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT) def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
Thread.new do Thread.new do
loop do loop do
add_dynamic queues if queues.dynamic? add_dynamic_queues
update_heartbeat ttl update_heartbeat ttl
invalidate_old_processes invalidate_old_processes
sleep timeout sleep timeout
@ -20,23 +19,24 @@ module Sidekiq::LimitFetch::Global
end end
def all_processes def all_processes
redis {|it| it.smembers PROCESS_SET } Sidekiq.redis {|it| it.smembers PROCESS_SET }
end end
def old_processes def old_processes
all_processes.reject do |process| all_processes.reject do |process|
redis {|it| it.get heartbeat_key process } Sidekiq.redis {|it| it.get heartbeat_key process }
end end
end end
def remove_old_processes! def remove_old_processes!
redis do |it| Sidekiq.redis do |it|
old_processes.each {|process| it.srem PROCESS_SET, process } old_processes.each {|process| it.srem PROCESS_SET, process }
end end
end end
def add_dynamic(queues) def add_dynamic_queues
queues.add Sidekiq::Queue.all.map(&:name) queues = Sidekiq::LimitFetch::Queues
queues.add Sidekiq::Queue.all.map(&:name) if queues.dynamic?
end end
private private

View file

@ -1,7 +1,5 @@
module Sidekiq::LimitFetch::Global module Sidekiq::LimitFetch::Global
class Semaphore class Semaphore
include Sidekiq::LimitFetch::Redis
PREFIX = 'limit_fetch' PREFIX = 'limit_fetch'
attr_reader :local_busy attr_reader :local_busy
@ -45,7 +43,7 @@ module Sidekiq::LimitFetch::Global
end end
def acquire def acquire
Selector.acquire([@name], determine_namespace).size > 0 Selector.acquire([@name], namespace).size > 0
end end
def release def release
@ -163,5 +161,15 @@ module Sidekiq::LimitFetch::Global
it.lrem "#{PREFIX}:busy:#@name", 0, process it.lrem "#{PREFIX}:busy:#@name", 0, process
end end
end end
private
def redis(&block)
Sidekiq.redis(&block)
end
def namespace
Sidekiq::LimitFetch::Queues.namespace
end
end end
end end

View file

@ -1,4 +1,4 @@
module Sidekiq::LimitFetch::Singleton module Sidekiq::LimitFetch::Instances
def self.extended(klass) def self.extended(klass)
klass.instance_variable_set :@instances, {} klass.instance_variable_set :@instances, {}
end end

View file

@ -1,10 +1,10 @@
class Sidekiq::LimitFetch module Sidekiq::LimitFetch::Queues
class Queues extend self
THREAD_KEY = :acquired_queues THREAD_KEY = :acquired_queues
def initialize(options) def start(options)
@queues = options[:queues] @queues = options[:queues]
@namespace = options[:namespace]
@dynamic = options[:dynamic] @dynamic = options[:dynamic]
options[:strict] ? strict_order! : weighted_order! options[:strict] ? strict_order! : weighted_order!
@ -15,7 +15,7 @@ class Sidekiq::LimitFetch
end end
def acquire def acquire
selector.acquire(ordered_queues, @namespace) selector.acquire(ordered_queues, namespace)
.tap {|it| save it } .tap {|it| save it }
.map {|it| "queue:#{it}" } .map {|it| "queue:#{it}" }
end end
@ -23,7 +23,7 @@ class Sidekiq::LimitFetch
def release_except(full_name) def release_except(full_name)
queues = restore queues = restore
queues.delete full_name[/queue:(.*)/, 1] if full_name queues.delete full_name[/queue:(.*)/, 1] if full_name
selector.release queues, @namespace selector.release queues, namespace
end end
def dynamic? def dynamic?
@ -45,10 +45,20 @@ class Sidekiq::LimitFetch
def ordered_queues; @queues.shuffle.uniq end def ordered_queues; @queues.shuffle.uniq end
end end
def namespace
@namespace ||= Sidekiq.redis do |it|
if it.respond_to?(:namespace) and it.namespace
it.namespace + ':'
else
''
end
end
end
private private
def selector def selector
Global::Selector Sidekiq::LimitFetch::Global::Selector
end end
def set(limit_type, limits) def set(limit_type, limits)
@ -84,5 +94,4 @@ class Sidekiq::LimitFetch
def each_queue def each_queue
@queues.uniq.each {|it| yield Sidekiq::Queue[it] } @queues.uniq.each {|it| yield Sidekiq::Queue[it] }
end end
end
end end

View file

@ -1,35 +0,0 @@
module Sidekiq::LimitFetch::Redis
extend self
def nonblocking_redis
redis do |redis|
# Celluloid 0.16 broke this method + yield redis
if Sidekiq::VERSION >= '4.0.0' || Celluloid::VERSION.to_f >= 0.16
yield redis
else
# prevent blocking of fetcher
# more bullet-proof and faster (O_O)
# than using Celluloid::IO
#
# https://github.com/brainopia/sidekiq-limit_fetch/issues/41
# explanation of why Future#value is beneficial here
begin
Celluloid::Future.new { yield redis }.value
rescue Celluloid::Task::TerminatedError
end
end
end
end
def redis
Sidekiq.redis {|it| yield it }
end
def determine_namespace
redis do |it|
if it.respond_to?(:namespace) and it.namespace
it.namespace + ':'
end
end
end
end

View file

@ -1,14 +0,0 @@
module Sidekiq
class Manager
def start
queues = Sidekiq::LimitFetch::Queues.new options.merge(namespace: Sidekiq::LimitFetch::Redis.determine_namespace)
Sidekiq::LimitFetch::Global::Monitor.start! queues
@workers.each do |x|
x.start
end
end
end
end

View file

@ -1,6 +1,6 @@
Gem::Specification.new do |gem| Gem::Specification.new do |gem|
gem.name = 'sidekiq-limit_fetch' gem.name = 'sidekiq-limit_fetch'
gem.version = '2.4.2' gem.version = '3.0.0'
gem.license = 'MIT' gem.license = 'MIT'
gem.authors = 'brainopia' gem.authors = 'brainopia'
gem.email = 'brainopia@evilmartians.com' gem.email = 'brainopia@evilmartians.com'
@ -13,9 +13,9 @@ Gem::Specification.new do |gem|
gem.files = `git ls-files`.split($/) gem.files = `git ls-files`.split($/)
gem.test_files = gem.files.grep %r{^spec/} gem.test_files = gem.files.grep %r{^spec/}
gem.require_paths = %w(lib) gem.require_paths = 'lib'
gem.add_dependency 'sidekiq', '>= 2.6.5' gem.add_dependency 'sidekiq', '>= 4'
gem.add_development_dependency 'rspec', '~> 3.2.0' gem.add_development_dependency 'rspec'
gem.add_development_dependency 'rake' gem.add_development_dependency 'rake'
end end

View file

@ -1,5 +1,3 @@
require 'spec_helper'
RSpec.describe Sidekiq::Queue do RSpec.describe Sidekiq::Queue do
context 'singleton' do context 'singleton' do
shared_examples :constructor do shared_examples :constructor do

View file

@ -1,21 +1,11 @@
require 'spec_helper'
Thread.abort_on_exception = true
RSpec.describe Sidekiq::LimitFetch::Global::Monitor do RSpec.describe Sidekiq::LimitFetch::Global::Monitor do
let(:queues) { double dynamic?: false } let(:monitor) { described_class.start! ttl, timeout }
let(:monitor) { described_class.start! queues, ttl, timeout }
let(:ttl) { 1 } let(:ttl) { 1 }
let(:queue) { Sidekiq::Queue[name] } let(:queue) { Sidekiq::Queue[name] }
let(:name) { 'default' } let(:name) { 'default' }
before :each do before { monitor }
monitor after { monitor.kill }
end
after :each do
monitor.kill
end
context 'old locks' do context 'old locks' do
let(:timeout) { 0.5 } let(:timeout) { 0.5 }

View file

@ -1,8 +1,4 @@
require 'spec_helper'
RSpec.describe Sidekiq::LimitFetch::Queues do RSpec.describe Sidekiq::LimitFetch::Queues do
subject { described_class.new options }
let(:queues) { %w[queue1 queue2] } let(:queues) { %w[queue1 queue2] }
let(:limits) {{ 'queue1' => 3 }} let(:limits) {{ 'queue1' => 3 }}
let(:strict) { true } let(:strict) { true }
@ -14,10 +10,11 @@ RSpec.describe Sidekiq::LimitFetch::Queues do
limits: limits, limits: limits,
strict: strict, strict: strict,
blocking: blocking, blocking: blocking,
process_limits: process_limits, process_limits: process_limits }
namespace: Sidekiq::LimitFetch::Redis.determine_namespace }
end end
before { subject.start options }
it 'should acquire queues' do it 'should acquire queues' do
subject.acquire subject.acquire
expect(Sidekiq::Queue['queue1'].probed).to eq 1 expect(Sidekiq::Queue['queue1'].probed).to eq 1

View file

@ -1,5 +1,3 @@
require 'spec_helper'
RSpec.describe 'semaphore' do RSpec.describe 'semaphore' do
let(:name) { 'default' } let(:name) { 'default' }
subject { Sidekiq::LimitFetch::Global::Semaphore.new name } subject { Sidekiq::LimitFetch::Global::Semaphore.new name }

View file

@ -1,7 +1,7 @@
require 'spec_helper' Thread.abort_on_exception = true
RSpec.describe Sidekiq::LimitFetch do RSpec.describe Sidekiq::LimitFetch do
before :each do before do
Sidekiq.redis do |it| Sidekiq.redis do |it|
it.del 'queue:queue1' it.del 'queue:queue1'
it.lpush 'queue:queue1', 'task1' it.lpush 'queue:queue1', 'task1'
@ -10,11 +10,12 @@ RSpec.describe Sidekiq::LimitFetch do
end end
end end
subject { described_class.new options }
let(:options) {{ queues: queues, limits: limits }} let(:options) {{ queues: queues, limits: limits }}
let(:queues) { %w(queue1 queue1 queue2 queue2) } let(:queues) { %w(queue1 queue1 queue2 queue2) }
let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }} let(:limits) {{ 'queue1' => 1, 'queue2' => 2 }}
before { subject::Queues.start options }
it 'should acquire lock on queue for execution' do it 'should acquire lock on queue for execution' do
work = subject.retrieve_work work = subject.retrieve_work
expect(work.queue_name).to eq 'queue1' expect(work.queue_name).to eq 'queue1'

View file

@ -1,6 +1,3 @@
require 'sidekiq/version'
require 'celluloid/autostart' if Sidekiq::VERSION < '4.0.0'
require 'sidekiq/fetch'
require 'sidekiq/limit_fetch' require 'sidekiq/limit_fetch'
Sidekiq.logger = nil Sidekiq.logger = nil
@ -10,7 +7,7 @@ RSpec.configure do |config|
config.order = :random config.order = :random
config.disable_monkey_patching! config.disable_monkey_patching!
config.raise_errors_for_deprecations! config.raise_errors_for_deprecations!
config.before :each do config.before do
Sidekiq::Queue.reset_instances! Sidekiq::Queue.reset_instances!
Sidekiq.redis do |it| Sidekiq.redis do |it|
clean_redis = ->(queue) do clean_redis = ->(queue) do