From 30198dadbee590d3fb4360994f8b62d544fa8a41 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 25 Mar 2014 11:56:15 -0700 Subject: [PATCH] Initial pass at API --- lib/sidekiq/client.rb | 43 +++++++++++++++++++++++++++++++++++++++++-- lib/sidekiq/worker.rb | 23 ++++++++++++++++++++++- test/test_client.rb | 29 +++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 3 deletions(-) diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index bdece8c1..8666b17e 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -32,8 +32,8 @@ module Sidekiq # # Sidekiq::Client.new(ConnectionPool.new { Redis.new }) # - def initialize(redis_pool = Sidekiq.redis_pool) - @redis_pool = redis_pool + def initialize(redis_pool = nil) + @redis_pool = redis_pool || Sidekiq.redis_pool end ## @@ -54,12 +54,15 @@ module Sidekiq # push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) # def push(item) + Thread.current[:current_pool] = @redis_pool normed = normalize_item(item) payload = process_single(item['class'], normed) pushed = false pushed = raw_push([payload]) if payload pushed ? payload['jid'] : nil + ensure + Thread.current[:current_pool] = nil end ## @@ -77,6 +80,7 @@ module Sidekiq # pushed can be less than the number given if the middleware stopped processing for one # or more jobs. def push_bulk(items) + Thread.current[:current_pool] = @redis_pool normed = normalize_item(items) payloads = items['args'].map do |args| raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array) @@ -86,9 +90,44 @@ module Sidekiq pushed = false pushed = raw_push(payloads) if !payloads.empty? pushed ? payloads.collect { |payload| payload['jid'] } : nil + ensure + Thread.current[:current_pool] = nil end class << self + + # Allows sharding of jobs across any number of Redis instances. All jobs + # defined within the block will use the given Redis connection pool. + # + # pool = ConnectionPool.new { Redis.new } + # Sidekiq::Client.via(pool) do + # SomeWorker.perform_async(1,2,3) + # SomeOtherWorker.perform_async(1,2,3) + # end + # + # Generally this is only needed for very large Sidekiq installs processing + # more than thousands jobs per second. + def via(pool) + raise ArgumentError, "No pool given" if pool.nil? + Thread.current[:sidekiq_via_pool] = pool + yield + ensure + Thread.current[:sidekiq_via_pool] = nil + end + + # + # Returns the Redis pool being used for the current client operation. + # Client operations should use +Sidekiq::Client.redis_pool+ whereas server + # operations should use +Sidekiq.redis_pool+. + # + # For example, in client-side middleware, you must use this method. + # In server-side middleware, you use +Sidekiq.redis_pool+. + # + # This complexity is necessary to support Redis sharding. + def redis_pool + Thread.current[:current_pool] || Sidekiq.redis_pool + end + def default @default ||= new end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 344d147f..85398923 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -34,6 +34,25 @@ module Sidekiq Sidekiq.logger end + # Allows sharding of jobs across any number of Redis instances. All jobs + # defined within the block will use the given Redis connection pool. + # + # pool = ConnectionPool.new { Redis.new } + # Sidekiq::Worker.via(pool) do + # SomeWorker.perform_async(1,2,3) + # SomeOtherWorker.perform_async(1,2,3) + # end + # + # Generally this is only needed for very large Sidekiq installs processing + # more than thousands jobs per second. + def self.via(pool) + raise ArgumentError, "No pool given" if pool.nil? + Thread.current[:sidekiq_via_pool] = pool + yield + ensure + Thread.current[:sidekiq_via_pool] = nil + end + module ClassMethods def perform_async(*args) @@ -62,6 +81,7 @@ module Sidekiq # :retry - enable the RetryJobs middleware for this Worker, default *true* # :backtrace - whether to save any error backtrace in the retry payload to display in web UI, # can be true, false or an integer number of lines to save, default *false* + # :pool - use the given Redis connection pool to push this type of job to a given shard. def sidekiq_options(opts={}) self.sidekiq_options_hash = get_sidekiq_options.merge((opts || {}).stringify_keys) ::Sidekiq.logger.warn("#{self.name} - :timeout is unsafe and support has been removed from Sidekiq, see http://bit.ly/OtYpK for details") if opts.include? :timeout @@ -80,7 +100,8 @@ module Sidekiq end def client_push(item) # :nodoc: - Sidekiq::Client.push(item.stringify_keys) + pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool + Sidekiq::Client.new(pool).push(item.stringify_keys) end end diff --git a/test/test_client.rb b/test/test_client.rb index b1aa6292..3e143722 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -235,4 +235,33 @@ class TestClient < Sidekiq::Test assert_equal 2, Sidekiq::Client.new.__send__(:normalize_item, 'class' => CWorker, 'args' => [])['retry'] end end + + describe 'sharding' do + class DWorker < BaseWorker + end + it 'allows sidekiq_options to point to different Redi' do + conn = MiniTest::Mock.new + conn.expect(:multi, [0, 1]) + DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn }) + DWorker.perform_async(1,2,3) + conn.verify + end + it 'allows #via to point to different Redi' do + conn = MiniTest::Mock.new + conn.expect(:multi, [0, 1]) + default = Sidekiq::Client.redis_pool + Sidekiq::Client.via(ConnectionPool.new(size: 1) { conn }) do + CWorker.perform_async(1,2,3) + end + assert_equal default, Sidekiq::Client.redis_pool + conn.verify + end + it 'allows Resque helpers to point to different Redi' do + conn = MiniTest::Mock.new + conn.expect(:zadd, 1, [String, Array]) + DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn }) + Sidekiq::Client.enqueue_in(10, DWorker, 3) + conn.verify + end + end end