diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 7601467d..e1e31880 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -124,7 +124,6 @@ module Sidekiq def validate! options[:queues] << 'default' if options[:queues].empty? - options[:queues].shuffle! if !File.exist?(options[:require]) || (File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb")) @@ -141,9 +140,13 @@ module Sidekiq opts = {} @parser = OptionParser.new do |o| - o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg| - q, weight = arg.split(",") - parse_queues(opts, q, weight) + o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg| + queues_and_weights = arg.scan(/(\w+),?(\d*)/) + queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)} + end + + o.on "-s", "--strict", "Use strictly ordered queues (e.g. all jobs in higher priority queues are performed before any jobs in lower priority queues)" do + opts[:strict] = true end o.on "-v", "--verbose", "Print more verbose output" do @@ -208,7 +211,7 @@ module Sidekiq end def parse_queues(opts, q, weight) - (weight || 1).to_i.times do + [weight.to_i, 1].max.times do (opts[:queues] ||= []) << q end end diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index efa09883..cf277bae 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -12,10 +12,11 @@ module Sidekiq TIMEOUT = 1 - def initialize(mgr, queues) + def initialize(mgr, queues, strict) @mgr = mgr + @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } - @unique_queues = @queues.uniq + @unique_ordered_queues = @queues.uniq.sort {|a, b| @queues.count(b) <=> @queues.count(a)} end # Fetching is straightforward: the Manager makes a fetch @@ -68,8 +69,9 @@ module Sidekiq # recreate the queue command each time we invoke Redis#blpop # to honor weights and avoid queue starvation. def queues_cmd - queues = @queues.sample(@unique_queues.size).uniq - queues.concat(@unique_queues - queues) + return @unique_ordered_queues.dup << TIMEOUT if @strictly_ordered_queues + queues = @queues.sample(@unique_ordered_queues.size).uniq + queues.concat(@unique_ordered_queues - queues) queues << TIMEOUT end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 74534be0..bdf75f99 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -27,7 +27,7 @@ module Sidekiq @in_progress = {} @done = false @busy = [] - @fetcher = Fetcher.new(current_actor, options[:queues]) + @fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict]) @ready = @count.times.map { Processor.new_link(current_actor) } procline end diff --git a/test/test_cli.rb b/test/test_cli.rb index 5a7fcb30..21da0088 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -40,14 +40,24 @@ class TestCli < MiniTest::Unit::TestCase assert_equal ['foo'], Sidekiq.options[:queues] end + it 'sets strictly ordered queues' do + @cli.parse(['sidekiq', '-s', '-r', './test/fake_env.rb']) + assert_equal true, Sidekiq.options[:strict] + end + it 'changes timeout' do @cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb']) assert_equal 30, Sidekiq.options[:timeout] end - it 'handles multiple queues with weights' do + it 'handles multiple queues with weights with multiple switches' do @cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb']) - assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort + assert_equal %w(foo foo foo bar), Sidekiq.options[:queues] + end + + it 'handles multiple queues with weights with a single switch' do + @cli.parse(['sidekiq', '-q', 'bar,foo,3', '-r', './test/fake_env.rb']) + assert_equal %w(bar foo foo foo), Sidekiq.options[:queues] end it 'sets verbose' do @@ -163,6 +173,24 @@ class TestCli < MiniTest::Unit::TestCase assert_equal 3, Sidekiq.options[:queues].count { |q| q == 'seldom' } end end + + describe 'Sidekiq::CLI#parse_queues' do + describe 'when weight is present' do + it 'concatenates queue to opts[:queues] weight number of times' do + opts = {} + @cli.send :parse_queues, opts, 'often', 7 + assert_equal %w[often] * 7, opts[:queues] + end + end + + describe 'when weight is not present' do + it 'concatenates queue to opts[:queues] once' do + opts = {} + @cli.send :parse_queues, opts, 'once', nil + assert_equal %w[once], opts[:queues] + end + end + end end end diff --git a/test/test_fetch.rb b/test/test_fetch.rb new file mode 100644 index 00000000..714054d1 --- /dev/null +++ b/test/test_fetch.rb @@ -0,0 +1,13 @@ +require 'helper' +require 'sidekiq/fetch' + +class TestFetcher < MiniTest::Unit::TestCase + describe 'Fetcher#queues_cmd' do + describe 'when queues are strictly ordered' do + it 'returns the unique ordered queues properly based on priority and order they were passed in' do + fetcher = Sidekiq::Fetcher.new nil, %w[medium medium high high high low default], true + assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd) + end + end + end +end