From 35c0ae50673af2c3202ed464a9bfee252bc26b85 Mon Sep 17 00:00:00 2001 From: Ryan LeCompte Date: Wed, 8 Feb 2012 17:04:19 -0800 Subject: [PATCH] add middleware classes --- lib/sidekiq/middleware/chain.rb | 87 +++++++++++++++++++ lib/sidekiq/middleware/client/unique_jobs.rb | 31 +++++++ .../middleware/server/active_record.rb | 13 +++ lib/sidekiq/middleware/server/airbrake.rb | 25 ++++++ lib/sidekiq/middleware/server/unique_jobs.rb | 17 ++++ 5 files changed, 173 insertions(+) create mode 100644 lib/sidekiq/middleware/chain.rb create mode 100644 lib/sidekiq/middleware/client/unique_jobs.rb create mode 100644 lib/sidekiq/middleware/server/active_record.rb create mode 100644 lib/sidekiq/middleware/server/airbrake.rb create mode 100644 lib/sidekiq/middleware/server/unique_jobs.rb diff --git a/lib/sidekiq/middleware/chain.rb b/lib/sidekiq/middleware/chain.rb new file mode 100644 index 00000000..1461f1d8 --- /dev/null +++ b/lib/sidekiq/middleware/chain.rb @@ -0,0 +1,87 @@ +module Sidekiq + # Middleware is code configured to run before/after + # a message is processed. It is patterned after Rack + # middleware. Middleware exists for the client side + # as well as the server side. + # + # Default middleware for the server side: + # + # Sidekiq::Processor.middleware.register do + # use Sidekiq::Airbrake + # use Sidekiq::ActiveRecord + # end + # + # To add middleware for the client, do: + # + # Sidekiq::Client.middleware.register do + # use MyClientHook + # end + # + # To add middleware for the server, do: + # + # Sidekiq::Processor.middleware.register do + # use MyServerHook + # end + # + # This is an example of a minimal middleware: + # + # class MyHook + # def initialize(options=nil) + # end + # def call(worker, msg) + # puts "Before work" + # yield + # puts "After work" + # end + # end + # + module Middleware + class Chain + attr_reader :entries + + def initialize + @entries = [] + end + + def register(&block) + instance_eval(&block) + end + + def unregister(klass) + entries.delete_if { |entry| entry.klass == klass } + end + + def use(klass, *args) + entries << Entry.new(klass, *args) + end + + def retrieve + entries.map(&:make_new) + end + + def invoke(*args, &final_action) + chain = retrieve.dup + traverse_chain = lambda do + if chain.empty? + final_action.call + else + chain.shift.call(*args, &traverse_chain) + end + end + traverse_chain.call + end + end + + class Entry + attr_reader :klass + def initialize(klass, *args) + @klass = klass + @args = args + end + + def make_new + @klass.new(*@args) + end + end + end +end diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb new file mode 100644 index 00000000..df25221a --- /dev/null +++ b/lib/sidekiq/middleware/client/unique_jobs.rb @@ -0,0 +1,31 @@ +module Sidekiq + module Middleware + module Client + class UniqueJobs + HASH_KEY_EXPIRATION = 30 * 60 + + def initialize(redis) + @redis = redis + end + + def call(item) + payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item)) + return if already_scheduled?(payload_hash) + + @redis.multi do + @redis.set(payload_hash, payload_hash) + @redis.expire(payload_hash, HASH_KEY_EXPIRATION) + end + + yield + end + + private + + def already_scheduled?(payload_hash) + !!@redis.get(payload_hash) + end + end + end + end +end \ No newline at end of file diff --git a/lib/sidekiq/middleware/server/active_record.rb b/lib/sidekiq/middleware/server/active_record.rb new file mode 100644 index 00000000..bd2897bc --- /dev/null +++ b/lib/sidekiq/middleware/server/active_record.rb @@ -0,0 +1,13 @@ +module Sidekiq + module Middleware + module Server + class ActiveRecord + def call(worker, msg) + yield + ensure + ::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord) + end + end + end + end +end \ No newline at end of file diff --git a/lib/sidekiq/middleware/server/airbrake.rb b/lib/sidekiq/middleware/server/airbrake.rb new file mode 100644 index 00000000..bee197b2 --- /dev/null +++ b/lib/sidekiq/middleware/server/airbrake.rb @@ -0,0 +1,25 @@ +module Sidekiq + module Middleware + module Server + class Airbrake + def call(worker, msg) + yield + rescue => ex + send_to_airbrake(msg, ex) if defined?(::Airbrake) + raise + end + + private + + def send_to_airbrake(msg, ex) + ::Airbrake.notify(:error_class => ex.class.name, + :error_message => "#{ex.class.name}: #{ex.message}", + :parameters => msg) + end + end + end + end +end + + + diff --git a/lib/sidekiq/middleware/server/unique_jobs.rb b/lib/sidekiq/middleware/server/unique_jobs.rb new file mode 100644 index 00000000..77be4450 --- /dev/null +++ b/lib/sidekiq/middleware/server/unique_jobs.rb @@ -0,0 +1,17 @@ +module Sidekiq + module Middleware + module Server + class UniqueJobs + def initialize(redis) + @redis = redis + end + + def call(worker, msg) + yield + ensure + @redis.del(Digest::MD5.hexdigest(MultiJson.encode(msg))) + end + end + end + end +end \ No newline at end of file