1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Upgrade to connection_pool 0.9.0

Remove all connection_pool usage of method_missing.
Change Sidekiq.redis API to require a block.
This commit is contained in:
Mike Perham 2012-03-14 09:56:13 -07:00
parent 66c275f42f
commit 74d0e5ec35
14 changed files with 39 additions and 31 deletions

View file

@ -7,6 +7,7 @@
capistrano sends USR1 at start of deploy and TERM at end of deploy
giving workers the maximum amount of time to finish.
- New Sidekiq::Web rack application available
- Updated Sidekiq.redis API
0.8.0
-----------

View file

@ -52,8 +52,21 @@ module Sidekiq
defined?(Sidekiq::CLI)
end
def self.redis
def self.redis(&block)
@redis ||= Sidekiq::RedisConnection.create
if block_given?
@redis.with(&block)
else
Sidekiq::Util.logger.info "*****************************************************
Sidekiq.redis now takes a block:
Sidekiq.redis { |connection| connection.smembers('myset') }
Please update your code accordingly.
Called from #{caller[0]}
*****************************************************"
@redis
end
end
def self.redis=(hash)

View file

@ -1,7 +1,6 @@
require 'multi_json'
require 'redis'
require 'sidekiq/redis_connection'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/client/unique_jobs'
@ -19,11 +18,11 @@ module Sidekiq
end
def self.registered_workers
Sidekiq.redis.smembers('workers')
Sidekiq.redis { |x| x.smembers('workers') }
end
def self.registered_queues
Sidekiq.redis.smembers('queues')
Sidekiq.redis { |x| x.smembers('queues') }
end
def self.queue_mappings
@ -43,7 +42,7 @@ module Sidekiq
pushed = false
Sidekiq.client_middleware.invoke(item, queue) do
payload = MultiJson.encode(item)
Sidekiq.redis.with_connection do |conn|
Sidekiq.redis do |conn|
conn.multi do
conn.sadd('queues', queue)
conn.rpush("queue:#{queue}", payload)

View file

@ -4,6 +4,7 @@ require 'multi_json'
require 'sidekiq/util'
require 'sidekiq/processor'
require 'connection_pool/version'
module Sidekiq
@ -19,7 +20,8 @@ module Sidekiq
trap_exit :processor_died
def initialize(options={})
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
logger.info "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis {|x| x.client.location}}"
logger.info "Running in #{RUBY_DESCRIPTION}"
logger.debug { options.inspect }
@count = options[:concurrency] || 25
@queues = options[:queues]
@ -38,7 +40,7 @@ module Sidekiq
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
redis.with_connection do |conn|
redis do |conn|
workers = conn.smembers('workers')
workers.each do |name|
conn.srem('workers', name) if name =~ /:#{process_id}-/
@ -96,7 +98,7 @@ module Sidekiq
private
def find_work(queue)
msg = redis.lpop("queue:#{queue}")
msg = redis { |x| x.lpop("queue:#{queue}") }
if msg
processor = @ready.pop
@busy << processor

View file

@ -9,9 +9,9 @@ module Sidekiq
def call(item, queue)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
Sidekiq.redis.with_connection do |redis|
return if redis.get(payload_hash)
redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
Sidekiq.redis do |conn|
return if conn.get(payload_hash)
conn.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
end
yield

View file

@ -15,7 +15,7 @@ module Sidekiq
:queue => args[2]
}
Sidekiq.redis.rpush(:failed, MultiJson.encode(data))
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
raise
end
end

View file

@ -5,7 +5,7 @@ module Sidekiq
def call(*args)
yield
ensure
Sidekiq.redis.del(Digest::MD5.hexdigest(MultiJson.encode(args[1])))
Sidekiq.redis {|conn| conn.del(Digest::MD5.hexdigest(MultiJson.encode(args[1]))) }
end
end
end

View file

@ -27,7 +27,7 @@ module Sidekiq
def initialize(boss)
@boss = boss
redis.sadd('workers', self)
redis {|x| x.sadd('workers', self) }
end
def process(msg, queue)
@ -53,7 +53,7 @@ module Sidekiq
private
def stats(worker, msg, queue)
redis.with_connection do |conn|
redis do |conn|
conn.multi do
conn.set("worker:#{self}:started", Time.now.to_s)
conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
@ -67,7 +67,7 @@ module Sidekiq
rescue
dying = true
# Uh oh, error. We will die so unregister as much as we can first.
redis.with_connection do |conn|
redis do |conn|
conn.multi do
conn.incrby("stat:failed", 1)
conn.del("stat:processed:#{self}")
@ -76,7 +76,7 @@ module Sidekiq
end
raise
ensure
redis.with_connection do |conn|
redis do |conn|
conn.multi do
conn.del("worker:#{self}")
conn.del("worker:#{self}:started")

View file

@ -5,11 +5,7 @@ module Sidekiq
class RedisConnection
def self.create(options={})
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
if options[:use_pool] != false
ConnectionPool.new(:timeout => 1, :size => (options[:size] || Sidekiq.options[:concurrency] || 25)) do
build_client(url, options[:namespace])
end
else
ConnectionPool::Wrapper.new(:timeout => 1, :size => (options[:size] || Sidekiq.options[:concurrency] || 25)) do
build_client(url, options[:namespace])
end
end

View file

@ -50,8 +50,8 @@ module Sidekiq
Sidekiq::Util.logger
end
def redis
Sidekiq.redis
def redis(&block)
Sidekiq.redis(&block)
end
def process_id

View file

@ -24,10 +24,6 @@ module Sidekiq
base.extend(ClassMethods)
end
def logger
Sidekiq::Util.logger
end
module ClassMethods
def perform_async(*args)
Sidekiq::Client.push('class' => self.name, 'args' => args)

View file

@ -3,7 +3,7 @@ PATH
specs:
sidekiq (0.9.0)
celluloid
connection_pool
connection_pool (>= 0.9.0)
multi_json
redis
redis-namespace
@ -52,7 +52,7 @@ GEM
net-ssh (>= 2.0.14)
net-ssh-gateway (>= 1.1.0)
celluloid (0.9.0)
connection_pool (0.1.0)
connection_pool (0.9.0)
erubis (2.7.0)
highline (1.6.11)
hike (1.2.1)

View file

@ -15,7 +15,7 @@ Gem::Specification.new do |gem|
gem.version = Sidekiq::VERSION
gem.add_dependency 'redis'
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool'
gem.add_dependency 'connection_pool', '>= 0.9.0'
gem.add_dependency 'celluloid'
gem.add_dependency 'multi_json'
gem.add_development_dependency 'minitest'

View file

@ -38,6 +38,7 @@ class TestClient < MiniTest::Unit::TestCase
def @redis.setex(*); nil; end
def @redis.expire(*); true; end
def @redis.with_connection; yield self; end
def @redis.with; yield self; end
Sidekiq.instance_variable_set(:@redis, @redis)
end