1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

WIP manager and launcher

This commit is contained in:
Mike Perham 2015-10-06 12:43:01 -07:00
parent 182db329cc
commit 48546fdae9
10 changed files with 327 additions and 349 deletions

View file

@ -129,7 +129,7 @@ module Sidekiq
raise Interrupt raise Interrupt
when 'USR1' when 'USR1'
Sidekiq.logger.info "Received USR1, no longer accepting new work" Sidekiq.logger.info "Received USR1, no longer accepting new work"
launcher.manager.async.stop launcher.quiet
fire_event(:quiet, true) fire_event(:quiet, true)
when 'USR2' when 'USR2'
if Sidekiq.options[:logfile] if Sidekiq.options[:logfile]

View file

@ -1,4 +1,3 @@
require 'sidekiq/actor'
require 'sidekiq/manager' require 'sidekiq/manager'
require 'sidekiq/fetch' require 'sidekiq/fetch'
require 'sidekiq/scheduled' require 'sidekiq/scheduled'
@ -9,62 +8,101 @@ module Sidekiq
# If any of these actors die, the Sidekiq process exits # If any of these actors die, the Sidekiq process exits
# immediately. # immediately.
class Launcher class Launcher
include Actor
include Util include Util
trap_exit :actor_died attr_accessor :manager, :poller, :fetcher
attr_reader :manager, :poller, :fetcher
def initialize(options) def initialize(options)
@condvar = Celluloid::Condition.new @condvar = ::ConditionVariable.new
@manager = Sidekiq::Manager.new_link(@condvar, options) @manager = Sidekiq::Manager.new(@condvar, options)
@poller = Sidekiq::Scheduled::Poller.new_link @poller = Sidekiq::Scheduled::Poller.new
@fetcher = Sidekiq::Fetcher.new_link(@manager, options) @fetcher = Sidekiq::Fetcher.new(@manager, options)
@manager.fetcher = @fetcher @manager.fetcher = @fetcher
@done = false @done = false
@options = options @options = options
end end
def actor_died(actor, reason)
# https://github.com/mperham/sidekiq/issues/2057#issuecomment-66485477
return if @done || !reason
Sidekiq.logger.warn("Sidekiq died due to the following error, cannot recover, process exiting")
handle_exception(reason)
exit(1)
end
def run def run
watchdog('Launcher#run') do @thread = safe_thread("heartbeat", &method(:start_heartbeat))
manager.async.start @fetcher.start
poller.async.poll(true) @poller.start
@manager.start
start_heartbeat
end
end end
# Stops this instance from processing any more jobs,
#
def quiet
@manager.quiet
@fetcher.terminate
@poller.terminate
end
# Shuts down the process. This method does not
# return until all work is complete and cleaned up.
# It can take up to the timeout to complete.
def stop def stop
watchdog('Launcher#stop') do deadline = Time.now + @options[:timeout]
@done = true
Sidekiq::Fetcher.done!
fetcher.terminate if fetcher.alive?
poller.terminate if poller.alive?
manager.async.stop(:shutdown => true, :timeout => @options[:timeout]) @manager.quiet
@condvar.wait @fetcher.terminate
manager.terminate @poller.terminate
# Requeue everything in case there was a worker who grabbed work while stopped @manager.stop(deadline)
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
Sidekiq::Fetcher.strategy.bulk_requeue([], @options)
stop_heartbeat # Requeue everything in case there was a worker who grabbed work while stopped
end # This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
Sidekiq::Fetcher.strategy.bulk_requeue([], @options)
stop_heartbeat
end end
private private
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
PROCTITLES = [
proc { 'sidekiq'.freeze },
proc { Sidekiq::VERSION },
proc { |me, data| data['tag'] },
proc { |me, data| "[#{me.manager.in_progress.size} of #{data['concurrency']} busy]" },
proc { |me, data| "stopping" if me.manager.stopped? },
]
def heartbeat(key, data, json)
while !@done
results = PROCTITLES.map {|x| x.(self, data) }
results.compact!
$0 = results.join(' ')
(key, json)
sleep 5
end
end
def (key, json)
begin
_, _, _, msg = Sidekiq.redis do |conn|
conn.pipelined do
conn.sadd('processes', key)
conn.hmset(key, 'info', json, 'busy', manager.in_progress.size, 'beat', Time.now.to_f)
conn.expire(key, 60)
conn.rpop("#{key}-signals")
end
end
return unless msg
if JVM_RESERVED_SIGNALS.include?(msg)
Sidekiq::CLI.instance.handle_signal(msg)
else
::Process.kill(msg, $$)
end
rescue => e
# ignore all redis/network issues
logger.error("heartbeat: #{e.message}")
end
end
def start_heartbeat def start_heartbeat
key = identity key = identity
data = { data = {
@ -74,16 +112,17 @@ module Sidekiq
'tag' => @options[:tag] || '', 'tag' => @options[:tag] || '',
'concurrency' => @options[:concurrency], 'concurrency' => @options[:concurrency],
'queues' => @options[:queues].uniq, 'queues' => @options[:queues].uniq,
'labels' => Sidekiq.options[:labels], 'labels' => @options[:labels],
'identity' => identity, 'identity' => identity,
} }
# this data doesn't change so dump it to a string # this data doesn't change so dump it to a string
# now so we don't need to dump it every heartbeat. # now so we don't need to dump it every heartbeat.
json = Sidekiq.dump_json(data) json = Sidekiq.dump_json(data)
manager.heartbeat(key, data, json) heartbeat(key, data, json)
end end
def stop_heartbeat def stop_heartbeat
@done = true
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.pipelined do conn.pipelined do
conn.srem('processes', identity) conn.srem('processes', identity)

View file

@ -1,156 +1,121 @@
# encoding: utf-8 # encoding: utf-8
require 'sidekiq/util' require 'sidekiq/util'
require 'sidekiq/actor'
require 'sidekiq/processor' require 'sidekiq/processor'
require 'sidekiq/fetch' require 'sidekiq/fetch'
module Sidekiq module Sidekiq
## ##
# The main router in the system. This # The Manager is the central coordination point in Sidekiq, controlling
# manages the processor state and accepts messages # the lifecycle of the Processors and feeding them jobs as necessary.
# from Redis to be dispatched to an idle processor. #
# Tasks:
#
# 1. start: Spin up Processors. Issue fetch requests for each.
# 2. processor_done: Handle job success, issue fetch request.
# 3. processor_died: Handle job failure, throw away Processor, issue fetch request.
# 4. quiet: shutdown idle Processors, ignore further fetch requests.
# 5. stop: hard stop the Processors by deadline.
#
# Note that only the last task requires a Thread since it has to monitor
# the shutdown process. The other tasks are performed by other threads.
# #
class Manager class Manager
include Util include Util
include Actor
trap_exit :processor_died
attr_writer :fetcher
attr_reader :in_progress
attr_reader :ready attr_reader :ready
attr_reader :busy
attr_accessor :fetcher
SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1 SPIN_TIME_FOR_GRACEFUL_SHUTDOWN = 1
JVM_RESERVED_SIGNALS = ['USR1', 'USR2'] # Don't Process#kill if we get these signals via the API
def initialize(condvar, options={}) def initialize(condvar, options={})
logger.debug { options.inspect } logger.debug { options.inspect }
@options = options @options = options
@count = options[:concurrency] || 25 @count = options[:concurrency] || 25
raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1
@done_callback = nil
@finished = condvar @finished = condvar
@in_progress = {} @in_progress = {}
@threads = {}
@done = false @done = false
@busy = [] @ready = Array.new(@count) do
@ready = @count.times.map do Processor.new(self)
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
p
end end
end @plock = Mutex.new
def stop(options={})
watchdog('Manager#stop died') do
should_shutdown = options[:shutdown]
timeout = options[:timeout]
@done = true
logger.info { "Terminating #{@ready.size} quiet workers" }
@ready.each { |x| x.terminate if x.alive? }
@ready.clear
return if clean_up_for_graceful_shutdown
hard_shutdown_in timeout if should_shutdown
end
end
def clean_up_for_graceful_shutdown
if @busy.empty?
shutdown
return true
end
after(SPIN_TIME_FOR_GRACEFUL_SHUTDOWN) { clean_up_for_graceful_shutdown }
false
end end
def start def start
@ready.each { dispatch } @ready.each { |x| x.start; dispatch }
end end
def when_done(&blk) def quiet
@done_callback = blk return if @done
@done = true
logger.info { "Terminating quiet workers" }
@plock.synchronize do
@ready.each { |x| x.terminate }
@ready.clear
end
end
def stop(deadline)
quiet
return shutdown if @in_progress.empty?
logger.info { "Pausing to allow workers to finish..." }
remaining = deadline - Time.now
while remaining > 0.5
return shutdown if @in_progress.empty?
sleep 0.5
remaining = deadline - Time.now
end
return shutdown if @in_progress.empty?
hard_shutdown
end end
def processor_done(processor) def processor_done(processor)
watchdog('Manager#processor_done died') do @plock.synchronize do
@done_callback.call(processor) if @done_callback @in_progress.delete(processor)
@in_progress.delete(processor.object_id) if @done
@threads.delete(processor.object_id) processor.terminate
@busy.delete(processor) #shutdown if @in_progress.empty?
if stopped?
processor.terminate if processor.alive?
shutdown if @busy.empty?
else else
@ready << processor if processor.alive? @ready << processor
end end
dispatch
end end
dispatch
end end
def processor_died(processor, reason) def processor_died(processor, reason)
watchdog("Manager#processor_died died") do @plock.synchronize do
@in_progress.delete(processor.object_id) @in_progress.delete(processor)
@threads.delete(processor.object_id) if @done
@busy.delete(processor) #shutdown if @in_progress.empty?
unless stopped?
p = Processor.new_link(current_actor)
p.proxy_id = p.object_id
@ready << p
dispatch
else else
shutdown if @busy.empty? @ready << Processor.new(self)
end end
end end
dispatch
end end
def assign(work) def assign(work)
watchdog("Manager#assign died") do if @done
if stopped? # Race condition between Manager#stop if Fetcher
# Race condition between Manager#stop if Fetcher # is blocked on redis and gets a message after
# is blocked on redis and gets a message after # all the ready Processors have been stopped.
# all the ready Processors have been stopped. # Push the message back to redis.
# Push the message back to redis. work.requeue
work.requeue else
else processor = nil
@plock.synchronize do
processor = @ready.pop processor = @ready.pop
@in_progress[processor.object_id] = work @in_progress[processor] = work
@busy << processor
processor.async.process(work)
end end
end processor.request_process(work)
end
# A hack worthy of Rube Goldberg. We need to be able
# to hard stop a working thread. But there's no way for us to
# get handle to the underlying thread performing work for a processor
# so we have it call us and tell us.
def real_thread(proxy_id, thr)
@threads[proxy_id] = thr
end
PROCTITLES = [
proc { 'sidekiq'.freeze },
proc { Sidekiq::VERSION },
proc { |mgr, data| data['tag'] },
proc { |mgr, data| "[#{mgr.busy.size} of #{data['concurrency']} busy]" },
proc { |mgr, data| "stopping" if mgr.stopped? },
]
def heartbeat(key, data, json)
results = PROCTITLES.map {|x| x.(self, data) }
results.compact!
$0 = results.join(' ')
(key, json)
after(5) do
heartbeat(key, data, json)
end end
end end
@ -160,77 +125,44 @@ module Sidekiq
private private
def (key, json) def hard_shutdown
begin # We've reached the timeout and we still have busy workers.
_, _, _, msg = Sidekiq.redis do |conn| # They must die but their messages shall live on.
conn.multi do logger.warn { "Terminating #{@in_progress.size} busy worker threads" }
conn.sadd('processes', key) logger.warn { "Work still in progress #{@in_progress.values.inspect}" }
conn.hmset(key, 'info', json, 'busy', @busy.size, 'beat', Time.now.to_f)
conn.expire(key, 60)
conn.rpop("#{key}-signals")
end
end
return unless msg requeue
if JVM_RESERVED_SIGNALS.include?(msg) @in_progress.each do |processor, _|
Sidekiq::CLI.instance.handle_signal(msg) processor.kill
else
::Process.kill(msg, $$)
end
rescue => e
# ignore all redis/network issues
logger.error("heartbeat: #{e.message}")
end
end
def hard_shutdown_in(delay)
logger.info { "Pausing up to #{delay} seconds to allow workers to finish..." }
after(delay) do
watchdog("Manager#hard_shutdown_in died") do
# We've reached the timeout and we still have busy workers.
# They must die but their messages shall live on.
logger.warn { "Terminating #{@busy.size} busy worker threads" }
logger.warn { "Work still in progress #{@in_progress.values.inspect}" }
requeue
@busy.each do |processor|
if processor.alive? && t = @threads.delete(processor.object_id)
t.raise Shutdown
end
end
@finished.signal
end
end end
end end
def dispatch def dispatch
return if stopped? return if @done
# This is a safety check to ensure we haven't leaked # This is a safety check to ensure we haven't leaked processors somehow.
# processors somehow. raise "BUG: No processors, cannot continue!" if @ready.empty? && @in_progress.empty?
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
raise "No ready processor!?" if @ready.empty? raise "No ready processor!?" if @ready.empty?
@fetcher.async.fetch @fetcher.request_job
end end
def shutdown def shutdown
requeue requeue
@finished.signal
end end
def requeue def requeue
# Re-enqueue terminated jobs # Re-enqueue unfinished jobs
# NOTE: You may notice that we may push a job back to redis before # NOTE: You may notice that we may push a job back to redis before
# the worker thread is terminated. This is ok because Sidekiq's # the worker thread is terminated. This is ok because Sidekiq's
# contract says that jobs are run AT LEAST once. Process termination # contract says that jobs are run AT LEAST once. Process termination
# is delayed until we're certain the jobs are back in Redis because # is delayed until we're certain the jobs are back in Redis because
# it is worse to lose a job than to run it twice. # it is worse to lose a job than to run it twice.
Sidekiq::Fetcher.strategy.bulk_requeue(@in_progress.values, @options) jobs = nil
@in_progress.clear @plock.synchronize do
jobs = @in_progress.values
end
Sidekiq::Fetcher.strategy.bulk_requeue(jobs, @options) if jobs.size > 0
end end
end end
end end

View file

@ -33,37 +33,41 @@ module Sidekiq
@mgr = mgr @mgr = mgr
@done = false @done = false
@work = ::Queue.new @work = ::Queue.new
@thread = safe_thread("processor", &method(:run))
end end
def terminate(wait=false) def terminate(wait=false)
@done = true @done = true
@work << nil @work << nil
@thread.value if wait
end
def kill(wait=false)
# unlike the other actors, terminate does not wait # unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how # for the thread to finish because we don't know how
# long the job will take to finish. Instead we # long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown # provide a `kill` method to call after the shutdown
# timeout passes. # timeout passes.
@thread.value if wait
end
def kill(wait=false)
@thread.raise ::Sidekiq::Shutdown @thread.raise ::Sidekiq::Shutdown
@thread.value if wait @thread.value if wait
end end
def process(work) def start
@thread ||= safe_thread("processor", &method(:run))
end
def request_process(work)
raise ArgumentError, "Processor is shut down!" if @done raise ArgumentError, "Processor is shut down!" if @done
raise ArgumentError, "Processor has not started!" unless @thread
@work << work @work << work
end end
private private unless $TESTING
def run def run
begin begin
while !@done while !@done
job = @work.pop job = @work.pop
go(job) if job process(job) if job
end end
rescue Exception => ex rescue Exception => ex
Sidekiq.logger.warn(ex.message) Sidekiq.logger.warn(ex.message)
@ -71,7 +75,7 @@ module Sidekiq
end end
end end
def go(work) def process(work)
msgstr = work.message msgstr = work.message
queue = work.queue_name queue = work.queue_name
@ -121,7 +125,7 @@ module Sidekiq
retry_and_suppress_exceptions do retry_and_suppress_exceptions do
hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i }) hash = Sidekiq.dump_json({:queue => queue, :payload => msg, :run_at => Time.now.to_i })
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.multi do conn.pipelined do
conn.hmset("#{identity}:workers", thread_identity, hash) conn.hmset("#{identity}:workers", thread_identity, hash)
conn.expire("#{identity}:workers", 14400) conn.expire("#{identity}:workers", 14400)
end end
@ -135,7 +139,7 @@ module Sidekiq
retry_and_suppress_exceptions do retry_and_suppress_exceptions do
failed = "stat:failed:#{nowdate}" failed = "stat:failed:#{nowdate}"
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.multi do conn.pipelined do
conn.incrby("stat:failed".freeze, 1) conn.incrby("stat:failed".freeze, 1)
conn.incrby(failed, 1) conn.incrby(failed, 1)
conn.expire(failed, STATS_TIMEOUT) conn.expire(failed, STATS_TIMEOUT)
@ -147,7 +151,7 @@ module Sidekiq
retry_and_suppress_exceptions do retry_and_suppress_exceptions do
processed = "stat:processed:#{nowdate}" processed = "stat:processed:#{nowdate}"
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
conn.multi do conn.pipelined do
conn.hdel("#{identity}:workers", thread_identity) conn.hdel("#{identity}:workers", thread_identity)
conn.incrby("stat:processed".freeze, 1) conn.incrby("stat:processed".freeze, 1)
conn.incrby(processed, 1) conn.incrby(processed, 1)
@ -174,7 +178,7 @@ module Sidekiq
rescue => e rescue => e
retry_count += 1 retry_count += 1
if retry_count <= max_retries if retry_count <= max_retries
Sidekiq.logger.debug {"Suppressing and retrying error: #{e.inspect}"} Sidekiq.logger.info {"Suppressing and retrying error: #{e.inspect}"}
pause_for_recovery(retry_count) pause_for_recovery(retry_count)
retry retry
else else

View file

@ -9,6 +9,23 @@ if ENV["COVERAGE"]
end end
ENV['RACK_ENV'] = ENV['RAILS_ENV'] = 'test' ENV['RACK_ENV'] = ENV['RAILS_ENV'] = 'test'
trap 'USR1' do
threads = Thread.list
puts
puts "=" * 80
puts "Received USR1 signal; printing all #{threads.count} thread backtraces."
threads.each do |thr|
description = thr == Thread.main ? "Main thread" : thr.inspect
puts
puts "#{description} backtrace: "
puts thr.backtrace.join("\n")
end
puts "=" * 80
end
begin begin
require 'pry-byebug' require 'pry-byebug'
rescue LoadError rescue LoadError

View file

@ -94,13 +94,14 @@ class TestActors < Sidekiq::Test
mgr = Mgr.new mgr = Mgr.new
p = Sidekiq::Processor.new(mgr) p = Sidekiq::Processor.new(mgr)
p.start
SomeWorker.perform_async(0) SomeWorker.perform_async(0)
job = Sidekiq.redis { |c| c.lpop("queue:default") } job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count a = $count
mgr.mutex.synchronize do mgr.mutex.synchronize do
p.process(uow) p.request_process(uow)
mgr.cond.wait(mgr.mutex) mgr.cond.wait(mgr.mutex)
end end
b = $count b = $count
@ -115,13 +116,14 @@ class TestActors < Sidekiq::Test
mgr = Mgr.new mgr = Mgr.new
p = Sidekiq::Processor.new(mgr) p = Sidekiq::Processor.new(mgr)
p.start
SomeWorker.perform_async("boom") SomeWorker.perform_async("boom")
job = Sidekiq.redis { |c| c.lpop("queue:default") } job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count a = $count
mgr.mutex.synchronize do mgr.mutex.synchronize do
p.process(uow) p.request_process(uow)
mgr.cond.wait(mgr.mutex) mgr.cond.wait(mgr.mutex)
end end
b = $count b = $count
@ -140,7 +142,8 @@ class TestActors < Sidekiq::Test
job = Sidekiq.redis { |c| c.lpop("queue:default") } job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count a = $count
p.process(uow) p.start
p.request_process(uow)
sleep(0.02) sleep(0.02)
p.terminate p.terminate
p.kill(true) p.kill(true)

92
test/test_launcher.rb Normal file
View file

@ -0,0 +1,92 @@
require_relative 'helper'
require 'sidekiq/launcher'
class TestLauncher < Sidekiq::Test
describe 'launcher' do
before do
Sidekiq.redis {|c| c.flushdb }
end
def new_manager(opts)
condvar = Minitest::Mock.new
condvar.expect(:signal, nil, [])
Sidekiq::Manager.new(condvar, opts)
end
describe 'heartbeat' do
before do
uow = Object.new
@processor = Minitest::Mock.new
@processor.expect(:request_process, nil, [uow])
@processor.expect(:hash, 1234, [])
@mgr = new_manager(options)
@launcher = Sidekiq::Launcher.new(options)
@launcher.manager = @mgr
@mgr.ready << @processor
@mgr.assign(uow)
@processor.verify
@proctitle = $0
end
after do
$0 = @proctitle
end
describe 'when manager is active' do
before do
Sidekiq::Launcher::PROCTITLES << proc { "xyz" }
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
Sidekiq::Launcher::PROCTITLES.pop
end
it 'sets useful info to proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["1"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 500
end
end
describe 'when manager is stopped' do
before do
@processor.expect(:terminate, [])
@launcher.quiet
@launcher.manager.processor_done(@processor)
@launcher.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
@processor.verify
end
it 'indicates stopping status in proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["0"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 50
end
end
end
def options
{ :concurrency => 3, :queues => ['default'] }
end
def heartbeat_data
{ 'concurrency' => 3, 'tag' => 'myapp' }
end
end
end

View file

@ -17,19 +17,19 @@ class TestManager < Sidekiq::Test
it 'creates N processor instances' do it 'creates N processor instances' do
mgr = new_manager(options) mgr = new_manager(options)
assert_equal options[:concurrency], mgr.ready.size assert_equal options[:concurrency], mgr.ready.size
assert_equal [], mgr.busy assert_equal({}, mgr.in_progress)
end end
it 'assigns work to a processor' do it 'assigns work to a processor' do
uow = Object.new uow = Object.new
processor = Minitest::Mock.new processor = Minitest::Mock.new
processor.expect(:async, processor, [])
processor.expect(:process, nil, [uow]) processor.expect(:process, nil, [uow])
processor.expect(:hash, 1234, [])
mgr = new_manager(options) mgr = new_manager(options)
mgr.ready << processor mgr.ready << processor
mgr.assign(uow) mgr.assign(uow)
assert_equal 1, mgr.busy.size assert_equal 1, mgr.in_progress.size
processor.verify processor.verify
end end
@ -40,7 +40,7 @@ class TestManager < Sidekiq::Test
mgr = new_manager(options) mgr = new_manager(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop mgr.quiet
mgr.assign(uow) mgr.assign(uow)
uow.verify uow.verify
end end
@ -48,40 +48,38 @@ class TestManager < Sidekiq::Test
it 'shuts down the system' do it 'shuts down the system' do
mgr = new_manager(options) mgr = new_manager(options)
mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []}) mgr.fetcher = Sidekiq::BasicFetch.new({:queues => []})
mgr.stop mgr.stop(Time.now)
assert mgr.busy.empty? assert mgr.in_progress.empty?
assert mgr.ready.empty? assert mgr.ready.empty?
end end
it 'returns finished processors to the ready pool' do it 'returns finished processors to the ready pool' do
fetcher = MiniTest::Mock.new fetcher = MiniTest::Mock.new
fetcher.expect :async, fetcher, [] fetcher.expect :request_job, nil, []
fetcher.expect :fetch, nil, []
mgr = new_manager(options) mgr = new_manager(options)
mgr.fetcher = fetcher mgr.fetcher = fetcher
init_size = mgr.ready.size init_size = mgr.ready.size
processor = mgr.ready.pop processor = mgr.ready.pop
mgr.busy << processor mgr.in_progress[processor] = 'abc'
mgr.processor_done(processor) mgr.processor_done(processor)
assert_equal 0, mgr.busy.size assert_equal 0, mgr.in_progress.size
assert_equal init_size, mgr.ready.size assert_equal init_size, mgr.ready.size
fetcher.verify fetcher.verify
end end
it 'throws away dead processors' do it 'throws away dead processors' do
fetcher = MiniTest::Mock.new fetcher = MiniTest::Mock.new
fetcher.expect :async, fetcher, [] fetcher.expect :request_job, nil, []
fetcher.expect :fetch, nil, []
mgr = new_manager(options) mgr = new_manager(options)
mgr.fetcher = fetcher mgr.fetcher = fetcher
init_size = mgr.ready.size init_size = mgr.ready.size
processor = mgr.ready.pop processor = mgr.ready.pop
mgr.busy << processor mgr.in_progress[processor] = 'abc'
mgr.processor_died(processor, 'ignored') mgr.processor_died(processor, 'ignored')
assert_equal 0, mgr.busy.size assert_equal 0, mgr.in_progress.size
assert_equal init_size, mgr.ready.size assert_equal init_size, mgr.ready.size
refute mgr.ready.include?(processor) refute mgr.ready.include?(processor)
fetcher.verify fetcher.verify
@ -92,77 +90,9 @@ class TestManager < Sidekiq::Test
assert_raises(ArgumentError) { new_manager(concurrency: -1) } assert_raises(ArgumentError) { new_manager(concurrency: -1) }
end end
describe 'heartbeat' do
before do
uow = Object.new
@processor = Minitest::Mock.new
@processor.expect(:async, @processor, [])
@processor.expect(:process, nil, [uow])
@mgr = new_manager(options)
@mgr.ready << @processor
@mgr.assign(uow)
@processor.verify
@proctitle = $0
end
after do
$0 = @proctitle
end
describe 'when manager is active' do
before do
Sidekiq::Manager::PROCTITLES << proc { "xyz" }
@mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
Sidekiq::Manager::PROCTITLES.pop
end
it 'sets useful info to proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [1 of 3 busy] xyz", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["1"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 500
end
end
describe 'when manager is stopped' do
before do
@processor.expect(:alive?, [])
@processor.expect(:terminate, [])
@mgr.stop
@mgr.processor_done(@processor)
@mgr.heartbeat('identity', heartbeat_data, Sidekiq.dump_json(heartbeat_data))
@processor.verify
end
it 'indicates stopping status in proctitle' do
assert_equal "sidekiq #{Sidekiq::VERSION} myapp [0 of 3 busy] stopping", $0
end
it 'stores process info in redis' do
info = Sidekiq.redis { |c| c.hmget('identity', 'busy') }
assert_equal ["0"], info
expires = Sidekiq.redis { |c| c.pttl('identity') }
assert_in_delta 60000, expires, 50
end
end
end
def options def options
{ :concurrency => 3, :queues => ['default'] } { :concurrency => 3, :queues => ['default'] }
end end
def heartbeat_data
{ 'concurrency' => 3, 'tag' => 'myapp' }
end
end end
end end

View file

@ -80,11 +80,7 @@ class TestMiddleware < Sidekiq::Test
boss = Minitest::Mock.new boss = Minitest::Mock.new
processor = Sidekiq::Processor.new(boss) processor = Sidekiq::Processor.new(boss)
actor = Minitest::Mock.new boss.expect(:processor_done, nil, [processor])
actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Thread])
boss.expect(:async, actor, [])
boss.expect(:async, actor, [])
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg)) processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten
end end

View file

@ -1,15 +1,17 @@
require_relative 'helper' require_relative 'helper'
require 'sidekiq/fetch'
require 'sidekiq/cli'
require 'sidekiq/processor' require 'sidekiq/processor'
class TestProcessor < Sidekiq::Test class TestProcessor < Sidekiq::Test
TestException = Class.new(StandardError) TestException = Class.new(StandardError)
TEST_EXCEPTION = TestException.new("kerboom!") TEST_EXCEPTION = TestException.new("kerboom!")
describe 'with mock setup' do describe 'processor' do
before do before do
$invokes = 0 $invokes = 0
@boss = Minitest::Mock.new @mgr = Minitest::Mock.new
@processor = ::Sidekiq::Processor.new(@boss) @processor = ::Sidekiq::Processor.new(@mgr)
end end
class MockWorker class MockWorker
@ -27,13 +29,9 @@ class TestProcessor < Sidekiq::Test
it 'processes as expected' do it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new @mgr.expect(:processor_done, nil, [Sidekiq::Processor])
actor.expect(:processor_done, nil, [@processor])
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg)) @processor.process(work(msg))
@boss.verify @mgr.verify
assert_equal 1, $invokes assert_equal 1, $invokes
end end
@ -43,46 +41,26 @@ class TestProcessor < Sidekiq::Test
@processor.execute_job(worker, [1, 2, 3]) @processor.execute_job(worker, [1, 2, 3])
end end
it 'passes exceptions to ExceptionHandler' do
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(work(msg))
flunk "Expected #process to raise exception"
rescue TestException
end
assert_equal 0, $invokes
end
it 're-raises exceptions after handling' do it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false re_raise = false
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
begin begin
@processor.process(work(msg)) @processor.process(work(msg))
flunk "Expected exception"
rescue TestException rescue TestException
re_raise = true re_raise = true
end end
assert_equal 0, $invokes
assert re_raise, "does not re-raise exceptions after handling" assert re_raise, "does not re-raise exceptions after handling"
end end
it 'does not modify original arguments' do it 'does not modify original arguments' do
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] } msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
msgstr = Sidekiq.dump_json(msg) msgstr = Sidekiq.dump_json(msg)
processor = ::Sidekiq::Processor.new(@boss) @mgr.expect(:processor_done, nil, [@processor])
actor = Minitest::Mock.new @processor.process(work(msgstr))
actor.expect(:processor_done, nil, [processor])
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
processor.process(work(msgstr))
assert_equal [['myarg']], msg['args'] assert_equal [['myarg']], msg['args']
end end
@ -106,17 +84,13 @@ class TestProcessor < Sidekiq::Test
let(:skip_job) { false } let(:skip_job) { false }
let(:worker_args) { ['myarg'] } let(:worker_args) { ['myarg'] }
let(:work) { MiniTest::Mock.new } let(:work) { MiniTest::Mock.new }
let(:actor) { Minitest::Mock.new }
before do before do
work.expect(:queue_name, 'queues:default') work.expect(:queue_name, 'queue:default')
work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args })) work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
Sidekiq.server_middleware do |chain| Sidekiq.server_middleware do |chain|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
end end
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
end end
after do after do
@ -156,8 +130,7 @@ class TestProcessor < Sidekiq::Test
it 'acks the job' do it 'acks the job' do
work.expect(:acknowledge, nil) work.expect(:acknowledge, nil)
@boss.expect(:async, actor, []) @mgr.expect(:processor_done, nil, [@processor])
actor.expect(:processor_done, nil, [@processor])
@processor.process(work) @processor.process(work)
end end
end end
@ -178,8 +151,7 @@ class TestProcessor < Sidekiq::Test
describe 'everything goes well' do describe 'everything goes well' do
it 'acks the job' do it 'acks the job' do
work.expect(:acknowledge, nil) work.expect(:acknowledge, nil)
@boss.expect(:async, actor, []) @mgr.expect(:processor_done, nil, [@processor])
actor.expect(:processor_done, nil, [@processor])
@processor.process(work) @processor.process(work)
end end
end end
@ -195,11 +167,7 @@ class TestProcessor < Sidekiq::Test
def successful_job def successful_job
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
actor = Minitest::Mock.new @mgr.expect(:processor_done, nil, [@processor])
actor.expect(:real_thread, nil, [nil, Thread])
actor.expect(:processor_done, nil, [@processor])
@boss.expect(:async, actor, [])
@boss.expect(:async, actor, [])
@processor.process(work(msg)) @processor.process(work(msg))
end end
@ -215,9 +183,6 @@ class TestProcessor < Sidekiq::Test
let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" } let(:failed_today_key) { "stat:failed:#{Time.now.utc.strftime("%Y-%m-%d")}" }
def failed_job def failed_job
actor = Minitest::Mock.new
actor.expect(:real_thread, nil, [nil, Thread])
@boss.expect(:async, actor, [])
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin begin
@processor.process(work(msg)) @processor.process(work(msg))