1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Initial pass at heartbeat, still failing tests

This commit is contained in:
Mike Perham 2014-03-02 16:36:00 -08:00
parent 8dae3840a3
commit bbe245e4ff
8 changed files with 93 additions and 70 deletions

View file

@ -277,6 +277,15 @@ module Sidekiq
Sidekiq.redis {|c| c.zcard(name) }
end
def clear
Sidekiq.redis do |conn|
conn.del(name)
end
end
end
class JobSet < SortedSet
def schedule(timestamp, message)
Sidekiq.redis do |conn|
conn.zadd(name, timestamp.to_f.to_s, Sidekiq.dump_json(message))
@ -354,11 +363,6 @@ module Sidekiq
end
end
def clear
Sidekiq.redis do |conn|
conn.del(name)
end
end
end
##
@ -373,7 +377,7 @@ module Sidekiq
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class ScheduledSet < SortedSet
class ScheduledSet < JobSet
def initialize
super 'schedule'
end
@ -391,7 +395,7 @@ module Sidekiq
# retri.args[0] == 'User' &&
# retri.args[1] == 'setup_new_subscriber'
# end.map(&:delete)
class RetrySet < SortedSet
class RetrySet < JobSet
def initialize
super 'retry'
end
@ -403,7 +407,7 @@ module Sidekiq
end
end
class DeadSet < SortedSet
class DeadSet < JobSet
def initialize
super 'dead'
end
@ -415,6 +419,24 @@ module Sidekiq
end
end
class ProcessSet < SortedSet
def initialize
super 'processes'
end
def each(&block)
now = Time.now.to_f
_, procs = Sidekiq.redis do |conn|
conn.multi do
conn.zremrangebyscore('processes', '-inf', now - 5.1)
conn.zrange name, 0, -1
end
end
procs.each do |proc_data|
yield Sidekiq.load_json(proc_data)
end
end
end
##
# Programmatic access to the current active worker set.
@ -433,18 +455,24 @@ module Sidekiq
# # { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
# # run_at is an epoch Integer.
# end
class Workers
include Enumerable
def each(&block)
Sidekiq.redis do |conn|
live = ProcessSet.new.map {|x| "#{x['hostname']}:#{x['process_id']}-" }
p live
msgs = Sidekiq.redis do |conn|
workers = conn.smembers("workers")
workers.each do |w|
msg = conn.get("worker:#{w}")
next unless msg
block.call(w, Sidekiq.load_json(msg))
end
p workers
to_rem = workers.delete_if {|w| !live.any? {|identity| w =~ /\A#{identity}/ } }
conn.srem('workers', *to_rem) unless to_rem.empty?
workers.empty? ? {} : conn.mapped_mget(*workers.map {|w| "worker:#{w}" })
end
msgs.each do |w, msg|
next unless msg
block.call(w, Sidekiq.load_json(msg))
end
end
@ -453,36 +481,6 @@ module Sidekiq
conn.scard("workers")
end.to_i
end
# Prune old worker entries from the Busy set. Worker entries
# can be orphaned if Sidekiq hard crashes while processing jobs.
# Default is to delete worker entries older than one hour.
#
# Returns the number of records removed.
def prune(older_than=60*60)
to_rem = []
Sidekiq.redis do |conn|
conn.smembers('workers').each do |w|
msg = conn.get("worker:#{w}")
if !msg
to_rem << w
else
m = Sidekiq.load_json(msg)
run_at = Time.at(m['run_at'])
# prune jobs older than one hour
if run_at < (Time.now - older_than)
to_rem << w
else
end
end
end
end
if to_rem.size > 0
Sidekiq.redis { |conn| conn.srem('workers', to_rem) }
end
to_rem.size
end
end
end

View file

@ -80,7 +80,7 @@ module Sidekiq
require 'sidekiq/launcher'
@launcher = Sidekiq::Launcher.new(options)
launcher.procline(options[:tag] ? "#{options[:tag]} " : '')
launcher.start_heartbeat(options[:tag] ? "#{options[:tag]} " : '')
begin
if options[:profile]

View file

@ -54,9 +54,15 @@ module Sidekiq
end
end
def procline(tag)
$0 = manager.procline(tag)
manager.after(5) { procline(tag) }
def start_heartbeat(tag)
manager.heartbeat({
'hostname' => hostname,
'pid' => $$,
'process_id' => process_id,
'tag' => tag,
'concurrency' => @options[:concurrency],
'queues' => @options[:queues].uniq,
})
end
end
end

View file

@ -132,12 +132,27 @@ module Sidekiq
@threads[proxy_id] = thr
end
def procline(tag)
"sidekiq #{Sidekiq::VERSION} #{tag}[#{@busy.size} of #{@count} busy]#{stopped? ? ' stopping' : ''}"
def heartbeat(data)
$0 = "sidekiq #{Sidekiq::VERSION} #{data['tag']}[#{@busy.size} of #{data['concurrency']} busy]#{stopped? ? ' stopping' : ''}"
(data)
after(5) do
heartbeat(data)
end
end
private
def (data)
now = Time.now.to_f
proc_data = Sidekiq.dump_json(data.merge('busy' => @busy.size, 'at' => now))
Sidekiq.redis do |conn|
conn.multi do
conn.zadd("processes", proc_data, now)
conn.zremrangebyscore('processes', '-inf', now - 5.1)
end
end
end
def clear_worker_set
# Clearing workers in Redis
# NOTE: we do this before terminating worker threads because the

View file

@ -69,8 +69,8 @@ module Sidekiq
private
def identity
@str ||= "#{hostname}:#{process_id}-#{Thread.current.object_id}:default"
def thread_identity
@str ||= "#{identity}-#{Thread.current.object_id}"
end
def stats(worker, msg, queue)
@ -80,8 +80,8 @@ module Sidekiq
hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
redis do |conn|
conn.multi do
conn.sadd('workers', identity)
conn.setex("worker:#{identity}", EXPIRY, hash)
conn.sadd('workers', thread_identity)
conn.setex("worker:#{thread_identity}", EXPIRY, hash)
end
end
end
@ -105,8 +105,8 @@ module Sidekiq
redis do |conn|
processed = "stat:processed:#{Time.now.utc.to_date}"
result = conn.multi do
conn.srem("workers", identity)
conn.del("worker:#{identity}")
conn.srem("workers", thread_identity)
conn.del("worker:#{thread_identity}")
conn.incrby("stat:processed", 1)
conn.incrby(processed, 1)
end

View file

@ -33,5 +33,9 @@ module Sidekiq
def hostname
Socket.gethostname
end
def identity
@@identity ||= "#{hostname}:#{process_id}"
end
end
end

View file

@ -48,11 +48,7 @@ module Sidekiq
end
def workers
@workers ||= begin
Sidekiq::Workers.new.tap do |w|
w.prune
end
end
@workers ||= Sidekiq::Workers.new
end
def stats

View file

@ -1,6 +1,7 @@
require 'helper'
class TestApi < Sidekiq::Test
describe "stats" do
before do
Sidekiq.redis {|c| c.flushdb }
@ -160,6 +161,8 @@ class TestApi < Sidekiq::Test
end
describe 'with an empty database' do
include Sidekiq::Util
before do
Sidekiq.redis {|c| c.flushdb }
end
@ -346,7 +349,10 @@ class TestApi < Sidekiq::Test
assert false
end
s = '12345'
Sidekiq.redis do |conn|
conn.zadd('processes', Time.now.to_f, Sidekiq.dump_json({ 'pid' => $$, 'hostname' => hostname, 'process_id' => process_id }))
end
s = "worker:#{hostname}:#{process_id}-12345"
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => Time.now.to_i })
Sidekiq.redis do |c|
c.multi do
@ -357,26 +363,24 @@ class TestApi < Sidekiq::Test
assert_equal 1, w.size
w.each do |x, y|
assert_equal s, x
assert_equal "worker:#{s}", x
assert_equal 'default', y['queue']
assert_equal Time.now.year, Time.at(y['run_at']).year
end
s = '12346'
s = "#{hostname}:#{process_id}-12346"
data = Sidekiq.dump_json({ 'payload' => {}, 'queue' => 'default', 'run_at' => (Time.now.to_i - 2*60*60) })
Sidekiq.redis do |c|
c.multi do
c.sadd('workers', s)
c.set("worker:#{s}", data)
c.set("worker:#{s}:started", Time.now.to_s)
c.sadd('workers', '123457')
c.sadd('workers', "#{hostname}:#{process_id}2-12347")
end
end
assert_equal 3, w.size
count = w.prune
assert_equal 1, w.size
assert_equal 2, count
w.each { }
assert_equal 2, w.size
end
it 'can reschedule jobs' do