mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Implement USR1 - stop accepting new work, GH-69
This commit is contained in:
parent
16556c6cf9
commit
be1ef5736e
7 changed files with 74 additions and 51 deletions
|
@ -1,3 +1,12 @@
|
|||
HEAD
|
||||
-----------
|
||||
|
||||
- TERM shutdown timeout is now configurable, defaults to 5 seconds. (mperham)
|
||||
- USR1 signal now stops Sidekiq from accepting new work,
|
||||
capistrano sends USR1 at start of deploy and TERM at end of deploy
|
||||
giving workers the maximum amount of time to finish. (mperham)
|
||||
- New Sidekiq::Web rack application available (mperham)
|
||||
|
||||
0.8.0
|
||||
-----------
|
||||
|
||||
|
|
|
@ -10,7 +10,7 @@ rescue LoadError
|
|||
end
|
||||
|
||||
begin
|
||||
cli = Sidekiq::CLI.new
|
||||
cli = Sidekiq::CLI.instance
|
||||
cli.parse
|
||||
cli.run
|
||||
rescue => e
|
||||
|
|
|
@ -14,6 +14,7 @@ module Sidekiq
|
|||
:concurrency => 25,
|
||||
:require => '.',
|
||||
:environment => nil,
|
||||
:timeout => 5,
|
||||
}
|
||||
|
||||
def self.options
|
||||
|
|
|
@ -1,16 +1,19 @@
|
|||
Capistrano::Configuration.instance.load do
|
||||
before "deploy", "sidekiq:quiet"
|
||||
after "deploy", "sidekiq:restart"
|
||||
|
||||
_cset(:sidekiq_timeout) { 5 }
|
||||
|
||||
namespace :sidekiq do
|
||||
|
||||
desc "Force stop sidekiq"
|
||||
task :kill do
|
||||
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep 5 && kill -9 `cat tmp/pids/sidekiq.pid`"
|
||||
desc "Quiet sidekiq (stop accepting new work)"
|
||||
task :quiet do
|
||||
run "cd #{current_path} && kill -USR1 `cat tmp/pids/sidekiq.pid`"
|
||||
end
|
||||
|
||||
desc "Stop sidekiq"
|
||||
task :stop do
|
||||
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid`"
|
||||
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep #{fetch :sidekiq_timeout} && kill -9 `cat tmp/pids/sidekiq.pid` && rm tmp/pids/sidekiq.pid"
|
||||
end
|
||||
|
||||
desc "Start sidekiq"
|
||||
|
|
|
@ -9,7 +9,14 @@ trap 'TERM' do
|
|||
Thread.main.raise Interrupt
|
||||
end
|
||||
|
||||
trap 'USR1' do
|
||||
Sidekiq::Util.logger.info "Received USR1, no longer accepting new work"
|
||||
mgr = Sidekiq::CLI.instance.manager
|
||||
mgr.stop! if mgr
|
||||
end
|
||||
|
||||
require 'yaml'
|
||||
require 'singleton'
|
||||
require 'optparse'
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
|
@ -18,40 +25,40 @@ require 'sidekiq/manager'
|
|||
module Sidekiq
|
||||
class CLI
|
||||
include Util
|
||||
include Singleton
|
||||
|
||||
# Used for CLI testing
|
||||
attr_accessor :code
|
||||
attr_accessor :code, :manager
|
||||
|
||||
def initialize
|
||||
@code = nil
|
||||
@manager = nil
|
||||
end
|
||||
|
||||
def parse(args=ARGV)
|
||||
@code = nil
|
||||
Sidekiq::Util.logger
|
||||
|
||||
cli = parse_options(args)
|
||||
config = parse_config(cli)
|
||||
options.merge!(config.merge(cli))
|
||||
|
||||
set_logger_level_to_debug if options[:verbose]
|
||||
Sidekiq::Util.logger.level = Logger::DEBUG if options[:verbose]
|
||||
|
||||
write_pid
|
||||
validate!
|
||||
write_pid
|
||||
boot_system
|
||||
end
|
||||
|
||||
def run
|
||||
manager = Sidekiq::Manager.new(options)
|
||||
@manager = Sidekiq::Manager.new(options)
|
||||
begin
|
||||
logger.info 'Starting processing, hit Ctrl-C to stop'
|
||||
manager.start!
|
||||
# HACK need to determine how to pause main thread while
|
||||
# waiting for signals.
|
||||
sleep
|
||||
rescue Interrupt
|
||||
# TODO Need clean shutdown support from Celluloid
|
||||
logger.info 'Shutting down'
|
||||
manager.stop!
|
||||
manager.stop!(:shutdown => true, :timeout => options[:timeout])
|
||||
manager.wait(:shutdown)
|
||||
end
|
||||
end
|
||||
|
@ -111,18 +118,14 @@ module Sidekiq
|
|||
set_logger_level_to_debug
|
||||
end
|
||||
|
||||
o.on "-n", "--namespace NAMESPACE", "namespace worker queues are under (DEPRECATED)" do |arg|
|
||||
puts "Namespace support has been removed, see https://github.com/mperham/sidekiq/issues/61"
|
||||
end
|
||||
|
||||
o.on "-s", "--server LOCATION", "Where to find Redis (DEPRECATED)" do |arg|
|
||||
puts "Server location support has been removed, see https://github.com/mperham/sidekiq/issues/61"
|
||||
end
|
||||
|
||||
o.on '-e', '--environment ENV', "Application environment" do |arg|
|
||||
opts[:environment] = arg
|
||||
end
|
||||
|
||||
o.on '-t', '--timeout NUM', "Shutdown timeout" do |arg|
|
||||
opts[:timeout] = arg.to_i
|
||||
end
|
||||
|
||||
o.on '-r', '--require [PATH|DIR]', "Location of Rails application with workers or file to require" do |arg|
|
||||
opts[:require] = arg
|
||||
end
|
||||
|
@ -173,9 +176,5 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def set_logger_level_to_debug
|
||||
Sidekiq::Util.logger.level = Logger::DEBUG
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -30,7 +30,10 @@ module Sidekiq
|
|||
@ready = @count.times.map { Processor.new_link(current_actor) }
|
||||
end
|
||||
|
||||
def stop
|
||||
def stop(options={})
|
||||
shutdown = options[:shutdown]
|
||||
timeout = options[:timeout]
|
||||
|
||||
@done = true
|
||||
@ready.each(&:terminate)
|
||||
@ready.clear
|
||||
|
@ -42,21 +45,25 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
if @busy.empty?
|
||||
return signal(:shutdown)
|
||||
end
|
||||
if shutdown
|
||||
if @busy.empty?
|
||||
# after(0) needed to avoid deadlock in Celluoid after USR1 + TERM
|
||||
return after(0) { signal(:shutdown) }
|
||||
else
|
||||
logger.info { "Pausing #{timeout} seconds to allow workers to finish..." }
|
||||
end
|
||||
|
||||
logger.info("Pausing 5 seconds to allow workers to finish...")
|
||||
after(5) do
|
||||
@busy.each(&:terminate)
|
||||
redis.with_connection do |conn|
|
||||
conn.multi do
|
||||
@busy.each do |busy|
|
||||
conn.lpush("queue:#{busy.queue}", busy.msg)
|
||||
after(timeout) do
|
||||
@busy.each(&:terminate)
|
||||
redis.with_connection do |conn|
|
||||
conn.multi do
|
||||
@busy.each do |busy|
|
||||
conn.lpush("queue:#{busy.queue}", busy.msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
signal(:shutdown)
|
||||
end
|
||||
signal(:shutdown)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -2,10 +2,20 @@ require 'helper'
|
|||
require 'sidekiq/cli'
|
||||
require 'tempfile'
|
||||
|
||||
cli = Sidekiq::CLI.instance
|
||||
def cli.die(code)
|
||||
@code = code
|
||||
end
|
||||
|
||||
def cli.valid?
|
||||
!@code
|
||||
end
|
||||
|
||||
class TestCli < MiniTest::Unit::TestCase
|
||||
describe 'with cli' do
|
||||
|
||||
before do
|
||||
@cli = new_cli
|
||||
@cli = Sidekiq::CLI.instance
|
||||
end
|
||||
|
||||
it 'blows up with an invalid require' do
|
||||
|
@ -14,7 +24,7 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'blows up with invalid Ruby' do
|
||||
it 'requires the specified Ruby code' do
|
||||
@cli.parse(['sidekiq', '-r', './test/fake_env.rb'])
|
||||
assert($LOADED_FEATURES.any? { |x| x =~ /fake_env/ })
|
||||
assert @cli.valid?
|
||||
|
@ -30,6 +40,11 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
assert_equal ['foo'], Sidekiq.options[:queues]
|
||||
end
|
||||
|
||||
it 'changes timeout' do
|
||||
@cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb'])
|
||||
assert_equal 30, Sidekiq.options[:timeout]
|
||||
end
|
||||
|
||||
it 'handles weights' do
|
||||
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
|
||||
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
|
||||
|
@ -140,17 +155,6 @@ class TestCli < MiniTest::Unit::TestCase
|
|||
assert_equal 3, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length
|
||||
end
|
||||
end
|
||||
|
||||
def new_cli
|
||||
cli = Sidekiq::CLI.new
|
||||
def cli.die(code)
|
||||
@code = code
|
||||
end
|
||||
|
||||
def cli.valid?
|
||||
!@code
|
||||
end
|
||||
cli
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue