1
0
Fork 0
mirror of https://github.com/deanpcmad/sidekiq-limit_fetch.git synced 2022-11-09 13:54:36 -05:00

Initial implementation

This commit is contained in:
brainopia 2013-01-11 14:56:41 +04:00
commit f850430076
10 changed files with 184 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
Gemfile.lock
pkg/

2
Gemfile Normal file
View file

@ -0,0 +1,2 @@
source :rubygems
gemspec

22
LICENSE.txt Normal file
View file

@ -0,0 +1,22 @@
Copyright (c) 2013 brainopia
MIT License
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

22
README.md Normal file
View file

@ -0,0 +1,22 @@
## Description
Sidekig strategy to restrict number of workers
which are able to run specified queues simultaneously.
## Installation
Add this line to your application's Gemfile:
gem 'sidekiq-limit_fetch'
## Usage
Specify limits which you want to place on queues inside sidekiq.yml:
```yaml
:limits:
restricted_queue: 5
```
In this example, tasks for restricted queue will be run by at most 5
workers at the same time.

1
Rakefile Normal file
View file

@ -0,0 +1 @@
require "bundler/gem_tasks"

View file

@ -0,0 +1,58 @@
require 'sidekiq'
class Sidekiq::LimitFetch
require_relative 'limit_fetch/semaphore'
require_relative 'limit_fetch/queue'
require_relative 'limit_fetch/unit_of_work'
Sidekiq.options[:fetch] = self
def initialize(options)
prepare_queues options
options[:strict] ? define_strict_queues : define_weighted_queues
end
def available_queues
fetch_queues.select(&:acquire)
end
def retrieve_work
queues = available_queues
queue_name, message = Sidekiq.redis do |it|
it.brpop *queues.map(&:full_name), Sidekiq::Fetcher::TIMEOUT
end
if message
queue = queues.find {|it| it.name == queue_name }
queues.delete queue
UnitOfWork.new queue, message
end
ensure
queues.each(&:release) if queues
end
private
def prepare_queues(options)
cache = {}
limits = options[:limits] || {}
@queues = options[:queues].map do |name|
cache[name] ||= Queue.new name, limits[name]
end
end
def define_strict_queues
@queues.uniq!
def fetch_queues
@queues
end
end
def define_weighted_queues
def fetch_queues
@queues.shuffle.uniq
end
end
end

View file

@ -0,0 +1,14 @@
class Sidekiq::LimitFetch
class Queue
extend Forwardable
attr_reader :name, :full_name
def_delegators :@lock, :acquire, :release
def initialize(name, limit)
@name = name
@full_name = "queue:#{name}"
@lock = Semaphore.for limit
end
end
end

View file

@ -0,0 +1,28 @@
class Sidekiq::LimitFetch::Semaphore
Stub = Struct.new(:acquire, :release)
def self.for(limit)
limit ? new(limit) : stub
end
def self.stub
@stub ||= Stub.new(true, true)
end
def initialize(limit)
@lock = Mutex.new
@limit = limit
end
def acquire
@lock.synchronize do
@limit -= 1 if @limit > 0
end
end
def release
@lock.synchronize do
@limit += 1
end
end
end

View file

@ -0,0 +1,16 @@
Sidekiq::LimitFetch::UnitOfWork = Struct.new :queue_wrapper, :message do
extend Forwardable
def_delegator :queue_wrapper, :full_name, :queue
def_delegator :queue_wrapper, :name, :queue_name
def_delegator :queue_wrapper, :release
def acknowledge
release
end
def requeue
release
Sidekiq.redis {|it| it.rpush queue, message }
end
end

View file

@ -0,0 +1,19 @@
Gem::Specification.new do |gem|
gem.name = 'sidekiq-limit_fetch'
gem.version = '0.1'
gem.authors = 'brainopia'
gem.email = 'brainopia@evilmartians.com'
gem.summary = 'Sidekig strategy to support queue limits'
gem.homepage = 'https://github.com/brainopia/sidekiq-limit_fetch'
gem.description = <<-DESCRIPTION
Sidekig strategy to restrict number of workers
which are able to run specified queues simultaneously.
DESCRIPTION
gem.files = `git ls-files`.split($/)
gem.test_files = gem.files.grep %r{^spec/}
gem.require_paths = %w(lib)
gem.add_dependency 'sidekiq', '>= 2.6.3'
gem.add_development_dependency 'rspec'
end