mirror of
https://github.com/deanpcmad/sidekiq-limit_fetch.git
synced 2022-11-09 13:54:36 -05:00
Pace yourself
fetcher only needs to poll redis when there is a potential queue which can be queried additionally to currently polling queues
This commit is contained in:
parent
e664e97819
commit
9f7ac9eaf4
5 changed files with 38 additions and 4 deletions
|
@ -1,6 +1,7 @@
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
class Queue
|
class Queue
|
||||||
extend LimitFetch::Singleton, Forwardable
|
extend LimitFetch::Singleton, Forwardable
|
||||||
|
attr_reader :rname
|
||||||
|
|
||||||
def_delegators :lock,
|
def_delegators :lock,
|
||||||
:limit, :limit=,
|
:limit, :limit=,
|
||||||
|
@ -10,7 +11,8 @@ module Sidekiq
|
||||||
:paused?, :blocking?,
|
:paused?, :blocking?,
|
||||||
:unblocked, :block_except,
|
:unblocked, :block_except,
|
||||||
:probed, :busy,
|
:probed, :busy,
|
||||||
:increase_busy, :decrease_busy
|
:increase_busy, :decrease_busy,
|
||||||
|
:local_busy?
|
||||||
|
|
||||||
def lock
|
def lock
|
||||||
@lock ||= LimitFetch::Global::Semaphore.new name
|
@lock ||= LimitFetch::Global::Semaphore.new name
|
||||||
|
|
|
@ -15,8 +15,8 @@ class Sidekiq::LimitFetch
|
||||||
include Redis
|
include Redis
|
||||||
Sidekiq.options[:fetch] = self
|
Sidekiq.options[:fetch] = self
|
||||||
|
|
||||||
def self.bulk_requeue(jobs)
|
def self.bulk_requeue(*args)
|
||||||
Sidekiq::BasicFetch.bulk_requeue jobs
|
Sidekiq::BasicFetch.bulk_requeue *args
|
||||||
end
|
end
|
||||||
|
|
||||||
def initialize(options)
|
def initialize(options)
|
||||||
|
@ -39,6 +39,16 @@ class Sidekiq::LimitFetch
|
||||||
|
|
||||||
def redis_brpop(*args)
|
def redis_brpop(*args)
|
||||||
return if args.size < 2
|
return if args.size < 2
|
||||||
nonblocking_redis {|it| it.brpop *args }
|
query = -> redis { redis.brpop *args }
|
||||||
|
|
||||||
|
if busy_local_queues.any? {|queue| not args.include? queue.rname }
|
||||||
|
nonblocking_redis(&query)
|
||||||
|
else
|
||||||
|
redis(&query)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def busy_local_queues
|
||||||
|
Sidekiq::Queue.instances.select(&:local_busy?)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -4,8 +4,12 @@ module Sidekiq::LimitFetch::Global
|
||||||
|
|
||||||
PREFIX = 'limit_fetch'
|
PREFIX = 'limit_fetch'
|
||||||
|
|
||||||
|
attr_reader :local_busy
|
||||||
|
|
||||||
def initialize(name)
|
def initialize(name)
|
||||||
@name = name
|
@name = name
|
||||||
|
@lock = Mutex.new
|
||||||
|
@local_busy = 0
|
||||||
end
|
end
|
||||||
|
|
||||||
def limit
|
def limit
|
||||||
|
@ -30,10 +34,12 @@ module Sidekiq::LimitFetch::Global
|
||||||
end
|
end
|
||||||
|
|
||||||
def increase_busy
|
def increase_busy
|
||||||
|
increase_local_busy
|
||||||
redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
|
redis {|it| it.rpush "#{PREFIX}:busy:#@name", Selector.uuid }
|
||||||
end
|
end
|
||||||
|
|
||||||
def decrease_busy
|
def decrease_busy
|
||||||
|
decrease_local_busy
|
||||||
redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
|
redis {|it| it.lrem "#{PREFIX}:busy:#@name", 1, Selector.uuid }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -69,5 +75,17 @@ module Sidekiq::LimitFetch::Global
|
||||||
def blocking?
|
def blocking?
|
||||||
redis {|it| it.get "#{PREFIX}:block:#@name" }
|
redis {|it| it.get "#{PREFIX}:block:#@name" }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def increase_local_busy
|
||||||
|
@lock.synchronize { @local_busy += 1 }
|
||||||
|
end
|
||||||
|
|
||||||
|
def decrease_local_busy
|
||||||
|
@lock.synchronize { @local_busy -= 1 }
|
||||||
|
end
|
||||||
|
|
||||||
|
def local_busy?
|
||||||
|
@local_busy > 0
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -8,4 +8,8 @@ module Sidekiq::LimitFetch::Singleton
|
||||||
end
|
end
|
||||||
|
|
||||||
alias [] new
|
alias [] new
|
||||||
|
|
||||||
|
def instances
|
||||||
|
@instances.values
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue