mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
systemd notification integration (#4489)
systemd notification support, fixes #4488
This commit is contained in:
parent
417f23ea12
commit
d4d8b209de
5 changed files with 228 additions and 0 deletions
|
@ -5,6 +5,8 @@
|
|||
HEAD
|
||||
---------
|
||||
|
||||
- Integrate with systemd's watchdog and notification features [#4488]
|
||||
See `Type=notify` in [systemd.service](https://www.freedesktop.org/software/systemd/man/systemd.service.html#Options)
|
||||
- Fix edge case where a job can be pushed without a queue.
|
||||
|
||||
6.0.5
|
||||
|
|
|
@ -258,3 +258,4 @@ module Sidekiq
|
|||
end
|
||||
|
||||
require "sidekiq/rails" if defined?(::Rails::Engine)
|
||||
require "sidekiq/systemd"
|
||||
|
|
149
lib/sidekiq/sd_notify.rb
Normal file
149
lib/sidekiq/sd_notify.rb
Normal file
|
@ -0,0 +1,149 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# The MIT License
|
||||
#
|
||||
# Copyright (c) 2017, 2018, 2019, 2020 Agis Anastasopoulos
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
# this software and associated documentation files (the "Software"), to deal in
|
||||
# the Software without restriction, including without limitation the rights to
|
||||
# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
|
||||
# the Software, and to permit persons to whom the Software is furnished to do so,
|
||||
# subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in all
|
||||
# copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
|
||||
# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
|
||||
# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
|
||||
# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
||||
|
||||
# This is a copy of https://github.com/agis/ruby-sdnotify as of commit a7d52ee
|
||||
# The only changes made was "rehoming" it within the Sidekiq module to avoid
|
||||
# namespace collisions and applying standard's code formatting style.
|
||||
|
||||
require "socket"
|
||||
|
||||
# SdNotify is a pure-Ruby implementation of sd_notify(3). It can be used to
|
||||
# notify systemd about state changes. Methods of this package are no-op on
|
||||
# non-systemd systems (eg. Darwin).
|
||||
#
|
||||
# The API maps closely to the original implementation of sd_notify(3),
|
||||
# therefore be sure to check the official man pages prior to using SdNotify.
|
||||
#
|
||||
# @see https://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||
module Sidekiq
|
||||
module SdNotify
|
||||
# Exception raised when there's an error writing to the notification socket
|
||||
class NotifyError < RuntimeError; end
|
||||
|
||||
READY = "READY=1"
|
||||
RELOADING = "RELOADING=1"
|
||||
STOPPING = "STOPPING=1"
|
||||
STATUS = "STATUS="
|
||||
ERRNO = "ERRNO="
|
||||
MAINPID = "MAINPID="
|
||||
WATCHDOG = "WATCHDOG=1"
|
||||
FDSTORE = "FDSTORE=1"
|
||||
|
||||
def self.ready(unset_env = false)
|
||||
notify(READY, unset_env)
|
||||
end
|
||||
|
||||
def self.reloading(unset_env = false)
|
||||
notify(RELOADING, unset_env)
|
||||
end
|
||||
|
||||
def self.stopping(unset_env = false)
|
||||
notify(STOPPING, unset_env)
|
||||
end
|
||||
|
||||
# @param status [String] a custom status string that describes the current
|
||||
# state of the service
|
||||
def self.status(status, unset_env = false)
|
||||
notify("#{STATUS}#{status}", unset_env)
|
||||
end
|
||||
|
||||
# @param errno [Integer]
|
||||
def self.errno(errno, unset_env = false)
|
||||
notify("#{ERRNO}#{errno}", unset_env)
|
||||
end
|
||||
|
||||
# @param pid [Integer]
|
||||
def self.mainpid(pid, unset_env = false)
|
||||
notify("#{MAINPID}#{pid}", unset_env)
|
||||
end
|
||||
|
||||
def self.watchdog(unset_env = false)
|
||||
notify(WATCHDOG, unset_env)
|
||||
end
|
||||
|
||||
def self.fdstore(unset_env = false)
|
||||
notify(FDSTORE, unset_env)
|
||||
end
|
||||
|
||||
# @param [Boolean] true if the service manager expects watchdog keep-alive
|
||||
# notification messages to be sent from this process.
|
||||
#
|
||||
# If the $WATCHDOG_USEC environment variable is set,
|
||||
# and the $WATCHDOG_PID variable is unset or set to the PID of the current
|
||||
# process
|
||||
#
|
||||
# @note Unlike sd_watchdog_enabled(3), this method does not mutate the
|
||||
# environment.
|
||||
def self.watchdog?
|
||||
wd_usec = ENV["WATCHDOG_USEC"]
|
||||
wd_pid = ENV["WATCHDOG_PID"]
|
||||
|
||||
return false unless wd_usec
|
||||
|
||||
begin
|
||||
wd_usec = Integer(wd_usec)
|
||||
rescue
|
||||
return false
|
||||
end
|
||||
|
||||
return false if wd_usec <= 0
|
||||
return true if !wd_pid || wd_pid == $$.to_s
|
||||
|
||||
false
|
||||
end
|
||||
|
||||
# Notify systemd with the provided state, via the notification socket, if
|
||||
# any.
|
||||
#
|
||||
# Generally this method will be used indirectly through the other methods
|
||||
# of the library.
|
||||
#
|
||||
# @param state [String]
|
||||
# @param unset_env [Boolean]
|
||||
#
|
||||
# @return [Fixnum, nil] the number of bytes written to the notification
|
||||
# socket or nil if there was no socket to report to (eg. the program wasn't
|
||||
# started by systemd)
|
||||
#
|
||||
# @raise [NotifyError] if there was an error communicating with the systemd
|
||||
# socket
|
||||
#
|
||||
# @see https://www.freedesktop.org/software/systemd/man/sd_notify.html
|
||||
def self.notify(state, unset_env = false)
|
||||
sock = ENV["NOTIFY_SOCKET"]
|
||||
|
||||
return nil unless sock
|
||||
|
||||
ENV.delete("NOTIFY_SOCKET") if unset_env
|
||||
|
||||
begin
|
||||
Addrinfo.unix(sock, :DGRAM).connect do |s|
|
||||
s.close_on_exec = true
|
||||
s.write(state)
|
||||
end
|
||||
rescue => e
|
||||
raise NotifyError, "#{e.class}: #{e.message}", e.backtrace
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
36
lib/sidekiq/systemd.rb
Normal file
36
lib/sidekiq/systemd.rb
Normal file
|
@ -0,0 +1,36 @@
|
|||
#
|
||||
# Sidekiq's systemd integration allows Sidekiq to inform systemd:
|
||||
# 1. when it has successfully started
|
||||
# 2. when it is starting shutdown
|
||||
# 3. periodically for a liveness check with a watchdog thread
|
||||
#
|
||||
module Sidekiq
|
||||
def self.start_watchdog
|
||||
usec = Integer(ENV["WATCHDOG_USEC"])
|
||||
return Sidekiq.logger.error("systemd Watchdog too fast: " + usec) if usec < 1_000_000
|
||||
|
||||
sec_f = usec / 1_000_000.0
|
||||
# "It is recommended that a daemon sends a keep-alive notification message
|
||||
# to the service manager every half of the time returned here."
|
||||
ping_f = sec_f / 2
|
||||
Thread.new do
|
||||
loop do
|
||||
Sidekiq::SdNotify.watchdog
|
||||
sleep ping_f
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
if ENV["NOTIFY_SOCKET"]
|
||||
Sidekiq.configure_server do |config|
|
||||
require "sidekiq/sd_notify"
|
||||
config.on(:startup) do
|
||||
Sidekiq::SdNotify.ready
|
||||
end
|
||||
config.on(:terminate) do
|
||||
Sidekiq::SdNotify.stopping
|
||||
end
|
||||
Sidekiq.start_watchdog if Sidekiq::SdNotify.watchdog?
|
||||
end
|
||||
end
|
40
test/test_systemd.rb
Normal file
40
test/test_systemd.rb
Normal file
|
@ -0,0 +1,40 @@
|
|||
require_relative 'helper'
|
||||
require "sidekiq/sd_notify"
|
||||
require 'sidekiq/systemd'
|
||||
|
||||
class TestSystemd < Minitest::Test
|
||||
def setup
|
||||
::Dir::Tmpname.create("sidekiq_socket") do |sockaddr|
|
||||
@sockaddr = sockaddr
|
||||
@socket = Socket.new(:UNIX, :DGRAM, 0)
|
||||
socket_ai = Addrinfo.unix(sockaddr)
|
||||
@socket.bind(socket_ai)
|
||||
ENV["NOTIFY_SOCKET"] = sockaddr
|
||||
end
|
||||
end
|
||||
|
||||
def teardown
|
||||
@socket.close if @socket
|
||||
File.unlink(@sockaddr) if @sockaddr
|
||||
@socket = nil
|
||||
@sockaddr = nil
|
||||
end
|
||||
|
||||
def socket_message
|
||||
@socket.recvfrom(10)[0]
|
||||
end
|
||||
|
||||
def test_notify
|
||||
count = Sidekiq::SdNotify.ready
|
||||
assert_equal(socket_message, "READY=1")
|
||||
assert_equal(ENV["NOTIFY_SOCKET"], @sockaddr)
|
||||
assert_equal(count, 7)
|
||||
|
||||
count = Sidekiq::SdNotify.stopping
|
||||
assert_equal(socket_message, "STOPPING=1")
|
||||
assert_equal(ENV["NOTIFY_SOCKET"], @sockaddr)
|
||||
assert_equal(count, 10)
|
||||
|
||||
refute Sidekiq::SdNotify.watchdog?
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue