mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor middleware API, this will break anyone using the old API.
e.g. Sidekiq::Processor.middleware => Sidekiq.server_middleware Sidekiq::Client.middleware => Sidekiq.client_middleware
This commit is contained in:
parent
5a85380733
commit
6af6a86369
8 changed files with 67 additions and 62 deletions
1
TODO.md
1
TODO.md
|
@ -1,3 +1,2 @@
|
||||||
- resque-ui-esque web ui
|
- resque-ui-esque web ui
|
||||||
- graceful shutdown (ideally Celluloid will provide this)
|
|
||||||
- monit/god/etc example scripts
|
- monit/god/etc example scripts
|
||||||
|
|
|
@ -2,15 +2,50 @@ require 'sidekiq/version'
|
||||||
require 'sidekiq/client'
|
require 'sidekiq/client'
|
||||||
require 'sidekiq/worker'
|
require 'sidekiq/worker'
|
||||||
require 'sidekiq/rails' if defined?(::Rails)
|
require 'sidekiq/rails' if defined?(::Rails)
|
||||||
|
require 'sidekiq/redis_connection'
|
||||||
|
|
||||||
require 'sidekiq/extensions/action_mailer'
|
require 'sidekiq/extensions/action_mailer'
|
||||||
require 'sidekiq/extensions/active_record'
|
require 'sidekiq/extensions/active_record'
|
||||||
|
|
||||||
|
require 'sidekiq/middleware/chain'
|
||||||
|
require 'sidekiq/middleware/server/active_record'
|
||||||
|
require 'sidekiq/middleware/server/airbrake'
|
||||||
|
require 'sidekiq/middleware/server/unique_jobs'
|
||||||
|
require 'sidekiq/middleware/client/resque_web_compatibility'
|
||||||
|
require 'sidekiq/middleware/client/unique_jobs'
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
|
|
||||||
def self.redis
|
def self.redis
|
||||||
@redis ||= Sidekiq::RedisConnection.create
|
@redis ||= Sidekiq::RedisConnection.create
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.redis=(r)
|
def self.redis=(r)
|
||||||
@redis = r
|
@redis = r
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def self.client_middleware
|
||||||
|
@client_chain ||= begin
|
||||||
|
m = Middleware::Chain.new
|
||||||
|
m.add Middleware::Client::UniqueJobs
|
||||||
|
m.add Middleware::Client::ResqueWebCompatibility
|
||||||
|
m
|
||||||
|
end
|
||||||
|
yield @client_chain if block_given?
|
||||||
|
@client_chain
|
||||||
|
end
|
||||||
|
|
||||||
|
def self.server_middleware
|
||||||
|
@server_chain ||= begin
|
||||||
|
m = Middleware::Chain.new
|
||||||
|
m.add Middleware::Server::Airbrake
|
||||||
|
m.add Middleware::Server::UniqueJobs
|
||||||
|
m.add Middleware::Server::ActiveRecord
|
||||||
|
m
|
||||||
|
end
|
||||||
|
|
||||||
|
yield @server_chain if block_given?
|
||||||
|
@server_chain
|
||||||
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
|
|
|
@ -10,14 +10,7 @@ module Sidekiq
|
||||||
class Client
|
class Client
|
||||||
|
|
||||||
def self.middleware
|
def self.middleware
|
||||||
@middleware ||= begin
|
raise "Sidekiq::Client.middleware is now Sidekiq.client_middleware"
|
||||||
m = Middleware::Chain.new
|
|
||||||
m.register do
|
|
||||||
use Middleware::Client::UniqueJobs
|
|
||||||
use Middleware::Client::ResqueWebCompatibility
|
|
||||||
end
|
|
||||||
m
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def self.registered_workers
|
def self.registered_workers
|
||||||
|
@ -43,7 +36,7 @@ module Sidekiq
|
||||||
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
|
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
|
||||||
|
|
||||||
pushed = false
|
pushed = false
|
||||||
middleware.invoke(item, queue) do
|
Sidekiq.client_middleware.invoke(item, queue) do
|
||||||
Sidekiq.redis.rpush("queue:#{queue}", MultiJson.encode(item))
|
Sidekiq.redis.rpush("queue:#{queue}", MultiJson.encode(item))
|
||||||
pushed = true
|
pushed = true
|
||||||
end
|
end
|
||||||
|
|
|
@ -5,31 +5,31 @@ module Sidekiq
|
||||||
# (pushing jobs onto the queue) as well as the server
|
# (pushing jobs onto the queue) as well as the server
|
||||||
# side (when jobs are actually processed).
|
# side (when jobs are actually processed).
|
||||||
#
|
#
|
||||||
# Default middleware for the server side:
|
# Default middleware for the server:
|
||||||
#
|
#
|
||||||
# Sidekiq::Processor.middleware.register do
|
# Sidekiq.server_middleware do |chain|
|
||||||
# use Middleware::Server::Airbrake
|
# chain.use Middleware::Server::Airbrake
|
||||||
# use Middleware::Server::ActiveRecord
|
# chain.use Middleware::Server::ActiveRecord
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# To add middleware for the client, do:
|
# To add middleware for the client:
|
||||||
#
|
#
|
||||||
# Sidekiq::Client.middleware.register do
|
# Sidekiq.client_middleware do |chain|
|
||||||
# use MyClientHook
|
# chain.use MyClientHook
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# To add middleware for the server, do:
|
# To modify middleware for the server, just call
|
||||||
|
# with another block:
|
||||||
#
|
#
|
||||||
# Sidekiq::Processor.middleware.register do
|
# Sidekiq::Processor.middleware do |chain|
|
||||||
# use MyServerHook
|
# chain.use MyServerHook
|
||||||
|
# chain.remove ActiveRecord
|
||||||
# end
|
# end
|
||||||
#
|
#
|
||||||
# This is an example of a minimal middleware:
|
# This is an example of a minimal middleware:
|
||||||
#
|
#
|
||||||
# class MyHook
|
# class MyServerHook
|
||||||
# def initialize(options=nil)
|
# def call(worker, msg, queue)
|
||||||
# end
|
|
||||||
# def call(worker, msg)
|
|
||||||
# puts "Before work"
|
# puts "Before work"
|
||||||
# yield
|
# yield
|
||||||
# puts "After work"
|
# puts "After work"
|
||||||
|
@ -44,15 +44,11 @@ module Sidekiq
|
||||||
@entries = []
|
@entries = []
|
||||||
end
|
end
|
||||||
|
|
||||||
def register(&block)
|
def remove(klass)
|
||||||
instance_eval(&block)
|
|
||||||
end
|
|
||||||
|
|
||||||
def unregister(klass)
|
|
||||||
entries.delete_if { |entry| entry.klass == klass }
|
entries.delete_if { |entry| entry.klass == klass }
|
||||||
end
|
end
|
||||||
|
|
||||||
def use(klass, *args)
|
def add(klass, *args)
|
||||||
entries << Entry.new(klass, *args) unless exists?(klass)
|
entries << Entry.new(klass, *args) unless exists?(klass)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
require 'multi_json'
|
||||||
require 'digest'
|
require 'digest'
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
|
|
|
@ -1,10 +1,5 @@
|
||||||
require 'celluloid'
|
require 'celluloid'
|
||||||
|
|
||||||
require 'sidekiq/util'
|
require 'sidekiq/util'
|
||||||
require 'sidekiq/middleware/chain'
|
|
||||||
require 'sidekiq/middleware/server/active_record'
|
|
||||||
require 'sidekiq/middleware/server/airbrake'
|
|
||||||
require 'sidekiq/middleware/server/unique_jobs'
|
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
class Processor
|
class Processor
|
||||||
|
@ -12,17 +7,7 @@ module Sidekiq
|
||||||
include Celluloid
|
include Celluloid
|
||||||
|
|
||||||
def self.middleware
|
def self.middleware
|
||||||
@middleware ||= begin
|
raise "Sidekiq::Processor.middleware is now Sidekiq.server_middleware"
|
||||||
chain = Middleware::Chain.new
|
|
||||||
|
|
||||||
# default middleware
|
|
||||||
chain.register do
|
|
||||||
use Middleware::Server::Airbrake
|
|
||||||
use Middleware::Server::UniqueJobs
|
|
||||||
use Middleware::Server::ActiveRecord
|
|
||||||
end
|
|
||||||
chain
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def initialize(boss)
|
def initialize(boss)
|
||||||
|
@ -34,7 +19,7 @@ module Sidekiq
|
||||||
klass = constantize(msg['class'])
|
klass = constantize(msg['class'])
|
||||||
worker = klass.new
|
worker = klass.new
|
||||||
stats(worker, msg, queue) do
|
stats(worker, msg, queue) do
|
||||||
self.class.middleware.invoke(worker, msg, queue) do
|
Sidekiq.server_middleware.invoke(worker, msg, queue) do
|
||||||
worker.perform(*msg['args'])
|
worker.perform(*msg['args'])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -10,17 +10,17 @@ class TestClient < MiniTest::Unit::TestCase
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does not push duplicate messages when configured for unique only' do
|
it 'does not push duplicate messages when configured for unique only' do
|
||||||
Sidekiq::Client.middleware.entries.clear
|
Sidekiq.client_middleware.entries.clear
|
||||||
Sidekiq::Client.middleware.register do
|
Sidekiq.client_middleware do |chain|
|
||||||
use Sidekiq::Middleware::Client::UniqueJobs
|
chain.add Sidekiq::Middleware::Client::UniqueJobs
|
||||||
use Sidekiq::Middleware::Client::ResqueWebCompatibility
|
chain.add Sidekiq::Middleware::Client::ResqueWebCompatibility
|
||||||
end
|
end
|
||||||
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
|
10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||||
assert_equal 1, Sidekiq.redis.llen("queue:customqueue")
|
assert_equal 1, Sidekiq.redis.llen("queue:customqueue")
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'does push duplicate messages when not configured for unique only' do
|
it 'does push duplicate messages when not configured for unique only' do
|
||||||
Sidekiq::Client.middleware.unregister(Sidekiq::Middleware::Client::UniqueJobs)
|
Sidekiq.client_middleware.remove(Sidekiq::Middleware::Client::UniqueJobs)
|
||||||
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
|
10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) }
|
||||||
assert_equal 10, Sidekiq.redis.llen("queue:customqueue2")
|
assert_equal 10, Sidekiq.redis.llen("queue:customqueue2")
|
||||||
end
|
end
|
||||||
|
|
|
@ -24,9 +24,7 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
||||||
|
|
||||||
it 'supports custom middleware' do
|
it 'supports custom middleware' do
|
||||||
chain = Sidekiq::Middleware::Chain.new
|
chain = Sidekiq::Middleware::Chain.new
|
||||||
chain.register do
|
chain.add CustomMiddleware, 1, []
|
||||||
use CustomMiddleware, 1, []
|
|
||||||
end
|
|
||||||
|
|
||||||
assert_equal CustomMiddleware, chain.entries.last.klass
|
assert_equal CustomMiddleware, chain.entries.last.klass
|
||||||
end
|
end
|
||||||
|
@ -51,8 +49,9 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
||||||
recorder = []
|
recorder = []
|
||||||
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
||||||
|
|
||||||
Sidekiq::Processor.middleware.register do
|
Sidekiq.server_middleware do |chain|
|
||||||
2.times { |i| use CustomMiddleware, i.to_s, recorder }
|
# should only add once, second should be ignored
|
||||||
|
2.times { |i| chain.add CustomMiddleware, i.to_s, recorder }
|
||||||
end
|
end
|
||||||
|
|
||||||
boss = MiniTest::Mock.new
|
boss = MiniTest::Mock.new
|
||||||
|
@ -65,11 +64,8 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
||||||
it 'allows middleware to abruptly stop processing rest of chain' do
|
it 'allows middleware to abruptly stop processing rest of chain' do
|
||||||
recorder = []
|
recorder = []
|
||||||
chain = Sidekiq::Middleware::Chain.new
|
chain = Sidekiq::Middleware::Chain.new
|
||||||
|
chain.add NonYieldingMiddleware
|
||||||
chain.register do
|
chain.add CustomMiddleware, 1, recorder
|
||||||
use NonYieldingMiddleware
|
|
||||||
use CustomMiddleware, 1, recorder
|
|
||||||
end
|
|
||||||
|
|
||||||
final_action = nil
|
final_action = nil
|
||||||
chain.invoke { final_action = true }
|
chain.invoke { final_action = true }
|
||||||
|
|
Loading…
Add table
Reference in a new issue