1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Remove sync stats update, move update into heartbeat

This commit is contained in:
Mike Perham 2015-10-07 14:27:47 -07:00
parent da62019b49
commit 752bfc6d8b
5 changed files with 59 additions and 48 deletions

View file

@ -18,6 +18,7 @@ end
platforms :mri do
gem 'pry-byebug'
gem 'ruby-prof'
end
platforms :jruby do

View file

@ -4,6 +4,8 @@
# RUBYOPT=-w bundle exec sidekiq
$TESTING = false
#require 'ruby-prof'
require_relative '../lib/sidekiq/cli'
require_relative '../lib/sidekiq/launcher'
@ -77,20 +79,16 @@ end
iter = 10
count = 10_000
Client = Thread.new do
watchdog("client thread") do
iter.times do
arr = Array.new(count) do
[]
end
count.times do |idx|
arr[idx][0] = idx
end
Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr)
end
Sidekiq.logger.error "Created #{count*iter} jobs"
iter.times do
arr = Array.new(count) do
[]
end
count.times do |idx|
arr[idx][0] = idx
end
Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr)
end
Sidekiq.logger.error "Created #{count*iter} jobs"
Monitoring = Thread.new do
watchdog("monitor thread") do
@ -103,7 +101,7 @@ Monitoring = Thread.new do
end
end.map(&:to_i)
total = qsize + retries
GC.start
#GC.start
Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}")
if total == 0
Sidekiq.logger.error("Done")
@ -114,6 +112,8 @@ Monitoring = Thread.new do
end
begin
#RubyProf::exclude_threads = [ Monitoring ]
#RubyProf.start
launcher = Sidekiq::Launcher.new(Sidekiq.options)
launcher.run
@ -122,6 +122,10 @@ begin
handle_signal(launcher, signal)
end
rescue SystemExit => e
#Sidekiq.logger.error("Profiling...")
#result = RubyProf.stop
#printer = RubyProf::GraphHtmlPrinter.new(result)
#printer.print(File.new("output.html", "w"), :min_percent => 1)
# normal
rescue => e
raise e if $DEBUG

View file

@ -33,6 +33,7 @@ module Sidekiq
# Stops this instance from processing any more jobs,
#
def quiet
@done = true
@manager.quiet
@fetcher.terminate
@poller.terminate
@ -44,6 +45,7 @@ module Sidekiq
def stop
deadline = Time.now + @options[:timeout]
@done = true
@manager.quiet
@fetcher.terminate
@poller.terminate
@ -57,6 +59,10 @@ module Sidekiq
clear_heartbeat
end
def stopping?
@done
end
private unless $TESTING
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
@ -65,8 +71,8 @@ module Sidekiq
proc { 'sidekiq'.freeze },
proc { Sidekiq::VERSION },
proc { |me, data| data['tag'] },
proc { |me, data| "[#{me.manager.in_progress.size} of #{data['concurrency']} busy]" },
proc { |me, data| "stopping" if me.manager.stopped? },
proc { |me, data| "[#{Processor::WORKER_STATE.size} of #{data['concurrency']} busy]" },
proc { |me, data| "stopping" if me.stopping? },
]
def heartbeat(key, data, json)
@ -79,10 +85,30 @@ module Sidekiq
def (key, json)
begin
fails = 0
Processor::FAILURE.update {|curr| fails = curr; 0 }
procd = 0
Processor::PROCESSED.update {|curr| procd = curr; 0 }
workers_key = "#{key}:workers".freeze
nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
Sidekiq.redis do |conn|
conn.pipelined do
conn.incrby("stat:processed".freeze, procd)
conn.incrby("stat:processed:#{nowdate}", procd)
conn.incrby("stat:failed".freeze, fails)
conn.incrby("stat:failed:#{nowdate}", fails)
conn.del(workers_key)
Processor::WORKER_STATE.each_pair do |tid, hash|
conn.hset(workers_key, tid, Sidekiq.dump_json(hash))
end
end
end
_, _, _, msg = Sidekiq.redis do |conn|
conn.pipelined do
conn.sadd('processes', key)
conn.hmset(key, 'info', json, 'busy', manager.in_progress.size, 'beat', Time.now.to_f)
conn.hmset(key, 'info', json, 'busy', Processor::WORKER_STATE.size, 'beat', Time.now.to_f)
conn.expire(key, 60)
conn.rpop("#{key}-signals")
end
@ -117,7 +143,7 @@ module Sidekiq
# now so we don't need to dump it every heartbeat.
json = Sidekiq.dump_json(data)
while !@done
while true
heartbeat(key, data, json)
sleep 5
end

View file

@ -1,5 +1,6 @@
require 'sidekiq/util'
require 'thread'
require 'concurrent'
module Sidekiq
##
@ -106,46 +107,24 @@ module Sidekiq
@str ||= Thread.current.object_id.to_s(36)
end
WORKER_STATE = Concurrent::Map.new
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(worker, msg, queue)
# Do not conflate errors from the job with errors caused by updating
# stats so calling code can react appropriately
retry_and_suppress_exceptions do
hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
Sidekiq.redis do |conn|
conn.pipelined do
conn.hmset("#{identity}:workers", thread_identity, hash)
conn.expire("#{identity}:workers", 14400)
end
end
end
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
nowdate = Time.now.utc.strftime("%Y-%m-%d".freeze)
begin
yield
rescue Exception
retry_and_suppress_exceptions do
failed = "stat:failed:#{nowdate}"
Sidekiq.redis do |conn|
conn.pipelined do
conn.incrby("stat:failed".freeze, 1)
conn.incrby(failed, 1)
conn.expire(failed, STATS_TIMEOUT)
end
end
end
FAILURE.increment
raise
ensure
retry_and_suppress_exceptions do
processed = "stat:processed:#{nowdate}"
Sidekiq.redis do |conn|
conn.pipelined do
conn.hdel("#{identity}:workers", thread_identity)
conn.incrby("stat:processed".freeze, 1)
conn.incrby(processed, 1)
conn.expire(processed, STATS_TIMEOUT)
end
end
end
WORKER_STATE.delete(tid)
PROCESSED.increment
end
end

View file

@ -19,6 +19,7 @@ Gem::Specification.new do |gem|
gem.add_dependency 'redis-namespace', '~> 1.5', '>= 1.5.2'
gem.add_dependency 'connection_pool', '~> 2.2', '>= 2.2.0'
gem.add_dependency 'json', '~> 1.0'
gem.add_dependency 'concurrent-ruby', '1.0.0.pre3'
gem.add_development_dependency 'sinatra', '~> 1.4', '>= 1.4.6'
gem.add_development_dependency 'minitest', '~> 5.7', '>= 5.7.0'
gem.add_development_dependency 'rake', '~> 10.0'