mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Implemented a strictly ordered queue switch
This commit is contained in:
parent
f235f67701
commit
c779018af8
5 changed files with 58 additions and 12 deletions
|
@ -124,7 +124,6 @@ module Sidekiq
|
|||
|
||||
def validate!
|
||||
options[:queues] << 'default' if options[:queues].empty?
|
||||
options[:queues].shuffle!
|
||||
|
||||
if !File.exist?(options[:require]) ||
|
||||
(File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb"))
|
||||
|
@ -141,9 +140,13 @@ module Sidekiq
|
|||
opts = {}
|
||||
|
||||
@parser = OptionParser.new do |o|
|
||||
o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg|
|
||||
q, weight = arg.split(",")
|
||||
parse_queues(opts, q, weight)
|
||||
o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg|
|
||||
queues_and_weights = arg.scan(/(\w+),?(\d*)/)
|
||||
queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)}
|
||||
end
|
||||
|
||||
o.on "-s", "--strict", "Use strictly ordered queues (e.g. all jobs in higher priority queues are performed before any jobs in lower priority queues)" do
|
||||
opts[:strict] = true
|
||||
end
|
||||
|
||||
o.on "-v", "--verbose", "Print more verbose output" do
|
||||
|
@ -208,7 +211,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def parse_queues(opts, q, weight)
|
||||
(weight || 1).to_i.times do
|
||||
[weight.to_i, 1].max.times do
|
||||
(opts[:queues] ||= []) << q
|
||||
end
|
||||
end
|
||||
|
|
|
@ -12,10 +12,11 @@ module Sidekiq
|
|||
|
||||
TIMEOUT = 1
|
||||
|
||||
def initialize(mgr, queues)
|
||||
def initialize(mgr, queues, strict)
|
||||
@mgr = mgr
|
||||
@strictly_ordered_queues = strict
|
||||
@queues = queues.map { |q| "queue:#{q}" }
|
||||
@unique_queues = @queues.uniq
|
||||
@unique_ordered_queues = @queues.uniq.sort {|a, b| @queues.count(b) <=> @queues.count(a)}
|
||||
end
|
||||
|
||||
# Fetching is straightforward: the Manager makes a fetch
|
||||
|
@ -68,8 +69,9 @@ module Sidekiq
|
|||
# recreate the queue command each time we invoke Redis#blpop
|
||||
# to honor weights and avoid queue starvation.
|
||||
def queues_cmd
|
||||
queues = @queues.sample(@unique_queues.size).uniq
|
||||
queues.concat(@unique_queues - queues)
|
||||
return @unique_ordered_queues.dup << TIMEOUT if @strictly_ordered_queues
|
||||
queues = @queues.sample(@unique_ordered_queues.size).uniq
|
||||
queues.concat(@unique_ordered_queues - queues)
|
||||
queues << TIMEOUT
|
||||
end
|
||||
end
|
||||
|
|
|
@ -27,7 +27,7 @@ module Sidekiq
|
|||
@in_progress = {}
|
||||
@done = false
|
||||
@busy = []
|
||||
@fetcher = Fetcher.new(current_actor, options[:queues])
|
||||
@fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict])
|
||||
@ready = @count.times.map { Processor.new_link(current_actor) }
|
||||
procline
|
||||
end
|
||||
|
|
|
@ -40,14 +40,24 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
assert_equal ['foo'], Sidekiq.options[:queues]
|
||||
end
|
||||
|
||||
it 'sets strictly ordered queues' do
|
||||
@cli.parse(['sidekiq', '-s', '-r', './test/fake_env.rb'])
|
||||
assert_equal true, Sidekiq.options[:strict]
|
||||
end
|
||||
|
||||
it 'changes timeout' do
|
||||
@cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb'])
|
||||
assert_equal 30, Sidekiq.options[:timeout]
|
||||
end
|
||||
|
||||
it 'handles multiple queues with weights' do
|
||||
it 'handles multiple queues with weights with multiple switches' do
|
||||
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
|
||||
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
|
||||
assert_equal %w(foo foo foo bar), Sidekiq.options[:queues]
|
||||
end
|
||||
|
||||
it 'handles multiple queues with weights with a single switch' do
|
||||
@cli.parse(['sidekiq', '-q', 'bar,foo,3', '-r', './test/fake_env.rb'])
|
||||
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues]
|
||||
end
|
||||
|
||||
it 'sets verbose' do
|
||||
|
@ -163,6 +173,24 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
assert_equal 3, Sidekiq.options[:queues].count { |q| q == 'seldom' }
|
||||
end
|
||||
end
|
||||
|
||||
describe 'Sidekiq::CLI#parse_queues' do
|
||||
describe 'when weight is present' do
|
||||
it 'concatenates queue to opts[:queues] weight number of times' do
|
||||
opts = {}
|
||||
@cli.send :parse_queues, opts, 'often', 7
|
||||
assert_equal %w[often] * 7, opts[:queues]
|
||||
end
|
||||
end
|
||||
|
||||
describe 'when weight is not present' do
|
||||
it 'concatenates queue to opts[:queues] once' do
|
||||
opts = {}
|
||||
@cli.send :parse_queues, opts, 'once', nil
|
||||
assert_equal %w[once], opts[:queues]
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
13
test/test_fetch.rb
Normal file
13
test/test_fetch.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
require 'helper'
|
||||
require 'sidekiq/fetch'
|
||||
|
||||
class TestFetcher < MiniTest::Unit::TestCase
|
||||
describe 'Fetcher#queues_cmd' do
|
||||
describe 'when queues are strictly ordered' do
|
||||
it 'returns the unique ordered queues properly based on priority and order they were passed in' do
|
||||
fetcher = Sidekiq::Fetcher.new nil, %w[medium medium high high high low default], true
|
||||
assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Add table
Reference in a new issue