1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Rework process/worker data model.

We no longer have a global 'workers' set but rather a global 'processes' set.  Each process has its own workers hash, keyed by thread id.

Rely as much as possible on Redis key expiration to naturally prune any lingering data.  Process data only has a one minute TTL, with the heartbeat refreshing the TTL, so it will expire quickly after a process dies.
This commit is contained in:
Mike Perham 2014-03-07 22:41:10 -08:00
parent 6f5d1d691b
commit ba8c8a57b9
10 changed files with 120 additions and 127 deletions

View file

@ -426,25 +426,37 @@ module Sidekiq
# Enumerates the set of Sidekiq processes which are actively working
# right now. Each process send a heartbeat to Redis every 5 seconds
# so this set should be relatively accurate, barring network partitions.
#
# Yields a hash of data which looks something like this:
#
# {
# 'hostname' => 'app-1.example.com',
# 'started_at' => <process start time>,
# 'pid' => 12345,
# 'tag' => 'myapp'
# 'concurrency' => 25,
# 'queues' => ['default', 'low'],
# 'busy' => 10,
# 'at' => <last heartbeat>,
# }
class ProcessSet
include Enumerable
def each(&block)
procs = Sidekiq.redis { |conn| conn.hgetall('processes') }
cutoff = Time.now.to_f - 5
procs = Sidekiq.redis { |conn| conn.smembers('processes') }
to_prune = []
procs.map {|name, data| Sidekiq.load_json(data) }.
sort_by {|x| x['key'] }.
each do |pdata|
if pdata['at'] < cutoff
to_prune << pdata['key']; next
else
yield pdata
Sidekiq.redis do |conn|
procs.sort.each do |key|
info, busy, at_s = conn.hmget(key, 'info', 'busy', 'at')
(to_prune << key; next) if info.nil?
hash = Sidekiq.load_json(info)
yield hash.merge('busy' => busy.to_i, 'at' => at_s.to_f)
end
end
Sidekiq.redis {|conn| conn.hdel('processes', *to_prune) } unless to_prune.empty?
Sidekiq.redis {|conn| conn.srem('processes', *to_prune) } unless to_prune.empty?
nil
end
end
@ -470,25 +482,23 @@ module Sidekiq
include Enumerable
def each(&block)
live = ProcessSet.new.map {|x| /\A#{x['hostname']}:#{x['process_id']}-/ }
msgs = Sidekiq.redis do |conn|
workers = conn.smembers("workers")
to_rem = workers.reject {|w| live.any? {|identity| w =~ identity } }
conn.srem('workers', to_rem) unless to_rem.empty?
workers.empty? ? {} : conn.mapped_mget(*workers.map {|w| "worker:#{w}" })
end
msgs.each do |w, msg|
next unless msg
block.call(w, Sidekiq.load_json(msg))
Sidekiq.redis do |conn|
procs = conn.smembers('processes')
procs.sort.each do |key|
valid, workers = conn.multi do
conn.exists(key)
conn.hgetall("#{key}:workers")
end
next unless valid
workers.each_pair do |tid, json|
yield tid, Sidekiq.load_json(json)
end
end
end
end
def size
Sidekiq.redis do |conn|
conn.scard("workers")
end.to_i
Sidekiq.redis { |conn| conn.get('busy') }.to_i
end
end

View file

@ -51,20 +51,35 @@ module Sidekiq
# Requeue everything in case there was a worker who grabbed work while stopped
Sidekiq::Fetcher.strategy.bulk_requeue([], @options)
stop_heartbeat
end
end
def start_heartbeat(tag)
manager.heartbeat({
'key' => "#{hostname}:#{$$}",
key = identity
data = {
'hostname' => hostname,
'started_at' => Time.now.to_f,
'pid' => $$,
'process_id' => process_id,
'tag' => tag.strip,
'concurrency' => @options[:concurrency],
'queues' => @options[:queues].uniq,
})
}
Sidekiq.redis do |conn|
conn.multi do
conn.sadd('processes', key)
conn.hset(key, 'info', Sidekiq.dump_json(data))
conn.expire(key, 60)
end
end
manager.heartbeat(key, data)
end
def stop_heartbeat
Sidekiq.redis do |conn|
conn.srem('processes', identity)
end
end
end
end

View file

@ -50,7 +50,6 @@ module Sidekiq
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
clear_worker_set
return if clean_up_for_graceful_shutdown
hard_shutdown_in timeout if should_shutdown
@ -133,43 +132,29 @@ module Sidekiq
@threads[proxy_id] = thr
end
def heartbeat(data)
def heartbeat(key, data)
return if stopped?
$0 = "sidekiq #{Sidekiq::VERSION} #{data['tag']}[#{@busy.size} of #{data['concurrency']} busy]#{stopped? ? ' stopping' : ''}"
(data)
(key, data)
after(5) do
heartbeat(data)
heartbeat(key, data)
end
end
private
def (data)
def (key, data)
watchdog('heartbeat') do
now = Time.now.to_f
proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now))
Sidekiq.redis do |conn|
conn.hset("processes", data['key'], proc_data)
conn.multi do
conn.hmset(key, 'busy', @busy.size, 'at', Time.now.to_f)
conn.expire(key, 60)
end
end
end
end
def clear_worker_set
# Clearing workers in Redis
# NOTE: we do this before terminating worker threads because the
# process will likely receive a hard shutdown soon anyway, which
# means the threads will killed.
logger.debug { "Clearing workers in redis" }
Sidekiq.redis do |conn|
workers = conn.smembers('workers')
workers_to_remove = workers.select do |worker_name|
worker_name =~ /:#{process_id}-/
end
conn.srem('workers', workers_to_remove) if !workers_to_remove.empty?
end
rescue => ex
Sidekiq.logger.warn("Unable to clear worker set while shutting down: #{ex.message}")
end
def hard_shutdown_in(delay)
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }

View file

@ -78,10 +78,11 @@ module Sidekiq
# stats so calling code can react appropriately
retry_and_suppress_exceptions do
hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
redis do |conn|
Sidekiq.redis do |conn|
conn.multi do
conn.sadd('workers', thread_identity)
conn.setex("worker:#{thread_identity}", EXPIRY, hash)
conn.incr('busy')
conn.hmset("#{identity}:workers", Thread.current.object_id, hash)
conn.expire("#{identity}:workers", 60*60)
end
end
end
@ -90,7 +91,7 @@ module Sidekiq
yield
rescue Exception
retry_and_suppress_exceptions do
redis do |conn|
Sidekiq.redis do |conn|
failed = "stat:failed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.incrby("stat:failed", 1)
@ -102,11 +103,11 @@ module Sidekiq
raise
ensure
retry_and_suppress_exceptions do
redis do |conn|
Sidekiq.redis do |conn|
processed = "stat:processed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.srem("workers", thread_identity)
conn.del("worker:#{thread_identity}")
conn.decr('busy')
conn.hdel("#{identity}:workers", Thread.current.object_id)
conn.incrby("stat:processed", 1)
conn.incrby(processed, 1)
end

View file

@ -26,16 +26,12 @@ module Sidekiq
Sidekiq.redis(&block)
end
def process_id
@@process_id ||= SecureRandom.hex
end
def hostname
Socket.gethostname
end
def identity
@@identity ||= "#{hostname}:#{process_id}"
@@identity ||= "#{hostname}:#{$$}"
end
end
end

View file

@ -34,17 +34,8 @@ module Sidekiq
string % options
end
def reset_worker_list
Sidekiq.redis do |conn|
workers = conn.smembers('workers')
conn.srem('workers', workers) if !workers.empty?
end
end
def workers_size
@workers_size ||= Sidekiq.redis do |conn|
conn.scard('workers')
end
@workers_size ||= Sidekiq.redis { |conn| conn.get('busy') }.to_i
end
def workers

View file

@ -343,17 +343,20 @@ class TestApi < Sidekiq::Test
end
it 'can enumerate processes' do
odata = { 'pid' => 123, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:123", 'at' => Time.now.to_f - 5 }
pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f }
odata = { 'pid' => 123, 'hostname' => hostname, 'key' => "#{hostname}:123", 'started_at' => Time.now.to_f - 15 }
time = Time.now.to_f
Sidekiq.redis do |conn|
conn.hset('processes', odata['key'], Sidekiq.dump_json(odata))
conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata))
conn.multi do
conn.sadd('processes', odata['key'])
conn.hmset(odata['key'], 'info', Sidekiq.dump_json(odata), 'busy', 10, 'at', time)
conn.sadd('processes', 'fake:pid')
end
end
ps = Sidekiq::ProcessSet.new.to_a
assert_equal 1, ps.size
data = ps.first
assert_equal pdata, data
assert_equal odata.merge('busy' => 10, 'at' => time), data
end
it 'can enumerate workers' do
@ -363,41 +366,42 @@ class TestApi < Sidekiq::Test
assert false
end
pdata = { 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f, 'started_at' => Time.now.to_i }
pdata = { 'pid' => $$, 'hostname' => hostname, 'key' => "#{hostname}:#{$$}", 'at' => Time.now.to_f, 'started_at' => Time.now.to_i }
Sidekiq.redis do |conn|
conn.hset('processes', pdata['key'], Sidekiq.dump_json(pdata))
conn.sadd('processes', pdata['key'])
conn.hmset(pdata['key'], 'info', Sidekiq.dump_json(pdata))
end
s = "#{hostname}:#{process_id}-12345"
s = "#{hostname}:#{$$}:workers"
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i })
Sidekiq.redis do |c|
c.multi do
c.sadd('workers', s)
c.set("worker:#{s}", data)
end
c.hmset(s, '1234', data)
end
Sidekiq.redis do |c|
assert_equal 0, w.size
c.incr("busy")
assert_equal 1, w.size
c.decr("busy")
assert_equal 0, w.size
end
assert_equal 1, w.size
w.each do |x, y|
assert_equal "worker:#{s}", x
assert_equal "1234", x
assert_equal 'default', y['queue']
assert_equal Time.now.year, Time.at(y['run_at']).year
end
assert_equal 1, w.size
s = "#{hostname}:#{process_id}-12346"
s = "#{hostname}:#{$$}:workers"
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
Sidekiq.redis do |c|
c.multi do
c.sadd('workers', s)
c.set("worker:#{s}", data)
c.sadd('workers', "#{hostname}:#{process_id}2-12347")
c.hmset(s, '5678', data)
c.hmset("b#{s}", '5678', data)
end
end
assert_equal 3, w.size
w.each { }
assert_equal 2, w.size
assert_equal ['1234', '5678'], w.map { |tid, data| tid }
end
it 'can reschedule jobs' do

View file

@ -1,18 +0,0 @@
require 'helper'
require 'sidekiq/util'
class TestUtil < Sidekiq::Test
describe 'util' do
it 'generates the same process id when included in two or more classes' do
class One
include Sidekiq::Util
end
class Two
include Sidekiq::Util
end
assert_equal One.new.process_id, Two.new.process_id
end
end
end

View file

@ -4,6 +4,7 @@ require 'sidekiq/web'
require 'rack/test'
class TestWeb < Sidekiq::Test
describe 'sidekiq web' do
include Rack::Test::Methods
@ -30,12 +31,14 @@ class TestWeb < Sidekiq::Test
it 'can display workers' do
Sidekiq.redis do |conn|
conn.hset('processes', 'foo:1234', Sidekiq.dump_json('key' => 'foo:1234', 'hostname' => 'foo', 'process_id' => '1234', 'started_at' => Time.now.to_f, 'at' => Time.now.to_f))
identity = 'foo:1234-123abc:default'
conn.sadd('workers', identity)
conn.incr('busy')
conn.sadd('processes', 'foo:1234')
conn.hmset('foo:1234', 'info', Sidekiq.dump_json('hostname' => 'foo', 'started_at' => Time.now.to_f), 'at', Time.now.to_f, 'busy', 4)
identity = 'foo:1234:workers'
hash = {:queue => 'critical', :payload => { 'class' => WebWorker.name, 'args' => [1,'abc'] }, :run_at => Time.now.to_i }
conn.setex("worker:#{identity}", 10, Sidekiq.dump_json(hash))
conn.hmset(identity, 1001, Sidekiq.dump_json(hash))
end
assert_equal ['1001'], Sidekiq::Workers.new.map { |tid, data| tid }
get '/busy'
assert_equal 200, last_response.status
@ -261,10 +264,11 @@ class TestWeb < Sidekiq::Test
# on /workers page
Sidekiq.redis do |conn|
identity = 'foo:1234-123abc:default'
conn.sadd('workers', identity)
conn.sadd('processes', 'foo:1234')
identity = 'foo:1234:workers'
hash = {:queue => 'critical', :payload => { 'class' => "FailWorker", 'args' => ["<a>hello</a>"] }, :run_at => Time.now.to_i }
conn.setex("worker:#{identity}", 10, Sidekiq.dump_json(hash))
conn.hmset(identity, 100001, Sidekiq.dump_json(hash))
conn.incr('busy')
end
get '/busy'
@ -319,6 +323,8 @@ class TestWeb < Sidekiq::Test
end
describe 'stats' do
include Sidekiq::Util
before do
Sidekiq.redis do |conn|
conn.set("stat:processed", 5)
@ -468,11 +474,14 @@ class TestWeb < Sidekiq::Test
end
def add_worker
process_id = rand(1000)
key = "#{hostname}:#{$$}"
msg = "{\"queue\":\"default\",\"payload\":{\"retry\":true,\"queue\":\"default\",\"timeout\":20,\"backtrace\":5,\"class\":\"HardWorker\",\"args\":[\"bob\",10,5],\"jid\":\"2b5ad2b016f5e063a1c62872\"},\"run_at\":1361208995}"
Sidekiq.redis do |conn|
conn.sadd("workers", "mercury.home:#{process_id}-70215157189060:default")
conn.set("worker:mercury.home:#{process_id}-70215157189060:default", msg)
conn.multi do
conn.incr("busy")
conn.sadd("processes", key)
conn.hmset("#{key}:workers", Time.now.to_f, msg)
end
end
end
end

View file

@ -8,7 +8,7 @@
<span class="desc"><%= t('Failed') %></span>
</li>
<li class="busy col-sm-1">
<a href="<%= root_path %>workers">
<a href="<%= root_path %>busy">
<span class="count"><%= number_with_delimiter(workers_size) %></span>
<span class="desc"><%= t('Busy') %></span>
</a>