mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Rework redis connections so that the manager and
the client use separate pools. This is so the Rails app Sidekiq::Client and Sidekiq::Manager can use different configurations. Also, fix issue where workers were not unregistered in Redis upon shutdown.
This commit is contained in:
parent
b9bb5b7699
commit
f9af66edd7
12 changed files with 48 additions and 30 deletions
|
@ -18,9 +18,8 @@ module Sidekiq
|
||||||
FOREVER = 2_000_000_000
|
FOREVER = 2_000_000_000
|
||||||
|
|
||||||
def run
|
def run
|
||||||
Sidekiq::Client.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace], :use_pool => true)
|
Sidekiq::Manager.redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace])
|
||||||
manager_redis = RedisConnection.create(:url => @options[:server], :namespace => @options[:namespace])
|
manager = Sidekiq::Manager.new(@options)
|
||||||
manager = Sidekiq::Manager.new(manager_redis, @options)
|
|
||||||
begin
|
begin
|
||||||
log 'Starting processing, hit Ctrl-C to stop'
|
log 'Starting processing, hit Ctrl-C to stop'
|
||||||
manager.start!
|
manager.start!
|
||||||
|
|
|
@ -18,12 +18,15 @@ module Sidekiq
|
||||||
|
|
||||||
trap_exit :processor_died
|
trap_exit :processor_died
|
||||||
|
|
||||||
def initialize(redis, options={})
|
class << self
|
||||||
|
attr_accessor :redis
|
||||||
|
end
|
||||||
|
|
||||||
|
def initialize(options={})
|
||||||
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
|
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
|
||||||
verbose options.inspect
|
verbose options.inspect
|
||||||
@count = options[:processor_count] || 25
|
@count = options[:processor_count] || 25
|
||||||
@queues = options[:queues]
|
@queues = options[:queues]
|
||||||
@redis = redis
|
|
||||||
@done_callback = nil
|
@done_callback = nil
|
||||||
|
|
||||||
@done = false
|
@done = false
|
||||||
|
@ -39,6 +42,12 @@ module Sidekiq
|
||||||
after(5) do
|
after(5) do
|
||||||
signal(:shutdown)
|
signal(:shutdown)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
redis.with_connection do |conn|
|
||||||
|
conn.smembers('workers').each do |name|
|
||||||
|
conn.srem('workers', name) if name =~ /:#{Process.pid}:/
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def start
|
def start
|
||||||
|
@ -79,7 +88,7 @@ module Sidekiq
|
||||||
private
|
private
|
||||||
|
|
||||||
def find_work(queue)
|
def find_work(queue)
|
||||||
msg = @redis.lpop("queue:#{queue}")
|
msg = redis.lpop("queue:#{queue}")
|
||||||
if msg
|
if msg
|
||||||
processor = @ready.pop
|
processor = @ready.pop
|
||||||
@busy << processor
|
@busy << processor
|
||||||
|
|
|
@ -14,10 +14,7 @@ module Sidekiq
|
||||||
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
|
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
|
||||||
return if already_scheduled?(payload_hash)
|
return if already_scheduled?(payload_hash)
|
||||||
|
|
||||||
@redis.multi do
|
@redis.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
|
||||||
@redis.set(payload_hash, 1)
|
|
||||||
@redis.expire(payload_hash, HASH_KEY_EXPIRATION)
|
|
||||||
end
|
|
||||||
|
|
||||||
yield
|
yield
|
||||||
end
|
end
|
||||||
|
|
|
@ -18,7 +18,7 @@ module Sidekiq
|
||||||
# default middleware
|
# default middleware
|
||||||
chain.register do
|
chain.register do
|
||||||
use Middleware::Server::Airbrake
|
use Middleware::Server::Airbrake
|
||||||
use Middleware::Server::UniqueJobs, Sidekiq::Client.redis
|
use Middleware::Server::UniqueJobs, Sidekiq::Manager.redis
|
||||||
use Middleware::Server::ActiveRecord
|
use Middleware::Server::ActiveRecord
|
||||||
end
|
end
|
||||||
chain
|
chain
|
||||||
|
@ -53,10 +53,12 @@ module Sidekiq
|
||||||
private
|
private
|
||||||
|
|
||||||
def stats(worker, msg, queue)
|
def stats(worker, msg, queue)
|
||||||
redis.multi do
|
redis.with_connection do |conn|
|
||||||
redis.set("worker:#{self}:started", Time.now.to_s)
|
conn.multi do
|
||||||
redis.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
|
conn.set("worker:#{self}:started", Time.now.to_s)
|
||||||
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
|
conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
|
||||||
|
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
dying = false
|
dying = false
|
||||||
|
@ -65,18 +67,22 @@ module Sidekiq
|
||||||
rescue
|
rescue
|
||||||
dying = true
|
dying = true
|
||||||
# Uh oh, error. We will die so unregister as much as we can first.
|
# Uh oh, error. We will die so unregister as much as we can first.
|
||||||
redis.multi do
|
redis.with_connection do |conn|
|
||||||
redis.incrby("stat:failed", 1)
|
conn.multi do
|
||||||
redis.del("stat:processed:#{self}")
|
conn.incrby("stat:failed", 1)
|
||||||
redis.srem("workers", self)
|
conn.del("stat:processed:#{self}")
|
||||||
|
conn.srem("workers", self)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
raise
|
raise
|
||||||
ensure
|
ensure
|
||||||
redis.multi do
|
redis.with_connection do |conn|
|
||||||
redis.del("worker:#{self}")
|
conn.multi do
|
||||||
redis.del("worker:#{self}:started")
|
conn.del("worker:#{self}")
|
||||||
redis.incrby("stat:processed", 1)
|
conn.del("worker:#{self}:started")
|
||||||
redis.incrby("stat:processed:#{self}", 1) unless dying
|
conn.incrby("stat:processed", 1)
|
||||||
|
conn.incrby("stat:processed:#{self}", 1) unless dying
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ module Sidekiq
|
||||||
def self.create(options={})
|
def self.create(options={})
|
||||||
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
|
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
|
||||||
client = build_client(url, options[:namespace])
|
client = build_client(url, options[:namespace])
|
||||||
return ConnectionPool.new(:timeout => 1, :size => 25) { client } if options[:use_pool]
|
return ConnectionPool.new(:timeout => 1, :size => 25) { client } unless options[:use_pool] == false
|
||||||
client
|
client
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -33,7 +33,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def redis
|
def redis
|
||||||
Sidekiq::Client.redis
|
Sidekiq::Manager.redis
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
1
myapp/config/initializers/sidekiq.rb
Normal file
1
myapp/config/initializers/sidekiq.rb
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Sidekiq::Client.redis = Sidekiq::RedisConnection.create(:namespace => 'resque')
|
|
@ -36,6 +36,7 @@ class TestClient < MiniTest::Unit::TestCase
|
||||||
def @redis.get(*); nil; end
|
def @redis.get(*); nil; end
|
||||||
def @redis.del(*); nil; end
|
def @redis.del(*); nil; end
|
||||||
def @redis.incrby(*); nil; end
|
def @redis.incrby(*); nil; end
|
||||||
|
def @redis.setex(*); nil; end
|
||||||
def @redis.expire(*); true; end
|
def @redis.expire(*); true; end
|
||||||
Sidekiq::Client.redis = @redis
|
Sidekiq::Client.redis = @redis
|
||||||
end
|
end
|
||||||
|
|
|
@ -8,7 +8,7 @@ require 'connection_pool'
|
||||||
class TestManager < MiniTest::Unit::TestCase
|
class TestManager < MiniTest::Unit::TestCase
|
||||||
describe 'with redis' do
|
describe 'with redis' do
|
||||||
before do
|
before do
|
||||||
Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
Sidekiq::Manager.redis = Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
||||||
@redis.flushdb
|
@redis.flushdb
|
||||||
$processed = 0
|
$processed = 0
|
||||||
$mutex = Mutex.new
|
$mutex = Mutex.new
|
||||||
|
@ -30,7 +30,7 @@ class TestManager < MiniTest::Unit::TestCase
|
||||||
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 3])
|
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 3])
|
||||||
|
|
||||||
q = TimedQueue.new
|
q = TimedQueue.new
|
||||||
mgr = Sidekiq::Manager.new(@redis, :queues => [:foo], :processor_count => 2)
|
mgr = Sidekiq::Manager.new(:queues => [:foo], :processor_count => 2)
|
||||||
mgr.when_done do |_|
|
mgr.when_done do |_|
|
||||||
q << 'done' if $processed == 2
|
q << 'done' if $processed == 2
|
||||||
end
|
end
|
||||||
|
|
|
@ -5,6 +5,10 @@ require 'sidekiq/processor'
|
||||||
|
|
||||||
class TestMiddleware < MiniTest::Unit::TestCase
|
class TestMiddleware < MiniTest::Unit::TestCase
|
||||||
describe 'middleware chain' do
|
describe 'middleware chain' do
|
||||||
|
before do
|
||||||
|
Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
||||||
|
Sidekiq::Client.redis = nil
|
||||||
|
end
|
||||||
|
|
||||||
class CustomMiddleware
|
class CustomMiddleware
|
||||||
def initialize(name, recorder)
|
def initialize(name, recorder)
|
||||||
|
|
|
@ -5,7 +5,8 @@ require 'sidekiq/processor'
|
||||||
class TestStats < MiniTest::Unit::TestCase
|
class TestStats < MiniTest::Unit::TestCase
|
||||||
describe 'with redis' do
|
describe 'with redis' do
|
||||||
before do
|
before do
|
||||||
Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
Sidekiq::Manager.redis = @redis = Sidekiq::RedisConnection.create(:url => 'redis://localhost/sidekiq_test')
|
||||||
|
Sidekiq::Client.redis = nil
|
||||||
@redis.flushdb
|
@redis.flushdb
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ class TestTesting < MiniTest::Unit::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'calls the worker directly when in testing mode' do
|
it 'stubs the async call when in testing mode' do
|
||||||
begin
|
begin
|
||||||
# Override Sidekiq::Worker
|
# Override Sidekiq::Worker
|
||||||
require 'sidekiq/testing'
|
require 'sidekiq/testing'
|
||||||
|
|
Loading…
Add table
Reference in a new issue