From 9f7ac9eaf4c705c58c67b61a82dd8c62533e660a Mon Sep 17 00:00:00 2001 From: brainopia Date: Thu, 20 Jun 2013 22:46:21 +0400 Subject: [PATCH] Pace yourself fetcher only needs to poll redis when there is a potential queue which can be queried additionally to currently polling queues --- {benchmarks => bench}/compare.rb | 0 lib/sidekiq/extensions/queue.rb | 4 +++- lib/sidekiq/limit_fetch.rb | 16 +++++++++++++--- lib/sidekiq/limit_fetch/global/semaphore.rb | 18 ++++++++++++++++++ lib/sidekiq/limit_fetch/singleton.rb | 4 ++++ 5 files changed, 38 insertions(+), 4 deletions(-) rename {benchmarks => bench}/compare.rb (100%) diff --git a/benchmarks/compare.rb b/bench/compare.rb similarity index 100% rename from benchmarks/compare.rb rename to bench/compare.rb diff --git a/lib/sidekiq/extensions/queue.rb b/lib/sidekiq/extensions/queue.rb index 3f6fb90..92d9eb8 100644 --- a/lib/sidekiq/extensions/queue.rb +++ b/lib/sidekiq/extensions/queue.rb @@ -1,6 +1,7 @@ module Sidekiq class Queue extend LimitFetch::Singleton, Forwardable + attr_reader :rname def_delegators :lock, :limit, :limit=, @@ -10,7 +11,8 @@ module Sidekiq :paused?, :blocking?, :unblocked, :block_except, :probed, :busy, - :increase_busy, :decrease_busy + :increase_busy, :decrease_busy, + :local_busy? def lock @lock ||= LimitFetch::Global::Semaphore.new name diff --git a/lib/sidekiq/limit_fetch.rb b/lib/sidekiq/limit_fetch.rb index 01cba46..5a9adc9 100644 --- a/lib/sidekiq/limit_fetch.rb +++ b/lib/sidekiq/limit_fetch.rb @@ -15,8 +15,8 @@ class Sidekiq::LimitFetch include Redis Sidekiq.options[:fetch] = self - def self.bulk_requeue(jobs) - Sidekiq::BasicFetch.bulk_requeue jobs + def self.bulk_requeue(*args) + Sidekiq::BasicFetch.bulk_requeue *args end def initialize(options) @@ -39,6 +39,16 @@ class Sidekiq::LimitFetch def redis_brpop(*args) return if args.size < 2 - nonblocking_redis {|it| it.brpop *args } + query = -> redis { redis.brpop *args } + + if busy_local_queues.any? {|queue| not args.include? queue.rname } + nonblocking_redis(&query) + else + redis(&query) + end + end + + def busy_local_queues + Sidekiq::Queue.instances.select(&:local_busy?) end end diff --git a/lib/sidekiq/limit_fetch/global/semaphore.rb b/lib/sidekiq/limit_fetch/global/semaphore.rb index 72c8966..5cbbe15 100644 --- a/lib/sidekiq/limit_fetch/global/semaphore.rb +++ b/lib/sidekiq/limit_fetch/global/semaphore.rb @@ -4,8 +4,12 @@ module Sidekiq::LimitFetch::Global PREFIX = 'limit_fetch' + attr_reader :local_busy + def initialize(name) @name = name + @lock = Mutex.new + @local_busy = 0 end def limit @@ -30,10 +34,12 @@ module Sidekiq::LimitFetch::Global end def increase_busy + increase_local_busy redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid } end def decrease_busy + decrease_local_busy redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid } end @@ -69,5 +75,17 @@ module Sidekiq::LimitFetch::Global def blocking? redis {|it| it.get "#{PREFIX}:block:#@name" } end + + def increase_local_busy + @lock.synchronize { @local_busy += 1 } + end + + def decrease_local_busy + @lock.synchronize { @local_busy -= 1 } + end + + def local_busy? + @local_busy > 0 + end end end diff --git a/lib/sidekiq/limit_fetch/singleton.rb b/lib/sidekiq/limit_fetch/singleton.rb index 39ff164..69bf609 100644 --- a/lib/sidekiq/limit_fetch/singleton.rb +++ b/lib/sidekiq/limit_fetch/singleton.rb @@ -8,4 +8,8 @@ module Sidekiq::LimitFetch::Singleton end alias [] new + + def instances + @instances.values + end end