mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
First pass at Resque-compatible processing stats
This commit is contained in:
parent
a8226227c4
commit
23651d5196
11 changed files with 140 additions and 19 deletions
|
@ -89,7 +89,7 @@ module Sidekiq
|
|||
if msg
|
||||
processor = @ready.pop
|
||||
@busy << processor
|
||||
processor.process!(MultiJson.decode(msg))
|
||||
processor.process!(MultiJson.decode(msg), current_queue)
|
||||
end
|
||||
!!msg
|
||||
end
|
||||
|
|
|
@ -2,7 +2,7 @@ module Sidekiq
|
|||
module Middleware
|
||||
module Server
|
||||
class ActiveRecord
|
||||
def call(worker, msg)
|
||||
def call(*args)
|
||||
yield
|
||||
ensure
|
||||
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||
|
@ -10,4 +10,4 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -2,10 +2,10 @@ module Sidekiq
|
|||
module Middleware
|
||||
module Server
|
||||
class Airbrake
|
||||
def call(worker, msg)
|
||||
def call(*args)
|
||||
yield
|
||||
rescue => ex
|
||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||
send_to_airbrake(args[1], ex) if defined?(::Airbrake)
|
||||
raise
|
||||
end
|
||||
|
||||
|
|
|
@ -6,12 +6,12 @@ module Sidekiq
|
|||
@redis = redis
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
def call(*args)
|
||||
yield
|
||||
ensure
|
||||
@redis.del(Digest::MD5.hexdigest(MultiJson.encode(msg)))
|
||||
@redis.del(Digest::MD5.hexdigest(MultiJson.encode(args[1])))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,8 +17,8 @@ module Sidekiq
|
|||
|
||||
# default middleware
|
||||
chain.register do
|
||||
use Middleware::Server::UniqueJobs, Sidekiq::Client.redis
|
||||
use Middleware::Server::Airbrake
|
||||
use Middleware::Server::UniqueJobs, Sidekiq::Client.redis
|
||||
use Middleware::Server::ActiveRecord
|
||||
end
|
||||
chain
|
||||
|
@ -27,20 +27,64 @@ module Sidekiq
|
|||
|
||||
def initialize(boss)
|
||||
@boss = boss
|
||||
redis.sadd(:workers, self)
|
||||
end
|
||||
|
||||
def process(msg)
|
||||
def process(msg, queue)
|
||||
klass = constantize(msg['class'])
|
||||
worker = klass.new
|
||||
self.class.middleware.invoke(worker, msg) do
|
||||
worker.perform(*msg['args'])
|
||||
stats(worker, msg, queue) do
|
||||
self.class.middleware.invoke(worker, msg, queue) do
|
||||
worker.perform(*msg['args'])
|
||||
end
|
||||
end
|
||||
@boss.processor_done!(current_actor)
|
||||
end
|
||||
|
||||
# See http://github.com/tarcieri/celluloid/issues/22
|
||||
def inspect
|
||||
"Sidekiq::Processor<#{object_id}>"
|
||||
"#<Processor #{to_s}>"
|
||||
end
|
||||
|
||||
def to_s
|
||||
@str ||= "#{hostname}:#{Process.pid}:#{Thread.current.object_id}"
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def stats(worker, msg, queue)
|
||||
redis.multi do
|
||||
redis.sadd("worker", self)
|
||||
redis.set("worker:#{self}:started", Time.now.to_s)
|
||||
redis.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
|
||||
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
|
||||
end
|
||||
|
||||
dying = false
|
||||
begin
|
||||
yield
|
||||
rescue
|
||||
dying = true
|
||||
# Uh oh, error. We will die so unregister as much as we can first.
|
||||
redis.multi do
|
||||
redis.incrby("stat:failed", 1)
|
||||
redis.del("stat:processed:#{self}")
|
||||
redis.srem("worker", self)
|
||||
end
|
||||
raise
|
||||
ensure
|
||||
redis.multi do
|
||||
redis.del("worker:#{self}")
|
||||
redis.del("worker:#{self}:started")
|
||||
redis.incrby("stat:processed", 1)
|
||||
redis.incrby("stat:processed:#{self}", 1) unless dying
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
||||
def hostname
|
||||
@h ||= `hostname`.strip
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -6,7 +6,7 @@ module Sidekiq
|
|||
def self.create(options={})
|
||||
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
|
||||
client = build_client(url, options[:namespace])
|
||||
return ConnectionPool.new { client } if options[:use_pool]
|
||||
return ConnectionPool.new(:timeout => 1, :size => 25) { client } if options[:use_pool]
|
||||
client
|
||||
end
|
||||
|
||||
|
|
|
@ -31,5 +31,9 @@ module Sidekiq
|
|||
def verbose(msg)
|
||||
STDOUT.puts(msg) if $DEBUG
|
||||
end
|
||||
|
||||
def redis
|
||||
Sidekiq::Client.redis
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -32,7 +32,10 @@ class TestClient < MiniTest::Unit::TestCase
|
|||
def @redis.multi; yield; end
|
||||
def @redis.set(*); true; end
|
||||
def @redis.sadd(*); true; end
|
||||
def @redis.srem(*); true; end
|
||||
def @redis.get(*); nil; end
|
||||
def @redis.del(*); nil; end
|
||||
def @redis.incrby(*); nil; end
|
||||
def @redis.expire(*); true; end
|
||||
Sidekiq::Client.redis = @redis
|
||||
end
|
||||
|
|
|
@ -12,7 +12,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
@recorder = recorder
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
def call(*args)
|
||||
@recorder << [@name, 'before']
|
||||
yield
|
||||
@recorder << [@name, 'after']
|
||||
|
@ -29,6 +29,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
class CustomWorker
|
||||
include Sidekiq::Worker
|
||||
def perform(recorder)
|
||||
recorder << ['work_performed']
|
||||
end
|
||||
|
@ -41,7 +42,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
|
||||
it 'executes middleware in the proper order' do
|
||||
Sidekiq::Middleware::Server::UniqueJobs.class_eval do
|
||||
def call(worker, msg); yield; end
|
||||
def call(*args); yield; end
|
||||
end
|
||||
|
||||
recorder = []
|
||||
|
@ -54,7 +55,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
boss = MiniTest::Mock.new
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
assert_equal %w(0 before work_performed 0 after), recorder.flatten
|
||||
end
|
||||
|
||||
|
|
|
@ -11,6 +11,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
class MockWorker
|
||||
include Sidekiq::Worker
|
||||
def perform(args)
|
||||
raise "kerboom!" if args == 'boom'
|
||||
$invokes += 1
|
||||
|
@ -21,7 +22,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
msg = { 'class' => MockWorker.to_s, 'args' => ['myarg'] }
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
@boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
@boss.verify
|
||||
assert_equal 1, $invokes
|
||||
assert_equal 0, $errors.size
|
||||
|
@ -31,7 +32,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
msg = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
assert_raises RuntimeError do
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
end
|
||||
@boss.verify
|
||||
assert_equal 0, $invokes
|
||||
|
|
68
test/test_stats.rb
Normal file
68
test/test_stats.rb
Normal file
|
@ -0,0 +1,68 @@
|
|||
require 'helper'
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/processor'
|
||||
|
||||
class TestStats < MiniTest::Unit::TestCase
|
||||
describe 'with redis' do
|
||||
before do
|
||||
Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
||||
@redis.flushdb
|
||||
end
|
||||
|
||||
class DumbWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(redis)
|
||||
raise 'bang' if redis == nil
|
||||
end
|
||||
end
|
||||
|
||||
it 'updates global stats in the success case' do
|
||||
msg = { 'class' => DumbWorker.to_s, 'args' => [@redis] }
|
||||
boss = MiniTest::Mock.new
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
|
||||
assert_equal [], @redis.smembers('worker')
|
||||
assert_equal 0, @redis.get('stat:failed').to_i
|
||||
assert_equal 0, @redis.get('stat:processed').to_i
|
||||
assert_equal 0, @redis.get("stat:processed:#{processor}").to_i
|
||||
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
processor.process(msg, 'xyzzy')
|
||||
|
||||
set = @redis.smembers('worker')
|
||||
assert_equal 1, set.size
|
||||
assert_match(/#{Regexp.escape(`hostname`.strip)}/, set.first)
|
||||
assert_equal 0, @redis.get('stat:failed').to_i
|
||||
assert_equal 3, @redis.get('stat:processed').to_i
|
||||
assert_equal 3, @redis.get("stat:processed:#{processor}").to_i
|
||||
end
|
||||
|
||||
it 'updates global stats in the error case' do
|
||||
msg = { 'class' => DumbWorker.to_s, 'args' => [nil] }
|
||||
boss = MiniTest::Mock.new
|
||||
|
||||
assert_equal [], @redis.smembers('worker')
|
||||
assert_equal 0, @redis.get('stat:failed').to_i
|
||||
assert_equal 0, @redis.get('stat:processed').to_i
|
||||
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
pstr = processor.to_s
|
||||
assert_raises RuntimeError do
|
||||
processor.process(msg, 'xyzzy')
|
||||
end
|
||||
|
||||
set = @redis.smembers('worker')
|
||||
assert_equal 0, set.size
|
||||
assert_equal 1, @redis.get('stat:failed').to_i
|
||||
assert_equal 1, @redis.get('stat:processed').to_i
|
||||
assert_equal nil, @redis.get("stat:processed:#{pstr}")
|
||||
end
|
||||
|
||||
it 'should set various stats during processing' do
|
||||
skip 'TODO'
|
||||
end
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue