diff --git a/lib/sidekiq/middleware.rb b/lib/sidekiq/middleware.rb index db80b6ab..8573f7da 100644 --- a/lib/sidekiq/middleware.rb +++ b/lib/sidekiq/middleware.rb @@ -23,33 +23,35 @@ module Sidekiq module Middleware class Chain def self.register(&block) - @chain ||= default - self.instance_exec(&block) + instance_exec(&block) end def self.default - [Entry.new(Airbrake), Entry.new(ActiveRecord)] + @default ||= [Entry.new(Airbrake), Entry.new(ActiveRecord)] end - def self.use(klass, options=nil) - @chain << Entry.new(klass, options) + def self.use(klass, *args) + chain << Entry.new(klass, args) end def self.chain - defined?(@chain) ? @chain : default + @chain ||= default end def self.retrieve - Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.klass.new(entry.options) } + Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.make_new } end end class Entry - attr_accessor :klass - attr_accessor :options - def initialize(klass, options=nil) + attr_reader :klass + def initialize(klass, args = []) @klass = klass - @options = options + @args = args + end + + def make_new + @klass.new(*@args) end end end @@ -62,7 +64,7 @@ module Sidekiq yield rescue => ex send_to_airbrake(msg, ex) if defined?(::Airbrake) - raise ex + raise end private @@ -78,7 +80,7 @@ module Sidekiq def initialize(options=nil) end - def call(*_) + def call(*) yield ensure ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord) diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index f9f609ed..3674a698 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -13,25 +13,20 @@ module Sidekiq def process(msg) klass = constantize(msg['class']) - invoke_chain(klass.new, msg) do |worker, msg| - worker.perform(*msg['args']) - end - end - - def invoke_chain(worker, msg, &block) - invoke_link(0, worker, msg, &block) + invoke_chain(klass.new, msg) @boss.processor_done!(current_actor) end - def invoke_link(idx, worker, msg, &block) - chain = Sidekiq::Middleware::Chain.retrieve - if chain.size == idx - block.call(worker, msg) - else - chain[idx].call(worker, msg) do - invoke_link(idx + 1, worker, msg, &block) + def invoke_chain(worker, msg) + chain = Sidekiq::Middleware::Chain.retrieve.dup + traverse_chain = lambda do + if chain.empty? + worker.perform(*msg['args']) + else + chain.shift.call(worker, msg, &traverse_chain) end end + traverse_chain.call end # See http://github.com/tarcieri/celluloid/issues/22 diff --git a/test/test_middleware.rb b/test/test_middleware.rb new file mode 100644 index 00000000..8d682dbd --- /dev/null +++ b/test/test_middleware.rb @@ -0,0 +1,62 @@ +require 'helper' +require 'sidekiq/middleware' +require 'sidekiq/processor' + +class TestMiddleware < MiniTest::Unit::TestCase + describe 'middleware chain' do + before do + @boss = MiniTest::Mock.new + Celluloid.logger = nil + end + + class CustomMiddleware + def initialize(name, recorder) + @name = name + @recorder = recorder + end + + def call(worker, msg) + @recorder << [@name, 'before'] + yield + @recorder << [@name, 'after'] + end + end + + it 'configures default middleware' do + chain = Sidekiq::Middleware::Chain.chain + assert_equal chain, Sidekiq::Middleware::Chain.default + end + + it 'supports custom middleware' do + Sidekiq::Middleware::Chain.register do + use CustomMiddleware, 1, [] + end + chain = Sidekiq::Middleware::Chain.chain + assert_equal chain.last.klass, CustomMiddleware + end + + class CustomWorker + def perform(recorder) + recorder << ['work_performed'] + end + end + + it 'executes middleware in the proper order' do + recorder = [] + msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] } + + Sidekiq::Middleware::Chain.register do + 2.times { |i| use CustomMiddleware, i.to_s, recorder } + end + + processor = Sidekiq::Processor.new(@boss) + @boss.expect(:processor_done!, nil, [processor]) + processor.process(msg) + assert_equal recorder.flatten, %w(0 before 1 before work_performed 1 after 0 after) + end + end +end + + + +