diff --git a/TODO.md b/TODO.md index 683537e0..6640c3f1 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,2 @@ - resque-ui-esque web ui -- graceful shutdown (ideally Celluloid will provide this) - monit/god/etc example scripts diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 9e34c01c..a366955b 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -2,15 +2,50 @@ require 'sidekiq/version' require 'sidekiq/client' require 'sidekiq/worker' require 'sidekiq/rails' if defined?(::Rails) +require 'sidekiq/redis_connection' require 'sidekiq/extensions/action_mailer' require 'sidekiq/extensions/active_record' +require 'sidekiq/middleware/chain' +require 'sidekiq/middleware/server/active_record' +require 'sidekiq/middleware/server/airbrake' +require 'sidekiq/middleware/server/unique_jobs' +require 'sidekiq/middleware/client/resque_web_compatibility' +require 'sidekiq/middleware/client/unique_jobs' + module Sidekiq + def self.redis @redis ||= Sidekiq::RedisConnection.create end + def self.redis=(r) @redis = r end + + def self.client_middleware + @client_chain ||= begin + m = Middleware::Chain.new + m.add Middleware::Client::UniqueJobs + m.add Middleware::Client::ResqueWebCompatibility + m + end + yield @client_chain if block_given? + @client_chain + end + + def self.server_middleware + @server_chain ||= begin + m = Middleware::Chain.new + m.add Middleware::Server::Airbrake + m.add Middleware::Server::UniqueJobs + m.add Middleware::Server::ActiveRecord + m + end + + yield @server_chain if block_given? + @server_chain + end + end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index a96f36df..0d5c0cc3 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -10,14 +10,7 @@ module Sidekiq class Client def self.middleware - @middleware ||= begin - m = Middleware::Chain.new - m.register do - use Middleware::Client::UniqueJobs - use Middleware::Client::ResqueWebCompatibility - end - m - end + raise "Sidekiq::Client.middleware is now Sidekiq.client_middleware" end def self.registered_workers @@ -43,7 +36,7 @@ module Sidekiq item['class'] = item['class'].to_s if !item['class'].is_a?(String) pushed = false - middleware.invoke(item, queue) do + Sidekiq.client_middleware.invoke(item, queue) do Sidekiq.redis.rpush("queue:#{queue}", MultiJson.encode(item)) pushed = true end diff --git a/lib/sidekiq/middleware/chain.rb b/lib/sidekiq/middleware/chain.rb index 4edcdb61..161b4e7e 100644 --- a/lib/sidekiq/middleware/chain.rb +++ b/lib/sidekiq/middleware/chain.rb @@ -5,31 +5,31 @@ module Sidekiq # (pushing jobs onto the queue) as well as the server # side (when jobs are actually processed). # - # Default middleware for the server side: + # Default middleware for the server: # - # Sidekiq::Processor.middleware.register do - # use Middleware::Server::Airbrake - # use Middleware::Server::ActiveRecord + # Sidekiq.server_middleware do |chain| + # chain.use Middleware::Server::Airbrake + # chain.use Middleware::Server::ActiveRecord # end # - # To add middleware for the client, do: + # To add middleware for the client: # - # Sidekiq::Client.middleware.register do - # use MyClientHook + # Sidekiq.client_middleware do |chain| + # chain.use MyClientHook # end # - # To add middleware for the server, do: + # To modify middleware for the server, just call + # with another block: # - # Sidekiq::Processor.middleware.register do - # use MyServerHook + # Sidekiq::Processor.middleware do |chain| + # chain.use MyServerHook + # chain.remove ActiveRecord # end # # This is an example of a minimal middleware: # - # class MyHook - # def initialize(options=nil) - # end - # def call(worker, msg) + # class MyServerHook + # def call(worker, msg, queue) # puts "Before work" # yield # puts "After work" @@ -44,15 +44,11 @@ module Sidekiq @entries = [] end - def register(&block) - instance_eval(&block) - end - - def unregister(klass) + def remove(klass) entries.delete_if { |entry| entry.klass == klass } end - def use(klass, *args) + def add(klass, *args) entries << Entry.new(klass, *args) unless exists?(klass) end diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb index ee7d833a..fea6ad8e 100644 --- a/lib/sidekiq/middleware/client/unique_jobs.rb +++ b/lib/sidekiq/middleware/client/unique_jobs.rb @@ -1,3 +1,4 @@ +require 'multi_json' require 'digest' module Sidekiq diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index b9b8433a..c61acfa1 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -1,10 +1,5 @@ require 'celluloid' - require 'sidekiq/util' -require 'sidekiq/middleware/chain' -require 'sidekiq/middleware/server/active_record' -require 'sidekiq/middleware/server/airbrake' -require 'sidekiq/middleware/server/unique_jobs' module Sidekiq class Processor @@ -12,17 +7,7 @@ module Sidekiq include Celluloid def self.middleware - @middleware ||= begin - chain = Middleware::Chain.new - - # default middleware - chain.register do - use Middleware::Server::Airbrake - use Middleware::Server::UniqueJobs - use Middleware::Server::ActiveRecord - end - chain - end + raise "Sidekiq::Processor.middleware is now Sidekiq.server_middleware" end def initialize(boss) @@ -34,7 +19,7 @@ module Sidekiq klass = constantize(msg['class']) worker = klass.new stats(worker, msg, queue) do - self.class.middleware.invoke(worker, msg, queue) do + Sidekiq.server_middleware.invoke(worker, msg, queue) do worker.perform(*msg['args']) end end diff --git a/test/test_client.rb b/test/test_client.rb index c0386e96..d0dd9047 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -10,17 +10,17 @@ class TestClient < MiniTest::Unit::TestCase end it 'does not push duplicate messages when configured for unique only' do - Sidekiq::Client.middleware.entries.clear - Sidekiq::Client.middleware.register do - use Sidekiq::Middleware::Client::UniqueJobs - use Sidekiq::Middleware::Client::ResqueWebCompatibility + Sidekiq.client_middleware.entries.clear + Sidekiq.client_middleware do |chain| + chain.add Sidekiq::Middleware::Client::UniqueJobs + chain.add Sidekiq::Middleware::Client::ResqueWebCompatibility end 10.times { Sidekiq::Client.push('customqueue', 'class' => 'Foo', 'args' => [1, 2]) } assert_equal 1, Sidekiq.redis.llen("queue:customqueue") end it 'does push duplicate messages when not configured for unique only' do - Sidekiq::Client.middleware.unregister(Sidekiq::Middleware::Client::UniqueJobs) + Sidekiq.client_middleware.remove(Sidekiq::Middleware::Client::UniqueJobs) 10.times { Sidekiq::Client.push('customqueue2', 'class' => 'Foo', 'args' => [1, 2]) } assert_equal 10, Sidekiq.redis.llen("queue:customqueue2") end diff --git a/test/test_middleware.rb b/test/test_middleware.rb index fec73f71..d758c663 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -24,9 +24,7 @@ class TestMiddleware < MiniTest::Unit::TestCase it 'supports custom middleware' do chain = Sidekiq::Middleware::Chain.new - chain.register do - use CustomMiddleware, 1, [] - end + chain.add CustomMiddleware, 1, [] assert_equal CustomMiddleware, chain.entries.last.klass end @@ -51,8 +49,9 @@ class TestMiddleware < MiniTest::Unit::TestCase recorder = [] msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] } - Sidekiq::Processor.middleware.register do - 2.times { |i| use CustomMiddleware, i.to_s, recorder } + Sidekiq.server_middleware do |chain| + # should only add once, second should be ignored + 2.times { |i| chain.add CustomMiddleware, i.to_s, recorder } end boss = MiniTest::Mock.new @@ -65,11 +64,8 @@ class TestMiddleware < MiniTest::Unit::TestCase it 'allows middleware to abruptly stop processing rest of chain' do recorder = [] chain = Sidekiq::Middleware::Chain.new - - chain.register do - use NonYieldingMiddleware - use CustomMiddleware, 1, recorder - end + chain.add NonYieldingMiddleware + chain.add CustomMiddleware, 1, recorder final_action = nil chain.invoke { final_action = true }