mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Add Sidekiq::Actor which provides a testable alternative to Celluloid
This commit is contained in:
parent
f4c54ff5e2
commit
55726ca900
10 changed files with 173 additions and 18 deletions
|
@ -1,6 +1,7 @@
|
|||
2.11.3
|
||||
-----------
|
||||
|
||||
- Abstract Celluloid usage to Sidekiq::Actor for testing purposes.
|
||||
- Better handling for Redis downtime when fetching jobs and shutting
|
||||
down, don't print exceptions every second and print success message
|
||||
when Redis is back.
|
||||
|
|
|
@ -115,4 +115,3 @@ require 'sidekiq/extensions/class_methods'
|
|||
require 'sidekiq/extensions/action_mailer'
|
||||
require 'sidekiq/extensions/active_record'
|
||||
require 'sidekiq/rails' if defined?(::Rails::Engine)
|
||||
|
||||
|
|
63
lib/sidekiq/actor.rb
Normal file
63
lib/sidekiq/actor.rb
Normal file
|
@ -0,0 +1,63 @@
|
|||
module Sidekiq
|
||||
#
|
||||
# Celluloid has the nasty side effect of making objects
|
||||
# very hard to test because they are immediately async
|
||||
# upon creation. In testing we want to just treat
|
||||
# the actors as POROs.
|
||||
#
|
||||
# Instead of including Celluloid, we'll just stub
|
||||
# out the key methods we use so that everything works
|
||||
# synchronously. The alternative is no test coverage.
|
||||
#
|
||||
module Actor
|
||||
if $TESTING
|
||||
|
||||
def sleep(amount=0)
|
||||
end
|
||||
|
||||
def after(amount=0)
|
||||
end
|
||||
|
||||
def defer
|
||||
yield
|
||||
end
|
||||
|
||||
def current_actor
|
||||
self
|
||||
end
|
||||
|
||||
def alive?
|
||||
!@dead
|
||||
end
|
||||
|
||||
def terminate
|
||||
@dead = true
|
||||
end
|
||||
|
||||
def async
|
||||
self
|
||||
end
|
||||
|
||||
def signal(sym)
|
||||
end
|
||||
|
||||
# we don't want to hide or catch failures in testing
|
||||
def watchdog(msg)
|
||||
yield
|
||||
end
|
||||
|
||||
def self.included(klass)
|
||||
class << klass
|
||||
alias_method :new_link, :new
|
||||
def trap_exit(meth)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
else
|
||||
def self.included(klass)
|
||||
klass.send(:include, Celluloid)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -1,5 +1,5 @@
|
|||
require 'sidekiq'
|
||||
require 'celluloid'
|
||||
require 'sidekiq/actor'
|
||||
|
||||
module Sidekiq
|
||||
##
|
||||
|
@ -7,8 +7,8 @@ module Sidekiq
|
|||
# from the queues. It gets the message and hands it to the Manager
|
||||
# to assign to a ready Processor.
|
||||
class Fetcher
|
||||
include Celluloid
|
||||
include Sidekiq::Util
|
||||
include Util
|
||||
include Actor
|
||||
|
||||
TIMEOUT = 1
|
||||
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
require 'celluloid'
|
||||
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/actor'
|
||||
require 'sidekiq/processor'
|
||||
require 'sidekiq/fetch'
|
||||
|
||||
|
@ -13,10 +12,13 @@ module Sidekiq
|
|||
#
|
||||
class Manager
|
||||
include Util
|
||||
include Celluloid
|
||||
|
||||
include Actor
|
||||
trap_exit :processor_died
|
||||
|
||||
attr_reader :ready
|
||||
attr_reader :busy
|
||||
attr_accessor :fetcher
|
||||
|
||||
def initialize(options={})
|
||||
logger.debug { options.inspect }
|
||||
@count = options[:concurrency] || 25
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
require 'celluloid'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/actor'
|
||||
|
||||
require 'sidekiq/middleware/server/active_record'
|
||||
require 'sidekiq/middleware/server/retry_jobs'
|
||||
|
@ -12,7 +12,7 @@ module Sidekiq
|
|||
# chain and then calls Sidekiq::Worker#perform.
|
||||
class Processor
|
||||
include Util
|
||||
include Celluloid
|
||||
include Actor
|
||||
|
||||
def self.default_middleware
|
||||
Middleware::Chain.new do |m|
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
require 'celluloid'
|
||||
require 'sidekiq/actor'
|
||||
|
||||
module Sidekiq
|
||||
module Scheduled
|
||||
|
@ -13,8 +13,8 @@ module Sidekiq
|
|||
# just pops the message back onto its original queue so the
|
||||
# workers can pick it up like any other message.
|
||||
class Poller
|
||||
include Celluloid
|
||||
include Sidekiq::Util
|
||||
include Util
|
||||
include Actor
|
||||
|
||||
SETS = %w(retry schedule)
|
||||
|
||||
|
@ -42,7 +42,7 @@ module Sidekiq
|
|||
conn.sadd('queues', msg['queue'])
|
||||
conn.lpush("queue:#{msg['queue']}", message)
|
||||
end
|
||||
logger.debug("enqueued #{sorted_set}: #{message}") if logger.debug?
|
||||
logger.debug { "enqueued #{sorted_set}: #{message}" }
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -51,7 +51,7 @@ module Sidekiq
|
|||
# Most likely a problem with redis networking.
|
||||
# Punt and try again at the next interval
|
||||
logger.error ex.message
|
||||
logger.error(ex.backtrace.first)
|
||||
logger.error ex.backtrace.first
|
||||
end
|
||||
|
||||
after(poll_interval) { poll }
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
$TESTING = true
|
||||
require 'coveralls'
|
||||
Coveralls.wear! do
|
||||
add_filter "/test/"
|
||||
|
|
90
test/test_manager.rb
Normal file
90
test/test_manager.rb
Normal file
|
@ -0,0 +1,90 @@
|
|||
require 'helper'
|
||||
require 'sidekiq/manager'
|
||||
|
||||
class TestManager < MiniTest::Unit::TestCase
|
||||
|
||||
describe 'manager' do
|
||||
it 'creates N processor instances' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
assert_equal options[:concurrency], mgr.ready.size
|
||||
assert_equal [], mgr.busy
|
||||
assert mgr.fetcher
|
||||
end
|
||||
|
||||
it 'fetches upon start' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
count = options[:concurrency]
|
||||
|
||||
fetch_mock = MiniTest::Mock.new
|
||||
count.times { fetch_mock.expect(:fetch, nil, []) }
|
||||
async_mock = MiniTest::Mock.new
|
||||
count.times { async_mock.expect(:async, fetch_mock, []) }
|
||||
mgr.fetcher = async_mock
|
||||
mgr.start
|
||||
|
||||
fetch_mock.verify
|
||||
async_mock.verify
|
||||
end
|
||||
|
||||
it 'assigns work to a processor' do
|
||||
uow = MiniTest::Mock.new
|
||||
processor = MiniTest::Mock.new
|
||||
processor.expect(:async, processor, [])
|
||||
processor.expect(:process, nil, [uow])
|
||||
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr.ready << processor
|
||||
mgr.assign(uow)
|
||||
assert_equal 1, mgr.busy.size
|
||||
|
||||
processor.verify
|
||||
end
|
||||
|
||||
it 'requeues work if stopping' do
|
||||
uow = MiniTest::Mock.new
|
||||
uow.expect(:requeue, nil, [])
|
||||
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr.stop
|
||||
mgr.assign(uow)
|
||||
uow.verify
|
||||
end
|
||||
|
||||
it 'shuts down the system' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
mgr.stop
|
||||
|
||||
assert mgr.busy.empty?
|
||||
assert mgr.ready.empty?
|
||||
refute mgr.fetcher.alive?
|
||||
end
|
||||
|
||||
it 'returns finished processors to the ready pool' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
mgr.busy << processor
|
||||
mgr.processor_done(processor)
|
||||
|
||||
assert_equal 0, mgr.busy.size
|
||||
assert_equal init_size, mgr.ready.size
|
||||
end
|
||||
|
||||
it 'throws away dead processors' do
|
||||
mgr = Sidekiq::Manager.new(options)
|
||||
init_size = mgr.ready.size
|
||||
processor = mgr.ready.pop
|
||||
mgr.busy << processor
|
||||
mgr.processor_died(processor, 'ignored')
|
||||
|
||||
assert_equal 0, mgr.busy.size
|
||||
assert_equal init_size, mgr.ready.size
|
||||
refute mgr.ready.include?(processor)
|
||||
end
|
||||
|
||||
def options
|
||||
{ :concurrency => 3, :queues => ['default'] }
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -7,7 +7,7 @@ class TestScheduled < MiniTest::Unit::TestCase
|
|||
def perform(x)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
describe 'poller' do
|
||||
before do
|
||||
Sidekiq.redis = REDIS
|
||||
|
@ -34,8 +34,7 @@ class TestScheduled < MiniTest::Unit::TestCase
|
|||
|
||||
poller = Sidekiq::Scheduled::Poller.new
|
||||
poller.poll
|
||||
poller.terminate
|
||||
|
||||
|
||||
assert_equal [error_1], conn.lrange("queue:queue_1", 0, -1)
|
||||
assert_equal [error_2], conn.lrange("queue:queue_2", 0, -1)
|
||||
assert_equal [error_3], conn.zrange("retry", 0, -1)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue