mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
67daa7a408
* Prepare for upcoming Sidekiq::Config redesign Adjust the server internals to use a config object rather than refering directly to the Sidekiq module.
147 lines
3.1 KiB
Ruby
147 lines
3.1 KiB
Ruby
# frozen_string_literal: true
|
|
|
|
require_relative "helper"
|
|
require "sidekiq/cli"
|
|
require "sidekiq/fetch"
|
|
require "sidekiq/scheduled"
|
|
require "sidekiq/processor"
|
|
|
|
class JoeWorker
|
|
include Sidekiq::Job
|
|
def perform(slp)
|
|
raise "boom" if slp == "boom"
|
|
sleep(slp) if slp > 0
|
|
$count += 1
|
|
end
|
|
end
|
|
|
|
describe "Actors" do
|
|
before do
|
|
Sidekiq.reset!
|
|
Sidekiq.redis { |c| c.flushdb }
|
|
@config = Sidekiq
|
|
@config[:queues] = %w[default]
|
|
@config[:fetch] = Sidekiq::BasicFetch.new(@config)
|
|
@config[:error_handlers] << Sidekiq.method(:default_error_handler)
|
|
# @config.logger.level = Logger::DEBUG
|
|
end
|
|
|
|
describe "scheduler" do
|
|
it "can start and stop" do
|
|
f = Sidekiq::Scheduled::Poller.new(@config)
|
|
f.start
|
|
f.terminate
|
|
end
|
|
|
|
it "can schedule" do
|
|
ss = Sidekiq::ScheduledSet.new
|
|
q = Sidekiq::Queue.new
|
|
|
|
JoeWorker.perform_in(0.01, 0)
|
|
|
|
assert_equal 0, q.size
|
|
assert_equal 1, ss.size
|
|
|
|
sleep 0.015
|
|
s = Sidekiq::Scheduled::Poller.new(@config)
|
|
s.enqueue
|
|
assert_equal 1, q.size
|
|
assert_equal 0, ss.size
|
|
s.terminate
|
|
end
|
|
end
|
|
|
|
describe "processor" do
|
|
before do
|
|
$count = 0
|
|
@mutex = ::Mutex.new
|
|
@cond = ::ConditionVariable.new
|
|
@latest_error = nil
|
|
end
|
|
|
|
def result(pr, ex)
|
|
@latest_error = ex
|
|
@mutex.synchronize do
|
|
@cond.signal
|
|
end
|
|
end
|
|
|
|
def await(timeout=0.5)
|
|
@mutex.synchronize do
|
|
yield
|
|
@cond.wait(@mutex, timeout)
|
|
end
|
|
end
|
|
|
|
it "can start and stop" do
|
|
f = Sidekiq::Processor.new(@config) { |p, ex| raise "should not raise!" }
|
|
f.terminate
|
|
end
|
|
|
|
it "can process" do
|
|
q = Sidekiq::Queue.new
|
|
assert_equal 0, q.size
|
|
p = Sidekiq::Processor.new(@config) do |pr, ex|
|
|
result(pr, ex)
|
|
end
|
|
JoeWorker.perform_async(0)
|
|
assert_equal 1, q.size
|
|
|
|
a = $count
|
|
await do
|
|
p.start
|
|
end
|
|
|
|
p.kill(true)
|
|
b = $count
|
|
assert_nil @latest_error
|
|
assert_equal a + 1, b
|
|
assert_equal 0, q.size
|
|
end
|
|
|
|
it "deals with errors" do
|
|
q = Sidekiq::Queue.new
|
|
assert_equal 0, q.size
|
|
p = Sidekiq::Processor.new(@config) do |pr, ex|
|
|
result(pr, ex)
|
|
end
|
|
jid = JoeWorker.perform_async("boom")
|
|
assert jid, jid
|
|
assert_equal 1, q.size
|
|
|
|
a = $count
|
|
await do
|
|
p.start
|
|
end
|
|
b = $count
|
|
assert_equal a, b
|
|
|
|
p.kill(true)
|
|
assert @latest_error
|
|
assert_equal "boom", @latest_error.message
|
|
assert_equal RuntimeError, @latest_error.class
|
|
end
|
|
|
|
it "gracefully kills" do
|
|
q = Sidekiq::Queue.new
|
|
assert_equal 0, q.size
|
|
p = Sidekiq::Processor.new(@config) do |pr, ex|
|
|
result(pr, ex)
|
|
end
|
|
jid = JoeWorker.perform_async(1)
|
|
assert jid, jid
|
|
assert_equal 1, q.size
|
|
|
|
a = $count
|
|
p.start
|
|
sleep(0.05)
|
|
p.terminate
|
|
p.kill(true)
|
|
|
|
b = $count
|
|
assert_equal a, b
|
|
assert_equal false, p.thread.status
|
|
refute @latest_error, @latest_error.to_s
|
|
end
|
|
end
|
|
end
|