mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add full multithreaded integration test for manager
This commit is contained in:
parent
b77e8795ca
commit
92c51c5785
6 changed files with 100 additions and 12 deletions
|
@ -10,7 +10,7 @@ PATH
|
|||
GEM
|
||||
remote: http://rubygems.org/
|
||||
specs:
|
||||
celluloid (0.7.2)
|
||||
celluloid (0.8.0)
|
||||
connection_pool (0.1.0)
|
||||
minitest (2.10.0)
|
||||
multi_json (1.0.4)
|
||||
|
|
|
@ -2,7 +2,7 @@ require 'optparse'
|
|||
require 'sidekiq/version'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/client'
|
||||
require 'sidekiq/server'
|
||||
require 'sidekiq/manager'
|
||||
require 'connection_pool'
|
||||
|
||||
module Sidekiq
|
||||
|
@ -19,15 +19,15 @@ module Sidekiq
|
|||
|
||||
def run
|
||||
::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) }
|
||||
server = Sidekiq::Server.new(@options[:server], @options)
|
||||
manager = Sidekiq::Manager.new(@options[:server], @options)
|
||||
begin
|
||||
log 'Starting processing, hit Ctrl-C to stop'
|
||||
server.start!
|
||||
manager.start!
|
||||
sleep FOREVER
|
||||
rescue Interrupt
|
||||
log 'Shutting down...'
|
||||
server.stop!
|
||||
server.wait(:shutdown)
|
||||
manager.stop!
|
||||
manager.wait(:shutdown)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -8,11 +8,11 @@ require 'sidekiq/processor'
|
|||
module Sidekiq
|
||||
|
||||
##
|
||||
# This is the main router in the system. This
|
||||
# The main router in the system. This
|
||||
# manages the processor state and fetches messages
|
||||
# from Redis to be dispatched to ready processor.
|
||||
# from Redis to be dispatched to an idle processor.
|
||||
#
|
||||
class Server
|
||||
class Manager
|
||||
include Util
|
||||
include Celluloid
|
||||
|
||||
|
@ -21,7 +21,7 @@ module Sidekiq
|
|||
def initialize(location, options={})
|
||||
log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
|
||||
verbose options.inspect
|
||||
@count = options[:processor_count]
|
||||
@count = options[:processor_count] || 25
|
||||
@queues = options[:queues]
|
||||
@queue_idx = 0
|
||||
@queues_size = @queues.size
|
||||
|
@ -49,7 +49,12 @@ module Sidekiq
|
|||
dispatch(true)
|
||||
end
|
||||
|
||||
def when_done
|
||||
@done_callback = Proc.new
|
||||
end
|
||||
|
||||
def processor_done(processor)
|
||||
@done_callback.call(processor)
|
||||
@busy.delete(processor)
|
||||
if stopped?
|
||||
processor.terminate
|
||||
|
@ -83,7 +88,7 @@ module Sidekiq
|
|||
@busy << processor
|
||||
processor.process! MultiJson.decode(msg)
|
||||
end
|
||||
msg
|
||||
!!msg
|
||||
end
|
||||
|
||||
def dispatch(schedule = false)
|
||||
|
@ -112,6 +117,9 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
# This is the polling loop that ensures we check Redis every
|
||||
# second for work, even if there was nothing to do this time
|
||||
# around.
|
||||
after(1) { verbose('ping'); dispatch(schedule) } if schedule
|
||||
end
|
||||
end
|
|
@ -25,7 +25,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def log(msg)
|
||||
STDOUT.puts(msg)
|
||||
STDOUT.puts(msg) unless $TESTING
|
||||
end
|
||||
|
||||
def verbose(msg)
|
||||
|
|
38
test/test_manager.rb
Normal file
38
test/test_manager.rb
Normal file
|
@ -0,0 +1,38 @@
|
|||
require 'helper'
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/manager'
|
||||
require 'timed_queue'
|
||||
|
||||
class TestManager < MiniTest::Unit::TestCase
|
||||
describe 'with redis' do
|
||||
before do
|
||||
Sidekiq::Client.redis = @redis = Redis.connect(:url => 'redis://localhost/sidekiq_test')
|
||||
@redis.flushdb
|
||||
$processed = 0
|
||||
end
|
||||
|
||||
class IntegrationWorker
|
||||
include Sidekiq::Worker
|
||||
|
||||
def perform(a, b)
|
||||
$processed += 1
|
||||
a + b
|
||||
end
|
||||
end
|
||||
|
||||
it 'processes messages' do
|
||||
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
|
||||
Sidekiq::Client.push(:foo, 'class' => IntegrationWorker, 'args' => [1, 2])
|
||||
|
||||
q = TimedQueue.new
|
||||
mgr = Sidekiq::Manager.new("redis://localhost/sidekiq_test", :queues => [:foo])
|
||||
mgr.when_done do |_|
|
||||
q << 'done' if $processed == 2
|
||||
end
|
||||
mgr.start!
|
||||
result = q.timed_pop
|
||||
assert_equal 'done', result
|
||||
mgr.stop
|
||||
end
|
||||
end
|
||||
end
|
42
test/timed_queue.rb
Normal file
42
test/timed_queue.rb
Normal file
|
@ -0,0 +1,42 @@
|
|||
require 'thread'
|
||||
require 'timeout'
|
||||
|
||||
class TimedQueue
|
||||
def initialize
|
||||
@que = []
|
||||
@mutex = Mutex.new
|
||||
@resource = ConditionVariable.new
|
||||
end
|
||||
|
||||
def push(obj)
|
||||
@mutex.synchronize do
|
||||
@que.push obj
|
||||
@resource.broadcast
|
||||
end
|
||||
end
|
||||
alias_method :<<, :push
|
||||
|
||||
def timed_pop(timeout=0.5)
|
||||
deadline = Time.now + timeout
|
||||
@mutex.synchronize do
|
||||
loop do
|
||||
return @que.shift unless @que.empty?
|
||||
to_wait = deadline - Time.now
|
||||
raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
|
||||
@resource.wait(@mutex, to_wait)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def empty?
|
||||
@que.empty?
|
||||
end
|
||||
|
||||
def clear
|
||||
@que.clear
|
||||
end
|
||||
|
||||
def length
|
||||
@que.length
|
||||
end
|
||||
end
|
Loading…
Reference in a new issue