mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Implement USR1 - stop accepting new work, GH-69
This commit is contained in:
parent
16556c6cf9
commit
be1ef5736e
7 changed files with 74 additions and 51 deletions
|
@ -1,3 +1,12 @@
|
||||||
|
HEAD
|
||||||
|
-----------
|
||||||
|
|
||||||
|
- TERM shutdown timeout is now configurable, defaults to 5 seconds. (mperham)
|
||||||
|
- USR1 signal now stops Sidekiq from accepting new work,
|
||||||
|
capistrano sends USR1 at start of deploy and TERM at end of deploy
|
||||||
|
giving workers the maximum amount of time to finish. (mperham)
|
||||||
|
- New Sidekiq::Web rack application available (mperham)
|
||||||
|
|
||||||
0.8.0
|
0.8.0
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,7 @@ rescue LoadError
|
||||||
end
|
end
|
||||||
|
|
||||||
begin
|
begin
|
||||||
cli = Sidekiq::CLI.new
|
cli = Sidekiq::CLI.instance
|
||||||
cli.parse
|
cli.parse
|
||||||
cli.run
|
cli.run
|
||||||
rescue => e
|
rescue => e
|
||||||
|
|
|
@ -14,6 +14,7 @@ module Sidekiq
|
||||||
:concurrency => 25,
|
:concurrency => 25,
|
||||||
:require => '.',
|
:require => '.',
|
||||||
:environment => nil,
|
:environment => nil,
|
||||||
|
:timeout => 5,
|
||||||
}
|
}
|
||||||
|
|
||||||
def self.options
|
def self.options
|
||||||
|
|
|
@ -1,16 +1,19 @@
|
||||||
Capistrano::Configuration.instance.load do
|
Capistrano::Configuration.instance.load do
|
||||||
|
before "deploy", "sidekiq:quiet"
|
||||||
after "deploy", "sidekiq:restart"
|
after "deploy", "sidekiq:restart"
|
||||||
|
|
||||||
|
_cset(:sidekiq_timeout) { 5 }
|
||||||
|
|
||||||
namespace :sidekiq do
|
namespace :sidekiq do
|
||||||
|
|
||||||
desc "Force stop sidekiq"
|
desc "Quiet sidekiq (stop accepting new work)"
|
||||||
task :kill do
|
task :quiet do
|
||||||
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep 5 && kill -9 `cat tmp/pids/sidekiq.pid`"
|
run "cd #{current_path} && kill -USR1 `cat tmp/pids/sidekiq.pid`"
|
||||||
end
|
end
|
||||||
|
|
||||||
desc "Stop sidekiq"
|
desc "Stop sidekiq"
|
||||||
task :stop do
|
task :stop do
|
||||||
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid`"
|
run "cd #{current_path} && kill `cat tmp/pids/sidekiq.pid` && sleep #{fetch :sidekiq_timeout} && kill -9 `cat tmp/pids/sidekiq.pid` && rm tmp/pids/sidekiq.pid"
|
||||||
end
|
end
|
||||||
|
|
||||||
desc "Start sidekiq"
|
desc "Start sidekiq"
|
||||||
|
|
|
@ -9,7 +9,14 @@ trap 'TERM' do
|
||||||
Thread.main.raise Interrupt
|
Thread.main.raise Interrupt
|
||||||
end
|
end
|
||||||
|
|
||||||
|
trap 'USR1' do
|
||||||
|
Sidekiq::Util.logger.info "Received USR1, no longer accepting new work"
|
||||||
|
mgr = Sidekiq::CLI.instance.manager
|
||||||
|
mgr.stop! if mgr
|
||||||
|
end
|
||||||
|
|
||||||
require 'yaml'
|
require 'yaml'
|
||||||
|
require 'singleton'
|
||||||
require 'optparse'
|
require 'optparse'
|
||||||
require 'sidekiq'
|
require 'sidekiq'
|
||||||
require 'sidekiq/util'
|
require 'sidekiq/util'
|
||||||
|
@ -18,40 +25,40 @@ require 'sidekiq/manager'
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
class CLI
|
class CLI
|
||||||
include Util
|
include Util
|
||||||
|
include Singleton
|
||||||
|
|
||||||
# Used for CLI testing
|
# Used for CLI testing
|
||||||
attr_accessor :code
|
attr_accessor :code, :manager
|
||||||
|
|
||||||
def initialize
|
def initialize
|
||||||
@code = nil
|
@code = nil
|
||||||
|
@manager = nil
|
||||||
end
|
end
|
||||||
|
|
||||||
def parse(args=ARGV)
|
def parse(args=ARGV)
|
||||||
|
@code = nil
|
||||||
Sidekiq::Util.logger
|
Sidekiq::Util.logger
|
||||||
|
|
||||||
cli = parse_options(args)
|
cli = parse_options(args)
|
||||||
config = parse_config(cli)
|
config = parse_config(cli)
|
||||||
options.merge!(config.merge(cli))
|
options.merge!(config.merge(cli))
|
||||||
|
|
||||||
set_logger_level_to_debug if options[:verbose]
|
Sidekiq::Util.logger.level = Logger::DEBUG if options[:verbose]
|
||||||
|
|
||||||
write_pid
|
|
||||||
validate!
|
validate!
|
||||||
|
write_pid
|
||||||
boot_system
|
boot_system
|
||||||
end
|
end
|
||||||
|
|
||||||
def run
|
def run
|
||||||
manager = Sidekiq::Manager.new(options)
|
@manager = Sidekiq::Manager.new(options)
|
||||||
begin
|
begin
|
||||||
logger.info 'Starting processing, hit Ctrl-C to stop'
|
logger.info 'Starting processing, hit Ctrl-C to stop'
|
||||||
manager.start!
|
manager.start!
|
||||||
# HACK need to determine how to pause main thread while
|
|
||||||
# waiting for signals.
|
|
||||||
sleep
|
sleep
|
||||||
rescue Interrupt
|
rescue Interrupt
|
||||||
# TODO Need clean shutdown support from Celluloid
|
|
||||||
logger.info 'Shutting down'
|
logger.info 'Shutting down'
|
||||||
manager.stop!
|
manager.stop!(:shutdown => true, :timeout => options[:timeout])
|
||||||
manager.wait(:shutdown)
|
manager.wait(:shutdown)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -111,18 +118,14 @@ module Sidekiq
|
||||||
set_logger_level_to_debug
|
set_logger_level_to_debug
|
||||||
end
|
end
|
||||||
|
|
||||||
o.on "-n", "--namespace NAMESPACE", "namespace worker queues are under (DEPRECATED)" do |arg|
|
|
||||||
puts "Namespace support has been removed, see https://github.com/mperham/sidekiq/issues/61"
|
|
||||||
end
|
|
||||||
|
|
||||||
o.on "-s", "--server LOCATION", "Where to find Redis (DEPRECATED)" do |arg|
|
|
||||||
puts "Server location support has been removed, see https://github.com/mperham/sidekiq/issues/61"
|
|
||||||
end
|
|
||||||
|
|
||||||
o.on '-e', '--environment ENV', "Application environment" do |arg|
|
o.on '-e', '--environment ENV', "Application environment" do |arg|
|
||||||
opts[:environment] = arg
|
opts[:environment] = arg
|
||||||
end
|
end
|
||||||
|
|
||||||
|
o.on '-t', '--timeout NUM', "Shutdown timeout" do |arg|
|
||||||
|
opts[:timeout] = arg.to_i
|
||||||
|
end
|
||||||
|
|
||||||
o.on '-r', '--require [PATH|DIR]', "Location of Rails application with workers or file to require" do |arg|
|
o.on '-r', '--require [PATH|DIR]', "Location of Rails application with workers or file to require" do |arg|
|
||||||
opts[:require] = arg
|
opts[:require] = arg
|
||||||
end
|
end
|
||||||
|
@ -173,9 +176,5 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def set_logger_level_to_debug
|
|
||||||
Sidekiq::Util.logger.level = Logger::DEBUG
|
|
||||||
end
|
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -30,7 +30,10 @@ module Sidekiq
|
||||||
@ready = @count.times.map { Processor.new_link(current_actor) }
|
@ready = @count.times.map { Processor.new_link(current_actor) }
|
||||||
end
|
end
|
||||||
|
|
||||||
def stop
|
def stop(options={})
|
||||||
|
shutdown = options[:shutdown]
|
||||||
|
timeout = options[:timeout]
|
||||||
|
|
||||||
@done = true
|
@done = true
|
||||||
@ready.each(&:terminate)
|
@ready.each(&:terminate)
|
||||||
@ready.clear
|
@ready.clear
|
||||||
|
@ -42,12 +45,15 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
if shutdown
|
||||||
if @busy.empty?
|
if @busy.empty?
|
||||||
return signal(:shutdown)
|
# after(0) needed to avoid deadlock in Celluoid after USR1 + TERM
|
||||||
|
return after(0) { signal(:shutdown) }
|
||||||
|
else
|
||||||
|
logger.info { "Pausing #{timeout} seconds to allow workers to finish..." }
|
||||||
end
|
end
|
||||||
|
|
||||||
logger.info("Pausing 5 seconds to allow workers to finish...")
|
after(timeout) do
|
||||||
after(5) do
|
|
||||||
@busy.each(&:terminate)
|
@busy.each(&:terminate)
|
||||||
redis.with_connection do |conn|
|
redis.with_connection do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
|
@ -59,6 +65,7 @@ module Sidekiq
|
||||||
signal(:shutdown)
|
signal(:shutdown)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
end
|
||||||
|
|
||||||
def start
|
def start
|
||||||
dispatch(true)
|
dispatch(true)
|
||||||
|
|
|
@ -2,10 +2,20 @@ require 'helper'
|
||||||
require 'sidekiq/cli'
|
require 'sidekiq/cli'
|
||||||
require 'tempfile'
|
require 'tempfile'
|
||||||
|
|
||||||
|
cli = Sidekiq::CLI.instance
|
||||||
|
def cli.die(code)
|
||||||
|
@code = code
|
||||||
|
end
|
||||||
|
|
||||||
|
def cli.valid?
|
||||||
|
!@code
|
||||||
|
end
|
||||||
|
|
||||||
class TestCli < MiniTest::Unit::TestCase
|
class TestCli < MiniTest::Unit::TestCase
|
||||||
describe 'with cli' do
|
describe 'with cli' do
|
||||||
|
|
||||||
before do
|
before do
|
||||||
@cli = new_cli
|
@cli = Sidekiq::CLI.instance
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'blows up with an invalid require' do
|
it 'blows up with an invalid require' do
|
||||||
|
@ -14,7 +24,7 @@ class TestCli < MiniTest::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'blows up with invalid Ruby' do
|
it 'requires the specified Ruby code' do
|
||||||
@cli.parse(['sidekiq', '-r', './test/fake_env.rb'])
|
@cli.parse(['sidekiq', '-r', './test/fake_env.rb'])
|
||||||
assert($LOADED_FEATURES.any? { |x| x =~ /fake_env/ })
|
assert($LOADED_FEATURES.any? { |x| x =~ /fake_env/ })
|
||||||
assert @cli.valid?
|
assert @cli.valid?
|
||||||
|
@ -30,6 +40,11 @@ class TestCli < MiniTest::Unit::TestCase
|
||||||
assert_equal ['foo'], Sidekiq.options[:queues]
|
assert_equal ['foo'], Sidekiq.options[:queues]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it 'changes timeout' do
|
||||||
|
@cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb'])
|
||||||
|
assert_equal 30, Sidekiq.options[:timeout]
|
||||||
|
end
|
||||||
|
|
||||||
it 'handles weights' do
|
it 'handles weights' do
|
||||||
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
|
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
|
||||||
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
|
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
|
||||||
|
@ -140,17 +155,6 @@ class TestCli < MiniTest::Unit::TestCase
|
||||||
assert_equal 3, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length
|
assert_equal 3, Sidekiq.options[:queues].select{ |q| q == 'seldom' }.length
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def new_cli
|
|
||||||
cli = Sidekiq::CLI.new
|
|
||||||
def cli.die(code)
|
|
||||||
@code = code
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def cli.valid?
|
|
||||||
!@code
|
|
||||||
end
|
|
||||||
cli
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
Loading…
Add table
Reference in a new issue