mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Updated .via API
This commit is contained in:
parent
d0d7acaf5e
commit
75808b59b8
3 changed files with 42 additions and 24 deletions
|
@ -25,6 +25,8 @@ module Sidekiq
|
||||||
@chain
|
@chain
|
||||||
end
|
end
|
||||||
|
|
||||||
|
attr_accessor :redis_pool
|
||||||
|
|
||||||
# Sidekiq::Client normally uses the default Redis pool but you may
|
# Sidekiq::Client normally uses the default Redis pool but you may
|
||||||
# pass a custom ConnectionPool if you want to shard your
|
# pass a custom ConnectionPool if you want to shard your
|
||||||
# Sidekiq jobs across several Redis instances (for scalability
|
# Sidekiq jobs across several Redis instances (for scalability
|
||||||
|
@ -32,8 +34,13 @@ module Sidekiq
|
||||||
#
|
#
|
||||||
# Sidekiq::Client.new(ConnectionPool.new { Redis.new })
|
# Sidekiq::Client.new(ConnectionPool.new { Redis.new })
|
||||||
#
|
#
|
||||||
def initialize(redis_pool = Sidekiq.redis_pool)
|
# Generally this is only needed for very large Sidekiq installs processing
|
||||||
@redis_pool = redis_pool
|
# more than thousands jobs per second. I do not recommend sharding unless
|
||||||
|
# you truly cannot scale any other way (e.g. splitting your app into smaller apps).
|
||||||
|
# Some features, like the API, do not support sharding: they are designed to work
|
||||||
|
# against a single Redis instance only.
|
||||||
|
def initialize(redis_pool=nil)
|
||||||
|
@redis_pool = redis_pool || Thread.current[:sidekiq_via_pool] || Sidekiq.redis_pool
|
||||||
end
|
end
|
||||||
|
|
||||||
##
|
##
|
||||||
|
@ -88,6 +95,29 @@ module Sidekiq
|
||||||
pushed ? payloads.collect { |payload| payload['jid'] } : nil
|
pushed ? payloads.collect { |payload| payload['jid'] } : nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Allows sharding of jobs across any number of Redis instances. All jobs
|
||||||
|
# defined within the block will use the given Redis connection pool.
|
||||||
|
#
|
||||||
|
# pool = ConnectionPool.new { Redis.new }
|
||||||
|
# Sidekiq::Client.via(pool) do
|
||||||
|
# SomeWorker.perform_async(1,2,3)
|
||||||
|
# SomeOtherWorker.perform_async(1,2,3)
|
||||||
|
# end
|
||||||
|
#
|
||||||
|
# Generally this is only needed for very large Sidekiq installs processing
|
||||||
|
# more than thousands jobs per second. I do not recommend sharding unless
|
||||||
|
# you truly cannot scale any other way (e.g. splitting your app into smaller apps).
|
||||||
|
# Some features, like the API, do not support sharding: they are designed to work
|
||||||
|
# against a single Redis instance.
|
||||||
|
def self.via(pool)
|
||||||
|
raise ArgumentError, "No pool given" if pool.nil?
|
||||||
|
raise RuntimeError, "Sidekiq::Client.via is not re-entrant" if x = Thread.current[:sidekiq_via_pool] && x != pool
|
||||||
|
Thread.current[:sidekiq_via_pool] = pool
|
||||||
|
yield
|
||||||
|
ensure
|
||||||
|
Thread.current[:sidekiq_via_pool] = nil
|
||||||
|
end
|
||||||
|
|
||||||
class << self
|
class << self
|
||||||
|
|
||||||
def default
|
def default
|
||||||
|
|
|
@ -34,25 +34,6 @@ module Sidekiq
|
||||||
Sidekiq.logger
|
Sidekiq.logger
|
||||||
end
|
end
|
||||||
|
|
||||||
# Allows sharding of jobs across any number of Redis instances. All jobs
|
|
||||||
# defined within the block will use the given Redis connection pool.
|
|
||||||
#
|
|
||||||
# pool = ConnectionPool.new { Redis.new }
|
|
||||||
# Sidekiq::Worker.via(pool) do
|
|
||||||
# SomeWorker.perform_async(1,2,3)
|
|
||||||
# SomeOtherWorker.perform_async(1,2,3)
|
|
||||||
# end
|
|
||||||
#
|
|
||||||
# Generally this is only needed for very large Sidekiq installs processing
|
|
||||||
# more than thousands jobs per second.
|
|
||||||
def self.via(pool)
|
|
||||||
raise ArgumentError, "No pool given" if pool.nil?
|
|
||||||
Thread.current[:sidekiq_via_pool] = pool
|
|
||||||
yield
|
|
||||||
ensure
|
|
||||||
Thread.current[:sidekiq_via_pool] = nil
|
|
||||||
end
|
|
||||||
|
|
||||||
module ClassMethods
|
module ClassMethods
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
|
|
|
@ -250,11 +250,18 @@ class TestClient < Sidekiq::Test
|
||||||
it 'allows #via to point to different Redi' do
|
it 'allows #via to point to different Redi' do
|
||||||
conn = MiniTest::Mock.new
|
conn = MiniTest::Mock.new
|
||||||
conn.expect(:multi, [0, 1])
|
conn.expect(:multi, [0, 1])
|
||||||
default = Sidekiq::Client.redis_pool
|
default = Sidekiq::Client.new.redis_pool
|
||||||
Sidekiq::Client.via(ConnectionPool.new(size: 1) { conn }) do
|
sharded_pool = ConnectionPool.new(size: 1) { conn }
|
||||||
|
Sidekiq::Client.via(sharded_pool) do
|
||||||
CWorker.perform_async(1,2,3)
|
CWorker.perform_async(1,2,3)
|
||||||
|
assert_equal sharded_pool, Sidekiq::Client.new.redis_pool
|
||||||
|
assert_raises RuntimeError do
|
||||||
|
Sidekiq::Client.via(default) do
|
||||||
|
# nothing
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
assert_equal default, Sidekiq::Client.redis_pool
|
assert_equal default, Sidekiq::Client.new.redis_pool
|
||||||
conn.verify
|
conn.verify
|
||||||
end
|
end
|
||||||
it 'allows Resque helpers to point to different Redi' do
|
it 'allows Resque helpers to point to different Redi' do
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue