1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Implement message processing middleware, patterned after Rack

This commit is contained in:
Mike Perham 2012-02-04 16:53:09 -08:00
parent 92c51c5785
commit 5a151d71e1
2 changed files with 105 additions and 11 deletions

87
lib/sidekiq/middleware.rb Normal file
View file

@ -0,0 +1,87 @@
module Sidekiq
# Middleware is code configured to run before/after
# a message is processed. It is patterned after Rack
# middleware. The default middleware chain:
#
# Sidekiq::Middleware::Chain.register do
# use Sidekiq::Airbrake
# use Sidekiq::ActiveRecord
# end
#
# This is an example of a minimal middleware:
#
# class MyHook
# def initialize(options=nil)
# end
# def call(worker, msg)
# puts "Before work"
# yield
# puts "After work"
# end
# end
#
module Middleware
class Chain
def self.register(&block)
@chain ||= default
self.instance_exec(&block)
end
def self.default
[Entry.new(Airbrake), Entry.new(ActiveRecord)]
end
def self.use(klass, options=nil)
@chain << Entry.new(klass, options)
end
def self.chain
@chain || default
end
def self.retrieve
Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.klass.new(entry.options) }
end
end
class Entry
attr_accessor :klass
attr_accessor :options
def initialize(klass, options=nil)
@klass = klass
@options = options
end
end
end
class Airbrake
def initialize(options=nil)
end
def call(worker, msg)
yield
rescue => ex
send_to_airbrake(msg, ex) if defined?(::Airbrake)
raise ex
end
private
def send_to_airbrake(msg, ex)
::Airbrake.notify(:error_class => ex.class.name,
:error_message => "#{ex.class.name}: #{ex.message}",
:parameters => msg)
end
end
class ActiveRecord
def initialize(options=nil)
end
def call(*_)
yield
ensure
ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
end
end
end

View file

@ -1,4 +1,5 @@
require 'sidekiq/util' require 'sidekiq/util'
require 'sidekiq/middleware'
require 'celluloid' require 'celluloid'
module Sidekiq module Sidekiq
@ -11,20 +12,26 @@ module Sidekiq
end end
def process(msg) def process(msg)
begin
klass = constantize(msg['class']) klass = constantize(msg['class'])
klass.new.perform(*msg['args']) invoke_chain(klass.new, msg) do |worker, msg|
@boss.processor_done!(current_actor) worker.perform(*msg['args'])
rescue => ex
send_to_airbrake(msg, ex) if defined?(::Airbrake)
raise ex
end end
end end
def send_to_airbrake(msg, ex) def invoke_chain(worker, msg, &block)
::Airbrake.notify(:error_class => ex.class.name, invoke_link(0, worker, msg, &block)
:error_message => "#{ex.class.name}: #{ex.message}", @boss.processor_done!(current_actor)
:parameters => msg) end
def invoke_link(idx, worker, msg, &block)
chain = Sidekiq::Middleware::Chain.retrieve
if chain.size == idx
block.call(worker, msg)
else
chain[idx].call(worker, msg) do
invoke_link(idx + 1, worker, msg, &block)
end
end
end end
# See http://github.com/tarcieri/celluloid/issues/22 # See http://github.com/tarcieri/celluloid/issues/22