mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge
This commit is contained in:
commit
431d3929fe
3 changed files with 86 additions and 27 deletions
|
@ -23,33 +23,35 @@ module Sidekiq
|
||||||
module Middleware
|
module Middleware
|
||||||
class Chain
|
class Chain
|
||||||
def self.register(&block)
|
def self.register(&block)
|
||||||
@chain ||= default
|
instance_exec(&block)
|
||||||
self.instance_exec(&block)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.default
|
def self.default
|
||||||
[Entry.new(Airbrake), Entry.new(ActiveRecord)]
|
@default ||= [Entry.new(Airbrake), Entry.new(ActiveRecord)]
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.use(klass, options=nil)
|
def self.use(klass, *args)
|
||||||
@chain << Entry.new(klass, options)
|
chain << Entry.new(klass, args)
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.chain
|
def self.chain
|
||||||
defined?(@chain) ? @chain : default
|
@chain ||= default
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.retrieve
|
def self.retrieve
|
||||||
Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.klass.new(entry.options) }
|
Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.make_new }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
class Entry
|
class Entry
|
||||||
attr_accessor :klass
|
attr_reader :klass
|
||||||
attr_accessor :options
|
def initialize(klass, args = [])
|
||||||
def initialize(klass, options=nil)
|
|
||||||
@klass = klass
|
@klass = klass
|
||||||
@options = options
|
@args = args
|
||||||
|
end
|
||||||
|
|
||||||
|
def make_new
|
||||||
|
@klass.new(*@args)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -62,7 +64,7 @@ module Sidekiq
|
||||||
yield
|
yield
|
||||||
rescue => ex
|
rescue => ex
|
||||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||||
raise ex
|
raise
|
||||||
end
|
end
|
||||||
|
|
||||||
private
|
private
|
||||||
|
@ -78,7 +80,7 @@ module Sidekiq
|
||||||
def initialize(options=nil)
|
def initialize(options=nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
def call(*_)
|
def call(*)
|
||||||
yield
|
yield
|
||||||
ensure
|
ensure
|
||||||
ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||||
|
|
|
@ -13,25 +13,20 @@ module Sidekiq
|
||||||
|
|
||||||
def process(msg)
|
def process(msg)
|
||||||
klass = constantize(msg['class'])
|
klass = constantize(msg['class'])
|
||||||
invoke_chain(klass.new, msg) do |worker, msg|
|
invoke_chain(klass.new, msg)
|
||||||
worker.perform(*msg['args'])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def invoke_chain(worker, msg, &block)
|
|
||||||
invoke_link(0, worker, msg, &block)
|
|
||||||
@boss.processor_done!(current_actor)
|
@boss.processor_done!(current_actor)
|
||||||
end
|
end
|
||||||
|
|
||||||
def invoke_link(idx, worker, msg, &block)
|
def invoke_chain(worker, msg)
|
||||||
chain = Sidekiq::Middleware::Chain.retrieve
|
chain = Sidekiq::Middleware::Chain.retrieve.dup
|
||||||
if chain.size == idx
|
traverse_chain = lambda do
|
||||||
block.call(worker, msg)
|
if chain.empty?
|
||||||
else
|
worker.perform(*msg['args'])
|
||||||
chain[idx].call(worker, msg) do
|
else
|
||||||
invoke_link(idx + 1, worker, msg, &block)
|
chain.shift.call(worker, msg, &traverse_chain)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
traverse_chain.call
|
||||||
end
|
end
|
||||||
|
|
||||||
# See http://github.com/tarcieri/celluloid/issues/22
|
# See http://github.com/tarcieri/celluloid/issues/22
|
||||||
|
|
62
test/test_middleware.rb
Normal file
62
test/test_middleware.rb
Normal file
|
@ -0,0 +1,62 @@
|
||||||
|
require 'helper'
|
||||||
|
require 'sidekiq/middleware'
|
||||||
|
require 'sidekiq/processor'
|
||||||
|
|
||||||
|
class TestMiddleware < MiniTest::Unit::TestCase
|
||||||
|
describe 'middleware chain' do
|
||||||
|
before do
|
||||||
|
@boss = MiniTest::Mock.new
|
||||||
|
Celluloid.logger = nil
|
||||||
|
end
|
||||||
|
|
||||||
|
class CustomMiddleware
|
||||||
|
def initialize(name, recorder)
|
||||||
|
@name = name
|
||||||
|
@recorder = recorder
|
||||||
|
end
|
||||||
|
|
||||||
|
def call(worker, msg)
|
||||||
|
@recorder << [@name, 'before']
|
||||||
|
yield
|
||||||
|
@recorder << [@name, 'after']
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'configures default middleware' do
|
||||||
|
chain = Sidekiq::Middleware::Chain.chain
|
||||||
|
assert_equal chain, Sidekiq::Middleware::Chain.default
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'supports custom middleware' do
|
||||||
|
Sidekiq::Middleware::Chain.register do
|
||||||
|
use CustomMiddleware, 1, []
|
||||||
|
end
|
||||||
|
chain = Sidekiq::Middleware::Chain.chain
|
||||||
|
assert_equal chain.last.klass, CustomMiddleware
|
||||||
|
end
|
||||||
|
|
||||||
|
class CustomWorker
|
||||||
|
def perform(recorder)
|
||||||
|
recorder << ['work_performed']
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'executes middleware in the proper order' do
|
||||||
|
recorder = []
|
||||||
|
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
||||||
|
|
||||||
|
Sidekiq::Middleware::Chain.register do
|
||||||
|
2.times { |i| use CustomMiddleware, i.to_s, recorder }
|
||||||
|
end
|
||||||
|
|
||||||
|
processor = Sidekiq::Processor.new(@boss)
|
||||||
|
@boss.expect(:processor_done!, nil, [processor])
|
||||||
|
processor.process(msg)
|
||||||
|
assert_equal recorder.flatten, %w(0 before 1 before work_performed 1 after 0 after)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue