1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Initial pass at API

This commit is contained in:
Mike Perham 2014-03-25 11:56:15 -07:00
parent eda5627f97
commit 30198dadbe
3 changed files with 92 additions and 3 deletions

View file

@ -32,8 +32,8 @@ module Sidekiq
#
# Sidekiq::Client.new(ConnectionPool.new { Redis.new })
#
def initialize(redis_pool = Sidekiq.redis_pool)
@redis_pool = redis_pool
def initialize(redis_pool = nil)
@redis_pool = redis_pool || Sidekiq.redis_pool
end
##
@ -54,12 +54,15 @@ module Sidekiq
# push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
#
def push(item)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(item)
payload = process_single(item['class'], normed)
pushed = false
pushed = raw_push([payload]) if payload
pushed ? payload['jid'] : nil
ensure
Thread.current[:current_pool] = nil
end
##
@ -77,6 +80,7 @@ module Sidekiq
# pushed can be less than the number given if the middleware stopped processing for one
# or more jobs.
def push_bulk(items)
Thread.current[:current_pool] = @redis_pool
normed = normalize_item(items)
payloads = items['args'].map do |args|
raise ArgumentError, "Bulk arguments must be an Array of Arrays: [[1], [2]]" if !args.is_a?(Array)
@ -86,9 +90,44 @@ module Sidekiq
pushed = false
pushed = raw_push(payloads) if !payloads.empty?
pushed ? payloads.collect { |payload| payload['jid'] } : nil
ensure
Thread.current[:current_pool] = nil
end
class << self
# Allows sharding of jobs across any number of Redis instances. All jobs
# defined within the block will use the given Redis connection pool.
#
# pool = ConnectionPool.new { Redis.new }
# Sidekiq::Client.via(pool) do
# SomeWorker.perform_async(1,2,3)
# SomeOtherWorker.perform_async(1,2,3)
# end
#
# Generally this is only needed for very large Sidekiq installs processing
# more than thousands jobs per second.
def via(pool)
raise ArgumentError, "No pool given" if pool.nil?
Thread.current[:sidekiq_via_pool] = pool
yield
ensure
Thread.current[:sidekiq_via_pool] = nil
end
#
# Returns the Redis pool being used for the current client operation.
# Client operations should use +Sidekiq::Client.redis_pool+ whereas server
# operations should use +Sidekiq.redis_pool+.
#
# For example, in client-side middleware, you must use this method.
# In server-side middleware, you use +Sidekiq.redis_pool+.
#
# This complexity is necessary to support Redis sharding.
def redis_pool
Thread.current[:current_pool] || Sidekiq.redis_pool
end
def default
@default ||= new
end

View file

@ -34,6 +34,25 @@ module Sidekiq
Sidekiq.logger
end
# Allows sharding of jobs across any number of Redis instances. All jobs
# defined within the block will use the given Redis connection pool.
#
# pool = ConnectionPool.new { Redis.new }
# Sidekiq::Worker.via(pool) do
# SomeWorker.perform_async(1,2,3)
# SomeOtherWorker.perform_async(1,2,3)
# end
#
# Generally this is only needed for very large Sidekiq installs processing
# more than thousands jobs per second.
def self.via(pool)
raise ArgumentError, "No pool given" if pool.nil?
Thread.current[:sidekiq_via_pool] = pool
yield
ensure
Thread.current[:sidekiq_via_pool] = nil
end
module ClassMethods
def perform_async(*args)
@ -62,6 +81,7 @@ module Sidekiq
# :retry - enable the RetryJobs middleware for this Worker, default *true*
# :backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
# :pool - use the given Redis connection pool to push this type of job to a given shard.
def sidekiq_options(opts={})
self.sidekiq_options_hash = get_sidekiq_options.merge((opts || {}).stringify_keys)
::Sidekiq.logger.warn("#{self.name} - :timeout is unsafe and support has been removed from Sidekiq, see http://bit.ly/OtYpK for details") if opts.include? :timeout
@ -80,7 +100,8 @@ module Sidekiq
end
def client_push(item) # :nodoc:
Sidekiq::Client.push(item.stringify_keys)
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
Sidekiq::Client.new(pool).push(item.stringify_keys)
end
end

View file

@ -235,4 +235,33 @@ class TestClient < Sidekiq::Test
assert_equal 2, Sidekiq::Client.new.__send__(:normalize_item, 'class' => CWorker, 'args' => [])['retry']
end
end
describe 'sharding' do
class DWorker < BaseWorker
end
it 'allows sidekiq_options to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:multi, [0, 1])
DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn })
DWorker.perform_async(1,2,3)
conn.verify
end
it 'allows #via to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:multi, [0, 1])
default = Sidekiq::Client.redis_pool
Sidekiq::Client.via(ConnectionPool.new(size: 1) { conn }) do
CWorker.perform_async(1,2,3)
end
assert_equal default, Sidekiq::Client.redis_pool
conn.verify
end
it 'allows Resque helpers to point to different Redi' do
conn = MiniTest::Mock.new
conn.expect(:zadd, 1, [String, Array])
DWorker.sidekiq_options('pool' => ConnectionPool.new(size: 1) { conn })
Sidekiq::Client.enqueue_in(10, DWorker, 3)
conn.verify
end
end
end