1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Merge pull request #12 from maxjustus/master

Make Sidekiq more Resque compatible
This commit is contained in:
Mike Perham 2012-02-09 19:43:12 -08:00
commit e8d3f5b244
11 changed files with 120 additions and 18 deletions

2
.gitignore vendored
View file

@ -1,2 +1,4 @@
tags
Gemfile.lock
*.swp
dump.rdb

View file

@ -8,3 +8,5 @@
- Refactored middleware support, introducing ability to add client/server middleware (Ryan)
- Added middleware for ignoring duplicate jobs (Ryan)
- Added middleware for displaying jobs in resque-web dashboard (Max)
- Added redis namespacing support (Max)

34
Gemfile.lock Normal file
View file

@ -0,0 +1,34 @@
PATH
remote: .
specs:
sidekiq (0.5.1)
celluloid
connection_pool
multi_json
redis
redis-namespace
GEM
remote: http://rubygems.org/
specs:
celluloid (0.8.0)
connection_pool (0.1.0)
minitest (2.11.1)
multi_json (1.0.4)
rake (0.9.2.2)
redis (2.2.2)
redis-namespace (1.1.0)
redis (< 3.0.0)
simplecov (0.5.4)
multi_json (~> 1.0.3)
simplecov-html (~> 0.5.3)
simplecov-html (0.5.3)
PLATFORMS
ruby
DEPENDENCIES
minitest
rake
sidekiq!
simplecov

View file

@ -1,9 +1,9 @@
require 'optparse'
require 'sidekiq/version'
require 'sidekiq/util'
require 'sidekiq/redis_connection'
require 'sidekiq/client'
require 'sidekiq/manager'
require 'connection_pool'
module Sidekiq
class CLI
@ -18,8 +18,9 @@ module Sidekiq
FOREVER = 2_000_000_000
def run
::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) }
manager = Sidekiq::Manager.new(@options[:server], @options)
::Sidekiq::Client.redis = Sidekiq::RedisConnection.create(@options[:server], @options[:namespace])
manager_redis = Sidekiq::RedisConnection.create(@options[:server], @options[:namespace], false)
manager = Sidekiq::Manager.new(manager_redis, @options)
begin
log 'Starting processing, hit Ctrl-C to stop'
manager.start!
@ -60,7 +61,6 @@ module Sidekiq
:verbose => false,
:queues => [],
:processor_count => 25,
:server => ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0',
:rails => '.',
:environment => nil,
}
@ -77,6 +77,10 @@ module Sidekiq
@options[:verbose] = true
end
o.on "-n", "--namespace NAMESPACE", "namespace worker queues are under" do |arg|
@options[:namespace] = arg
end
o.on "-s", "--server LOCATION", "Where to find Redis" do |arg|
@options[:server] = arg
end

View file

@ -1,7 +1,9 @@
require 'multi_json'
require 'redis'
require 'sidekiq/redis_connection'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/client/resque_web_compatability'
require 'sidekiq/middleware/client/unique_jobs'
module Sidekiq
@ -12,10 +14,7 @@ module Sidekiq
def self.redis
@redis ||= begin
# autoconfig for Heroku
hash = {}
hash[:url] = ENV['REDISTOGO_URL'] if ENV['REDISTOGO_URL']
Redis.connect(hash)
RedisConnection.create
end
end
@ -41,7 +40,7 @@ module Sidekiq
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']
item['class'] = item['class'].to_s if !item['class'].is_a?(String)
middleware.invoke(item) do
middleware.invoke(item, queue) do
redis.rpush("queue:#{queue}", MultiJson.encode(item))
end
end

View file

@ -18,14 +18,14 @@ module Sidekiq
trap_exit :processor_died
def initialize(location, options={})
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
def initialize(redis, options={})
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{redis.client.location}"
verbose options.inspect
@count = options[:processor_count] || 25
@queues = options[:queues]
@queue_idx = 0
@queues_size = @queues.size
@redis = Redis.connect(:url => location)
@redis = redis
@done_callback = nil
@done = false
@ -50,8 +50,8 @@ module Sidekiq
dispatch(true)
end
def when_done
@done_callback = Proc.new
def when_done(&blk)
@done_callback = blk
end
def processor_done(processor)

View file

@ -0,0 +1,18 @@
module Sidekiq
module Middleware
module Client
class ResqueWebCompatability
def initialize(redis)
@redis = redis
end
#Add job queue to list of queues resque web displays
def call(item, queue)
@redis.sadd('queues', queue)
yield
end
end
end
end
end

View file

@ -1,3 +1,5 @@
require 'digest'
module Sidekiq
module Middleware
module Client
@ -8,7 +10,7 @@ module Sidekiq
@redis = redis
end
def call(item)
def call(item, queue)
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
return if already_scheduled?(payload_hash)
@ -28,4 +30,4 @@ module Sidekiq
end
end
end
end
end

View file

@ -0,0 +1,39 @@
require 'connection_pool'
require 'redis/namespace'
module Sidekiq
class RedisConnection
def self.create(url = nil, namespace = nil, pool = true)
@namespace = namespace ? namespace : nil
@url = url ? url : nil
if pool
ConnectionPool.new { connect }
else
connect
end
end
def self.connect
if namespace
Redis::Namespace.new(namespace, :redis => redis_connection)
else
redis_connection
end
end
def self.namespace
@namespace
end
def self.url
@url || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
end
private
def self.redis_connection
Redis.connect(:url => url)
end
end
end

View file

@ -14,6 +14,7 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.version = Sidekiq::VERSION
gem.add_dependency 'redis'
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool'
gem.add_dependency 'celluloid'
gem.add_dependency 'multi_json'

View file

@ -6,7 +6,7 @@ require 'connection_pool'
class TestManager < MiniTest::Unit::TestCase
describe 'with redis' do
before do
Sidekiq::Client.redis = @redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
Sidekiq::Client.redis = @redis = Sidekiq::RedisConnection.create('redis://localhost/sidekiq_test', nil, false)
@redis.flushdb
$processed = 0
end
@ -25,7 +25,8 @@ class TestManager < MiniTest::Unit::TestCase
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
q = TimedQueue.new
mgr = Sidekiq::Manager.new("redis://localhost/sidekiq_test", :queues => [:foo])
redis = Sidekiq::RedisConnection.create('redis://localhost/sidekiq_test', nil, false)
mgr = Sidekiq::Manager.new(redis, :queues => [:foo])
mgr.when_done do |_|
q << 'done' if $processed == 2
end