1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Explicitly pass Redis associated with this job

When pushing a job, the middleware should be able to access the Redis instance associated with that job.  Previously, Sidekiq was limited to one global Redis instance.  Now that we want to support sharding, we have to explicitly pass the instance in OR hack up APIs with thread local variables.  Explicit is better.
This commit is contained in:
Mike Perham 2014-03-25 21:38:17 -07:00
parent dd798bb6be
commit 83aea0690e
8 changed files with 19 additions and 26 deletions

View file

@ -40,6 +40,8 @@ end
client = Sidekiq::Client.new(ConnectionPool.new { Redis.new }) client = Sidekiq::Client.new(ConnectionPool.new { Redis.new })
client.push(...) client.push(...)
``` ```
**Sharding support does require a breaking change to client-side
middleware, see Upgrading.md.**
- New Chinese, Greek, Swedish and Czech translations for the Web UI. - New Chinese, Greek, Swedish and Czech translations for the Web UI.
- Updated most languages translations for the new UI features. - Updated most languages translations for the new UI features.
- **Remove official Capistrano integration** - this integration has been - **Remove official Capistrano integration** - this integration has been

View file

@ -6,6 +6,15 @@ changes a few data elements in Redis. To upgrade cleanly:
* Upgrade to the latest Sidekiq 2.x and run it for a few weeks. * Upgrade to the latest Sidekiq 2.x and run it for a few weeks.
`gem 'sidekiq', '< 3'` `gem 'sidekiq', '< 3'`
This is only needed if you have retries pending. This is only needed if you have retries pending.
* 3rd party gems which use **client-side middleware** will need to update
due to an API change. The Redis connection for a particular job is
passed thru the middleware to handle sharding where jobs can
be pushed to different redis server instances.
`def call(worker_class, msg, queue, redis_pool)`
Client-side middleware should use `redis_pool.with { |conn| ... }` to
perform Redis operations and **not** `Sidekiq.redis`.
* If you used the capistrano integration, you'll need to pull in the * If you used the capistrano integration, you'll need to pull in the
new [capistrano-sidekiq](https://github.com/seuros/capistrano-sidekiq) new [capistrano-sidekiq](https://github.com/seuros/capistrano-sidekiq)
gem and use it in your deploy.rb. gem and use it in your deploy.rb.

View file

@ -54,15 +54,12 @@ module Sidekiq
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
# #
def push(item) def push(item)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(item) normed = normalize_item(item)
payload = process_single(item['class'], normed) payload = process_single(item['class'], normed)
pushed = false pushed = false
pushed = raw_push([payload]) if payload pushed = raw_push([payload]) if payload
pushed ? payload['jid'] : nil pushed ? payload['jid'] : nil
ensure
Thread.current[:current_pool] = nil
end end
## ##
@ -80,7 +77,6 @@ module Sidekiq
# pushed can be less than the number given if the middleware stopped processing for one # pushed can be less than the number given if the middleware stopped processing for one
# or more jobs. # or more jobs.
def push_bulk(items) def push_bulk(items)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(items) normed = normalize_item(items)
payloads = items['args'].map do |args| payloads = items['args'].map do |args|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array) raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array)
@ -90,25 +86,10 @@ module Sidekiq
pushed = false pushed = false
pushed = raw_push(payloads) if !payloads.empty? pushed = raw_push(payloads) if !payloads.empty?
pushed ? payloads.collect { |payload| payload['jid'] } : nil pushed ? payloads.collect { |payload| payload['jid'] } : nil
ensure
Thread.current[:current_pool] = nil
end end
class << self class << self
#
# Returns the Redis pool being used for the current client operation.
# Client operations should use +Sidekiq::Client.redis_pool+ whereas server
# operations should use +Sidekiq.redis_pool+.
#
# For example, in client-side middleware, you must use this method.
# In server-side middleware, you use +Sidekiq.redis_pool+.
#
# This complexity is necessary to support Redis sharding.
def redis_pool
Thread.current[:current_pool] || Sidekiq.redis_pool
end
def default def default
@default ||= new @default ||= new
end end
@ -187,7 +168,7 @@ module Sidekiq
def process_single(worker_class, item) def process_single(worker_class, item)
queue = item['queue'] queue = item['queue']
middleware.invoke(worker_class, item, queue) do middleware.invoke(worker_class, item, queue, @redis_pool) do
item item
end end
end end

View file

@ -54,7 +54,7 @@ module Sidekiq
# to Redis: # to Redis:
# #
# class MyClientHook # class MyClientHook
# def call(worker_class, msg, queue) # def call(worker_class, msg, queue, redis_pool)
# puts "Before push" # puts "Before push"
# result = yield # result = yield
# puts "After push" # puts "After push"

View file

@ -2,7 +2,7 @@ module Sidekiq::Middleware::I18n
# Get the current locale and store it in the message # Get the current locale and store it in the message
# to be sent to Sidekiq. # to be sent to Sidekiq.
class Client class Client
def call(worker_class, msg, queue) def call(worker_class, msg, queue, redis_pool)
msg['locale'] ||= I18n.locale msg['locale'] ||= I18n.locale
yield yield
end end

View file

@ -61,7 +61,7 @@ class TestClient < Sidekiq::Test
it 'allows local middleware modification' do it 'allows local middleware modification' do
@redis.expect :lpush, 1, ['queue:default', Array] @redis.expect :lpush, 1, ['queue:default', Array]
$called = false $called = false
mware = Class.new { def call(worker_klass,msg,q); $called = true; msg;end } mware = Class.new { def call(worker_klass,msg,q,r); $called = true; msg;end }
client = Sidekiq::Client.new client = Sidekiq::Client.new
client.middleware do |chain| client.middleware do |chain|
chain.add mware chain.add mware
@ -200,7 +200,8 @@ class TestClient < Sidekiq::Test
describe 'client middleware' do describe 'client middleware' do
class Stopper class Stopper
def call(worker_class, message, queue) def call(worker_class, message, queue, r)
raise ArgumentError unless r
yield if message['args'].first.odd? yield if message['args'].first.odd?
end end
end end

View file

@ -120,7 +120,7 @@ class TestMiddleware < Sidekiq::Test
I18n.locale = 'fr' I18n.locale = 'fr'
msg = {} msg = {}
mw = Sidekiq::Middleware::I18n::Client.new mw = Sidekiq::Middleware::I18n::Client.new
mw.call(nil, msg, nil) { } mw.call(nil, msg, nil, nil) { }
assert_equal :fr, msg['locale'] assert_equal :fr, msg['locale']
msg['locale'] = 'jp' msg['locale'] = 'jp'

View file

@ -28,7 +28,7 @@ class TestScheduled < Sidekiq::Test
end end
class Stopper class Stopper
def call(worker_class, message, queue) def call(worker_class, message, queue, r)
yield if message['args'].first.odd? yield if message['args'].first.odd?
end end
end end