diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 26e9be7e..e7cb2d9d 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -25,6 +25,8 @@ module Sidekiq @chain end + attr_accessor :redis_pool + # Sidekiq::Client normally uses the default Redis pool but you may # pass a custom ConnectionPool if you want to shard your # Sidekiq jobs across several Redis instances (for scalability @@ -32,8 +34,13 @@ module Sidekiq # # Sidekiq::Client.new(ConnectionPool.new { Redis.new }) # - def initialize(redis_pool = Sidekiq.redis_pool) - @redis_pool = redis_pool + # Generally this is only needed for very large Sidekiq installs processing + # more than thousands jobs per second. I do not recommend sharding unless + # you truly cannot scale any other way (e.g. splitting your app into smaller apps). + # Some features, like the API, do not support sharding: they are designed to work + # against a single Redis instance only. + def initialize(redis_pool=nil) + @redis_pool = redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool end ## @@ -88,6 +95,29 @@ module Sidekiq pushed ? payloads.collect { |payload| payload['jid'] } : nil end + # Allows sharding of jobs across any number of Redis instances. All jobs + # defined within the block will use the given Redis connection pool. + # + # pool = ConnectionPool.new { Redis.new } + # Sidekiq::Client.via(pool) do + # SomeWorker.perform_async(1,2,3) + # SomeOtherWorker.perform_async(1,2,3) + # end + # + # Generally this is only needed for very large Sidekiq installs processing + # more than thousands jobs per second. I do not recommend sharding unless + # you truly cannot scale any other way (e.g. splitting your app into smaller apps). + # Some features, like the API, do not support sharding: they are designed to work + # against a single Redis instance. + def self.via(pool) + raise ArgumentError, "No pool given" if pool.nil? + raise RuntimeError, "Sidekiq::Client.via is not re-entrant" if x = Thread.current[:sidekiq_via_pool] && x != pool + Thread.current[:sidekiq_via_pool] = pool + yield + ensure + Thread.current[:sidekiq_via_pool] = nil + end + class << self def default diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 85398923..82f4d164 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -34,25 +34,6 @@ module Sidekiq Sidekiq.logger end - # Allows sharding of jobs across any number of Redis instances. All jobs - # defined within the block will use the given Redis connection pool. - # - # pool = ConnectionPool.new { Redis.new } - # Sidekiq::Worker.via(pool) do - # SomeWorker.perform_async(1,2,3) - # SomeOtherWorker.perform_async(1,2,3) - # end - # - # Generally this is only needed for very large Sidekiq installs processing - # more than thousands jobs per second. - def self.via(pool) - raise ArgumentError, "No pool given" if pool.nil? - Thread.current[:sidekiq_via_pool] = pool - yield - ensure - Thread.current[:sidekiq_via_pool] = nil - end - module ClassMethods def perform_async(*args) diff --git a/test/test_client.rb b/test/test_client.rb index f6a078d7..ecdfffa9 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -250,11 +250,18 @@ class TestClient < Sidekiq::Test it 'allows #via to point to different Redi' do conn = MiniTest::Mock.new conn.expect(:multi, [0, 1]) - default = Sidekiq::Client.redis_pool - Sidekiq::Client.via(ConnectionPool.new(size: 1) { conn }) do + default = Sidekiq::Client.new.redis_pool + sharded_pool = ConnectionPool.new(size: 1) { conn } + Sidekiq::Client.via(sharded_pool) do CWorker.perform_async(1,2,3) + assert_equal sharded_pool, Sidekiq::Client.new.redis_pool + assert_raises RuntimeError do + Sidekiq::Client.via(default) do + # nothing + end + end end - assert_equal default, Sidekiq::Client.redis_pool + assert_equal default, Sidekiq::Client.new.redis_pool conn.verify end it 'allows Resque helpers to point to different Redi' do