diff --git a/Changes.md b/Changes.md index 986c4341..2252ecd2 100644 --- a/Changes.md +++ b/Changes.md @@ -5,6 +5,7 @@ HEAD --------- +- Add `sidekiqctl status` command [#4003, dzunk] - Update elapsed time calculatons to use monotonic clock [#3999] - Fix a few issues with mobile Web UI styling [#3973, navied] - Jobs with `retry: false` now go through the global `death_handlers`, diff --git a/bin/sidekiqctl b/bin/sidekiqctl index ad7dfee5..297b3496 100755 --- a/bin/sidekiqctl +++ b/bin/sidekiqctl @@ -1,20 +1,27 @@ #!/usr/bin/env ruby require 'fileutils' +require 'sidekiq/api' class Sidekiqctl DEFAULT_KILL_TIMEOUT = 10 + CMD = File.basename($0) attr_reader :stage, :pidfile, :kill_timeout def self.print_usage - puts "#{File.basename($0)} - stop a Sidekiq process from the command line." + puts "#{CMD} - control Sidekiq from the command line." + puts + puts "Usage: #{CMD} quiet " + puts " #{CMD} stop " + puts " #{CMD} status
" puts - puts "Usage: #{File.basename($0)} " - puts " where is either 'quiet' or 'stop'" puts " is path to a pidfile" puts " is number of seconds to wait until Sidekiq exits" puts " (default: #{Sidekiqctl::DEFAULT_KILL_TIMEOUT}), after which Sidekiq will be KILL'd" + puts + puts "
(optional) view a specific section of the status output" + puts " Valid sections are: #{Sidekiqctl::Status::VALID_SECTIONS.join(', ')}" puts puts "Be sure to set the kill_timeout LONGER than Sidekiq's -t timeout. If you want" puts "to wait 60 seconds for jobs to finish, use `sidekiq -t 60` and `sidekiqctl stop" @@ -85,6 +92,137 @@ class Sidekiqctl done 'Sidekiq shut down forcefully.' end alias_method :shutdown, :stop + + class Status + VALID_SECTIONS = %w[all version overview processes queues] + def display(section = nil) + section ||= 'all' + unless VALID_SECTIONS.include? section + puts "I don't know how to check the status of '#{section}'!" + puts "Try one of these: #{VALID_SECTIONS.join(', ')}" + return + end + send(section) + rescue StandardError => e + puts "Couldn't get status: #{e}" + end + + def all + version + puts + overview + puts + processes + puts + queues + end + + def version + puts "Sidekiq #{Sidekiq::VERSION}" + puts Time.now + end + + def overview + puts '---- Overview ----' + puts " Processed: #{delimit stats.processed}" + puts " Failed: #{delimit stats.failed}" + puts " Busy: #{delimit stats.workers_size}" + puts " Enqueued: #{delimit stats.enqueued}" + puts " Retries: #{delimit stats.retry_size}" + puts " Scheduled: #{delimit stats.scheduled_size}" + puts " Dead: #{delimit stats.dead_size}" + end + + def processes + puts "---- Processes (#{process_set.size}) ----" + process_set.each_with_index do |process, index| + puts "#{process['identity']} #{tags_for(process)}" + puts " Started: #{Time.at(process['started_at'])} (#{time_ago(process['started_at'])})" + puts " Threads: #{process['concurrency']} (#{process['busy']} busy)" + puts " Queues: #{split_multiline(process['queues'].sort, pad: 11)}" + puts '' unless (index+1) == process_set.size + end + end + + COL_PAD = 2 + def queues + puts "---- Queues (#{queue_data.size}) ----" + columns = { + name: [:ljust, (['name'] + queue_data.map(&:name)).map(&:length).max + COL_PAD], + size: [:rjust, (['size'] + queue_data.map(&:size)).map(&:length).max + COL_PAD], + latency: [:rjust, (['latency'] + queue_data.map(&:latency)).map(&:length).max + COL_PAD] + } + columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) } + puts + queue_data.each do |q| + columns.each do |col, (dir, width)| + print q.send(col).public_send(dir, width) + end + puts + end + end + + private + + def delimit(number) + number.to_s.reverse.scan(/.{1,3}/).join(',').reverse + end + + def split_multiline(values, opts = {}) + return 'none' unless values + pad = opts[:pad] || 0 + max_length = opts[:max_length] || (80 - pad) + out = [] + line = '' + values.each do |value| + if (line.length + value.length) > max_length + out << line + line = ' ' * pad + end + line << value + ', ' + end + out << line[0..-3] + out.join("\n") + end + + def tags_for(process) + tags = [ + process['tag'], + process['labels'], + (process['quiet'] == 'true' ? 'quiet' : nil) + ].flatten.compact + tags.any? ? "[#{tags.join('] [')}]" : nil + end + + def time_ago(timestamp) + seconds = Time.now - Time.at(timestamp) + return 'just now' if seconds < 60 + return 'a minute ago' if seconds < 120 + return "#{seconds.floor / 60} minutes ago" if seconds < 3600 + return 'an hour ago' if seconds < 7200 + "#{seconds.floor / 60 / 60} hours ago" + end + + QUEUE_STRUCT = Struct.new(:name, :size, :latency) + def queue_data + @queue_data ||= Sidekiq::Queue.all.map do |q| + QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf('%#.2f', q.latency)) + end + end + + def process_set + @process_set ||= Sidekiq::ProcessSet.new + end + + def stats + @stats ||= Sidekiq::Stats.new + end + end +end + +if ARGV[0] == 'status' + Sidekiqctl::Status.new.display(ARGV[1]) + exit end if ARGV.length < 2 diff --git a/test/test_sidekiqctl.rb b/test/test_sidekiqctl.rb new file mode 100644 index 00000000..3aa66242 --- /dev/null +++ b/test/test_sidekiqctl.rb @@ -0,0 +1,105 @@ +# frozen_string_literal: true +require_relative 'helper' + +def capture_stdout + $stdout = StringIO.new + yield + $stdout.string.chomp +ensure + $stdout = STDOUT +end + +capture_stdout do + ARGV = %w[status] + load 'bin/sidekiqctl' +end + +def output(section = 'all') + capture_stdout do + Sidekiqctl::Status.new.display(section) + end +end + +class TestSidekiqctl < Sidekiq::Test + describe 'sidekiqctl status' do + describe 'version' do + it 'displays the current Sidekiq version' do + assert_includes output, "Sidekiq #{Sidekiq::VERSION}" + end + + it 'displays the current time' do + Time.stub(:now, Time.at(0)) do + assert_includes output, Time.at(0).to_s + end + end + end + + describe 'overview' do + it 'has a heading' do + assert_includes output, '---- Overview ----' + end + + it 'displays the correct output' do + mock_stats = OpenStruct.new( + processed: 420710, + failed: 12, + workers_size: 34, + enqueued: 56, + retry_size: 78, + scheduled_size: 90, + dead_size: 666 + ) + Sidekiq::Stats.stub(:new, mock_stats) do + assert_includes output, 'Processed: 420,710' + assert_includes output, 'Failed: 12' + assert_includes output, 'Busy: 34' + assert_includes output, 'Enqueued: 56' + assert_includes output, 'Retries: 78' + assert_includes output, 'Scheduled: 90' + assert_includes output, 'Dead: 666' + end + end + end + + describe 'processes' do + it 'has a heading' do + assert_includes output, '---- Processes (0) ----' + end + + it 'displays the correct output' do + mock_processes = [{ + 'identity' => 'foobar', + 'tag' => 'baz', + 'started_at' => Time.now, + 'concurrency' => 5, + 'busy' => 2, + 'queues' => %w[low medium high] + }] + Sidekiq::ProcessSet.stub(:new, mock_processes) do + assert_includes output, 'foobar [baz]' + assert_includes output, "Started: #{mock_processes.first['started_at']} (just now)" + assert_includes output, 'Threads: 5 (2 busy)' + assert_includes output, 'Queues: high, low, medium' + end + end + end + + describe 'queues' do + it 'has a heading' do + assert_includes output, '---- Queues (0) ----' + end + + it 'displays the correct output' do + mock_queues = [ + OpenStruct.new(name: 'foobar', size: 12, latency: 12.3456), + OpenStruct.new(name: 'a_long_queue_name', size: 234, latency: 567.89999) + ] + Sidekiq::Queue.stub(:all, mock_queues) do + assert_includes output, 'NAME SIZE LATENCY' + assert_includes output, 'foobar 12 12.35' + assert_includes output, 'a_long_queue_name 234 567.90' + end + end + end + end +end