mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge pull request #1703 from mperham/quiet_api
Add API to remotely control Sidekiq processes
This commit is contained in:
commit
2e26b21c72
11 changed files with 161 additions and 45 deletions
18
Changes.md
18
Changes.md
|
@ -1,3 +1,21 @@
|
||||||
|
3.1.0
|
||||||
|
-----------
|
||||||
|
|
||||||
|
- New remote control feature: you can remotely trigger Sidekiq to quiet
|
||||||
|
or terminate remotely via API, without signals. This is most useful
|
||||||
|
on Heroku which does not support the USR1 'quiet' signal. Now you can
|
||||||
|
run a rake task like this at the start of your deploy to quiet your
|
||||||
|
set of Sidekiq processes.
|
||||||
|
```ruby
|
||||||
|
namespace :sidekiq do
|
||||||
|
task :quiet => :environment do
|
||||||
|
Sidekiq::ProcessSet.new.each(&:quiet!)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
```
|
||||||
|
- The Web UI can now quiet or stop all Sidekiq processes on the Busy tab.
|
||||||
|
|
||||||
|
|
||||||
3.0.2
|
3.0.2
|
||||||
-----------
|
-----------
|
||||||
|
|
||||||
|
|
|
@ -449,18 +449,8 @@ module Sidekiq
|
||||||
# right now. Each process send a heartbeat to Redis every 5 seconds
|
# right now. Each process send a heartbeat to Redis every 5 seconds
|
||||||
# so this set should be relatively accurate, barring network partitions.
|
# so this set should be relatively accurate, barring network partitions.
|
||||||
#
|
#
|
||||||
# Yields a hash of data which looks something like this:
|
# Yields a Sidekiq::Process.
|
||||||
#
|
#
|
||||||
# {
|
|
||||||
# 'hostname' => 'app-1.example.com',
|
|
||||||
# 'started_at' => <process start time>,
|
|
||||||
# 'pid' => 12345,
|
|
||||||
# 'tag' => 'myapp'
|
|
||||||
# 'concurrency' => 25,
|
|
||||||
# 'queues' => ['default', 'low'],
|
|
||||||
# 'busy' => 10,
|
|
||||||
# 'beat' => <last heartbeat>,
|
|
||||||
# }
|
|
||||||
|
|
||||||
class ProcessSet
|
class ProcessSet
|
||||||
include Enumerable
|
include Enumerable
|
||||||
|
@ -486,7 +476,7 @@ module Sidekiq
|
||||||
# in to Redis and probably died.
|
# in to Redis and probably died.
|
||||||
(to_prune << sorted[i]; next) if info.nil?
|
(to_prune << sorted[i]; next) if info.nil?
|
||||||
hash = Sidekiq.load_json(info)
|
hash = Sidekiq.load_json(info)
|
||||||
yield hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)
|
yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f))
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -503,6 +493,53 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
#
|
||||||
|
# Sidekiq::Process has a set of attributes which look like this:
|
||||||
|
#
|
||||||
|
# {
|
||||||
|
# 'hostname' => 'app-1.example.com',
|
||||||
|
# 'started_at' => <process start time>,
|
||||||
|
# 'pid' => 12345,
|
||||||
|
# 'tag' => 'myapp'
|
||||||
|
# 'concurrency' => 25,
|
||||||
|
# 'queues' => ['default', 'low'],
|
||||||
|
# 'busy' => 10,
|
||||||
|
# 'beat' => <last heartbeat>,
|
||||||
|
# }
|
||||||
|
class Process
|
||||||
|
def initialize(hash)
|
||||||
|
@attribs = hash
|
||||||
|
end
|
||||||
|
|
||||||
|
def [](key)
|
||||||
|
@attribs[key]
|
||||||
|
end
|
||||||
|
|
||||||
|
def quiet!
|
||||||
|
signal('USR1')
|
||||||
|
end
|
||||||
|
|
||||||
|
def stop!
|
||||||
|
signal('TERM')
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def signal(sig)
|
||||||
|
key = "#{identity}-signals"
|
||||||
|
Sidekiq.redis do |c|
|
||||||
|
c.multi do
|
||||||
|
c.lpush(key, sig)
|
||||||
|
c.expire(key, 60)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def identity
|
||||||
|
@id ||= "#{self['hostname']}:#{self['pid']}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
# Programmatic access to the current active worker set.
|
# Programmatic access to the current active worker set.
|
||||||
#
|
#
|
||||||
|
|
|
@ -95,16 +95,6 @@ module Sidekiq
|
||||||
|
|
||||||
private
|
private
|
||||||
|
|
||||||
def fire_event(event)
|
|
||||||
Sidekiq.options[:lifecycle_events][event].each do |block|
|
|
||||||
begin
|
|
||||||
block.call
|
|
||||||
rescue => ex
|
|
||||||
handle_exception(ex, { :event => event })
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_signal(sig)
|
def handle_signal(sig)
|
||||||
Sidekiq.logger.debug "Got #{sig} signal"
|
Sidekiq.logger.debug "Got #{sig} signal"
|
||||||
case sig
|
case sig
|
||||||
|
@ -158,7 +148,7 @@ module Sidekiq
|
||||||
files_to_reopen << file unless file.closed?
|
files_to_reopen << file unless file.closed?
|
||||||
end
|
end
|
||||||
|
|
||||||
Process.daemon(true, true)
|
::Process.daemon(true, true)
|
||||||
|
|
||||||
files_to_reopen.each do |file|
|
files_to_reopen.each do |file|
|
||||||
begin
|
begin
|
||||||
|
@ -323,7 +313,7 @@ module Sidekiq
|
||||||
if path = options[:pidfile]
|
if path = options[:pidfile]
|
||||||
pidfile = File.expand_path(path)
|
pidfile = File.expand_path(path)
|
||||||
File.open(pidfile, 'w') do |f|
|
File.open(pidfile, 'w') do |f|
|
||||||
f.puts Process.pid
|
f.puts ::Process.pid
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -7,7 +7,7 @@ module Sidekiq
|
||||||
class Pretty < Logger::Formatter
|
class Pretty < Logger::Formatter
|
||||||
# Provide a call() method that returns the formatted message.
|
# Provide a call() method that returns the formatted message.
|
||||||
def call(severity, time, program_name, message)
|
def call(severity, time, program_name, message)
|
||||||
"#{time.utc.iso8601} #{Process.pid} TID-#{Thread.current.object_id.to_s(36)}#{context} #{severity}: #{message}\n"
|
"#{time.utc.iso8601} #{::Process.pid} TID-#{Thread.current.object_id.to_s(36)}#{context} #{severity}: #{message}\n"
|
||||||
end
|
end
|
||||||
|
|
||||||
def context
|
def context
|
||||||
|
|
|
@ -149,12 +149,15 @@ module Sidekiq
|
||||||
|
|
||||||
def ❤(key)
|
def ❤(key)
|
||||||
begin
|
begin
|
||||||
Sidekiq.redis do |conn|
|
_, _, msg = Sidekiq.redis do |conn|
|
||||||
conn.multi do
|
conn.multi do
|
||||||
conn.hmset(key, 'busy', @busy.size, 'beat', Time.now.to_f)
|
conn.hmset(key, 'busy', @busy.size, 'beat', Time.now.to_f)
|
||||||
conn.expire(key, 60)
|
conn.expire(key, 60)
|
||||||
|
conn.rpop("#{key}-signals")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
::Process.kill(msg, $$) if msg
|
||||||
rescue => e
|
rescue => e
|
||||||
# ignore all redis/network issues
|
# ignore all redis/network issues
|
||||||
logger.error("heartbeat: #{e.message}")
|
logger.error("heartbeat: #{e.message}")
|
||||||
|
|
|
@ -33,5 +33,16 @@ module Sidekiq
|
||||||
def identity
|
def identity
|
||||||
@@identity ||= "#{hostname}:#{$$}"
|
@@identity ||= "#{hostname}:#{$$}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def fire_event(event)
|
||||||
|
Sidekiq.options[:lifecycle_events][event].each do |block|
|
||||||
|
begin
|
||||||
|
block.call
|
||||||
|
rescue => ex
|
||||||
|
handle_exception(ex, { :event => event })
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
VERSION = "3.0.2"
|
VERSION = "3.1.0"
|
||||||
end
|
end
|
||||||
|
|
|
@ -42,6 +42,20 @@ module Sidekiq
|
||||||
erb :busy
|
erb :busy
|
||||||
end
|
end
|
||||||
|
|
||||||
|
post "/busy" do
|
||||||
|
if params['hostname']
|
||||||
|
pro = Sidekiq::Process.new('hostname' => params["hostname"], 'pid' => params['pid'])
|
||||||
|
pro.quiet! if params[:quiet]
|
||||||
|
pro.stop! if params[:stop]
|
||||||
|
else
|
||||||
|
Sidekiq::ProcessSet.new.each do |pro|
|
||||||
|
pro.quiet! if params[:quiet]
|
||||||
|
pro.stop! if params[:stop]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
redirect "#{root_path}busy"
|
||||||
|
end
|
||||||
|
|
||||||
get "/queues" do
|
get "/queues" do
|
||||||
@queues = Sidekiq::Queue.all
|
@queues = Sidekiq::Queue.all
|
||||||
erb :queues
|
erb :queues
|
||||||
|
|
|
@ -356,7 +356,13 @@ class TestApi < Sidekiq::Test
|
||||||
ps = Sidekiq::ProcessSet.new.to_a
|
ps = Sidekiq::ProcessSet.new.to_a
|
||||||
assert_equal 1, ps.size
|
assert_equal 1, ps.size
|
||||||
data = ps.first
|
data = ps.first
|
||||||
assert_equal odata.merge('busy' => 10, 'beat' => time), data
|
assert_equal 10, data['busy']
|
||||||
|
assert_equal time, data['beat']
|
||||||
|
assert_equal 123, data['pid']
|
||||||
|
data.quiet!
|
||||||
|
data.stop!
|
||||||
|
assert_equal "TERM", Sidekiq.redis{|c| c.lpop("#{hostname}:123-signals") }
|
||||||
|
assert_equal "USR1", Sidekiq.redis{|c| c.lpop("#{hostname}:123-signals") }
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'can enumerate workers' do
|
it 'can enumerate workers' do
|
||||||
|
|
|
@ -29,22 +29,39 @@ class TestWeb < Sidekiq::Test
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'can display workers' do
|
describe 'busy' do
|
||||||
Sidekiq.redis do |conn|
|
|
||||||
conn.incr('busy')
|
|
||||||
conn.sadd('processes', 'foo:1234')
|
|
||||||
conn.hmset('foo:1234', 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f), 'at', Time.now.to_f, 'busy', 4)
|
|
||||||
identity = 'foo:1234:workers'
|
|
||||||
hash = {:queue => 'critical', :payload => { 'class' => WebWorker.name, 'args' => [1,'abc'] }, :run_at => Time.now.to_i }
|
|
||||||
conn.hmset(identity, 1001, Sidekiq.dump_json(hash))
|
|
||||||
end
|
|
||||||
assert_equal ['1001'], Sidekiq::Workers.new.map { |pid, tid, data| tid }
|
|
||||||
|
|
||||||
get '/busy'
|
it 'can display workers' do
|
||||||
assert_equal 200, last_response.status
|
Sidekiq.redis do |conn|
|
||||||
assert_match(/status-active/, last_response.body)
|
conn.incr('busy')
|
||||||
assert_match(/critical/, last_response.body)
|
conn.sadd('processes', 'foo:1234')
|
||||||
assert_match(/WebWorker/, last_response.body)
|
conn.hmset('foo:1234', 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f), 'at', Time.now.to_f, 'busy', 4)
|
||||||
|
identity = 'foo:1234:workers'
|
||||||
|
hash = {:queue => 'critical', :payload => { 'class' => WebWorker.name, 'args' => [1,'abc'] }, :run_at => Time.now.to_i }
|
||||||
|
conn.hmset(identity, 1001, Sidekiq.dump_json(hash))
|
||||||
|
end
|
||||||
|
assert_equal ['1001'], Sidekiq::Workers.new.map { |pid, tid, data| tid }
|
||||||
|
|
||||||
|
get '/busy'
|
||||||
|
assert_equal 200, last_response.status
|
||||||
|
assert_match(/status-active/, last_response.body)
|
||||||
|
assert_match(/critical/, last_response.body)
|
||||||
|
assert_match(/WebWorker/, last_response.body)
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'can quiet a process' do
|
||||||
|
assert_nil Sidekiq.redis { |c| c.lpop "host:pid-signals" }
|
||||||
|
post '/busy', 'quiet' => '1', 'hostname' => 'host', 'pid' => 'pid'
|
||||||
|
assert_equal 302, last_response.status
|
||||||
|
assert_equal 'USR1', Sidekiq.redis { |c| c.lpop "host:pid-signals" }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'can stop a process' do
|
||||||
|
assert_nil Sidekiq.redis { |c| c.lpop "host:pid-signals" }
|
||||||
|
post '/busy', 'stop' => '1', 'hostname' => 'host', 'pid' => 'pid'
|
||||||
|
assert_equal 302, last_response.status
|
||||||
|
assert_equal 'TERM', Sidekiq.redis { |c| c.lpop "host:pid-signals" }
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'can display queues' do
|
it 'can display queues' do
|
||||||
|
|
|
@ -1,7 +1,16 @@
|
||||||
<div class="row header">
|
<div class="row header">
|
||||||
<div class="col-sm-7">
|
<div class="col-sm-8 pull-left">
|
||||||
<h3><%= t('Processes') %></h3>
|
<h3><%= t('Processes') %></h3>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="col-sm-4 pull-right">
|
||||||
|
<form method="POST" style="margin-top: 20px; margin-bottom: 10px;">
|
||||||
|
<div class="btn-group pull-right">
|
||||||
|
<button class="btn btn-warn" type="submit" name="quiet" value="1"><%= t('Quiet All') %></button>
|
||||||
|
<button class="btn btn-danger" type="submit" name="stop" value="1"><%= t('Stop All') %></button>
|
||||||
|
</div>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<table class="processes table table-hover table-bordered table-striped table-white">
|
<table class="processes table table-hover table-bordered table-striped table-white">
|
||||||
|
@ -10,13 +19,24 @@
|
||||||
<th><%= t('Started') %></th>
|
<th><%= t('Started') %></th>
|
||||||
<th><%= t('Threads') %></th>
|
<th><%= t('Threads') %></th>
|
||||||
<th><%= t('Busy') %></th>
|
<th><%= t('Busy') %></th>
|
||||||
|
<th> </th>
|
||||||
</thead>
|
</thead>
|
||||||
<% Sidekiq::ProcessSet.new.each_with_index do |process, index| %>
|
<% Sidekiq::ProcessSet.new.each_with_index do |process, index| %>
|
||||||
<tr>
|
<tr>
|
||||||
<td><%= process['hostname'] %>:<%= process['pid'] %></td>
|
<td><%= "#{process['hostname']}:#{process['pid']}" %></td>
|
||||||
<td><%= relative_time(Time.at(process['started_at'])) %></td>
|
<td><%= relative_time(Time.at(process['started_at'])) %></td>
|
||||||
<td><%= process['concurrency'] %></td>
|
<td><%= process['concurrency'] %></td>
|
||||||
<td><%= process['busy'] %></td>
|
<td><%= process['busy'] %></td>
|
||||||
|
<td>
|
||||||
|
<div class="btn-group pull-right">
|
||||||
|
<form method="POST">
|
||||||
|
<input type="hidden" name="hostname" value="<%= process['hostname'] %>"/>
|
||||||
|
<input type="hidden" name="pid" value="<%= process['pid'] %>"/>
|
||||||
|
<button class="btn btn-warn" type="submit" name="quiet" value="1"><%= t('Quiet') %></button>
|
||||||
|
<button class="btn btn-danger" type="submit" name="stop" value="1"><%= t('Stop') %></button>
|
||||||
|
</form>
|
||||||
|
</div>
|
||||||
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
<% end %>
|
<% end %>
|
||||||
</table>
|
</table>
|
||||||
|
|
Loading…
Add table
Reference in a new issue