mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor sidekiqctl so it can be tested easily
This commit is contained in:
parent
20f4cdb4ff
commit
4d88319435
3 changed files with 235 additions and 235 deletions
239
bin/sidekiqctl
239
bin/sidekiqctl
|
@ -2,236 +2,19 @@
|
|||
|
||||
require 'fileutils'
|
||||
require 'sidekiq/api'
|
||||
|
||||
class Sidekiqctl
|
||||
DEFAULT_KILL_TIMEOUT = 10
|
||||
CMD = File.basename($0)
|
||||
|
||||
attr_reader :stage, :pidfile, :kill_timeout
|
||||
|
||||
def self.print_usage
|
||||
puts "#{CMD} - control Sidekiq from the command line."
|
||||
puts
|
||||
puts "Usage: #{CMD} quiet <pidfile> <kill_timeout>"
|
||||
puts " #{CMD} stop <pidfile> <kill_timeout>"
|
||||
puts " #{CMD} status <section>"
|
||||
puts
|
||||
puts " <pidfile> is path to a pidfile"
|
||||
puts " <kill_timeout> is number of seconds to wait until Sidekiq exits"
|
||||
puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd"
|
||||
puts
|
||||
puts " <section> (optional) view a specific section of the status output"
|
||||
puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}"
|
||||
puts
|
||||
puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want"
|
||||
puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop"
|
||||
puts " path_to_pidfile 61`"
|
||||
puts
|
||||
end
|
||||
|
||||
def initialize(stage, pidfile, timeout)
|
||||
@stage = stage
|
||||
@pidfile = pidfile
|
||||
@kill_timeout = timeout
|
||||
|
||||
done('No pidfile given', :error) if !pidfile
|
||||
done("Pidfile #{pidfile} does not exist", :warn) if !File.exist?(pidfile)
|
||||
done('Invalid pidfile content', :error) if pid == 0
|
||||
|
||||
fetch_process
|
||||
|
||||
begin
|
||||
send(stage)
|
||||
rescue NoMethodError
|
||||
done "Invalid command: #{stage}", :error
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_process
|
||||
Process.kill(0, pid)
|
||||
rescue Errno::ESRCH
|
||||
done "Process doesn't exist", :error
|
||||
# We were not allowed to send a signal, but the process must have existed
|
||||
# when Process.kill() was called.
|
||||
rescue Errno::EPERM
|
||||
return pid
|
||||
end
|
||||
|
||||
def done(msg, error = nil)
|
||||
puts msg
|
||||
exit(exit_signal(error))
|
||||
end
|
||||
|
||||
def exit_signal(error)
|
||||
(error == :error) ? 1 : 0
|
||||
end
|
||||
|
||||
def pid
|
||||
@pid ||= File.read(pidfile).to_i
|
||||
end
|
||||
|
||||
def quiet
|
||||
`kill -TSTP #{pid}`
|
||||
end
|
||||
|
||||
def stop
|
||||
`kill -TERM #{pid}`
|
||||
kill_timeout.times do
|
||||
begin
|
||||
Process.kill(0, pid)
|
||||
rescue Errno::ESRCH
|
||||
FileUtils.rm_f pidfile
|
||||
done 'Sidekiq shut down gracefully.'
|
||||
rescue Errno::EPERM
|
||||
done 'Not permitted to shut down Sidekiq.'
|
||||
end
|
||||
sleep 1
|
||||
end
|
||||
`kill -9 #{pid}`
|
||||
FileUtils.rm_f pidfile
|
||||
done 'Sidekiq shut down forcefully.'
|
||||
end
|
||||
alias_method :shutdown, :stop
|
||||
|
||||
class Status
|
||||
VALID_SECTIONS = %w[all version overview processes queues]
|
||||
def display(section = nil)
|
||||
section ||= 'all'
|
||||
unless VALID_SECTIONS.include? section
|
||||
puts "I don't know how to check the status of '#{section}'!"
|
||||
puts "Try one of these: #{VALID_SECTIONS.join(', ')}"
|
||||
return
|
||||
end
|
||||
send(section)
|
||||
rescue StandardError => e
|
||||
puts "Couldn't get status: #{e}"
|
||||
end
|
||||
|
||||
def all
|
||||
version
|
||||
puts
|
||||
overview
|
||||
puts
|
||||
processes
|
||||
puts
|
||||
queues
|
||||
end
|
||||
|
||||
def version
|
||||
puts "Sidekiq #{Sidekiq::VERSION}"
|
||||
puts Time.now
|
||||
end
|
||||
|
||||
def overview
|
||||
puts '---- Overview ----'
|
||||
puts " Processed: #{delimit stats.processed}"
|
||||
puts " Failed: #{delimit stats.failed}"
|
||||
puts " Busy: #{delimit stats.workers_size}"
|
||||
puts " Enqueued: #{delimit stats.enqueued}"
|
||||
puts " Retries: #{delimit stats.retry_size}"
|
||||
puts " Scheduled: #{delimit stats.scheduled_size}"
|
||||
puts " Dead: #{delimit stats.dead_size}"
|
||||
end
|
||||
|
||||
def processes
|
||||
puts "---- Processes (#{process_set.size}) ----"
|
||||
process_set.each_with_index do |process, index|
|
||||
puts "#{process['identity']} #{tags_for(process)}"
|
||||
puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})"
|
||||
puts " Threads: #{process['concurrency']} (#{process['busy']} busy)"
|
||||
puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}"
|
||||
puts '' unless (index+1) == process_set.size
|
||||
end
|
||||
end
|
||||
|
||||
COL_PAD = 2
|
||||
def queues
|
||||
puts "---- Queues (#{queue_data.size}) ----"
|
||||
columns = {
|
||||
name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
|
||||
size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
|
||||
latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
|
||||
}
|
||||
columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
|
||||
puts
|
||||
queue_data.each do |q|
|
||||
columns.each do |col, (dir, width)|
|
||||
print q.send(col).public_send(dir, width)
|
||||
end
|
||||
puts
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def delimit(number)
|
||||
number.to_s.reverse.scan(/.{1,3}/).join(',').reverse
|
||||
end
|
||||
|
||||
def split_multiline(values, opts = {})
|
||||
return 'none' unless values
|
||||
pad = opts[:pad] || 0
|
||||
max_length = opts[:max_length] || (80 - pad)
|
||||
out = []
|
||||
line = ''
|
||||
values.each do |value|
|
||||
if (line.length + value.length) > max_length
|
||||
out << line
|
||||
line = ' ' * pad
|
||||
end
|
||||
line << value + ', '
|
||||
end
|
||||
out << line[0..-3]
|
||||
out.join("\n")
|
||||
end
|
||||
|
||||
def tags_for(process)
|
||||
tags = [
|
||||
process['tag'],
|
||||
process['labels'],
|
||||
(process['quiet'] == 'true' ? 'quiet' : nil)
|
||||
].flatten.compact
|
||||
tags.any? ? "[#{tags.join('] [')}]" : nil
|
||||
end
|
||||
|
||||
def time_ago(timestamp)
|
||||
seconds = Time.now - Time.at(timestamp)
|
||||
return 'just now' if seconds < 60
|
||||
return 'a minute ago' if seconds < 120
|
||||
return "#{seconds.floor / 60} minutes ago" if seconds < 3600
|
||||
return 'an hour ago' if seconds < 7200
|
||||
"#{seconds.floor / 60 / 60} hours ago"
|
||||
end
|
||||
|
||||
QUEUE_STRUCT = Struct.new(:name, :size, :latency)
|
||||
def queue_data
|
||||
@queue_data ||= Sidekiq::Queue.all.map do |q|
|
||||
QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency))
|
||||
end
|
||||
end
|
||||
|
||||
def process_set
|
||||
@process_set ||= Sidekiq::ProcessSet.new
|
||||
end
|
||||
|
||||
def stats
|
||||
@stats ||= Sidekiq::Stats.new
|
||||
end
|
||||
end
|
||||
end
|
||||
require 'sidekiq/ctl'
|
||||
|
||||
if ARGV[0] == 'status'
|
||||
Sidekiqctl::Status.new.display(ARGV[1])
|
||||
exit
|
||||
end
|
||||
|
||||
if ARGV.length < 2
|
||||
Sidekiqctl.print_usage
|
||||
Sidekiq::Ctl::Status.new.display(ARGV[1])
|
||||
else
|
||||
stage = ARGV[0]
|
||||
pidfile = ARGV[1]
|
||||
timeout = ARGV[2].to_i
|
||||
timeout = Sidekiqctl::DEFAULT_KILL_TIMEOUT if timeout == 0
|
||||
if ARGV.length < 2
|
||||
Sidekiq::Ctl.print_usage
|
||||
else
|
||||
stage = ARGV[0]
|
||||
pidfile = ARGV[1]
|
||||
timeout = ARGV[2].to_i
|
||||
timeout = Sidekiq::Ctl::DEFAULT_KILL_TIMEOUT if timeout == 0
|
||||
|
||||
Sidekiqctl.new(stage, pidfile, timeout)
|
||||
Sidekiq::Ctl.new(stage, pidfile, timeout)
|
||||
end
|
||||
end
|
||||
|
|
221
lib/sidekiq/ctl.rb
Normal file
221
lib/sidekiq/ctl.rb
Normal file
|
@ -0,0 +1,221 @@
|
|||
#!/usr/bin/env ruby
|
||||
|
||||
require 'fileutils'
|
||||
require 'sidekiq/api'
|
||||
|
||||
class Sidekiq::Ctl
|
||||
DEFAULT_KILL_TIMEOUT = 10
|
||||
CMD = File.basename($0)
|
||||
|
||||
attr_reader :stage, :pidfile, :kill_timeout
|
||||
|
||||
def self.print_usage
|
||||
puts "#{CMD} - control Sidekiq from the command line."
|
||||
puts
|
||||
puts "Usage: #{CMD} quiet <pidfile> <kill_timeout>"
|
||||
puts " #{CMD} stop <pidfile> <kill_timeout>"
|
||||
puts " #{CMD} status <section>"
|
||||
puts
|
||||
puts " <pidfile> is path to a pidfile"
|
||||
puts " <kill_timeout> is number of seconds to wait until Sidekiq exits"
|
||||
puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd"
|
||||
puts
|
||||
puts " <section> (optional) view a specific section of the status output"
|
||||
puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}"
|
||||
puts
|
||||
puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want"
|
||||
puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop"
|
||||
puts " path_to_pidfile 61`"
|
||||
puts
|
||||
end
|
||||
|
||||
def initialize(stage, pidfile, timeout)
|
||||
@stage = stage
|
||||
@pidfile = pidfile
|
||||
@kill_timeout = timeout
|
||||
|
||||
done('No pidfile given', :error) if !pidfile
|
||||
done("Pidfile #{pidfile} does not exist", :warn) if !File.exist?(pidfile)
|
||||
done('Invalid pidfile content', :error) if pid == 0
|
||||
|
||||
fetch_process
|
||||
|
||||
begin
|
||||
send(stage)
|
||||
rescue NoMethodError
|
||||
done "Invalid command: #{stage}", :error
|
||||
end
|
||||
end
|
||||
|
||||
def fetch_process
|
||||
Process.kill(0, pid)
|
||||
rescue Errno::ESRCH
|
||||
done "Process doesn't exist", :error
|
||||
# We were not allowed to send a signal, but the process must have existed
|
||||
# when Process.kill() was called.
|
||||
rescue Errno::EPERM
|
||||
return pid
|
||||
end
|
||||
|
||||
def done(msg, error = nil)
|
||||
puts msg
|
||||
exit(exit_signal(error))
|
||||
end
|
||||
|
||||
def exit_signal(error)
|
||||
(error == :error) ? 1 : 0
|
||||
end
|
||||
|
||||
def pid
|
||||
@pid ||= File.read(pidfile).to_i
|
||||
end
|
||||
|
||||
def quiet
|
||||
`kill -TSTP #{pid}`
|
||||
end
|
||||
|
||||
def stop
|
||||
`kill -TERM #{pid}`
|
||||
kill_timeout.times do
|
||||
begin
|
||||
Process.kill(0, pid)
|
||||
rescue Errno::ESRCH
|
||||
FileUtils.rm_f pidfile
|
||||
done 'Sidekiq shut down gracefully.'
|
||||
rescue Errno::EPERM
|
||||
done 'Not permitted to shut down Sidekiq.'
|
||||
end
|
||||
sleep 1
|
||||
end
|
||||
`kill -9 #{pid}`
|
||||
FileUtils.rm_f pidfile
|
||||
done 'Sidekiq shut down forcefully.'
|
||||
end
|
||||
alias_method :shutdown, :stop
|
||||
|
||||
class Status
|
||||
VALID_SECTIONS = %w[all version overview processes queues]
|
||||
def display(section = nil)
|
||||
section ||= 'all'
|
||||
unless VALID_SECTIONS.include? section
|
||||
puts "I don't know how to check the status of '#{section}'!"
|
||||
puts "Try one of these: #{VALID_SECTIONS.join(', ')}"
|
||||
return
|
||||
end
|
||||
send(section)
|
||||
rescue StandardError => e
|
||||
puts "Couldn't get status: #{e}"
|
||||
end
|
||||
|
||||
def all
|
||||
version
|
||||
puts
|
||||
overview
|
||||
puts
|
||||
processes
|
||||
puts
|
||||
queues
|
||||
end
|
||||
|
||||
def version
|
||||
puts "Sidekiq #{Sidekiq::VERSION}"
|
||||
puts Time.now
|
||||
end
|
||||
|
||||
def overview
|
||||
puts '---- Overview ----'
|
||||
puts " Processed: #{delimit stats.processed}"
|
||||
puts " Failed: #{delimit stats.failed}"
|
||||
puts " Busy: #{delimit stats.workers_size}"
|
||||
puts " Enqueued: #{delimit stats.enqueued}"
|
||||
puts " Retries: #{delimit stats.retry_size}"
|
||||
puts " Scheduled: #{delimit stats.scheduled_size}"
|
||||
puts " Dead: #{delimit stats.dead_size}"
|
||||
end
|
||||
|
||||
def processes
|
||||
puts "---- Processes (#{process_set.size}) ----"
|
||||
process_set.each_with_index do |process, index|
|
||||
puts "#{process['identity']} #{tags_for(process)}"
|
||||
puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})"
|
||||
puts " Threads: #{process['concurrency']} (#{process['busy']} busy)"
|
||||
puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}"
|
||||
puts '' unless (index+1) == process_set.size
|
||||
end
|
||||
end
|
||||
|
||||
COL_PAD = 2
|
||||
def queues
|
||||
puts "---- Queues (#{queue_data.size}) ----"
|
||||
columns = {
|
||||
name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
|
||||
size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
|
||||
latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
|
||||
}
|
||||
columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
|
||||
puts
|
||||
queue_data.each do |q|
|
||||
columns.each do |col, (dir, width)|
|
||||
print q.send(col).public_send(dir, width)
|
||||
end
|
||||
puts
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def delimit(number)
|
||||
number.to_s.reverse.scan(/.{1,3}/).join(',').reverse
|
||||
end
|
||||
|
||||
def split_multiline(values, opts = {})
|
||||
return 'none' unless values
|
||||
pad = opts[:pad] || 0
|
||||
max_length = opts[:max_length] || (80 - pad)
|
||||
out = []
|
||||
line = ''
|
||||
values.each do |value|
|
||||
if (line.length + value.length) > max_length
|
||||
out << line
|
||||
line = ' ' * pad
|
||||
end
|
||||
line << value + ', '
|
||||
end
|
||||
out << line[0..-3]
|
||||
out.join("\n")
|
||||
end
|
||||
|
||||
def tags_for(process)
|
||||
tags = [
|
||||
process['tag'],
|
||||
process['labels'],
|
||||
(process['quiet'] == 'true' ? 'quiet' : nil)
|
||||
].flatten.compact
|
||||
tags.any? ? "[#{tags.join('] [')}]" : nil
|
||||
end
|
||||
|
||||
def time_ago(timestamp)
|
||||
seconds = Time.now - Time.at(timestamp)
|
||||
return 'just now' if seconds < 60
|
||||
return 'a minute ago' if seconds < 120
|
||||
return "#{seconds.floor / 60} minutes ago" if seconds < 3600
|
||||
return 'an hour ago' if seconds < 7200
|
||||
"#{seconds.floor / 60 / 60} hours ago"
|
||||
end
|
||||
|
||||
QUEUE_STRUCT = Struct.new(:name, :size, :latency)
|
||||
def queue_data
|
||||
@queue_data ||= Sidekiq::Queue.all.map do |q|
|
||||
QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency))
|
||||
end
|
||||
end
|
||||
|
||||
def process_set
|
||||
@process_set ||= Sidekiq::ProcessSet.new
|
||||
end
|
||||
|
||||
def stats
|
||||
@stats ||= Sidekiq::Stats.new
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,5 +1,6 @@
|
|||
# frozen_string_literal: true
|
||||
require_relative 'helper'
|
||||
require 'sidekiq/ctl'
|
||||
|
||||
def capture_stdout
|
||||
$stdout = StringIO.new
|
||||
|
@ -9,18 +10,13 @@ ensure
|
|||
$stdout = STDOUT
|
||||
end
|
||||
|
||||
capture_stdout do
|
||||
ARGV = %w[status]
|
||||
load 'bin/sidekiqctl'
|
||||
end
|
||||
|
||||
def output(section = 'all')
|
||||
capture_stdout do
|
||||
Sidekiqctl::Status.new.display(section)
|
||||
Sidekiq::Ctl::Status.new.display(section)
|
||||
end
|
||||
end
|
||||
|
||||
describe Sidekiqctl do
|
||||
describe Sidekiq::Ctl do
|
||||
describe 'status' do
|
||||
describe 'version' do
|
||||
it 'displays the current Sidekiq version' do
|
||||
|
|
Loading…
Add table
Reference in a new issue