192 lines
4.8 KiB
Ruby
192 lines
4.8 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
module Gitlab
|
|
module SidekiqDaemon
|
|
class Monitor < Daemon
|
|
include ::Gitlab::Utils::StrongMemoize
|
|
|
|
NOTIFICATION_CHANNEL = 'sidekiq:cancel:notifications'
|
|
CANCEL_DEADLINE = 24.hours.seconds
|
|
RECONNECT_TIME = 3.seconds
|
|
|
|
# We use exception derived from `Exception`
|
|
# to consider this as an very low-level exception
|
|
# that should not be caught by application
|
|
CancelledError = Class.new(Exception) # rubocop:disable Lint/InheritException
|
|
|
|
attr_reader :jobs
|
|
attr_reader :jobs_mutex
|
|
|
|
def initialize
|
|
super
|
|
|
|
@jobs = {}
|
|
@jobs_mutex = Mutex.new
|
|
end
|
|
|
|
def within_job(worker_class, jid, queue)
|
|
jobs_mutex.synchronize do
|
|
jobs[jid] = { worker_class: worker_class, thread: Thread.current, started_at: Gitlab::Metrics::System.monotonic_time }
|
|
end
|
|
|
|
if cancelled?(jid)
|
|
Sidekiq.logger.warn(
|
|
class: self.class.to_s,
|
|
action: 'run',
|
|
queue: queue,
|
|
jid: jid,
|
|
canceled: true
|
|
)
|
|
raise CancelledError
|
|
end
|
|
|
|
yield
|
|
ensure
|
|
jobs_mutex.synchronize do
|
|
jobs.delete(jid)
|
|
end
|
|
end
|
|
|
|
def self.cancel_job(jid)
|
|
payload = {
|
|
action: 'cancel',
|
|
jid: jid
|
|
}.to_json
|
|
|
|
::Gitlab::Redis::SharedState.with do |redis|
|
|
redis.setex(cancel_job_key(jid), CANCEL_DEADLINE, 1)
|
|
redis.publish(NOTIFICATION_CHANNEL, payload)
|
|
end
|
|
end
|
|
|
|
private
|
|
|
|
def start_working
|
|
return unless notification_channel_enabled?
|
|
|
|
begin
|
|
Sidekiq.logger.info(
|
|
class: self.class.to_s,
|
|
action: 'start',
|
|
message: 'Starting Monitor Daemon'
|
|
)
|
|
|
|
while enabled?
|
|
process_messages
|
|
sleep(RECONNECT_TIME)
|
|
end
|
|
|
|
ensure
|
|
Sidekiq.logger.warn(
|
|
class: self.class.to_s,
|
|
action: 'stop',
|
|
message: 'Stopping Monitor Daemon'
|
|
)
|
|
end
|
|
end
|
|
|
|
def stop_working
|
|
thread.raise(Interrupt) if thread.alive?
|
|
end
|
|
|
|
def process_messages
|
|
::Gitlab::Redis::SharedState.with do |redis|
|
|
redis.subscribe(NOTIFICATION_CHANNEL) do |on|
|
|
on.message do |channel, message|
|
|
process_message(message)
|
|
end
|
|
end
|
|
end
|
|
rescue Exception => e # rubocop:disable Lint/RescueException
|
|
Sidekiq.logger.warn(
|
|
class: self.class.to_s,
|
|
action: 'exception',
|
|
message: e.message
|
|
)
|
|
|
|
# we re-raise system exceptions
|
|
raise e unless e.is_a?(StandardError)
|
|
end
|
|
|
|
def process_message(message)
|
|
Sidekiq.logger.info(
|
|
class: self.class.to_s,
|
|
channel: NOTIFICATION_CHANNEL,
|
|
message: 'Received payload on channel',
|
|
payload: message
|
|
)
|
|
|
|
message = safe_parse(message)
|
|
return unless message
|
|
|
|
case message['action']
|
|
when 'cancel'
|
|
process_job_cancel(message['jid'])
|
|
else
|
|
# unknown message
|
|
end
|
|
end
|
|
|
|
def safe_parse(message)
|
|
JSON.parse(message)
|
|
rescue JSON::ParserError
|
|
end
|
|
|
|
def process_job_cancel(jid)
|
|
return unless jid
|
|
|
|
# try to find thread without lock
|
|
return unless find_thread_unsafe(jid)
|
|
|
|
Thread.new do
|
|
# try to find a thread, but with guaranteed
|
|
# that handle for thread corresponds to actually
|
|
# running job
|
|
find_thread_with_lock(jid) do |thread|
|
|
Sidekiq.logger.warn(
|
|
class: self.class.to_s,
|
|
action: 'cancel',
|
|
message: 'Canceling thread with CancelledError',
|
|
jid: jid,
|
|
thread_id: thread.object_id
|
|
)
|
|
|
|
thread&.raise(CancelledError)
|
|
end
|
|
end
|
|
end
|
|
|
|
# This method needs to be thread-safe
|
|
# This is why it passes thread in block,
|
|
# to ensure that we do process this thread
|
|
def find_thread_unsafe(jid)
|
|
jobs.dig(jid, :thread)
|
|
end
|
|
|
|
def find_thread_with_lock(jid)
|
|
# don't try to lock if we cannot find the thread
|
|
return unless find_thread_unsafe(jid)
|
|
|
|
jobs_mutex.synchronize do
|
|
find_thread_unsafe(jid).tap do |thread|
|
|
yield(thread) if thread
|
|
end
|
|
end
|
|
end
|
|
|
|
def cancelled?(jid)
|
|
::Gitlab::Redis::SharedState.with do |redis|
|
|
redis.exists(self.class.cancel_job_key(jid))
|
|
end
|
|
end
|
|
|
|
def self.cancel_job_key(jid)
|
|
"sidekiq:cancel:#{jid}"
|
|
end
|
|
|
|
def notification_channel_enabled?
|
|
ENV.fetch("SIDEKIQ_MONITOR_WORKER", 0).to_i.nonzero?
|
|
end
|
|
end
|
|
end
|
|
end
|