mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Merge branch 'master' of github.com:mperham/sidekiq
This commit is contained in:
commit
9bb382aaab
20 changed files with 398 additions and 139 deletions
2
.gitignore
vendored
2
.gitignore
vendored
|
@ -1,2 +1,4 @@
|
|||
tags
|
||||
Gemfile.lock
|
||||
*.swp
|
||||
dump.rdb
|
||||
|
|
|
@ -2,3 +2,11 @@
|
|||
-----------
|
||||
|
||||
- Initial release!
|
||||
|
||||
0.5.2
|
||||
-----------
|
||||
|
||||
- Refactored middleware support, introducing ability to add client/server middleware (Ryan)
|
||||
- Added middleware for ignoring duplicate jobs (Ryan)
|
||||
- Added middleware for displaying jobs in resque-web dashboard (Max)
|
||||
- Added redis namespacing support (Max)
|
||||
|
|
34
Gemfile.lock
Normal file
34
Gemfile.lock
Normal file
|
@ -0,0 +1,34 @@
|
|||
PATH
|
||||
remote: .
|
||||
specs:
|
||||
sidekiq (0.5.1)
|
||||
celluloid
|
||||
connection_pool
|
||||
multi_json
|
||||
redis
|
||||
redis-namespace
|
||||
|
||||
GEM
|
||||
remote: http://rubygems.org/
|
||||
specs:
|
||||
celluloid (0.8.0)
|
||||
connection_pool (0.1.0)
|
||||
minitest (2.11.1)
|
||||
multi_json (1.0.4)
|
||||
rake (0.9.2.2)
|
||||
redis (2.2.2)
|
||||
redis-namespace (1.1.0)
|
||||
redis (< 3.0.0)
|
||||
simplecov (0.5.4)
|
||||
multi_json (~> 1.0.3)
|
||||
simplecov-html (~> 0.5.3)
|
||||
simplecov-html (0.5.3)
|
||||
|
||||
PLATFORMS
|
||||
ruby
|
||||
|
||||
DEPENDENCIES
|
||||
minitest
|
||||
rake
|
||||
sidekiq!
|
||||
simplecov
|
|
@ -1,9 +1,9 @@
|
|||
require 'optparse'
|
||||
require 'sidekiq/version'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/redis_connection'
|
||||
require 'sidekiq/client'
|
||||
require 'sidekiq/manager'
|
||||
require 'connection_pool'
|
||||
|
||||
module Sidekiq
|
||||
class CLI
|
||||
|
@ -18,8 +18,9 @@ module Sidekiq
|
|||
FOREVER = 2_000_000_000
|
||||
|
||||
def run
|
||||
::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) }
|
||||
manager = Sidekiq::Manager.new(@options[:server], @options)
|
||||
::Sidekiq::Client.redis = Sidekiq::RedisConnection.create(@options[:server], @options[:namespace])
|
||||
manager_redis = Sidekiq::RedisConnection.create(@options[:server], @options[:namespace], false)
|
||||
manager = Sidekiq::Manager.new(manager_redis, @options)
|
||||
begin
|
||||
log 'Starting processing, hit Ctrl-C to stop'
|
||||
manager.start!
|
||||
|
@ -60,7 +61,6 @@ module Sidekiq
|
|||
:verbose => false,
|
||||
:queues => [],
|
||||
:processor_count => 25,
|
||||
:server => ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0',
|
||||
:rails => '.',
|
||||
:environment => nil,
|
||||
}
|
||||
|
@ -77,6 +77,10 @@ module Sidekiq
|
|||
@options[:verbose] = true
|
||||
end
|
||||
|
||||
o.on "-n", "--namespace NAMESPACE", "namespace worker queues are under" do |arg|
|
||||
@options[:namespace] = arg
|
||||
end
|
||||
|
||||
o.on "-s", "--server LOCATION", "Where to find Redis" do |arg|
|
||||
@options[:server] = arg
|
||||
end
|
||||
|
|
|
@ -1,15 +1,20 @@
|
|||
require 'multi_json'
|
||||
require 'redis'
|
||||
|
||||
require 'sidekiq/redis_connection'
|
||||
require 'sidekiq/middleware/chain'
|
||||
require 'sidekiq/middleware/client/resque_web_compatability'
|
||||
require 'sidekiq/middleware/client/unique_jobs'
|
||||
|
||||
module Sidekiq
|
||||
class Client
|
||||
def self.middleware
|
||||
@middleware ||= Middleware::Chain.new
|
||||
end
|
||||
|
||||
def self.redis
|
||||
@redis ||= begin
|
||||
# autoconfig for Heroku
|
||||
hash = {}
|
||||
hash[:url] = ENV['REDISTOGO_URL'] if ENV['REDISTOGO_URL']
|
||||
Redis.connect(hash)
|
||||
RedisConnection.create
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -17,6 +22,17 @@ module Sidekiq
|
|||
@redis = redis
|
||||
end
|
||||
|
||||
def self.ignore_duplicate_jobs=(value)
|
||||
@ignore_duplicate_jobs = value
|
||||
if @ignore_duplicate_jobs
|
||||
middleware.register do
|
||||
use Middleware::Client::UniqueJobs, Client.redis
|
||||
end
|
||||
else
|
||||
middleware.unregister(Middleware::Client::UniqueJobs)
|
||||
end
|
||||
end
|
||||
|
||||
# Example usage:
|
||||
# Sidekiq::Client.push('my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
|
||||
def self.push(queue='default', item)
|
||||
|
@ -24,7 +40,9 @@ module Sidekiq
|
|||
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']
|
||||
|
||||
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
|
||||
redis.rpush("queue:#{queue}", MultiJson.encode(item))
|
||||
middleware.invoke(item, queue) do
|
||||
redis.rpush("queue:#{queue}", MultiJson.encode(item))
|
||||
end
|
||||
end
|
||||
|
||||
# Please use .push if possible instead.
|
||||
|
|
|
@ -18,14 +18,14 @@ module Sidekiq
|
|||
|
||||
trap_exit :processor_died
|
||||
|
||||
def initialize(location, options={})
|
||||
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
|
||||
def initialize(redis, options={})
|
||||
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
|
||||
verbose options.inspect
|
||||
@count = options[:processor_count] || 25
|
||||
@queues = options[:queues]
|
||||
@queue_idx = 0
|
||||
@queues_size = @queues.size
|
||||
@redis = Redis.connect(:url => location)
|
||||
@redis = redis
|
||||
@done_callback = nil
|
||||
|
||||
@done = false
|
||||
|
@ -50,8 +50,8 @@ module Sidekiq
|
|||
dispatch(true)
|
||||
end
|
||||
|
||||
def when_done
|
||||
@done_callback = Proc.new
|
||||
def when_done(&blk)
|
||||
@done_callback = blk
|
||||
end
|
||||
|
||||
def processor_done(processor)
|
||||
|
@ -87,7 +87,7 @@ module Sidekiq
|
|||
if msg
|
||||
processor = @ready.pop
|
||||
@busy << processor
|
||||
processor.process! MultiJson.decode(msg)
|
||||
processor.process!(MultiJson.decode(msg))
|
||||
end
|
||||
!!msg
|
||||
end
|
||||
|
|
|
@ -1,89 +0,0 @@
|
|||
module Sidekiq
|
||||
# Middleware is code configured to run before/after
|
||||
# a message is processed. It is patterned after Rack
|
||||
# middleware. The default middleware chain:
|
||||
#
|
||||
# Sidekiq::Middleware::Chain.register do
|
||||
# use Sidekiq::Airbrake
|
||||
# use Sidekiq::ActiveRecord
|
||||
# end
|
||||
#
|
||||
# This is an example of a minimal middleware:
|
||||
#
|
||||
# class MyHook
|
||||
# def initialize(options=nil)
|
||||
# end
|
||||
# def call(worker, msg)
|
||||
# puts "Before work"
|
||||
# yield
|
||||
# puts "After work"
|
||||
# end
|
||||
# end
|
||||
#
|
||||
module Middleware
|
||||
class Chain
|
||||
def self.register(&block)
|
||||
instance_exec(&block)
|
||||
end
|
||||
|
||||
def self.default
|
||||
@default ||= [Entry.new(Airbrake), Entry.new(ActiveRecord)]
|
||||
end
|
||||
|
||||
def self.use(klass, *args)
|
||||
chain << Entry.new(klass, args)
|
||||
end
|
||||
|
||||
def self.chain
|
||||
@chain ||= default
|
||||
end
|
||||
|
||||
def self.retrieve
|
||||
Thread.current[:sidekiq_chain] ||= chain.map { |entry| entry.make_new }
|
||||
end
|
||||
end
|
||||
|
||||
class Entry
|
||||
attr_reader :klass
|
||||
def initialize(klass, args = [])
|
||||
@klass = klass
|
||||
@args = args
|
||||
end
|
||||
|
||||
def make_new
|
||||
@klass.new(*@args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
class Airbrake
|
||||
def initialize(options=nil)
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
yield
|
||||
rescue => ex
|
||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||
raise
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def send_to_airbrake(msg, ex)
|
||||
::Airbrake.notify(:error_class => ex.class.name,
|
||||
:error_message => "#{ex.class.name}: #{ex.message}",
|
||||
:parameters => msg)
|
||||
end
|
||||
end
|
||||
|
||||
class ActiveRecord
|
||||
def initialize(options=nil)
|
||||
end
|
||||
|
||||
def call(*)
|
||||
yield
|
||||
ensure
|
||||
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||
end
|
||||
end
|
||||
end
|
87
lib/sidekiq/middleware/chain.rb
Normal file
87
lib/sidekiq/middleware/chain.rb
Normal file
|
@ -0,0 +1,87 @@
|
|||
module Sidekiq
|
||||
# Middleware is code configured to run before/after
|
||||
# a message is processed. It is patterned after Rack
|
||||
# middleware. Middleware exists for the client side
|
||||
# as well as the server side.
|
||||
#
|
||||
# Default middleware for the server side:
|
||||
#
|
||||
# Sidekiq::Processor.middleware.register do
|
||||
# use Sidekiq::Airbrake
|
||||
# use Sidekiq::ActiveRecord
|
||||
# end
|
||||
#
|
||||
# To add middleware for the client, do:
|
||||
#
|
||||
# Sidekiq::Client.middleware.register do
|
||||
# use MyClientHook
|
||||
# end
|
||||
#
|
||||
# To add middleware for the server, do:
|
||||
#
|
||||
# Sidekiq::Processor.middleware.register do
|
||||
# use MyServerHook
|
||||
# end
|
||||
#
|
||||
# This is an example of a minimal middleware:
|
||||
#
|
||||
# class MyHook
|
||||
# def initialize(options=nil)
|
||||
# end
|
||||
# def call(worker, msg)
|
||||
# puts "Before work"
|
||||
# yield
|
||||
# puts "After work"
|
||||
# end
|
||||
# end
|
||||
#
|
||||
module Middleware
|
||||
class Chain
|
||||
attr_reader :entries
|
||||
|
||||
def initialize
|
||||
@entries = []
|
||||
end
|
||||
|
||||
def register(&block)
|
||||
instance_eval(&block)
|
||||
end
|
||||
|
||||
def unregister(klass)
|
||||
entries.delete_if { |entry| entry.klass == klass }
|
||||
end
|
||||
|
||||
def use(klass, *args)
|
||||
entries << Entry.new(klass, *args)
|
||||
end
|
||||
|
||||
def retrieve
|
||||
entries.map(&:make_new)
|
||||
end
|
||||
|
||||
def invoke(*args, &final_action)
|
||||
chain = retrieve.dup
|
||||
traverse_chain = lambda do
|
||||
if chain.empty?
|
||||
final_action.call
|
||||
else
|
||||
chain.shift.call(*args, &traverse_chain)
|
||||
end
|
||||
end
|
||||
traverse_chain.call
|
||||
end
|
||||
end
|
||||
|
||||
class Entry
|
||||
attr_reader :klass
|
||||
def initialize(klass, *args)
|
||||
@klass = klass
|
||||
@args = args
|
||||
end
|
||||
|
||||
def make_new
|
||||
@klass.new(*@args)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
18
lib/sidekiq/middleware/client/resque_web_compatability.rb
Normal file
18
lib/sidekiq/middleware/client/resque_web_compatability.rb
Normal file
|
@ -0,0 +1,18 @@
|
|||
module Sidekiq
|
||||
module Middleware
|
||||
module Client
|
||||
class ResqueWebCompatability
|
||||
def initialize(redis)
|
||||
@redis = redis
|
||||
end
|
||||
|
||||
#Add job queue to list of queues resque web displays
|
||||
def call(item, queue)
|
||||
@redis.sadd('queues', queue)
|
||||
|
||||
yield
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
33
lib/sidekiq/middleware/client/unique_jobs.rb
Normal file
33
lib/sidekiq/middleware/client/unique_jobs.rb
Normal file
|
@ -0,0 +1,33 @@
|
|||
require 'digest'
|
||||
|
||||
module Sidekiq
|
||||
module Middleware
|
||||
module Client
|
||||
class UniqueJobs
|
||||
HASH_KEY_EXPIRATION = 30 * 60
|
||||
|
||||
def initialize(redis)
|
||||
@redis = redis
|
||||
end
|
||||
|
||||
def call(item, queue)
|
||||
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
|
||||
return if already_scheduled?(payload_hash)
|
||||
|
||||
@redis.multi do
|
||||
@redis.set(payload_hash, payload_hash)
|
||||
@redis.expire(payload_hash, HASH_KEY_EXPIRATION)
|
||||
end
|
||||
|
||||
yield
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def already_scheduled?(payload_hash)
|
||||
!!@redis.get(payload_hash)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
13
lib/sidekiq/middleware/server/active_record.rb
Normal file
13
lib/sidekiq/middleware/server/active_record.rb
Normal file
|
@ -0,0 +1,13 @@
|
|||
module Sidekiq
|
||||
module Middleware
|
||||
module Server
|
||||
class ActiveRecord
|
||||
def call(worker, msg)
|
||||
yield
|
||||
ensure
|
||||
::ActiveRecord::Base.clear_active_connections! if defined?(::ActiveRecord)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
25
lib/sidekiq/middleware/server/airbrake.rb
Normal file
25
lib/sidekiq/middleware/server/airbrake.rb
Normal file
|
@ -0,0 +1,25 @@
|
|||
module Sidekiq
|
||||
module Middleware
|
||||
module Server
|
||||
class Airbrake
|
||||
def call(worker, msg)
|
||||
yield
|
||||
rescue => ex
|
||||
send_to_airbrake(msg, ex) if defined?(::Airbrake)
|
||||
raise
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def send_to_airbrake(msg, ex)
|
||||
::Airbrake.notify(:error_class => ex.class.name,
|
||||
:error_message => "#{ex.class.name}: #{ex.message}",
|
||||
:parameters => msg)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
17
lib/sidekiq/middleware/server/unique_jobs.rb
Normal file
17
lib/sidekiq/middleware/server/unique_jobs.rb
Normal file
|
@ -0,0 +1,17 @@
|
|||
module Sidekiq
|
||||
module Middleware
|
||||
module Server
|
||||
class UniqueJobs
|
||||
def initialize(redis)
|
||||
@redis = redis
|
||||
end
|
||||
|
||||
def call(worker, msg)
|
||||
yield
|
||||
ensure
|
||||
@redis.del(Digest::MD5.hexdigest(MultiJson.encode(msg)))
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,32 +1,41 @@
|
|||
require 'sidekiq/util'
|
||||
require 'sidekiq/middleware'
|
||||
require 'celluloid'
|
||||
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/middleware/chain'
|
||||
require 'sidekiq/middleware/server/active_record'
|
||||
require 'sidekiq/middleware/server/airbrake'
|
||||
require 'sidekiq/middleware/server/unique_jobs'
|
||||
|
||||
module Sidekiq
|
||||
class Processor
|
||||
include Util
|
||||
include Celluloid
|
||||
|
||||
def self.middleware
|
||||
@middleware ||= begin
|
||||
chain = Middleware::Chain.new
|
||||
|
||||
# default middleware
|
||||
chain.register do
|
||||
use Middleware::Server::UniqueJobs, Sidekiq::Client.redis
|
||||
use Middleware::Server::Airbrake
|
||||
use Middleware::Server::ActiveRecord
|
||||
end
|
||||
chain
|
||||
end
|
||||
end
|
||||
|
||||
def initialize(boss)
|
||||
@boss = boss
|
||||
end
|
||||
|
||||
def process(msg)
|
||||
klass = constantize(msg['class'])
|
||||
invoke_chain(klass.new, msg)
|
||||
@boss.processor_done!(current_actor)
|
||||
end
|
||||
|
||||
def invoke_chain(worker, msg)
|
||||
chain = Sidekiq::Middleware::Chain.retrieve.dup
|
||||
traverse_chain = lambda do
|
||||
if chain.empty?
|
||||
worker.perform(*msg['args'])
|
||||
else
|
||||
chain.shift.call(worker, msg, &traverse_chain)
|
||||
end
|
||||
klass = constantize(msg['class'])
|
||||
worker = klass.new
|
||||
self.class.middleware.invoke(worker, msg) do
|
||||
worker.perform(*msg['args'])
|
||||
end
|
||||
traverse_chain.call
|
||||
@boss.processor_done!(current_actor)
|
||||
end
|
||||
|
||||
# See http://github.com/tarcieri/celluloid/issues/22
|
||||
|
|
39
lib/sidekiq/redis_connection.rb
Normal file
39
lib/sidekiq/redis_connection.rb
Normal file
|
@ -0,0 +1,39 @@
|
|||
require 'connection_pool'
|
||||
require 'redis/namespace'
|
||||
|
||||
module Sidekiq
|
||||
class RedisConnection
|
||||
def self.create(url = nil, namespace = nil, pool = true)
|
||||
@namespace = namespace ? namespace : nil
|
||||
@url = url ? url : nil
|
||||
|
||||
if pool
|
||||
ConnectionPool.new { connect }
|
||||
else
|
||||
connect
|
||||
end
|
||||
end
|
||||
|
||||
def self.connect
|
||||
if namespace
|
||||
Redis::Namespace.new(namespace, :redis => redis_connection)
|
||||
else
|
||||
redis_connection
|
||||
end
|
||||
end
|
||||
|
||||
def self.namespace
|
||||
@namespace
|
||||
end
|
||||
|
||||
def self.url
|
||||
@url || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def self.redis_connection
|
||||
Redis.connect(:url => url)
|
||||
end
|
||||
end
|
||||
end
|
|
@ -31,6 +31,5 @@ module Sidekiq
|
|||
def verbose(msg)
|
||||
STDOUT.puts(msg) if $DEBUG
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
|
@ -14,6 +14,7 @@ Gem::Specification.new do |gem|
|
|||
gem.require_paths = ["lib"]
|
||||
gem.version = Sidekiq::VERSION
|
||||
gem.add_dependency 'redis'
|
||||
gem.add_dependency 'redis-namespace'
|
||||
gem.add_dependency 'connection_pool'
|
||||
gem.add_dependency 'celluloid'
|
||||
gem.add_dependency 'multi_json'
|
||||
|
|
|
@ -3,10 +3,33 @@ require 'sidekiq/client'
|
|||
require 'sidekiq/worker'
|
||||
|
||||
class TestClient < MiniTest::Unit::TestCase
|
||||
describe 'with real redis' do
|
||||
before do
|
||||
Sidekiq::Client.redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
|
||||
Sidekiq::Client.redis.flushdb
|
||||
end
|
||||
|
||||
it 'does not push duplicate messages when configured for unique only' do
|
||||
Sidekiq::Client.ignore_duplicate_jobs = true
|
||||
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||
assert_equal Sidekiq::Client.redis.llen("queue:customqueue"), 1
|
||||
end
|
||||
|
||||
it 'does push duplicate messages when not configured for unique only' do
|
||||
Sidekiq::Client.ignore_duplicate_jobs = false
|
||||
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||
assert_equal Sidekiq::Client.redis.llen("queue:customqueue2"), 10
|
||||
end
|
||||
end
|
||||
|
||||
describe 'with mock redis' do
|
||||
before do
|
||||
@redis = MiniTest::Mock.new
|
||||
def @redis.multi; yield; end
|
||||
def @redis.set(*); true; end
|
||||
def @redis.expire(*); true; end
|
||||
Sidekiq::Client.redis = @redis
|
||||
Sidekiq::Client.ignore_duplicate_jobs = false
|
||||
end
|
||||
|
||||
it 'raises ArgumentError with invalid params' do
|
||||
|
|
|
@ -6,7 +6,7 @@ require 'connection_pool'
|
|||
class TestManager < MiniTest::Unit::TestCase
|
||||
describe 'with redis' do
|
||||
before do
|
||||
Sidekiq::Client.redis = @redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
|
||||
Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create('redis://localhost/sidekiq_test', nil, false)
|
||||
@redis.flushdb
|
||||
$processed = 0
|
||||
end
|
||||
|
@ -25,12 +25,13 @@ class TestManager < MiniTest::Unit::TestCase
|
|||
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
|
||||
|
||||
q = TimedQueue.new
|
||||
mgr = Sidekiq::Manager.new("redis://localhost/sidekiq_test", :queues => [:foo])
|
||||
redis = Sidekiq::RedisConnection.create('redis://localhost/sidekiq_test', nil, false)
|
||||
mgr = Sidekiq::Manager.new(redis, :queues => [:foo])
|
||||
mgr.when_done do |_|
|
||||
q << 'done' if $processed == 2
|
||||
end
|
||||
mgr.start!
|
||||
result = q.timed_pop
|
||||
result = q.timed_pop(1.0)
|
||||
assert_equal 'done', result
|
||||
mgr.stop
|
||||
end
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
require 'helper'
|
||||
require 'sidekiq/middleware'
|
||||
require 'sidekiq/middleware/chain'
|
||||
require 'sidekiq/middleware/server/unique_jobs'
|
||||
require 'sidekiq/processor'
|
||||
|
||||
class TestMiddleware < MiniTest::Unit::TestCase
|
||||
|
@ -22,17 +23,13 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
it 'configures default middleware' do
|
||||
chain = Sidekiq::Middleware::Chain.chain
|
||||
assert_equal chain, Sidekiq::Middleware::Chain.default
|
||||
end
|
||||
|
||||
it 'supports custom middleware' do
|
||||
Sidekiq::Middleware::Chain.register do
|
||||
chain = Sidekiq::Middleware::Chain.new
|
||||
chain.register do
|
||||
use CustomMiddleware, 1, []
|
||||
end
|
||||
chain = Sidekiq::Middleware::Chain.chain
|
||||
assert_equal chain.last.klass, CustomMiddleware
|
||||
|
||||
assert_equal chain.entries.last.klass, CustomMiddleware
|
||||
end
|
||||
|
||||
class CustomWorker
|
||||
|
@ -41,11 +38,20 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
class NonYieldingMiddleware
|
||||
def call(*args)
|
||||
end
|
||||
end
|
||||
|
||||
it 'executes middleware in the proper order' do
|
||||
Sidekiq::Middleware::Server::UniqueJobs.class_eval do
|
||||
def call(worker, msg); yield; end
|
||||
end
|
||||
|
||||
recorder = []
|
||||
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
||||
|
||||
Sidekiq::Middleware::Chain.register do
|
||||
Sidekiq::Processor.middleware.register do
|
||||
2.times { |i| use CustomMiddleware, i.to_s, recorder }
|
||||
end
|
||||
|
||||
|
@ -54,9 +60,20 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
processor.process(msg)
|
||||
assert_equal recorder.flatten, %w(0 before 1 before work_performed 1 after 0 after)
|
||||
end
|
||||
|
||||
it 'allows middleware to abruptly stop processing rest of chain' do
|
||||
recorder = []
|
||||
chain = Sidekiq::Middleware::Chain.new
|
||||
|
||||
chain.register do
|
||||
use NonYieldingMiddleware
|
||||
use CustomMiddleware, 1, recorder
|
||||
end
|
||||
|
||||
final_action = nil
|
||||
chain.invoke { final_action = true }
|
||||
assert_equal final_action, nil
|
||||
assert_equal recorder, []
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
end
|
Loading…
Add table
Reference in a new issue