Allow to interrupt running jobs

This adds a middleware to track all threads
for running jobs.

This makes sidekiq to watch for redis-delivered notifications.

This makes be able to send notification to interrupt
running sidekiq jobs.

This does not take into account any native code,
as `Thread.raise` generates exception once the control gets
back to Ruby.

The separate measure should be taken to interrupt gRPC, shellouts,
or anything else that escapes Ruby.
This commit is contained in:
Kamil Trzciński 2019-08-14 17:56:37 +02:00 committed by Qingyu Zhao
parent ca622a3e13
commit 75e2302d01
4 changed files with 181 additions and 0 deletions

View File

@ -33,6 +33,7 @@ Sidekiq.configure_server do |config|
config.redis = queues_config_hash
config.server_middleware do |chain|
chain.add Gitlab::SidekiqMiddleware::JobsThreads unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
chain.add Gitlab::SidekiqMiddleware::Metrics if Settings.monitoring.sidekiq_exporter
chain.add Gitlab::SidekiqMiddleware::ArgumentsLogger if ENV['SIDEKIQ_LOG_ARGUMENTS'] && !enable_json_logs
chain.add Gitlab::SidekiqMiddleware::MemoryKiller if ENV['SIDEKIQ_MEMORY_KILLER_MAX_RSS']
@ -57,6 +58,8 @@ Sidekiq.configure_server do |config|
# Clear any connections that might have been obtained before starting
# Sidekiq (e.g. in an initializer).
ActiveRecord::Base.clear_all_connections!
Gitlab::SidekiqStatus::Monitor.instance.start unless ENV['DISABLE_SIDEKIQ_INTERRUPT_RUNNING_JOBS']
end
if enable_reliable_fetch?

View File

@ -0,0 +1,49 @@
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
class JobsThreads
@@jobs = {} # rubocop:disable Style/ClassVars
MUTEX = Mutex.new
def call(worker, job, queue)
jid = job['jid']
MUTEX.synchronize do
@@jobs[jid] = Thread.current
end
return if self.class.cancelled?(jid)
yield
ensure
MUTEX.synchronize do
@@jobs.delete(jid)
end
end
def self.interrupt(jid)
MUTEX.synchronize do
thread = @@jobs[jid]
break unless thread
thread.raise(Interrupt)
thread
end
end
def self.cancelled?(jid)
Sidekiq.redis {|c| c.exists("cancelled-#{jid}") }
end
def self.mark_job_as_cancelled(jid)
Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 86400, 1) }
"Marked job as cancelled(if Sidekiq retry within 24 hours, the job will be skipped as `processed`). Jid: #{jid}"
end
def self.jobs
@@jobs
end
end
end
end

View File

@ -0,0 +1,46 @@
# frozen_string_literal: true
module Gitlab
module SidekiqStatus
class Monitor < Daemon
include ::Gitlab::Utils::StrongMemoize
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'.freeze
def start_working
Sidekiq.logger.info "Watching sidekiq monitor"
::Gitlab::Redis::SharedState.with do |redis|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
on.message do |channel, message|
Sidekiq.logger.info "Received #{message} on #{channel}..."
execute_job_cancel(message)
end
end
end
end
def self.cancel_job(jid)
Gitlab::Redis::SharedState.with do |redis|
redis.publish(NOTIFICATION_CHANNEL, jid)
"Notification sent. Job should be cancelled soon. Check log to confirm. Jid: #{jid}"
end
end
private
def execute_job_cancel(jid)
Gitlab::SidekiqMiddleware::JobsThreads.mark_job_as_cancelled(jid)
thread = Gitlab::SidekiqMiddleware::JobsThreads
.interrupt(jid)
if thread
Sidekiq.logger.info "Interrupted thread: #{thread} for #{jid}."
else
Sidekiq.logger.info "Did not find thread for #{jid}."
end
end
end
end
end

View File

@ -0,0 +1,83 @@
require 'spec_helper'
describe Gitlab::SidekiqMiddleware::JobsThreads do
subject { described_class.new }
let(:worker) { double(:worker, class: Chaos::SleepWorker) }
let(:jid) { '581f90fbd2f24deabcbde2f9' }
let(:job) { { 'jid' => jid } }
let(:jid_thread) { '684f90fbd2f24deabcbde2f9' }
let(:job_thread) { { 'jid' => jid_thread } }
let(:queue) { 'test_queue' }
let(:mark_job_as_cancelled) { Sidekiq.redis {|c| c.setex("cancelled-#{jid}", 2, 1) } }
def run_job
subject.call(worker, job, queue) do
sleep 2
"mock return from yield"
end
end
def run_job_thread
Thread.new do
subject.call(worker, job_thread, queue) do
sleep 3
"mock return from yield"
end
end
end
describe '.call' do
context 'by default' do
it 'return from yield' do
expect(run_job).to eq("mock return from yield")
end
end
context 'when job is marked as cancelled' do
before do
mark_job_as_cancelled
end
it 'return directly' do
expect(run_job).to be_nil
end
end
end
describe '.self.interrupt' do
before do
run_job_thread
sleep 1
end
it 'interrupt the job with correct jid' do
expect(described_class.jobs[jid_thread]).to receive(:raise).with(Interrupt)
expect(described_class.interrupt jid_thread).to eq(described_class.jobs[jid_thread])
end
it 'do nothing with wrong jid' do
expect(described_class.jobs[jid_thread]).not_to receive(:raise)
expect(described_class.interrupt 'wrong_jid').to be_nil
end
end
describe '.self.cancelled?' do
it 'return true when job is marked as cancelled' do
mark_job_as_cancelled
expect(described_class.cancelled? jid).to be true
end
it 'return false when job is not marked as cancelled' do
expect(described_class.cancelled? 'non-exists-jid').to be false
end
end
describe '.self.mark_job_as_cancelled' do
it 'set Redis key' do
described_class.mark_job_as_cancelled('jid_123')
expect(described_class.cancelled? 'jid_123').to be true
end
end
end