From f9af66edd7978abd523d98b0608fa95dbb7929d6 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Sat, 11 Feb 2012 13:14:03 -0800 Subject: [PATCH] Rework redis connections so that the manager and the client use separate pools. This is so the Rails app Sidekiq::Client and Sidekiq::Manager can use different configurations. Also, fix issue where workers were not unregistered in Redis upon shutdown. --- lib/sidekiq/cli.rb | 5 ++- lib/sidekiq/manager.rb | 15 +++++++-- lib/sidekiq/middleware/client/unique_jobs.rb | 5 +-- lib/sidekiq/processor.rb | 34 ++++++++++++-------- lib/sidekiq/redis_connection.rb | 2 +- lib/sidekiq/util.rb | 2 +- myapp/config/initializers/sidekiq.rb | 1 + test/test_client.rb | 1 + test/test_manager.rb | 4 +-- test/test_middleware.rb | 4 +++ test/test_stats.rb | 3 +- test/test_testing.rb | 2 +- 12 files changed, 48 insertions(+), 30 deletions(-) create mode 100644 myapp/config/initializers/sidekiq.rb diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index c3f3e2ae..9b41d029 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -18,9 +18,8 @@ module Sidekiq FOREVER = 2_000_000_000 def run - Sidekiq::Client.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace], :use_pool => true) - manager_redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace]) - manager = Sidekiq::Manager.new(manager_redis, @options) + Sidekiq::Manager.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace]) + manager = Sidekiq::Manager.new(@options) begin log 'Starting processing, hit Ctrl-C to stop' manager.start! diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 031253f6..eaa0e9f7 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -18,12 +18,15 @@ module Sidekiq trap_exit :processor_died - def initialize(redis, options={}) + class << self + attr_accessor :redis + end + + def initialize(options={}) log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}" verbose options.inspect @count = options[:processor_count] || 25 @queues = options[:queues] - @redis = redis @done_callback = nil @done = false @@ -39,6 +42,12 @@ module Sidekiq after(5) do signal(:shutdown) end + + redis.with_connection do |conn| + conn.smembers('workers').each do |name| + conn.srem('workers', name) if name =~ /:#{Process.pid}:/ + end + end end def start @@ -79,7 +88,7 @@ module Sidekiq private def find_work(queue) - msg = @redis.lpop("queue:#{queue}") + msg = redis.lpop("queue:#{queue}") if msg processor = @ready.pop @busy << processor diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb index 8f39a479..2f4f2a5c 100644 --- a/lib/sidekiq/middleware/client/unique_jobs.rb +++ b/lib/sidekiq/middleware/client/unique_jobs.rb @@ -14,10 +14,7 @@ module Sidekiq payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item)) return if already_scheduled?(payload_hash) - @redis.multi do - @redis.set(payload_hash, 1) - @redis.expire(payload_hash, HASH_KEY_EXPIRATION) - end + @redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1) yield end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index cb5b583e..51ff479f 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -18,7 +18,7 @@ module Sidekiq # default middleware chain.register do use Middleware::Server::Airbrake - use Middleware::Server::UniqueJobs, Sidekiq::Client.redis + use Middleware::Server::UniqueJobs, Sidekiq::Manager.redis use Middleware::Server::ActiveRecord end chain @@ -53,10 +53,12 @@ module Sidekiq private def stats(worker, msg, queue) - redis.multi do - redis.set("worker:#{self}:started", Time.now.to_s) - redis.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg, - :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"))) + redis.with_connection do |conn| + conn.multi do + conn.set("worker:#{self}:started", Time.now.to_s) + conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg, + :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"))) + end end dying = false @@ -65,18 +67,22 @@ module Sidekiq rescue dying = true # Uh oh, error. We will die so unregister as much as we can first. - redis.multi do - redis.incrby("stat:failed", 1) - redis.del("stat:processed:#{self}") - redis.srem("workers", self) + redis.with_connection do |conn| + conn.multi do + conn.incrby("stat:failed", 1) + conn.del("stat:processed:#{self}") + conn.srem("workers", self) + end end raise ensure - redis.multi do - redis.del("worker:#{self}") - redis.del("worker:#{self}:started") - redis.incrby("stat:processed", 1) - redis.incrby("stat:processed:#{self}", 1) unless dying + redis.with_connection do |conn| + conn.multi do + conn.del("worker:#{self}") + conn.del("worker:#{self}:started") + conn.incrby("stat:processed", 1) + conn.incrby("stat:processed:#{self}", 1) unless dying + end end end diff --git a/lib/sidekiq/redis_connection.rb b/lib/sidekiq/redis_connection.rb index 57ce6c46..b7d69944 100644 --- a/lib/sidekiq/redis_connection.rb +++ b/lib/sidekiq/redis_connection.rb @@ -6,7 +6,7 @@ module Sidekiq def self.create(options={}) url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0' client = build_client(url, options[:namespace]) - return ConnectionPool.new(:timeout => 1, :size => 25) { client } if options[:use_pool] + return ConnectionPool.new(:timeout => 1, :size => 25) { client } unless options[:use_pool] == false client end diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 082672fe..6919186a 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -33,7 +33,7 @@ module Sidekiq end def redis - Sidekiq::Client.redis + Sidekiq::Manager.redis end end end diff --git a/myapp/config/initializers/sidekiq.rb b/myapp/config/initializers/sidekiq.rb new file mode 100644 index 00000000..0be74603 --- /dev/null +++ b/myapp/config/initializers/sidekiq.rb @@ -0,0 +1 @@ +Sidekiq::Client.redis = Sidekiq::RedisConnection.create(:namespace => 'resque') diff --git a/test/test_client.rb b/test/test_client.rb index b23fc66c..25e96e81 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -36,6 +36,7 @@ class TestClient < MiniTest::Unit::TestCase def @redis.get(*); nil; end def @redis.del(*); nil; end def @redis.incrby(*); nil; end + def @redis.setex(*); nil; end def @redis.expire(*); true; end Sidekiq::Client.redis = @redis end diff --git a/test/test_manager.rb b/test/test_manager.rb index be2e27a1..3704aa44 100644 --- a/test/test_manager.rb +++ b/test/test_manager.rb @@ -8,7 +8,7 @@ require 'connection_pool' class TestManager < MiniTest::Unit::TestCase describe 'with redis' do before do - Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test') + Sidekiq::Manager.redis = Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test') @redis.flushdb $processed = 0 $mutex = Mutex.new @@ -30,7 +30,7 @@ class TestManager < MiniTest::Unit::TestCase Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 3]) q = TimedQueue.new - mgr = Sidekiq::Manager.new(@redis, :queues => [:foo], :processor_count => 2) + mgr = Sidekiq::Manager.new(:queues => [:foo], :processor_count => 2) mgr.when_done do |_| q << 'done' if $processed == 2 end diff --git a/test/test_middleware.rb b/test/test_middleware.rb index 6ace8b81..6ae0c634 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -5,6 +5,10 @@ require 'sidekiq/processor' class TestMiddleware < MiniTest::Unit::TestCase describe 'middleware chain' do + before do + Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test') + Sidekiq::Client.redis = nil + end class CustomMiddleware def initialize(name, recorder) diff --git a/test/test_stats.rb b/test/test_stats.rb index 1db66e92..eb98b3bd 100644 --- a/test/test_stats.rb +++ b/test/test_stats.rb @@ -5,7 +5,8 @@ require 'sidekiq/processor' class TestStats < MiniTest::Unit::TestCase describe 'with redis' do before do - Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test') + Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test') + Sidekiq::Client.redis = nil @redis.flushdb end diff --git a/test/test_testing.rb b/test/test_testing.rb index 137a0243..e5cfe442 100644 --- a/test/test_testing.rb +++ b/test/test_testing.rb @@ -11,7 +11,7 @@ class TestTesting < MiniTest::Unit::TestCase end end - it 'calls the worker directly when in testing mode' do + it 'stubs the async call when in testing mode' do begin # Override Sidekiq::Worker require 'sidekiq/testing'