1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

refactor middleware into client/server layers

This commit is contained in:
Ryan LeCompte 2012-02-08 17:04:02 -08:00
parent a093e29a9a
commit c3609d6d48
7 changed files with 58 additions and 152 deletions

View file

@ -1,12 +1,13 @@
require 'multi_json'
require 'redis'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/client/unique_jobs'
module Sidekiq
class Client
class << self
attr_accessor :ignore_duplicate_jobs
alias_method :ignore_duplicate_jobs?, :ignore_duplicate_jobs
def self.middleware
@middleware ||= Middleware::Chain.new
end
def self.redis
@ -22,6 +23,17 @@ module Sidekiq
@redis = redis
end
def self.ignore_duplicate_jobs=(value)
@ignore_duplicate_jobs = value
if @ignore_duplicate_jobs
middleware.register do
use Middleware::Client::UniqueJobs, Client.redis
end
else
middleware.unregister(Middleware::Client::UniqueJobs)
end
end
# Example usage:
# Sidekiq::Client.push('my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
def self.push(queue='default', item)
@ -29,15 +41,8 @@ module Sidekiq
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
queue_key = "queue:#{queue}"
hashed_payloads_key = "queue:msg_hashes:#{queue}"
payload = MultiJson.encode(item)
payload_hash = Digest::MD5.hexdigest(payload)
return if ignore_duplicate_jobs? && already_queued?(hashed_payloads_key, payload_hash)
redis.multi do
redis.sadd(hashed_payloads_key, payload_hash)
redis.rpush(queue_key, payload)
middleware.invoke(item) do
redis.rpush("queue:#{queue}", MultiJson.encode(item))
end
end
@ -58,9 +63,5 @@ module Sidekiq
queue = (klass.respond_to?(:queue) && klass.queue) || 'default'
push(queue, { 'class' => klass.name, 'args' => args })
end
def self.already_queued?(queue_key, payload_hash)
redis.sismember(queue_key, payload_hash)
end
end
end

View file

@ -87,7 +87,7 @@ module Sidekiq
if msg
processor = @ready.pop
@busy << processor
processor.process!(MultiJson.decode(msg), current_queue)
processor.process!(MultiJson.decode(msg))
end
!!msg
end

View file

@ -1,100 +0,0 @@
require 'json'
module Sidekiq
# Middleware is code configured to run before/after
# a message is processed. It is patterned after Rack
# middleware. The default middleware chain:
#
# Sidekiq::Middleware::Chain.register do
# use Sidekiq::Airbrake
# use Sidekiq::ActiveRecord
# end
#
# This is an example of a minimal middleware:
#
# class MyHook
# def initialize(options=nil)
# end
# def call(worker, msg)
# puts "Before work"
# yield
# puts "After work"
# end
# end
#
module Middleware
class Chain
def self.register(&block)
instance_exec(&block)
end
def self.default
@default ||= [
Entry.new(EncodedMessageRemover, Sidekiq::Client.redis),
Entry.new(Airbrake),
Entry.new(ActiveRecord)]
end
def self.use(klass, *args)
chain << Entry.new(klass, *args)
end
def self.chain
@chain ||= default
end
def self.retrieve
Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.make_new }
end
end
class Entry
attr_reader :klass
def initialize(klass, *args)
@klass = klass
@args = args
end
def make_new
@klass.new(*@args)
end
end
class Airbrake
def call(worker, msg, queue)
yield
rescue => ex
send_to_airbrake(msg, ex) if defined?(::Airbrake)
raise
end
private
def send_to_airbrake(msg, ex)
::Airbrake.notify(:error_class => ex.class.name,
:error_message => "#{ex.class.name}: #{ex.message}",
:parameters => msg)
end
end
class ActiveRecord
def call(worker, msg, queue)
yield
ensure
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
end
end
class EncodedMessageRemover
def initialize(redis)
@redis = redis
end
def call(worker, msg, queue)
yield
ensure
@redis.srem("queue:msg_hashes:#{queue}", Digest::MD5.hexdigest(MultiJson.encode(msg)))
end
end
end
end

View file

@ -1,32 +1,39 @@
require 'sidekiq/util'
require 'sidekiq/middleware'
require 'celluloid'
require 'sidekiq/util'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/server/active_record'
require 'sidekiq/middleware/server/airbrake'
require 'sidekiq/middleware/server/unique_jobs'
module Sidekiq
class Processor
include Util
include Celluloid
def self.middleware
@middleware ||= begin
chain = Middleware::Chain.new
chain.register do
use Middleware::Server::UniqueJobs, Sidekiq::Client.redis
use Middleware::Server::Airbrake
use Middleware::Server::ActiveRecord
end
chain
end
end
def initialize(boss)
@boss = boss
end
def process(msg, queue)
klass = constantize(msg['class'])
invoke_chain(klass.new, msg, queue)
@boss.processor_done!(current_actor)
end
def invoke_chain(worker, msg, queue)
chain = Sidekiq::Middleware::Chain.retrieve.dup
traverse_chain = lambda do
if chain.empty?
worker.perform(*msg['args'])
else
chain.shift.call(worker, msg, queue, &traverse_chain)
end
def process(msg)
klass = constantize(msg['class'])
worker = klass.new
self.class.middleware.invoke(worker, msg) do
worker.perform(*msg['args'])
end
traverse_chain.call
@boss.processor_done!(current_actor)
end
# See http://github.com/tarcieri/celluloid/issues/22

View file

@ -26,7 +26,8 @@ class TestClient < MiniTest::Unit::TestCase
before do
@redis = MiniTest::Mock.new
def @redis.multi; yield; end
def @redis.sadd(*); true; end
def @redis.set(*); true; end
def @redis.expire(*); true; end
Sidekiq::Client.redis = @redis
Sidekiq::Client.ignore_duplicate_jobs = false
end

View file

@ -1,5 +1,6 @@
require 'helper'
require 'sidekiq/middleware'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/server/unique_jobs'
require 'sidekiq/processor'
class TestMiddleware < MiniTest::Unit::TestCase
@ -15,24 +16,20 @@ class TestMiddleware < MiniTest::Unit::TestCase
@recorder = recorder
end
def call(worker, msg, queue)
def call(worker, msg)
@recorder << [@name, 'before']
yield
@recorder << [@name, 'after']
end
end
it 'configures default middleware' do
chain = Sidekiq::Middleware::Chain.chain
assert_equal chain, Sidekiq::Middleware::Chain.default
end
it 'supports custom middleware' do
Sidekiq::Middleware::Chain.register do
chain = Sidekiq::Middleware::Chain.new
chain.register do
use CustomMiddleware, 1, []
end
chain = Sidekiq::Middleware::Chain.chain
assert_equal chain.last.klass, CustomMiddleware
assert_equal chain.entries.last.klass, CustomMiddleware
end
class CustomWorker
@ -42,20 +39,20 @@ class TestMiddleware < MiniTest::Unit::TestCase
end
it 'executes middleware in the proper order' do
Sidekiq::Middleware::EncodedMessageRemover.class_eval do
def call(worker, msg, queue); yield; end
Sidekiq::Middleware::Server::UniqueJobs.class_eval do
def call(worker, msg); yield; end
end
recorder = []
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
Sidekiq::Middleware::Chain.register do
Sidekiq::Processor.middleware.register do
2.times { |i| use CustomMiddleware, i.to_s, recorder }
end
processor = Sidekiq::Processor.new(@boss)
@boss.expect(:processor_done!, nil, [processor])
processor.process(msg, 'default')
processor.process(msg)
assert_equal recorder.flatten, %w(0 before 1 before work_performed 1 after 0 after)
end
end

View file

@ -21,7 +21,7 @@ class TestProcessor < MiniTest::Unit::TestCase
msg = { 'class' => MockWorker.to_s, 'args' => ['myarg'] }
processor = ::Sidekiq::Processor.new(@boss)
@boss.expect(:processor_done!, nil, [processor])
processor.process(msg, 'default')
processor.process(msg)
@boss.verify
assert_equal 1, $invokes
assert_equal 0, $errors.size
@ -31,7 +31,7 @@ class TestProcessor < MiniTest::Unit::TestCase
msg = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
processor = ::Sidekiq::Processor.new(@boss)
assert_raises RuntimeError do
processor.process(msg, 'default')
processor.process(msg)
end
@boss.verify
assert_equal 0, $invokes