1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/middleware/chain.rb
Mike Perham 83aea0690e Explicitly pass Redis associated with this job
When pushing a job, the middleware should be able to access the Redis instance associated with that job.  Previously, Sidekiq was limited to one global Redis instance.  Now that we want to support sharding, we have to explicitly pass the instance in OR hack up APIs with thread local variables.  Explicit is better.
2014-03-25 21:38:17 -07:00

144 lines
3.5 KiB
Ruby

module Sidekiq
# Middleware is code configured to run before/after
# a message is processed. It is patterned after Rack
# middleware. Middleware exists for the client side
# (pushing jobs onto the queue) as well as the server
# side (when jobs are actually processed).
#
# To add middleware for the client:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.add MyClientHook
# end
# end
#
# To modify middleware for the server, just call
# with another block:
#
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add MyServerHook
# chain.remove ActiveRecord
# end
# end
#
# To insert immediately preceding another entry:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_before ActiveRecord, MyClientHook
# end
# end
#
# To insert immediately after another entry:
#
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.insert_after ActiveRecord, MyClientHook
# end
# end
#
# This is an example of a minimal server middleware:
#
# class MyServerHook
# def call(worker_instance, msg, queue)
# puts "Before work"
# yield
# puts "After work"
# end
# end
#
# This is an example of a minimal client middleware, note
# the method must return the result or the job will not push
# to Redis:
#
# class MyClientHook
# def call(worker_class, msg, queue, redis_pool)
# puts "Before push"
# result = yield
# puts "After push"
# result
# end
# end
#
module Middleware
class Chain
include Enumerable
attr_reader :entries
def initialize_copy(copy)
copy.instance_variable_set(:@entries, entries.dup)
end
def each(&block)
entries.each(&block)
end
def initialize
@entries = []
yield self if block_given?
end
def remove(klass)
entries.delete_if { |entry| entry.klass == klass }
end
def add(klass, *args)
remove(klass) if exists?(klass)
entries << Entry.new(klass, *args)
end
def insert_before(oldklass, newklass, *args)
i = entries.index { |entry| entry.klass == newklass }
new_entry = i.nil? ? Entry.new(newklass, *args) : entries.delete_at(i)
i = entries.index { |entry| entry.klass == oldklass } || 0
entries.insert(i, new_entry)
end
def insert_after(oldklass, newklass, *args)
i = entries.index { |entry| entry.klass == newklass }
new_entry = i.nil? ? Entry.new(newklass, *args) : entries.delete_at(i)
i = entries.index { |entry| entry.klass == oldklass } || entries.count - 1
entries.insert(i+1, new_entry)
end
def exists?(klass)
any? { |entry| entry.klass == klass }
end
def retrieve
map(&:make_new)
end
def clear
entries.clear
end
def invoke(*args, &final_action)
chain = retrieve.dup
traverse_chain = lambda do
if chain.empty?
final_action.call
else
chain.shift.call(*args, &traverse_chain)
end
end
traverse_chain.call
end
end
class Entry
attr_reader :klass
def initialize(klass, *args)
@klass = klass
@args = args
end
def make_new
@klass.new(*@args)
end
end
end
end