1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

add middleware classes

This commit is contained in:
Ryan LeCompte 2012-02-08 17:04:19 -08:00
parent c3609d6d48
commit 35c0ae5067
5 changed files with 173 additions and 0 deletions

View file

@ -0,0 +1,87 @@
module Sidekiq
# Middleware is code configured to run before/after
# a message is processed. It is patterned after Rack
# middleware. Middleware exists for the client side
# as well as the server side.
#
# Default middleware for the server side:
#
# Sidekiq::Processor.middleware.register do
# use Sidekiq::Airbrake
# use Sidekiq::ActiveRecord
# end
#
# To add middleware for the client, do:
#
# Sidekiq::Client.middleware.register do
# use MyClientHook
# end
#
# To add middleware for the server, do:
#
# Sidekiq::Processor.middleware.register do
# use MyServerHook
# end
#
# This is an example of a minimal middleware:
#
# class MyHook
# def initialize(options=nil)
# end
# def call(worker, msg)
# puts "Before work"
# yield
# puts "After work"
# end
# end
#
module Middleware
class Chain
attr_reader :entries
def initialize
@entries = []
end
def register(&block)
instance_eval(&block)
end
def unregister(klass)
entries.delete_if { |entry| entry.klass == klass }
end
def use(klass, *args)
entries << Entry.new(klass, *args)
end
def retrieve
entries.map(&:make_new)
end
def invoke(*args, &final_action)
chain = retrieve.dup
traverse_chain = lambda do
if chain.empty?
final_action.call
else
chain.shift.call(*args, &traverse_chain)
end
end
traverse_chain.call
end
end
class Entry
attr_reader :klass
def initialize(klass, *args)
@klass = klass
@args = args
end
def make_new
@klass.new(*@args)
end
end
end
end

View file

@ -0,0 +1,31 @@
module Sidekiq
module Middleware
module Client
class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60
def initialize(redis)
@redis = redis
end
def call(item)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
return if already_scheduled?(payload_hash)
@redis.multi do
@redis.set(payload_hash, payload_hash)
@redis.expire(payload_hash, HASH_KEY_EXPIRATION)
end
yield
end
private
def already_scheduled?(payload_hash)
!!@redis.get(payload_hash)
end
end
end
end
end

View file

@ -0,0 +1,13 @@
module Sidekiq
module Middleware
module Server
class ActiveRecord
def call(worker, msg)
yield
ensure
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
end
end
end
end
end

View file

@ -0,0 +1,25 @@
module Sidekiq
module Middleware
module Server
class Airbrake
def call(worker, msg)
yield
rescue => ex
send_to_airbrake(msg, ex) if defined?(::Airbrake)
raise
end
private
def send_to_airbrake(msg, ex)
::Airbrake.notify(:error_class => ex.class.name,
:error_message => "#{ex.class.name}: #{ex.message}",
:parameters => msg)
end
end
end
end
end

View file

@ -0,0 +1,17 @@
module Sidekiq
module Middleware
module Server
class UniqueJobs
def initialize(redis)
@redis = redis
end
def call(worker, msg)
yield
ensure
@redis.del(Digest::MD5.hexdigest(MultiJson.encode(msg)))
end
end
end
end
end