Merge branch '40396-remove-unneeded-restart-code' into 'master'
Remove an obsolete workaround for GRPC unavailable errors See merge request gitlab-org/gitlab-ce!25645
This commit is contained in:
commit
3b5ecd35ec
6 changed files with 133 additions and 247 deletions
|
@ -22,7 +22,7 @@ Sidekiq.configure_server do |config|
|
||||||
|
|
||||||
config.server_middleware do |chain|
|
config.server_middleware do |chain|
|
||||||
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
|
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
|
||||||
chain.add Gitlab::SidekiqMiddleware::Shutdown
|
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
|
||||||
chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0'
|
chain.add Gitlab::SidekiqMiddleware::RequestStoreMiddleware unless ENV['SIDEKIQ_REQUEST_STORE'] == '0'
|
||||||
chain.add Gitlab::SidekiqMiddleware::BatchLoader
|
chain.add Gitlab::SidekiqMiddleware::BatchLoader
|
||||||
chain.add Gitlab::SidekiqMiddleware::CorrelationLogger
|
chain.add Gitlab::SidekiqMiddleware::CorrelationLogger
|
||||||
|
|
|
@ -164,8 +164,6 @@ module Gitlab
|
||||||
kwargs = yield(kwargs) if block_given?
|
kwargs = yield(kwargs) if block_given?
|
||||||
|
|
||||||
stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend
|
stub(service, storage).__send__(rpc, request, kwargs) # rubocop:disable GitlabSecurity/PublicSend
|
||||||
rescue GRPC::Unavailable => ex
|
|
||||||
handle_grpc_unavailable!(ex)
|
|
||||||
ensure
|
ensure
|
||||||
duration = Gitlab::Metrics::System.monotonic_time - start
|
duration = Gitlab::Metrics::System.monotonic_time - start
|
||||||
|
|
||||||
|
@ -178,27 +176,6 @@ module Gitlab
|
||||||
add_call_details(feature: "#{service}##{rpc}", duration: duration, request: request_hash, rpc: rpc)
|
add_call_details(feature: "#{service}##{rpc}", duration: duration, request: request_hash, rpc: rpc)
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.handle_grpc_unavailable!(ex)
|
|
||||||
status = ex.to_status
|
|
||||||
raise ex unless status.details == 'Endpoint read failed'
|
|
||||||
|
|
||||||
# There is a bug in grpc 1.8.x that causes a client process to get stuck
|
|
||||||
# always raising '14:Endpoint read failed'. The only thing that we can
|
|
||||||
# do to recover is to restart the process.
|
|
||||||
#
|
|
||||||
# See https://gitlab.com/gitlab-org/gitaly/issues/1029
|
|
||||||
|
|
||||||
if Sidekiq.server?
|
|
||||||
raise Gitlab::SidekiqMiddleware::Shutdown::WantShutdown.new(ex.to_s)
|
|
||||||
else
|
|
||||||
# SIGQUIT requests a Unicorn worker to shut down gracefully after the current request.
|
|
||||||
Process.kill('QUIT', Process.pid)
|
|
||||||
end
|
|
||||||
|
|
||||||
raise ex
|
|
||||||
end
|
|
||||||
private_class_method :handle_grpc_unavailable!
|
|
||||||
|
|
||||||
def self.current_transaction_labels
|
def self.current_transaction_labels
|
||||||
Gitlab::Metrics::Transaction.current&.labels || {}
|
Gitlab::Metrics::Transaction.current&.labels || {}
|
||||||
end
|
end
|
||||||
|
|
69
lib/gitlab/sidekiq_middleware/memory_killer.rb
Normal file
69
lib/gitlab/sidekiq_middleware/memory_killer.rb
Normal file
|
@ -0,0 +1,69 @@
|
||||||
|
# frozen_string_literal: true
|
||||||
|
|
||||||
|
module Gitlab
|
||||||
|
module SidekiqMiddleware
|
||||||
|
class MemoryKiller
|
||||||
|
# Default the RSS limit to 0, meaning the MemoryKiller is disabled
|
||||||
|
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
|
||||||
|
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
|
||||||
|
GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
|
||||||
|
# Wait 30 seconds for running jobs to finish during graceful shutdown
|
||||||
|
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
|
||||||
|
|
||||||
|
# Create a mutex used to ensure there will be only one thread waiting to
|
||||||
|
# shut Sidekiq down
|
||||||
|
MUTEX = Mutex.new
|
||||||
|
|
||||||
|
def call(worker, job, queue)
|
||||||
|
yield
|
||||||
|
|
||||||
|
current_rss = get_rss
|
||||||
|
|
||||||
|
return unless MAX_RSS > 0 && current_rss > MAX_RSS
|
||||||
|
|
||||||
|
Thread.new do
|
||||||
|
# Return if another thread is already waiting to shut Sidekiq down
|
||||||
|
next unless MUTEX.try_lock
|
||||||
|
|
||||||
|
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} current RSS #{current_rss}"\
|
||||||
|
" exceeds maximum RSS #{MAX_RSS} after finishing job #{worker.class} JID-#{job['jid']}"
|
||||||
|
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"
|
||||||
|
|
||||||
|
# Wait `GRACE_TIME` to give the memory intensive job time to finish.
|
||||||
|
# Then, tell Sidekiq to stop fetching new jobs.
|
||||||
|
wait_and_signal(GRACE_TIME, 'SIGSTP', 'stop fetching new jobs')
|
||||||
|
|
||||||
|
# Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
|
||||||
|
# Then, tell Sidekiq to gracefully shut down by giving jobs a few more
|
||||||
|
# moments to finish, killing and requeuing them if they didn't, and
|
||||||
|
# then terminating itself.
|
||||||
|
wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')
|
||||||
|
|
||||||
|
# Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
|
||||||
|
wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def get_rss
|
||||||
|
output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
|
||||||
|
return 0 unless status.zero?
|
||||||
|
|
||||||
|
output.to_i
|
||||||
|
end
|
||||||
|
|
||||||
|
def wait_and_signal(time, signal, explanation)
|
||||||
|
Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
|
||||||
|
sleep(time)
|
||||||
|
|
||||||
|
Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
|
||||||
|
Process.kill(signal, pid)
|
||||||
|
end
|
||||||
|
|
||||||
|
def pid
|
||||||
|
Process.pid
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,135 +0,0 @@
|
||||||
# frozen_string_literal: true
|
|
||||||
|
|
||||||
require 'mutex_m'
|
|
||||||
|
|
||||||
module Gitlab
|
|
||||||
module SidekiqMiddleware
|
|
||||||
class Shutdown
|
|
||||||
extend Mutex_m
|
|
||||||
|
|
||||||
# Default the RSS limit to 0, meaning the MemoryKiller is disabled
|
|
||||||
MAX_RSS = (ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS'] || 0).to_s.to_i
|
|
||||||
# Give Sidekiq 15 minutes of grace time after exceeding the RSS limit
|
|
||||||
GRACE_TIME = (ENV['SIDEKIQ_MEMORY_KILLER_GRACE_TIME'] || 15 * 60).to_s.to_i
|
|
||||||
# Wait 30 seconds for running jobs to finish during graceful shutdown
|
|
||||||
SHUTDOWN_WAIT = (ENV['SIDEKIQ_MEMORY_KILLER_SHUTDOWN_WAIT'] || 30).to_s.to_i
|
|
||||||
|
|
||||||
# This exception can be used to request that the middleware start shutting down Sidekiq
|
|
||||||
WantShutdown = Class.new(StandardError)
|
|
||||||
|
|
||||||
ShutdownWithoutRaise = Class.new(WantShutdown)
|
|
||||||
private_constant :ShutdownWithoutRaise
|
|
||||||
|
|
||||||
# For testing only, to avoid race conditions (?) in Rspec mocks.
|
|
||||||
attr_reader :trace
|
|
||||||
|
|
||||||
# We store the shutdown thread in a class variable to ensure that there
|
|
||||||
# can be only one shutdown thread in the process.
|
|
||||||
def self.create_shutdown_thread
|
|
||||||
mu_synchronize do
|
|
||||||
break unless @shutdown_thread.nil?
|
|
||||||
|
|
||||||
@shutdown_thread = Thread.new { yield }
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
# For testing only: so we can wait for the shutdown thread to finish.
|
|
||||||
def self.shutdown_thread
|
|
||||||
mu_synchronize { @shutdown_thread }
|
|
||||||
end
|
|
||||||
|
|
||||||
# For testing only: so that we can reset the global state before each test.
|
|
||||||
def self.clear_shutdown_thread
|
|
||||||
mu_synchronize { @shutdown_thread = nil }
|
|
||||||
end
|
|
||||||
|
|
||||||
def initialize
|
|
||||||
@trace = Queue.new if Rails.env.test?
|
|
||||||
end
|
|
||||||
|
|
||||||
def call(worker, job, queue)
|
|
||||||
shutdown_exception = nil
|
|
||||||
|
|
||||||
begin
|
|
||||||
yield
|
|
||||||
check_rss!
|
|
||||||
rescue WantShutdown => ex
|
|
||||||
shutdown_exception = ex
|
|
||||||
end
|
|
||||||
|
|
||||||
return unless shutdown_exception
|
|
||||||
|
|
||||||
self.class.create_shutdown_thread do
|
|
||||||
do_shutdown(worker, job, shutdown_exception)
|
|
||||||
end
|
|
||||||
|
|
||||||
raise shutdown_exception unless shutdown_exception.is_a?(ShutdownWithoutRaise)
|
|
||||||
end
|
|
||||||
|
|
||||||
private
|
|
||||||
|
|
||||||
def do_shutdown(worker, job, shutdown_exception)
|
|
||||||
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} shutting down because of #{shutdown_exception} after job "\
|
|
||||||
"#{worker.class} JID-#{job['jid']}"
|
|
||||||
Sidekiq.logger.warn "Sidekiq worker PID-#{pid} will stop fetching new jobs in #{GRACE_TIME} seconds, and will be shut down #{SHUTDOWN_WAIT} seconds later"
|
|
||||||
|
|
||||||
# Wait `GRACE_TIME` to give the memory intensive job time to finish.
|
|
||||||
# Then, tell Sidekiq to stop fetching new jobs.
|
|
||||||
wait_and_signal(GRACE_TIME, 'SIGTSTP', 'stop fetching new jobs')
|
|
||||||
|
|
||||||
# Wait `SHUTDOWN_WAIT` to give already fetched jobs time to finish.
|
|
||||||
# Then, tell Sidekiq to gracefully shut down by giving jobs a few more
|
|
||||||
# moments to finish, killing and requeuing them if they didn't, and
|
|
||||||
# then terminating itself.
|
|
||||||
wait_and_signal(SHUTDOWN_WAIT, 'SIGTERM', 'gracefully shut down')
|
|
||||||
|
|
||||||
# Wait for Sidekiq to shutdown gracefully, and kill it if it didn't.
|
|
||||||
wait_and_signal(Sidekiq.options[:timeout] + 2, 'SIGKILL', 'die')
|
|
||||||
end
|
|
||||||
|
|
||||||
def check_rss!
|
|
||||||
return unless MAX_RSS > 0
|
|
||||||
|
|
||||||
current_rss = get_rss
|
|
||||||
return unless current_rss > MAX_RSS
|
|
||||||
|
|
||||||
raise ShutdownWithoutRaise.new("current RSS #{current_rss} exceeds maximum RSS #{MAX_RSS}")
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_rss
|
|
||||||
output, status = Gitlab::Popen.popen(%W(ps -o rss= -p #{pid}), Rails.root.to_s)
|
|
||||||
return 0 unless status.zero?
|
|
||||||
|
|
||||||
output.to_i
|
|
||||||
end
|
|
||||||
|
|
||||||
def wait_and_signal(time, signal, explanation)
|
|
||||||
Sidekiq.logger.warn "waiting #{time} seconds before sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
|
|
||||||
sleep(time)
|
|
||||||
|
|
||||||
Sidekiq.logger.warn "sending Sidekiq worker PID-#{pid} #{signal} (#{explanation})"
|
|
||||||
kill(signal, pid)
|
|
||||||
end
|
|
||||||
|
|
||||||
def pid
|
|
||||||
Process.pid
|
|
||||||
end
|
|
||||||
|
|
||||||
def sleep(time)
|
|
||||||
if Rails.env.test?
|
|
||||||
@trace << [:sleep, time]
|
|
||||||
else
|
|
||||||
Kernel.sleep(time)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def kill(signal, pid)
|
|
||||||
if Rails.env.test?
|
|
||||||
@trace << [:kill, signal, pid]
|
|
||||||
else
|
|
||||||
Process.kill(signal, pid)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
63
spec/lib/gitlab/sidekiq_middleware/memory_killer_spec.rb
Normal file
63
spec/lib/gitlab/sidekiq_middleware/memory_killer_spec.rb
Normal file
|
@ -0,0 +1,63 @@
|
||||||
|
require 'spec_helper'
|
||||||
|
|
||||||
|
describe Gitlab::SidekiqMiddleware::MemoryKiller do
|
||||||
|
subject { described_class.new }
|
||||||
|
let(:pid) { 999 }
|
||||||
|
|
||||||
|
let(:worker) { double(:worker, class: 'TestWorker') }
|
||||||
|
let(:job) { { 'jid' => 123 } }
|
||||||
|
let(:queue) { 'test_queue' }
|
||||||
|
|
||||||
|
def run
|
||||||
|
thread = subject.call(worker, job, queue) { nil }
|
||||||
|
thread&.join
|
||||||
|
end
|
||||||
|
|
||||||
|
before do
|
||||||
|
allow(subject).to receive(:get_rss).and_return(10.kilobytes)
|
||||||
|
allow(subject).to receive(:pid).and_return(pid)
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when MAX_RSS is set to 0' do
|
||||||
|
before do
|
||||||
|
stub_const("#{described_class}::MAX_RSS", 0)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'does nothing' do
|
||||||
|
expect(subject).not_to receive(:sleep)
|
||||||
|
|
||||||
|
run
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when MAX_RSS is exceeded' do
|
||||||
|
before do
|
||||||
|
stub_const("#{described_class}::MAX_RSS", 5.kilobytes)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'sends the STP, TERM and KILL signals at expected times' do
|
||||||
|
expect(subject).to receive(:sleep).with(15 * 60).ordered
|
||||||
|
expect(Process).to receive(:kill).with('SIGSTP', pid).ordered
|
||||||
|
|
||||||
|
expect(subject).to receive(:sleep).with(30).ordered
|
||||||
|
expect(Process).to receive(:kill).with('SIGTERM', pid).ordered
|
||||||
|
|
||||||
|
expect(subject).to receive(:sleep).with(10).ordered
|
||||||
|
expect(Process).to receive(:kill).with('SIGKILL', pid).ordered
|
||||||
|
|
||||||
|
run
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when MAX_RSS is not exceeded' do
|
||||||
|
before do
|
||||||
|
stub_const("#{described_class}::MAX_RSS", 15.kilobytes)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'does nothing' do
|
||||||
|
expect(subject).not_to receive(:sleep)
|
||||||
|
|
||||||
|
run
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -1,88 +0,0 @@
|
||||||
require 'spec_helper'
|
|
||||||
|
|
||||||
describe Gitlab::SidekiqMiddleware::Shutdown do
|
|
||||||
subject { described_class.new }
|
|
||||||
|
|
||||||
let(:pid) { Process.pid }
|
|
||||||
let(:worker) { double(:worker, class: 'TestWorker') }
|
|
||||||
let(:job) { { 'jid' => 123 } }
|
|
||||||
let(:queue) { 'test_queue' }
|
|
||||||
let(:block) { proc { nil } }
|
|
||||||
|
|
||||||
def run
|
|
||||||
subject.call(worker, job, queue) { block.call }
|
|
||||||
described_class.shutdown_thread&.join
|
|
||||||
end
|
|
||||||
|
|
||||||
def pop_trace
|
|
||||||
subject.trace.pop(true)
|
|
||||||
end
|
|
||||||
|
|
||||||
before do
|
|
||||||
allow(subject).to receive(:get_rss).and_return(10.kilobytes)
|
|
||||||
described_class.clear_shutdown_thread
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when MAX_RSS is set to 0' do
|
|
||||||
before do
|
|
||||||
stub_const("#{described_class}::MAX_RSS", 0)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'does nothing' do
|
|
||||||
expect(subject).not_to receive(:sleep)
|
|
||||||
|
|
||||||
run
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def expect_shutdown_sequence
|
|
||||||
expect(pop_trace).to eq([:sleep, 15 * 60])
|
|
||||||
expect(pop_trace).to eq([:kill, 'SIGTSTP', pid])
|
|
||||||
|
|
||||||
expect(pop_trace).to eq([:sleep, 30])
|
|
||||||
expect(pop_trace).to eq([:kill, 'SIGTERM', pid])
|
|
||||||
|
|
||||||
expect(pop_trace).to eq([:sleep, 10])
|
|
||||||
expect(pop_trace).to eq([:kill, 'SIGKILL', pid])
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when MAX_RSS is exceeded' do
|
|
||||||
before do
|
|
||||||
stub_const("#{described_class}::MAX_RSS", 5.kilobytes)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'sends the TSTP, TERM and KILL signals at expected times' do
|
|
||||||
run
|
|
||||||
|
|
||||||
expect_shutdown_sequence
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when MAX_RSS is not exceeded' do
|
|
||||||
before do
|
|
||||||
stub_const("#{described_class}::MAX_RSS", 15.kilobytes)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'does nothing' do
|
|
||||||
expect(subject).not_to receive(:sleep)
|
|
||||||
|
|
||||||
run
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
context 'when WantShutdown is raised' do
|
|
||||||
let(:block) { proc { raise described_class::WantShutdown } }
|
|
||||||
|
|
||||||
it 'starts the shutdown sequence and re-raises the exception' do
|
|
||||||
expect { run }.to raise_exception(described_class::WantShutdown)
|
|
||||||
|
|
||||||
# We can't expect 'run' to have joined on the shutdown thread, because
|
|
||||||
# it hit an exception.
|
|
||||||
shutdown_thread = described_class.shutdown_thread
|
|
||||||
expect(shutdown_thread).not_to be_nil
|
|
||||||
shutdown_thread.join
|
|
||||||
|
|
||||||
expect_shutdown_sequence
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
Loading…
Reference in a new issue