From 5a151d71e12c165fc82d428712658f3a11f2c168 Mon Sep 17 00:00:00 2001 From: Mike Perham <mperham@gmail.com> Date: Sat, 4 Feb 2012 16:53:09 -0800 Subject: [PATCH] Implement message processing middleware, patterned after Rack --- lib/sidekiq/middleware.rb | 87 +++++++++++++++++++++++++++++++++++++++ lib/sidekiq/processor.rb | 29 ++++++++----- 2 files changed, 105 insertions(+), 11 deletions(-) create mode 100644 lib/sidekiq/middleware.rb diff --git a/lib/sidekiq/middleware.rb b/lib/sidekiq/middleware.rb new file mode 100644 index 00000000..ede25d1d --- /dev/null +++ b/lib/sidekiq/middleware.rb @@ -0,0 +1,87 @@ +module Sidekiq + # Middleware is code configured to run before/after + # a message is processed. It is patterned after Rack + # middleware. The default middleware chain: + # + # Sidekiq::Middleware::Chain.register do + # use Sidekiq::Airbrake + # use Sidekiq::ActiveRecord + # end + # + # This is an example of a minimal middleware: + # + # class MyHook + # def initialize(options=nil) + # end + # def call(worker, msg) + # puts "Before work" + # yield + # puts "After work" + # end + # end + # + module Middleware + class Chain + def self.register(&block) + @chain ||= default + self.instance_exec(&block) + end + + def self.default + [Entry.new(Airbrake), Entry.new(ActiveRecord)] + end + + def self.use(klass, options=nil) + @chain << Entry.new(klass, options) + end + + def self.chain + @chain || default + end + + def self.retrieve + Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.klass.new(entry.options) } + end + end + + class Entry + attr_accessor :klass + attr_accessor :options + def initialize(klass, options=nil) + @klass = klass + @options = options + end + end + end + + class Airbrake + def initialize(options=nil) + end + + def call(worker, msg) + yield + rescue => ex + send_to_airbrake(msg, ex) if defined?(::Airbrake) + raise ex + end + + private + + def send_to_airbrake(msg, ex) + ::Airbrake.notify(:error_class => ex.class.name, + :error_message => "#{ex.class.name}: #{ex.message}", + :parameters => msg) + end + end + + class ActiveRecord + def initialize(options=nil) + end + + def call(*_) + yield + ensure + ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord) + end + end +end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 805214d9..f9f609ed 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -1,4 +1,5 @@ require 'sidekiq/util' +require 'sidekiq/middleware' require 'celluloid' module Sidekiq @@ -11,20 +12,26 @@ module Sidekiq end def process(msg) - begin - klass = constantize(msg['class']) - klass.new.perform(*msg['args']) - @boss.processor_done!(current_actor) - rescue => ex - send_to_airbrake(msg, ex) if defined?(::Airbrake) - raise ex + klass = constantize(msg['class']) + invoke_chain(klass.new, msg) do |worker, msg| + worker.perform(*msg['args']) end end - def send_to_airbrake(msg, ex) - ::Airbrake.notify(:error_class => ex.class.name, - :error_message => "#{ex.class.name}: #{ex.message}", - :parameters => msg) + def invoke_chain(worker, msg, &block) + invoke_link(0, worker, msg, &block) + @boss.processor_done!(current_actor) + end + + def invoke_link(idx, worker, msg, &block) + chain = Sidekiq::Middleware::Chain.retrieve + if chain.size == idx + block.call(worker, msg) + else + chain[idx].call(worker, msg) do + invoke_link(idx + 1, worker, msg, &block) + end + end end # See http://github.com/tarcieri/celluloid/issues/22