From c779018af8700e4dc83ee57c493d1e1fe325c3ef Mon Sep 17 00:00:00 2001 From: Jake Mack Date: Fri, 27 Jul 2012 15:18:11 -0700 Subject: [PATCH 1/4] Implemented a strictly ordered queue switch --- lib/sidekiq/cli.rb | 13 ++++++++----- lib/sidekiq/fetch.rb | 10 ++++++---- lib/sidekiq/manager.rb | 2 +- test/test_cli.rb | 32 ++++++++++++++++++++++++++++++-- test/test_fetch.rb | 13 +++++++++++++ 5 files changed, 58 insertions(+), 12 deletions(-) create mode 100644 test/test_fetch.rb diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 7601467d..e1e31880 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -124,7 +124,6 @@ module Sidekiq def validate! options[:queues] << 'default' if options[:queues].empty? - options[:queues].shuffle! if !File.exist?(options[:require]) || (File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb")) @@ -141,9 +140,13 @@ module Sidekiq opts = {} @parser = OptionParser.new do |o| - o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg| - q, weight = arg.split(",") - parse_queues(opts, q, weight) + o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg| + queues_and_weights = arg.scan(/(\w+),?(\d*)/) + queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)} + end + + o.on "-s", "--strict", "Use strictly ordered queues (e.g. all jobs in higher priority queues are performed before any jobs in lower priority queues)" do + opts[:strict] = true end o.on "-v", "--verbose", "Print more verbose output" do @@ -208,7 +211,7 @@ module Sidekiq end def parse_queues(opts, q, weight) - (weight || 1).to_i.times do + [weight.to_i, 1].max.times do (opts[:queues] ||= []) << q end end diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index efa09883..cf277bae 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -12,10 +12,11 @@ module Sidekiq TIMEOUT = 1 - def initialize(mgr, queues) + def initialize(mgr, queues, strict) @mgr = mgr + @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } - @unique_queues = @queues.uniq + @unique_ordered_queues = @queues.uniq.sort {|a, b| @queues.count(b) <=> @queues.count(a)} end # Fetching is straightforward: the Manager makes a fetch @@ -68,8 +69,9 @@ module Sidekiq # recreate the queue command each time we invoke Redis#blpop # to honor weights and avoid queue starvation. def queues_cmd - queues = @queues.sample(@unique_queues.size).uniq - queues.concat(@unique_queues - queues) + return @unique_ordered_queues.dup << TIMEOUT if @strictly_ordered_queues + queues = @queues.sample(@unique_ordered_queues.size).uniq + queues.concat(@unique_ordered_queues - queues) queues << TIMEOUT end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 74534be0..bdf75f99 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -27,7 +27,7 @@ module Sidekiq @in_progress = {} @done = false @busy = [] - @fetcher = Fetcher.new(current_actor, options[:queues]) + @fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict]) @ready = @count.times.map { Processor.new_link(current_actor) } procline end diff --git a/test/test_cli.rb b/test/test_cli.rb index 5a7fcb30..21da0088 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -40,14 +40,24 @@ class TestCli < MiniTest::Unit::TestCase assert_equal ['foo'], Sidekiq.options[:queues] end + it 'sets strictly ordered queues' do + @cli.parse(['sidekiq', '-s', '-r', './test/fake_env.rb']) + assert_equal true, Sidekiq.options[:strict] + end + it 'changes timeout' do @cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb']) assert_equal 30, Sidekiq.options[:timeout] end - it 'handles multiple queues with weights' do + it 'handles multiple queues with weights with multiple switches' do @cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb']) - assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort + assert_equal %w(foo foo foo bar), Sidekiq.options[:queues] + end + + it 'handles multiple queues with weights with a single switch' do + @cli.parse(['sidekiq', '-q', 'bar,foo,3', '-r', './test/fake_env.rb']) + assert_equal %w(bar foo foo foo), Sidekiq.options[:queues] end it 'sets verbose' do @@ -163,6 +173,24 @@ class TestCli < MiniTest::Unit::TestCase assert_equal 3, Sidekiq.options[:queues].count { |q| q == 'seldom' } end end + + describe 'Sidekiq::CLI#parse_queues' do + describe 'when weight is present' do + it 'concatenates queue to opts[:queues] weight number of times' do + opts = {} + @cli.send :parse_queues, opts, 'often', 7 + assert_equal %w[often] * 7, opts[:queues] + end + end + + describe 'when weight is not present' do + it 'concatenates queue to opts[:queues] once' do + opts = {} + @cli.send :parse_queues, opts, 'once', nil + assert_equal %w[once], opts[:queues] + end + end + end end end diff --git a/test/test_fetch.rb b/test/test_fetch.rb new file mode 100644 index 00000000..714054d1 --- /dev/null +++ b/test/test_fetch.rb @@ -0,0 +1,13 @@ +require 'helper' +require 'sidekiq/fetch' + +class TestFetcher < MiniTest::Unit::TestCase + describe 'Fetcher#queues_cmd' do + describe 'when queues are strictly ordered' do + it 'returns the unique ordered queues properly based on priority and order they were passed in' do + fetcher = Sidekiq::Fetcher.new nil, %w[medium medium high high high low default], true + assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd) + end + end + end +end From 05aa2b2552f50e14bf15dde565d34bc544559eb7 Mon Sep 17 00:00:00 2001 From: Jake Mack Date: Fri, 27 Jul 2012 16:03:01 -0700 Subject: [PATCH 2/4] Performance enhancing codes --- lib/sidekiq/fetch.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index cf277bae..fbb6935b 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -16,7 +16,7 @@ module Sidekiq @mgr = mgr @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } - @unique_ordered_queues = @queues.uniq.sort {|a, b| @queues.count(b) <=> @queues.count(a)} + @unique_ordered_queues = @queues.group_by {|q| q}.sort_by { |k, v| -v.size }.map(&:first) end # Fetching is straightforward: the Manager makes a fetch From e6c306de15e822626cb660bdf5622fe9a4d6ecfb Mon Sep 17 00:00:00 2001 From: Jake Mack Date: Tue, 31 Jul 2012 13:58:50 -0700 Subject: [PATCH 3/4] Removed -s flag, simplified queue logic --- lib/sidekiq/cli.rb | 5 +---- lib/sidekiq/fetch.rb | 8 ++++---- test/helper.rb | 2 +- test/test_cli.rb | 11 ++++++++--- test/test_fetch.rb | 2 +- 5 files changed, 15 insertions(+), 13 deletions(-) diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index e1e31880..c56acec2 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -143,10 +143,7 @@ module Sidekiq o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg| queues_and_weights = arg.scan(/(\w+),?(\d*)/) queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)} - end - - o.on "-s", "--strict", "Use strictly ordered queues (e.g. all jobs in higher priority queues are performed before any jobs in lower priority queues)" do - opts[:strict] = true + opts[:strict] = queues_and_weights.collect(&:last).none? {|weight| weight != ''} end o.on "-v", "--verbose", "Print more verbose output" do diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index fbb6935b..37c7174b 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -16,7 +16,7 @@ module Sidekiq @mgr = mgr @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } - @unique_ordered_queues = @queues.group_by {|q| q}.sort_by { |k, v| -v.size }.map(&:first) + @unique_queues = @queues.uniq end # Fetching is straightforward: the Manager makes a fetch @@ -69,9 +69,9 @@ module Sidekiq # recreate the queue command each time we invoke Redis#blpop # to honor weights and avoid queue starvation. def queues_cmd - return @unique_ordered_queues.dup << TIMEOUT if @strictly_ordered_queues - queues = @queues.sample(@unique_ordered_queues.size).uniq - queues.concat(@unique_ordered_queues - queues) + return @unique_queues.dup << TIMEOUT if @strictly_ordered_queues + queues = @queues.sample(@unique_queues.size).uniq + queues.concat(@unique_queues - queues) queues << TIMEOUT end end diff --git a/test/helper.rb b/test/helper.rb index af79aca8..4fa45c5c 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,4 +13,4 @@ require 'sidekiq/util' Sidekiq.logger.level = Logger::ERROR require 'sidekiq/redis_connection' -REDIS = Sidekiq::RedisConnection.create(:url => "redis://localhost/15", :namespace => 'testy') +REDIS = Sidekiq::RedisConnection.create(:url => "redis://localhost/1", :namespace => 'testy') diff --git a/test/test_cli.rb b/test/test_cli.rb index 21da0088..361ea3df 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -40,9 +40,14 @@ class TestCli < MiniTest::Unit::TestCase assert_equal ['foo'], Sidekiq.options[:queues] end - it 'sets strictly ordered queues' do - @cli.parse(['sidekiq', '-s', '-r', './test/fake_env.rb']) - assert_equal true, Sidekiq.options[:strict] + it 'sets strictly ordered queues if weights are not present' do + @cli.parse(['sidekiq', '-q', 'foo,bar', '-r', './test/fake_env.rb']) + assert_equal true, !!Sidekiq.options[:strict] + end + + it 'does not set strictly ordered queues if weights are present' do + @cli.parse(['sidekiq', '-q', 'foo,3', '-r', './test/fake_env.rb']) + assert_equal false, !!Sidekiq.options[:strict] end it 'changes timeout' do diff --git a/test/test_fetch.rb b/test/test_fetch.rb index 714054d1..dfbea6f4 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -5,7 +5,7 @@ class TestFetcher < MiniTest::Unit::TestCase describe 'Fetcher#queues_cmd' do describe 'when queues are strictly ordered' do it 'returns the unique ordered queues properly based on priority and order they were passed in' do - fetcher = Sidekiq::Fetcher.new nil, %w[medium medium high high high low default], true + fetcher = Sidekiq::Fetcher.new nil, %w[high medium low default], true assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd) end end From 5941f53f3c5c3bf18dd6d24aa108f19ba4f5e2ed Mon Sep 17 00:00:00 2001 From: Jake Mack Date: Tue, 31 Jul 2012 14:03:42 -0700 Subject: [PATCH 4/4] Reverting change to test helper --- test/helper.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/helper.rb b/test/helper.rb index 4fa45c5c..af79aca8 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,4 +13,4 @@ require 'sidekiq/util' Sidekiq.logger.level = Logger::ERROR require 'sidekiq/redis_connection' -REDIS = Sidekiq::RedisConnection.create(:url => "redis://localhost/1", :namespace => 'testy') +REDIS = Sidekiq::RedisConnection.create(:url => "redis://localhost/15", :namespace => 'testy')