mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor options to be globally available for things like
connection pool sizing.
This commit is contained in:
parent
8fbbd02f64
commit
8ee8137caa
9 changed files with 109 additions and 80 deletions
|
@ -4,19 +4,38 @@ require 'sidekiq/worker'
|
|||
require 'sidekiq/rails' if defined?(::Rails)
|
||||
require 'sidekiq/redis_connection'
|
||||
|
||||
require 'sidekiq/extensions/action_mailer'
|
||||
require 'sidekiq/extensions/active_record'
|
||||
|
||||
require 'sidekiq/middleware/chain'
|
||||
require 'sidekiq/middleware/server/active_record'
|
||||
require 'sidekiq/middleware/server/airbrake'
|
||||
require 'sidekiq/middleware/server/unique_jobs'
|
||||
require 'sidekiq/middleware/server/failure_jobs'
|
||||
require 'sidekiq/middleware/client/resque_web_compatibility'
|
||||
require 'sidekiq/middleware/client/unique_jobs'
|
||||
require 'sidekiq/extensions/action_mailer' if defined?(::ActionMailer)
|
||||
require 'sidekiq/extensions/active_record' if defined?(::ActiveRecord)
|
||||
|
||||
module Sidekiq
|
||||
|
||||
DEFAULTS = {
|
||||
:queues => [],
|
||||
:concurrency => 25,
|
||||
:require => '.',
|
||||
:environment => nil,
|
||||
}
|
||||
|
||||
def self.options
|
||||
@options ||= DEFAULTS.dup
|
||||
end
|
||||
|
||||
def self.options=(opts)
|
||||
@options = opts
|
||||
end
|
||||
|
||||
##
|
||||
# Configuration for Sidekiq, use like:
|
||||
#
|
||||
# Sidekiq.configure do |config|
|
||||
# config.server_middleware do |chain|
|
||||
# chain.add MyServerHook
|
||||
# end
|
||||
# end
|
||||
def self.configure
|
||||
yield self
|
||||
end
|
||||
|
||||
def self.redis
|
||||
@redis ||= Sidekiq::RedisConnection.create
|
||||
end
|
||||
|
@ -26,25 +45,13 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def self.client_middleware
|
||||
@client_chain ||= begin
|
||||
m = Middleware::Chain.new
|
||||
m.add Middleware::Client::UniqueJobs
|
||||
m.add Middleware::Client::ResqueWebCompatibility
|
||||
m
|
||||
end
|
||||
@client_chain ||= Client.default_middleware
|
||||
yield @client_chain if block_given?
|
||||
@client_chain
|
||||
end
|
||||
|
||||
def self.server_middleware
|
||||
@server_chain ||= begin
|
||||
m = Middleware::Chain.new
|
||||
m.add Middleware::Server::Airbrake
|
||||
m.add Middleware::Server::UniqueJobs
|
||||
m.add Middleware::Server::ActiveRecord
|
||||
m
|
||||
end
|
||||
|
||||
@server_chain ||= Processor.default_middleware
|
||||
yield @server_chain if block_given?
|
||||
@server_chain
|
||||
end
|
||||
|
|
|
@ -9,8 +9,9 @@ trap 'TERM' do
|
|||
Thread.main.raise Interrupt
|
||||
end
|
||||
|
||||
require 'yaml'
|
||||
require 'optparse'
|
||||
require 'sidekiq/version'
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/redis_connection'
|
||||
require 'sidekiq/manager'
|
||||
|
@ -20,7 +21,7 @@ module Sidekiq
|
|||
include Util
|
||||
|
||||
# Used for CLI testing
|
||||
attr_accessor :options, :code
|
||||
attr_accessor :code
|
||||
|
||||
def initialize
|
||||
@code = nil
|
||||
|
@ -29,17 +30,11 @@ module Sidekiq
|
|||
def parse(args=ARGV)
|
||||
Sidekiq::Util.logger
|
||||
|
||||
@options = {
|
||||
:queues => [],
|
||||
:concurrency => 25,
|
||||
:require => '.',
|
||||
:environment => nil,
|
||||
}
|
||||
cli = parse_options(args)
|
||||
config = parse_config(cli)
|
||||
@options.merge!(config.merge(cli))
|
||||
options.merge!(config.merge(cli))
|
||||
|
||||
set_logger_level_to_debug if @options[:verbose]
|
||||
set_logger_level_to_debug if options[:verbose]
|
||||
|
||||
write_pid
|
||||
validate!
|
||||
|
@ -47,8 +42,8 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def run
|
||||
Sidekiq.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace])
|
||||
manager = Sidekiq::Manager.new(@options)
|
||||
Sidekiq.redis ||= RedisConnection.create(:url => options[:server], :namespace => options[:namespace])
|
||||
manager = Sidekiq::Manager.new(options)
|
||||
begin
|
||||
logger.info 'Starting processing, hit Ctrl-C to stop'
|
||||
manager.start!
|
||||
|
@ -69,29 +64,33 @@ module Sidekiq
|
|||
exit(code)
|
||||
end
|
||||
|
||||
def options
|
||||
Sidekiq.options
|
||||
end
|
||||
|
||||
def detected_environment
|
||||
@options[:environment] ||= ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
|
||||
options[:environment] ||= ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
|
||||
end
|
||||
|
||||
def boot_system
|
||||
ENV['RACK_ENV'] = ENV['RAILS_ENV'] = detected_environment
|
||||
|
||||
raise ArgumentError, "#{@options[:require]} does not exist" unless File.exist?(@options[:require])
|
||||
raise ArgumentError, "#{options[:require]} does not exist" unless File.exist?(options[:require])
|
||||
|
||||
if File.directory?(@options[:require])
|
||||
require File.expand_path("#{@options[:require]}/config/environment.rb")
|
||||
if File.directory?(options[:require])
|
||||
require File.expand_path("#{options[:require]}/config/environment.rb")
|
||||
::Rails.application.eager_load!
|
||||
else
|
||||
require @options[:require]
|
||||
require options[:require]
|
||||
end
|
||||
end
|
||||
|
||||
def validate!
|
||||
@options[:queues] << 'default' if @options[:queues].empty?
|
||||
@options[:queues].shuffle!
|
||||
options[:queues] << 'default' if options[:queues].empty?
|
||||
options[:queues].shuffle!
|
||||
|
||||
if !File.exist?(@options[:require]) ||
|
||||
(File.directory?(@options[:require]) && !File.exist?("#{@options[:require]}/config/application.rb"))
|
||||
if !File.exist?(options[:require]) ||
|
||||
(File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb"))
|
||||
logger.info "=================================================================="
|
||||
logger.info " Please point sidekiq to a Rails 3 application or a Ruby file "
|
||||
logger.info " to load your worker classes with -r [DIR|FILE]."
|
||||
|
@ -107,7 +106,7 @@ module Sidekiq
|
|||
@parser = OptionParser.new do |o|
|
||||
o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg|
|
||||
(q, weight) = arg.split(",")
|
||||
parse_queues(q, weight)
|
||||
parse_queues(opts, q, weight)
|
||||
end
|
||||
|
||||
o.on "-v", "--verbose", "Print more verbose output" do
|
||||
|
@ -153,7 +152,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def write_pid
|
||||
if path = @options[:pidfile]
|
||||
if path = options[:pidfile]
|
||||
File.open(path, 'w') do |f|
|
||||
f.puts Process.pid
|
||||
end
|
||||
|
@ -163,19 +162,16 @@ module Sidekiq
|
|||
def parse_config(cli)
|
||||
opts = {}
|
||||
if cli[:config_file] && File.exist?(cli[:config_file])
|
||||
require 'yaml'
|
||||
opts = YAML.load_file cli[:config_file]
|
||||
queues = opts.delete(:queues) || []
|
||||
if @options[:queues].empty?
|
||||
queues.each { |pair| parse_queues(*pair) }
|
||||
end
|
||||
queues.each { |pair| parse_queues(opts, *pair) }
|
||||
end
|
||||
opts
|
||||
end
|
||||
|
||||
def parse_queues(q, weight)
|
||||
def parse_queues(opts, q, weight)
|
||||
(weight || 1).to_i.times do
|
||||
@options[:queues] << q
|
||||
(opts[:queues] ||= []) << q
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -13,6 +13,13 @@ module Sidekiq
|
|||
raise "Sidekiq::Client.middleware is now Sidekiq.client_middleware"
|
||||
end
|
||||
|
||||
def self.default_middleware
|
||||
Middleware::Chain.new do |m|
|
||||
m.add Middleware::Client::UniqueJobs
|
||||
m.add Middleware::Client::ResqueWebCompatibility
|
||||
end
|
||||
end
|
||||
|
||||
def self.registered_workers
|
||||
Sidekiq.redis.smembers('workers')
|
||||
end
|
||||
|
|
|
@ -45,6 +45,7 @@ module Sidekiq
|
|||
|
||||
def initialize
|
||||
@entries = []
|
||||
yield self if block_given?
|
||||
end
|
||||
|
||||
def remove(klass)
|
||||
|
|
|
@ -1,6 +1,11 @@
|
|||
require 'celluloid'
|
||||
require 'sidekiq/util'
|
||||
|
||||
require 'sidekiq/middleware/server/active_record'
|
||||
require 'sidekiq/middleware/server/airbrake'
|
||||
require 'sidekiq/middleware/server/unique_jobs'
|
||||
require 'sidekiq/middleware/server/failure_jobs'
|
||||
|
||||
module Sidekiq
|
||||
class Processor
|
||||
include Util
|
||||
|
@ -10,6 +15,14 @@ module Sidekiq
|
|||
raise "Sidekiq::Processor.middleware is now Sidekiq.server_middleware"
|
||||
end
|
||||
|
||||
def self.default_middleware
|
||||
Middleware::Chain.new do |m|
|
||||
m.add Middleware::Server::Airbrake
|
||||
m.add Middleware::Server::UniqueJobs
|
||||
m.add Middleware::Server::ActiveRecord
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(boss)
|
||||
@boss = boss
|
||||
redis.sadd('workers', self)
|
||||
|
|
|
@ -6,7 +6,7 @@ module Sidekiq
|
|||
def self.create(options={})
|
||||
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
|
||||
client = build_client(url, options[:namespace])
|
||||
return ConnectionPool.new(:timeout => 1, :size => (options[:size] || 15)) { client } unless options[:use_pool] == false
|
||||
return ConnectionPool.new(:timeout => 1, :size => Sidekiq.options[:concurrency] || 25) { client } unless options[:use_pool] == false
|
||||
client
|
||||
end
|
||||
|
||||
|
|
|
@ -1 +1,3 @@
|
|||
Sidekiq.redis = Sidekiq::RedisConnection.create(:namespace => 'resque', :size => 5)
|
||||
Sidekiq.configure do |config|
|
||||
config.redis = Sidekiq::RedisConnection.create(:namespace => 'resque', :size => 5)
|
||||
end
|
||||
|
|
|
@ -22,17 +22,17 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
|
||||
it 'changes concurrency' do
|
||||
@cli.parse(['sidekiq', '-c', '60', '-r', './test/fake_env.rb'])
|
||||
assert_equal 60, @cli.options[:concurrency]
|
||||
assert_equal 60, Sidekiq.options[:concurrency]
|
||||
end
|
||||
|
||||
it 'changes queues' do
|
||||
@cli.parse(['sidekiq', '-q', 'foo', '-r', './test/fake_env.rb'])
|
||||
assert_equal ['foo'], @cli.options[:queues]
|
||||
assert_equal ['foo'], Sidekiq.options[:queues]
|
||||
end
|
||||
|
||||
it 'handles weights' do
|
||||
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
|
||||
assert_equal %w(bar foo foo foo), @cli.options[:queues].sort
|
||||
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
|
||||
end
|
||||
|
||||
describe 'with pidfile' do
|
||||
|
@ -49,7 +49,7 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'sets pidfile path' do
|
||||
assert_equal @tmp_path, @cli.options[:pidfile]
|
||||
assert_equal @tmp_path, Sidekiq.options[:pidfile]
|
||||
end
|
||||
|
||||
it 'writes pidfile' do
|
||||
|
@ -63,36 +63,36 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'takes a path' do
|
||||
assert_equal './test/config.yml', @cli.options[:config_file]
|
||||
assert_equal './test/config.yml', Sidekiq.options[:config_file]
|
||||
end
|
||||
|
||||
it 'sets verbose' do
|
||||
refute @cli.options[:verbose]
|
||||
refute Sidekiq.options[:verbose]
|
||||
end
|
||||
|
||||
|
||||
it 'sets namespace' do
|
||||
assert_equal "test_namespace", @cli.options[:namespace]
|
||||
assert_equal "test_namespace", Sidekiq.options[:namespace]
|
||||
end
|
||||
|
||||
it 'sets require file' do
|
||||
assert_equal './test/fake_env.rb', @cli.options[:require]
|
||||
assert_equal './test/fake_env.rb', Sidekiq.options[:require]
|
||||
end
|
||||
|
||||
it 'sets environment' do
|
||||
assert_equal 'xzibit', @cli.options[:environment]
|
||||
assert_equal 'xzibit', Sidekiq.options[:environment]
|
||||
end
|
||||
|
||||
it 'sets concurrency' do
|
||||
assert_equal 50, @cli.options[:concurrency]
|
||||
assert_equal 50, Sidekiq.options[:concurrency]
|
||||
end
|
||||
|
||||
it 'sets pid file' do
|
||||
assert_equal '/tmp/sidekiq-config-test.pid', @cli.options[:pidfile]
|
||||
assert_equal '/tmp/sidekiq-config-test.pid', Sidekiq.options[:pidfile]
|
||||
end
|
||||
|
||||
it 'sets queues' do
|
||||
assert_equal 2, @cli.options[:queues].select{ |q| q == 'often' }.length
|
||||
assert_equal 1, @cli.options[:queues].select{ |q| q == 'seldom' }.length
|
||||
assert_equal 2, Sidekiq.options[:queues].select{ |q| q == 'often' }.length
|
||||
assert_equal 1, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -125,28 +125,28 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'uses concurrency flag' do
|
||||
assert_equal 100, @cli.options[:concurrency]
|
||||
assert_equal 100, Sidekiq.options[:concurrency]
|
||||
end
|
||||
|
||||
|
||||
it 'uses namespace flag' do
|
||||
assert_equal "sweet_story_bro", @cli.options[:namespace]
|
||||
assert_equal "sweet_story_bro", Sidekiq.options[:namespace]
|
||||
end
|
||||
|
||||
it 'uses require file flag' do
|
||||
assert_equal @tmp_lib_path, @cli.options[:require]
|
||||
assert_equal @tmp_lib_path, Sidekiq.options[:require]
|
||||
end
|
||||
|
||||
it 'uses environment flag' do
|
||||
assert_equal 'snoop', @cli.options[:environment]
|
||||
assert_equal 'snoop', Sidekiq.options[:environment]
|
||||
end
|
||||
|
||||
it 'uses pidfile flag' do
|
||||
assert_equal @tmp_path, @cli.options[:pidfile]
|
||||
assert_equal @tmp_path, Sidekiq.options[:pidfile]
|
||||
end
|
||||
|
||||
it 'sets queues' do
|
||||
assert_equal 7, @cli.options[:queues].select{ |q| q == 'often' }.length
|
||||
assert_equal 3, @cli.options[:queues].select{ |q| q == 'seldom' }.length
|
||||
assert_equal 7, Sidekiq.options[:queues].select{ |q| q == 'often' }.length
|
||||
assert_equal 3, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -1,7 +1,10 @@
|
|||
require 'helper'
|
||||
require 'sidekiq'
|
||||
require 'active_record'
|
||||
require 'action_mailer'
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/extensions/action_mailer'
|
||||
require 'sidekiq/extensions/active_record'
|
||||
|
||||
|
||||
class TestExtensions < MiniTest::Unit::TestCase
|
||||
describe 'sidekiq extensions' do
|
||||
|
@ -16,7 +19,7 @@ class TestExtensions < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'allowed delayed exection of ActiveRecord class methods' do
|
||||
it 'allows delayed exection of ActiveRecord class methods' do
|
||||
@redis.expect(:rpush, @redis, ['queue:default', "{\"class\":\"Sidekiq::Extensions::DelayedModel\",\"args\":[\"---\\n- !ruby/class 'TestExtensions::MyModel'\\n- :long_class_method\\n- []\\n\"]}"])
|
||||
MyModel.delay.long_class_method
|
||||
@redis.verify
|
||||
|
@ -32,7 +35,7 @@ class TestExtensions < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'allowed delayed delivery of ActionMailer mails' do
|
||||
it 'allows delayed delivery of ActionMailer mails' do
|
||||
@redis.expect(:rpush, @redis, ['queue:default', "{\"class\":\"Sidekiq::Extensions::DelayedMailer\",\"args\":[\"---\\n- !ruby/class 'TestExtensions::UserMailer'\\n- :greetings\\n- - 1\\n - 2\\n\"]}"])
|
||||
UserMailer.delay.greetings(1, 2)
|
||||
@redis.verify
|
||||
|
|
Loading…
Add table
Reference in a new issue