diff --git a/Changes.md b/Changes.md index 28eeadf3..0393fa0c 100644 --- a/Changes.md +++ b/Changes.md @@ -1,3 +1,12 @@ +HEAD +----------- + +- TERM shutdown timeout is now configurable, defaults to 5 seconds. (mperham) +- USR1 signal now stops Sidekiq from accepting new work, + capistrano sends USR1 at start of deploy and TERM at end of deploy + giving workers the maximum amount of time to finish. (mperham) +- New Sidekiq::Web rack application available (mperham) + 0.8.0 ----------- diff --git a/bin/sidekiq b/bin/sidekiq index 07bb3de5..321d968c 100755 --- a/bin/sidekiq +++ b/bin/sidekiq @@ -10,7 +10,7 @@ rescue LoadError end begin - cli = Sidekiq::CLI.new + cli = Sidekiq::CLI.instance cli.parse cli.run rescue => e diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 9e5a5773..899d8ffb 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -14,6 +14,7 @@ module Sidekiq :concurrency => 25, :require => '.', :environment => nil, + :timeout => 5, } def self.options diff --git a/lib/sidekiq/capistrano.rb b/lib/sidekiq/capistrano.rb index 582202c4..a54de306 100644 --- a/lib/sidekiq/capistrano.rb +++ b/lib/sidekiq/capistrano.rb @@ -1,16 +1,19 @@ Capistrano::Configuration.instance.load do + before "deploy", "sidekiq:quiet" after "deploy", "sidekiq:restart" + _cset(:sidekiq_timeout) { 5 } + namespace :sidekiq do - desc "Force stop sidekiq" - task :kill do - run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep 5 && kill -9 `cat tmp/pids/sidekiq.pid`" + desc "Quiet sidekiq (stop accepting new work)" + task :quiet do + run "cd #{current_path} && kill -USR1 `cat tmp/pids/sidekiq.pid`" end desc "Stop sidekiq" task :stop do - run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid`" + run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep #{fetch :sidekiq_timeout} && kill -9 `cat tmp/pids/sidekiq.pid` && rm tmp/pids/sidekiq.pid" end desc "Start sidekiq" diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 28aedf61..578cecf8 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -9,7 +9,14 @@ trap 'TERM' do Thread.main.raise Interrupt end +trap 'USR1' do + Sidekiq::Util.logger.info "Received USR1, no longer accepting new work" + mgr = Sidekiq::CLI.instance.manager + mgr.stop! if mgr +end + require 'yaml' +require 'singleton' require 'optparse' require 'sidekiq' require 'sidekiq/util' @@ -18,40 +25,40 @@ require 'sidekiq/manager' module Sidekiq class CLI include Util + include Singleton # Used for CLI testing - attr_accessor :code + attr_accessor :code, :manager def initialize @code = nil + @manager = nil end def parse(args=ARGV) + @code = nil Sidekiq::Util.logger cli = parse_options(args) config = parse_config(cli) options.merge!(config.merge(cli)) - set_logger_level_to_debug if options[:verbose] + Sidekiq::Util.logger.level = Logger::DEBUG if options[:verbose] - write_pid validate! + write_pid boot_system end def run - manager = Sidekiq::Manager.new(options) + @manager = Sidekiq::Manager.new(options) begin logger.info 'Starting processing, hit Ctrl-C to stop' manager.start! - # HACK need to determine how to pause main thread while - # waiting for signals. sleep rescue Interrupt - # TODO Need clean shutdown support from Celluloid logger.info 'Shutting down' - manager.stop! + manager.stop!(:shutdown => true, :timeout => options[:timeout]) manager.wait(:shutdown) end end @@ -111,18 +118,14 @@ module Sidekiq set_logger_level_to_debug end - o.on "-n", "--namespace NAMESPACE", "namespace worker queues are under (DEPRECATED)" do |arg| - puts "Namespace support has been removed, see https://github.com/mperham/sidekiq/issues/61" - end - - o.on "-s", "--server LOCATION", "Where to find Redis (DEPRECATED)" do |arg| - puts "Server location support has been removed, see https://github.com/mperham/sidekiq/issues/61" - end - o.on '-e', '--environment ENV', "Application environment" do |arg| opts[:environment] = arg end + o.on '-t', '--timeout NUM', "Shutdown timeout" do |arg| + opts[:timeout] = arg.to_i + end + o.on '-r', '--require [PATH|DIR]', "Location of Rails application with workers or file to require" do |arg| opts[:require] = arg end @@ -173,9 +176,5 @@ module Sidekiq end end - def set_logger_level_to_debug - Sidekiq::Util.logger.level = Logger::DEBUG - end - end end diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 6e6bd506..8ec00328 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -30,7 +30,10 @@ module Sidekiq @ready = @count.times.map { Processor.new_link(current_actor) } end - def stop + def stop(options={}) + shutdown = options[:shutdown] + timeout = options[:timeout] + @done = true @ready.each(&:terminate) @ready.clear @@ -42,21 +45,25 @@ module Sidekiq end end - if @busy.empty? - return signal(:shutdown) - end + if shutdown + if @busy.empty? + # after(0) needed to avoid deadlock in Celluoid after USR1 + TERM + return after(0) { signal(:shutdown) } + else + logger.info { "Pausing #{timeout} seconds to allow workers to finish..." } + end - logger.info("Pausing 5 seconds to allow workers to finish...") - after(5) do - @busy.each(&:terminate) - redis.with_connection do |conn| - conn.multi do - @busy.each do |busy| - conn.lpush("queue:#{busy.queue}", busy.msg) + after(timeout) do + @busy.each(&:terminate) + redis.with_connection do |conn| + conn.multi do + @busy.each do |busy| + conn.lpush("queue:#{busy.queue}", busy.msg) + end end end + signal(:shutdown) end - signal(:shutdown) end end diff --git a/test/test_cli.rb b/test/test_cli.rb index 5ce8ec54..b2cdf31e 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -2,10 +2,20 @@ require 'helper' require 'sidekiq/cli' require 'tempfile' +cli = Sidekiq::CLI.instance +def cli.die(code) + @code = code +end + +def cli.valid? + !@code +end + class TestCli < MiniTest::Unit::TestCase describe 'with cli' do + before do - @cli = new_cli + @cli = Sidekiq::CLI.instance end it 'blows up with an invalid require' do @@ -14,7 +24,7 @@ class TestCli < MiniTest::Unit::TestCase end end - it 'blows up with invalid Ruby' do + it 'requires the specified Ruby code' do @cli.parse(['sidekiq', '-r', './test/fake_env.rb']) assert($LOADED_FEATURES.any? { |x| x =~ /fake_env/ }) assert @cli.valid? @@ -30,6 +40,11 @@ class TestCli < MiniTest::Unit::TestCase assert_equal ['foo'], Sidekiq.options[:queues] end + it 'changes timeout' do + @cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb']) + assert_equal 30, Sidekiq.options[:timeout] + end + it 'handles weights' do @cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb']) assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort @@ -140,17 +155,6 @@ class TestCli < MiniTest::Unit::TestCase assert_equal 3, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length end end - - def new_cli - cli = Sidekiq::CLI.new - def cli.die(code) - @code = code - end - - def cli.valid? - !@code - end - cli - end end + end