1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Correctly support redis namespace

This commit is contained in:
brainopia 2013-06-18 15:23:12 +04:00
parent 2801e06ba9
commit dd11381537
8 changed files with 40 additions and 21 deletions

View file

@ -2,4 +2,11 @@ require 'bundler/gem_tasks'
require 'rspec/core/rake_task'
RSpec::Core::RakeTask.new
task default: :spec
task :default do
rspec = Rake::Task[:spec]
rspec.invoke
ENV['namespace'] = 'namespace'
rspec.reenable
rspec.invoke
end

View file

@ -21,7 +21,7 @@ class Sidekiq::LimitFetch
def initialize(options)
Global::Monitor.start!
@queues = Queues.new options
@queues = Queues.new options.merge(namespace: determine_namespace)
end
def retrieve_work
@ -39,6 +39,6 @@ class Sidekiq::LimitFetch
def redis_brpop(*args)
return if args.size < 2
redis {|it| it.brpop *args }
nonblocking_redis {|it| it.brpop *args }
end
end

View file

@ -2,11 +2,11 @@ module Sidekiq::LimitFetch::Global
module Selector
extend self
def acquire(queues)
def acquire(queues, namespace)
redis_eval :acquire, [namespace, uuid, queues]
end
def release(queues)
def release(queues, namespace)
redis_eval :release, [namespace, uuid, queues]
end
@ -22,13 +22,6 @@ module Sidekiq::LimitFetch::Global
private
def namespace
@namespace ||= begin
namespace = Sidekiq.options[:namespace]
namespace + ':' if namespace
end
end
def redis_eval(script_name, args)
Sidekiq.redis do |it|
begin

View file

@ -1,7 +1,6 @@
module Sidekiq::LimitFetch::Global
class Semaphore
extend Forwardable
def_delegator Sidekiq, :redis
include Sidekiq::LimitFetch::Redis
PREFIX = 'limit_fetch'
@ -19,7 +18,7 @@ module Sidekiq::LimitFetch::Global
end
def acquire
Selector.acquire([@name]).size > 0
Selector.acquire([@name], determine_namespace).size > 0
end
def release

View file

@ -3,7 +3,9 @@ class Sidekiq::LimitFetch
THREAD_KEY = :acquired_queues
def initialize(options)
@queues = options[:queues]
@queues = options[:queues]
@namespace = options[:namespace]
options[:strict] ? strict_order! : weighted_order!
set_limits options[:limits]
@ -11,7 +13,7 @@ class Sidekiq::LimitFetch
end
def acquire
selector.acquire(ordered_queues)
selector.acquire(ordered_queues, @namespace)
.tap {|it| save it }
.map {|it| "queue:#{it}" }
end
@ -19,7 +21,7 @@ class Sidekiq::LimitFetch
def release_except(full_name)
queues = restore
queues.delete full_name[/queue:(.*)/, 1] if full_name
selector.release queues
selector.release queues, @namespace
end
private

View file

@ -1,13 +1,27 @@
module Sidekiq::LimitFetch::Redis
extend self
# prevent blocking of fetcher
# more bullet-proof and faster (O_O)
# than using Celluloid::IO
def redis
Sidekiq.redis do |redis|
def nonblocking_redis
redis do |redis|
begin
Celluloid::Future.new { yield redis }.value
end
end
rescue Celluloid::Task::TerminatedError
end
def redis
Sidekiq.redis {|it| yield it }
end
def determine_namespace
redis do |it|
if it.respond_to?(:namespace) and it.namespace
it.namespace + ':'
end
end
end
end

View file

@ -12,7 +12,8 @@ describe Sidekiq::LimitFetch::Queues do
{ queues: queues,
limits: limits,
strict: strict,
blocking: blocking }
blocking: blocking,
namespace: Sidekiq::LimitFetch::Redis.determine_namespace }
end
it 'should acquire queues' do

View file

@ -2,6 +2,9 @@ require 'sidekiq/limit_fetch'
require 'celluloid/autostart'
require 'sidekiq/fetch'
Sidekiq.logger = nil
Sidekiq.redis = { namespace: ENV['namespace'] }
RSpec.configure do |config|
config.before :each do
Sidekiq.redis do |it|