mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
introduce middleware for cleanup of encoded payloads
This commit is contained in:
parent
62045af3d5
commit
1a028c4a2f
8 changed files with 57 additions and 47 deletions
|
@ -6,7 +6,8 @@ module Sidekiq
|
|||
class Client
|
||||
|
||||
class << self
|
||||
attr_accessor :push_unique_only
|
||||
attr_accessor :ignore_duplicate_jobs
|
||||
alias_method :ignore_duplicate_jobs?, :ignore_duplicate_jobs
|
||||
end
|
||||
|
||||
def self.redis
|
||||
|
@ -33,7 +34,7 @@ module Sidekiq
|
|||
encoded_payloads_key = "queue:encoded:#{queue}"
|
||||
payload = MultiJson.encode(item)
|
||||
encoded_payload = Base64.encode64(payload)
|
||||
return if push_unique_only && already_queued?(encoded_payloads_key, encoded_payload)
|
||||
return if ignore_duplicate_jobs? && already_queued?(encoded_payloads_key, encoded_payload)
|
||||
|
||||
redis.multi do
|
||||
redis.sadd(encoded_payloads_key, encoded_payload)
|
||||
|
|
|
@ -83,15 +83,11 @@ module Sidekiq
|
|||
|
||||
def find_work(queue_idx)
|
||||
current_queue = @queues[queue_idx]
|
||||
queue_key = "queue:#{current_queue}"
|
||||
encoded_payloads_key = "queue:encoded:#{current_queue}"
|
||||
msg = @redis.lpop(queue_key)
|
||||
msg = @redis.lpop("queue:#{current_queue}")
|
||||
if msg
|
||||
payload = MultiJson.decode(msg)
|
||||
@redis.srem(encoded_payloads_key, Base64.encode64(msg))
|
||||
processor = @ready.pop
|
||||
@busy << processor
|
||||
processor.process!(payload)
|
||||
processor.process!(MultiJson.decode(msg), current_queue)
|
||||
end
|
||||
!!msg
|
||||
end
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
require 'json'
|
||||
|
||||
module Sidekiq
|
||||
# Middleware is code configured to run before/after
|
||||
# a message is processed. It is patterned after Rack
|
||||
|
@ -27,11 +29,14 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def self.default
|
||||
@default ||= [Entry.new(Airbrake), Entry.new(ActiveRecord)]
|
||||
@default ||= [
|
||||
Entry.new(EncodedMessageRemover, Sidekiq::Client.redis),
|
||||
Entry.new(Airbrake),
|
||||
Entry.new(ActiveRecord)]
|
||||
end
|
||||
|
||||
def self.use(klass, *args)
|
||||
chain << Entry.new(klass, args)
|
||||
chain << Entry.new(klass, *args)
|
||||
end
|
||||
|
||||
def self.chain
|
||||
|
@ -45,7 +50,7 @@ module Sidekiq
|
|||
|
||||
class Entry
|
||||
attr_reader :klass
|
||||
def initialize(klass, args = [])
|
||||
def initialize(klass, *args)
|
||||
@klass = klass
|
||||
@args = args
|
||||
end
|
||||
|
@ -54,36 +59,41 @@ module Sidekiq
|
|||
@klass.new(*@args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Airbrake
|
||||
def initialize(options=nil)
|
||||
class Airbrake
|
||||
def call(worker, msg, queue)
|
||||
yield
|
||||
rescue => ex
|
||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||
raise
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def send_to_airbrake(msg, ex)
|
||||
::Airbrake.notify(:error_class => ex.class.name,
|
||||
:error_message => "#{ex.class.name}: #{ex.message}",
|
||||
:parameters => msg)
|
||||
end
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
yield
|
||||
rescue => ex
|
||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||
raise
|
||||
class ActiveRecord
|
||||
def call(worker, msg, queue)
|
||||
yield
|
||||
ensure
|
||||
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
class EncodedMessageRemover
|
||||
def initialize(redis)
|
||||
@redis = redis
|
||||
end
|
||||
|
||||
def send_to_airbrake(msg, ex)
|
||||
::Airbrake.notify(:error_class => ex.class.name,
|
||||
:error_message => "#{ex.class.name}: #{ex.message}",
|
||||
:parameters => msg)
|
||||
end
|
||||
end
|
||||
|
||||
class ActiveRecord
|
||||
def initialize(options=nil)
|
||||
end
|
||||
|
||||
def call(*)
|
||||
yield
|
||||
ensure
|
||||
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||
def call(worker, msg, queue)
|
||||
@redis.srem("queue:encoded:#{queue}", Base64.encode64(msg.to_json))
|
||||
yield
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -11,19 +11,19 @@ module Sidekiq
|
|||
@boss = boss
|
||||
end
|
||||
|
||||
def process(msg)
|
||||
def process(msg, queue)
|
||||
klass = constantize(msg['class'])
|
||||
invoke_chain(klass.new, msg)
|
||||
invoke_chain(klass.new, msg, queue)
|
||||
@boss.processor_done!(current_actor)
|
||||
end
|
||||
|
||||
def invoke_chain(worker, msg)
|
||||
def invoke_chain(worker, msg, queue)
|
||||
chain = Sidekiq::Middleware::Chain.retrieve.dup
|
||||
traverse_chain = lambda do
|
||||
if chain.empty?
|
||||
worker.perform(*msg['args'])
|
||||
else
|
||||
chain.shift.call(worker, msg, &traverse_chain)
|
||||
chain.shift.call(worker, msg, queue, &traverse_chain)
|
||||
end
|
||||
end
|
||||
traverse_chain.call
|
||||
|
|
|
@ -31,6 +31,5 @@ module Sidekiq
|
|||
def verbose(msg)
|
||||
STDOUT.puts(msg) if $DEBUG
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -10,13 +10,13 @@ class TestClient < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'does not push duplicate messages when configured for unique only' do
|
||||
Sidekiq::Client.push_unique_only = true
|
||||
Sidekiq::Client.ignore_duplicate_jobs = true
|
||||
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||
assert_equal Sidekiq::Client.redis.llen("queue:customqueue"), 1
|
||||
end
|
||||
|
||||
it 'does push duplicate messages when not configured for unique only' do
|
||||
Sidekiq::Client.push_unique_only = false
|
||||
Sidekiq::Client.ignore_duplicate_jobs = false
|
||||
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||
assert_equal Sidekiq::Client.redis.llen("queue:customqueue2"), 10
|
||||
end
|
||||
|
@ -28,7 +28,7 @@ class TestClient < MiniTest::Unit::TestCase
|
|||
def @redis.multi; yield; end
|
||||
def @redis.sadd(*); true; end
|
||||
Sidekiq::Client.redis = @redis
|
||||
Sidekiq::Client.push_unique_only = false
|
||||
Sidekiq::Client.ignore_duplicate_jobs = false
|
||||
end
|
||||
|
||||
it 'raises ArgumentError with invalid params' do
|
||||
|
|
|
@ -15,7 +15,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
@recorder = recorder
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
def call(worker, msg, queue)
|
||||
@recorder << [@name, 'before']
|
||||
yield
|
||||
@recorder << [@name, 'after']
|
||||
|
@ -42,6 +42,10 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'executes middleware in the proper order' do
|
||||
Sidekiq::Middleware::EncodedMessageRemover.class_eval do
|
||||
def call(worker, msg, queue); yield; end
|
||||
end
|
||||
|
||||
recorder = []
|
||||
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
||||
|
||||
|
@ -51,7 +55,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
|
||||
processor = Sidekiq::Processor.new(@boss)
|
||||
@boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
assert_equal recorder.flatten, %w(0 before 1 before work_performed 1 after 0 after)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -21,7 +21,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
msg = { 'class' => MockWorker.to_s, 'args' => ['myarg'] }
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
@boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
@boss.verify
|
||||
assert_equal 1, $invokes
|
||||
assert_equal 0, $errors.size
|
||||
|
@ -31,7 +31,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
msg = { 'class' => MockWorker.to_s, 'args' => ['boom'] }
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
assert_raises RuntimeError do
|
||||
processor.process(msg)
|
||||
processor.process(msg, 'default')
|
||||
end
|
||||
@boss.verify
|
||||
assert_equal 0, $invokes
|
||||
|
|
Loading…
Reference in a new issue