From f81f09dee406aedc1711c762c7761350c01b9516 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 13 May 2014 20:33:20 -0700 Subject: [PATCH 1/3] Refactor ProcessSet to yield a Sidekiq::Process --- lib/sidekiq/api.rb | 37 +++++++++++++++++++++++++------------ lib/sidekiq/cli.rb | 4 ++-- lib/sidekiq/logging.rb | 2 +- test/test_api.rb | 4 +++- 4 files changed, 31 insertions(+), 16 deletions(-) diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 5ec2c43e..d5bf22ed 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -449,18 +449,8 @@ module Sidekiq # right now. Each process send a heartbeat to Redis every 5 seconds # so this set should be relatively accurate, barring network partitions. # - # Yields a hash of data which looks something like this: + # Yields a Sidekiq::Process. # - # { - # 'hostname' => 'app-1.example.com', - # 'started_at' => , - # 'pid' => 12345, - # 'tag' => 'myapp' - # 'concurrency' => 25, - # 'queues' => ['default', 'low'], - # 'busy' => 10, - # 'beat' => , - # } class ProcessSet include Enumerable @@ -486,7 +476,7 @@ module Sidekiq # in to Redis and probably died. (to_prune << sorted[i]; next) if info.nil? hash = Sidekiq.load_json(info) - yield hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f) + yield Process.new(hash.merge('busy' => busy.to_i, 'beat' => at_s.to_f)) end end @@ -503,6 +493,29 @@ module Sidekiq end end + # + # Sidekiq::Process has a set of attributes which look like this: + # + # { + # 'hostname' => 'app-1.example.com', + # 'started_at' => , + # 'pid' => 12345, + # 'tag' => 'myapp' + # 'concurrency' => 25, + # 'queues' => ['default', 'low'], + # 'busy' => 10, + # 'beat' => , + # } + class Process + def initialize(hash) + @attribs = hash + end + + def [](key) + @attribs[key] + end + end + ## # Programmatic access to the current active worker set. # diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 241bba8d..4a26d4c7 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -158,7 +158,7 @@ module Sidekiq files_to_reopen << file unless file.closed? end - Process.daemon(true, true) + ::Process.daemon(true, true) files_to_reopen.each do |file| begin @@ -323,7 +323,7 @@ module Sidekiq if path = options[:pidfile] pidfile = File.expand_path(path) File.open(pidfile, 'w') do |f| - f.puts Process.pid + f.puts ::Process.pid end end end diff --git a/lib/sidekiq/logging.rb b/lib/sidekiq/logging.rb index 5d40657a..5597aea6 100644 --- a/lib/sidekiq/logging.rb +++ b/lib/sidekiq/logging.rb @@ -7,7 +7,7 @@ module Sidekiq class Pretty < Logger::Formatter # Provide a call() method that returns the formatted message. def call(severity, time, program_name, message) - "#{time.utc.iso8601} #{Process.pid} TID-#{Thread.current.object_id.to_s(36)}#{context} #{severity}: #{message}\n" + "#{time.utc.iso8601} #{::Process.pid} TID-#{Thread.current.object_id.to_s(36)}#{context} #{severity}: #{message}\n" end def context diff --git a/test/test_api.rb b/test/test_api.rb index 6950d6e2..fe628e20 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -356,7 +356,9 @@ class TestApi < Sidekiq::Test ps = Sidekiq::ProcessSet.new.to_a assert_equal 1, ps.size data = ps.first - assert_equal odata.merge('busy' => 10, 'beat' => time), data + assert_equal 10, data['busy'] + assert_equal time, data['beat'] + assert_equal 123, data['pid'] end it 'can enumerate workers' do From b0aa136be6ce68d949bee70db188d7224b12aeca Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 13 May 2014 21:41:40 -0700 Subject: [PATCH 2/3] Implement remote signals via heartbeat --- Changes.md | 17 +++++++++++++++++ lib/sidekiq/api.rb | 16 ++++++++++++++++ lib/sidekiq/cli.rb | 10 ---------- lib/sidekiq/manager.rb | 5 ++++- lib/sidekiq/util.rb | 11 +++++++++++ lib/sidekiq/version.rb | 2 +- 6 files changed, 49 insertions(+), 12 deletions(-) diff --git a/Changes.md b/Changes.md index 29be66fc..c443c9de 100644 --- a/Changes.md +++ b/Changes.md @@ -1,3 +1,20 @@ +3.1.0 +----------- + +- New remote signals feature: you can remotely trigger Sidekiq to quiet + or terminate remotely via API, without signals. This is most useful + on Heroku which does not support the USR1 'quiet' signal. Now you can + run a rake task like this at the start of your deploy to quiet your + set of Sidekiq processes. +```ruby +namespace :sidekiq do + task :quiet => :environment do + Sidekiq::ProcessSet.new.each(&:quiet!) + end +end +``` + + 3.0.2 ----------- diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index d5bf22ed..6cad59ba 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -514,6 +514,22 @@ module Sidekiq def [](key) @attribs[key] end + + def quiet! + key = "#{identity}-signals" + Sidekiq.redis do |c| + c.multi do + c.lpush(key, 'USR1') + c.expire(key, 60) + end + end + end + + private + + def identity + @id ||= "#{self['hostname']}:#{self['pid']}" + end end ## diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 4a26d4c7..77c7bd89 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -95,16 +95,6 @@ module Sidekiq private - def fire_event(event) - Sidekiq.options[:lifecycle_events][event].each do |block| - begin - block.call - rescue => ex - handle_exception(ex, { :event => event }) - end - end - end - def handle_signal(sig) Sidekiq.logger.debug "Got #{sig} signal" case sig diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 19d6afb0..5723c190 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -149,12 +149,15 @@ module Sidekiq def ❤(key) begin - Sidekiq.redis do |conn| + _, _, msg = Sidekiq.redis do |conn| conn.multi do conn.hmset(key, 'busy', @busy.size, 'beat', Time.now.to_f) conn.expire(key, 60) + conn.rpop("#{key}-signals") end end + + ::Process.kill(msg, $$) if msg rescue => e # ignore all redis/network issues logger.error("heartbeat: #{e.message}") diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 4ee64608..e1fe1a9a 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -33,5 +33,16 @@ module Sidekiq def identity @@identity ||= "#{hostname}:#{$$}" end + + def fire_event(event) + Sidekiq.options[:lifecycle_events][event].each do |block| + begin + block.call + rescue => ex + handle_exception(ex, { :event => event }) + end + end + end + end end diff --git a/lib/sidekiq/version.rb b/lib/sidekiq/version.rb index 6db8f278..4ecd3423 100644 --- a/lib/sidekiq/version.rb +++ b/lib/sidekiq/version.rb @@ -1,3 +1,3 @@ module Sidekiq - VERSION = "3.0.2" + VERSION = "3.1.0" end From 88b2c6ffa673ee2c65755fa8a060f41c6821990b Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Thu, 15 May 2014 21:12:44 -0700 Subject: [PATCH 3/3] Fully functioning remote control via Web UI, yay! --- Changes.md | 3 ++- lib/sidekiq/api.rb | 14 +++++++++++--- lib/sidekiq/web.rb | 14 ++++++++++++++ test/test_api.rb | 4 ++++ test/test_web.rb | 47 +++++++++++++++++++++++++++++++--------------- web/views/busy.erb | 24 +++++++++++++++++++++-- 6 files changed, 85 insertions(+), 21 deletions(-) diff --git a/Changes.md b/Changes.md index c443c9de..3458bb1f 100644 --- a/Changes.md +++ b/Changes.md @@ -1,7 +1,7 @@ 3.1.0 ----------- -- New remote signals feature: you can remotely trigger Sidekiq to quiet +- New remote control feature: you can remotely trigger Sidekiq to quiet or terminate remotely via API, without signals. This is most useful on Heroku which does not support the USR1 'quiet' signal. Now you can run a rake task like this at the start of your deploy to quiet your @@ -13,6 +13,7 @@ namespace :sidekiq do end end ``` +- The Web UI can now quiet or stop all Sidekiq processes on the Busy tab. 3.0.2 diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 6cad59ba..b61758ee 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -516,17 +516,25 @@ module Sidekiq end def quiet! + signal('USR1') + end + + def stop! + signal('TERM') + end + + private + + def signal(sig) key = "#{identity}-signals" Sidekiq.redis do |c| c.multi do - c.lpush(key, 'USR1') + c.lpush(key, sig) c.expire(key, 60) end end end - private - def identity @id ||= "#{self['hostname']}:#{self['pid']}" end diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index 28d0a299..a13a0ff5 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -42,6 +42,20 @@ module Sidekiq erb :busy end + post "/busy" do + if params['hostname'] + pro = Sidekiq::Process.new('hostname' => params["hostname"], 'pid' => params['pid']) + pro.quiet! if params[:quiet] + pro.stop! if params[:stop] + else + Sidekiq::ProcessSet.new.each do |pro| + pro.quiet! if params[:quiet] + pro.stop! if params[:stop] + end + end + redirect "#{root_path}busy" + end + get "/queues" do @queues = Sidekiq::Queue.all erb :queues diff --git a/test/test_api.rb b/test/test_api.rb index fe628e20..d799d666 100644 --- a/test/test_api.rb +++ b/test/test_api.rb @@ -359,6 +359,10 @@ class TestApi < Sidekiq::Test assert_equal 10, data['busy'] assert_equal time, data['beat'] assert_equal 123, data['pid'] + data.quiet! + data.stop! + assert_equal "TERM", Sidekiq.redis{|c| c.lpop("#{hostname}:123-signals") } + assert_equal "USR1", Sidekiq.redis{|c| c.lpop("#{hostname}:123-signals") } end it 'can enumerate workers' do diff --git a/test/test_web.rb b/test/test_web.rb index 41ca2486..aaa76ddf 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -29,22 +29,39 @@ class TestWeb < Sidekiq::Test end end - it 'can display workers' do - Sidekiq.redis do |conn| - conn.incr('busy') - conn.sadd('processes', 'foo:1234') - conn.hmset('foo:1234', 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f), 'at', Time.now.to_f, 'busy', 4) - identity = 'foo:1234:workers' - hash = {:queue => 'critical', :payload => { 'class' => WebWorker.name, 'args' => [1,'abc'] }, :run_at => Time.now.to_i } - conn.hmset(identity, 1001, Sidekiq.dump_json(hash)) - end - assert_equal ['1001'], Sidekiq::Workers.new.map { |pid, tid, data| tid } + describe 'busy' do - get '/busy' - assert_equal 200, last_response.status - assert_match(/status-active/, last_response.body) - assert_match(/critical/, last_response.body) - assert_match(/WebWorker/, last_response.body) + it 'can display workers' do + Sidekiq.redis do |conn| + conn.incr('busy') + conn.sadd('processes', 'foo:1234') + conn.hmset('foo:1234', 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f), 'at', Time.now.to_f, 'busy', 4) + identity = 'foo:1234:workers' + hash = {:queue => 'critical', :payload => { 'class' => WebWorker.name, 'args' => [1,'abc'] }, :run_at => Time.now.to_i } + conn.hmset(identity, 1001, Sidekiq.dump_json(hash)) + end + assert_equal ['1001'], Sidekiq::Workers.new.map { |pid, tid, data| tid } + + get '/busy' + assert_equal 200, last_response.status + assert_match(/status-active/, last_response.body) + assert_match(/critical/, last_response.body) + assert_match(/WebWorker/, last_response.body) + end + + it 'can quiet a process' do + assert_nil Sidekiq.redis { |c| c.lpop "host:pid-signals" } + post '/busy', 'quiet' => '1', 'hostname' => 'host', 'pid' => 'pid' + assert_equal 302, last_response.status + assert_equal 'USR1', Sidekiq.redis { |c| c.lpop "host:pid-signals" } + end + + it 'can stop a process' do + assert_nil Sidekiq.redis { |c| c.lpop "host:pid-signals" } + post '/busy', 'stop' => '1', 'hostname' => 'host', 'pid' => 'pid' + assert_equal 302, last_response.status + assert_equal 'TERM', Sidekiq.redis { |c| c.lpop "host:pid-signals" } + end end it 'can display queues' do diff --git a/web/views/busy.erb b/web/views/busy.erb index d3bf3348..87b0c32e 100644 --- a/web/views/busy.erb +++ b/web/views/busy.erb @@ -1,7 +1,16 @@
-
+

<%= t('Processes') %>

+
+
+
+ + +
+
+
+
@@ -10,13 +19,24 @@ + <% Sidekiq::ProcessSet.new.each_with_index do |process, index| %> - + + <% end %>
<%= t('Started') %> <%= t('Threads') %> <%= t('Busy') %> 
<%= process['hostname'] %>:<%= process['pid'] %><%= "#{process['hostname']}:#{process['pid']}" %> <%= relative_time(Time.at(process['started_at'])) %> <%= process['concurrency'] %> <%= process['busy'] %> +
+
+ + + + +
+
+