mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Refactor Fetcher to use bare threads
This commit is contained in:
parent
0b59ed8c2b
commit
da563e73b1
7 changed files with 110 additions and 122 deletions
|
@ -40,7 +40,6 @@ module Sidekiq
|
|||
validate!
|
||||
daemonize
|
||||
write_pid
|
||||
load_celluloid
|
||||
end
|
||||
|
||||
# Code within this method is not tested because it alters
|
||||
|
@ -160,19 +159,6 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
|
||||
def load_celluloid
|
||||
raise "Celluloid cannot be required until here, or it will break Sidekiq's daemonization" if defined?(::Celluloid) && options[:daemon]
|
||||
|
||||
# Celluloid can't be loaded until after we've daemonized
|
||||
# because it spins up threads and creates locks which get
|
||||
# into a very bad state if forked.
|
||||
require 'celluloid/current'
|
||||
Celluloid.logger = (options[:verbose] ? Sidekiq.logger : nil)
|
||||
|
||||
require 'sidekiq/manager'
|
||||
require 'sidekiq/scheduled'
|
||||
end
|
||||
|
||||
def daemonize
|
||||
return unless options[:daemon]
|
||||
|
||||
|
|
|
@ -1,61 +1,101 @@
|
|||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
require 'sidekiq/actor'
|
||||
require 'thread'
|
||||
|
||||
module Sidekiq
|
||||
##
|
||||
# The Fetcher blocks on Redis, waiting for a message to process
|
||||
# from the queues. It gets the message and hands it to the Manager
|
||||
# The Fetcher blocks on Redis, waiting for a job to process
|
||||
# from the queues. It gets the job and hands it to the Manager
|
||||
# to assign to a ready Processor.
|
||||
#
|
||||
# f = Fetcher.new(mgr, opts)
|
||||
# f.start
|
||||
#
|
||||
# Now anyone can call:
|
||||
#
|
||||
# f.request_job
|
||||
#
|
||||
# and the fetcher will handle a job to the mgr.
|
||||
#
|
||||
# The Manager makes a request_job call for each idle processor
|
||||
# when Sidekiq starts and then issues a new request_job call
|
||||
# every time a Processor finishes a job.
|
||||
#
|
||||
class Fetcher
|
||||
include Util
|
||||
include Actor
|
||||
|
||||
TIMEOUT = 1
|
||||
REQUEST = Object.new
|
||||
|
||||
attr_reader :down
|
||||
|
||||
def initialize(mgr, options)
|
||||
@done = false
|
||||
@down = nil
|
||||
@mgr = mgr
|
||||
@strategy = Fetcher.strategy.new(options)
|
||||
@requests = ConnectionPool::TimedStack.new
|
||||
end
|
||||
|
||||
# Fetching is straightforward: the Manager makes a fetch
|
||||
# request for each idle processor when Sidekiq starts and
|
||||
# then issues a new fetch request every time a Processor
|
||||
# finishes a message.
|
||||
#
|
||||
# Because we have to shut down cleanly, we can't block
|
||||
# forever and we can't loop forever. Instead we reschedule
|
||||
# a new fetch if the current fetch turned up nothing.
|
||||
def fetch
|
||||
watchdog('Fetcher#fetch died') do
|
||||
return if Sidekiq::Fetcher.done?
|
||||
def request_job
|
||||
@requests << REQUEST
|
||||
nil
|
||||
end
|
||||
|
||||
begin
|
||||
work = @strategy.retrieve_work
|
||||
::Sidekiq.logger.info("Redis is online, #{Time.now - @down} sec downtime") if @down
|
||||
@down = nil
|
||||
# Shut down this Fetcher instance, will pause until
|
||||
# the thread is dead.
|
||||
def terminate
|
||||
@done = true
|
||||
if @thread
|
||||
t = @thread
|
||||
@thread = nil
|
||||
@requests << 0
|
||||
t.value
|
||||
end
|
||||
end
|
||||
|
||||
if work
|
||||
@mgr.async.assign(work)
|
||||
else
|
||||
after(0) { fetch }
|
||||
end
|
||||
rescue => ex
|
||||
handle_fetch_exception(ex)
|
||||
# Spins up the thread for this Fetcher instance
|
||||
def start
|
||||
@thread ||= safe_thread("fetcher") do
|
||||
while !@done
|
||||
get_one
|
||||
end
|
||||
Sidekiq.logger.debug("Fetcher shutting down")
|
||||
end
|
||||
end
|
||||
|
||||
# not for public use, testing only
|
||||
def wait_for_request
|
||||
begin
|
||||
req = @requests.pop(1)
|
||||
return if !req || @done
|
||||
|
||||
result = yield
|
||||
unless result
|
||||
@requests << req
|
||||
end
|
||||
result
|
||||
rescue => ex
|
||||
handle_fetch_exception(ex)
|
||||
@requests << REQUEST
|
||||
end
|
||||
end
|
||||
|
||||
# not for public use, testing only
|
||||
def get_one
|
||||
wait_for_request do
|
||||
work = @strategy.retrieve_work
|
||||
::Sidekiq.logger.info("Redis is online, #{Time.now - @down} sec downtime") if @down
|
||||
@down = nil
|
||||
|
||||
@mgr.assign(work) if work
|
||||
work
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def pause
|
||||
sleep(TIMEOUT)
|
||||
end
|
||||
|
||||
def handle_fetch_exception(ex)
|
||||
if !@down
|
||||
logger.error("Error fetching message: #{ex}")
|
||||
|
@ -64,26 +104,7 @@ module Sidekiq
|
|||
end
|
||||
end
|
||||
@down ||= Time.now
|
||||
pause
|
||||
after(0) { fetch }
|
||||
rescue Celluloid::TaskTerminated
|
||||
# If redis is down when we try to shut down, all the fetch backlog
|
||||
# raises these errors. Haven't been able to figure out what I'm doing wrong.
|
||||
end
|
||||
|
||||
# Ugh. Say hello to a bloody hack.
|
||||
# Can't find a clean way to get the fetcher to just stop processing
|
||||
# its mailbox when shutdown starts.
|
||||
def self.done!
|
||||
@done = true
|
||||
end
|
||||
|
||||
def self.reset # testing only
|
||||
@done = nil
|
||||
end
|
||||
|
||||
def self.done?
|
||||
defined?(@done) && @done
|
||||
sleep(TIMEOUT)
|
||||
end
|
||||
|
||||
def self.strategy
|
||||
|
|
|
@ -19,6 +19,12 @@ module Sidekiq
|
|||
raise ex
|
||||
end
|
||||
|
||||
def safe_thread(name, &block)
|
||||
Thread.new do
|
||||
watchdog(name, &block)
|
||||
end
|
||||
end
|
||||
|
||||
def logger
|
||||
Sidekiq.logger
|
||||
end
|
||||
|
|
|
@ -18,7 +18,7 @@ Gem::Specification.new do |gem|
|
|||
gem.add_dependency 'redis', '~> 3.2', '>= 3.2.1'
|
||||
gem.add_dependency 'redis-namespace', '~> 1.5', '>= 1.5.2'
|
||||
gem.add_dependency 'connection_pool', '~> 2.2', '>= 2.2.0'
|
||||
gem.add_dependency 'celluloid', '~> 0.17.2'
|
||||
gem.add_dependency 'concurrent-ruby', '1.0.0.pre3'
|
||||
gem.add_dependency 'json', '~> 1.0'
|
||||
gem.add_development_dependency 'sinatra', '~> 1.4', '>= 1.4.6'
|
||||
gem.add_development_dependency 'minitest', '~> 5.7', '>= 5.7.0'
|
||||
|
|
|
@ -17,9 +17,6 @@ end
|
|||
require 'minitest/autorun'
|
||||
require 'minitest/pride'
|
||||
|
||||
require 'celluloid/current'
|
||||
require 'celluloid/test'
|
||||
Celluloid.boot
|
||||
require 'sidekiq'
|
||||
require 'sidekiq/util'
|
||||
Sidekiq.logger.level = Logger::ERROR
|
||||
|
|
33
test/test_actors.rb
Normal file
33
test/test_actors.rb
Normal file
|
@ -0,0 +1,33 @@
|
|||
require_relative 'helper'
|
||||
require 'sidekiq/fetch'
|
||||
|
||||
class TestActors < Sidekiq::Test
|
||||
class SomeWorker
|
||||
include Sidekiq::Worker
|
||||
end
|
||||
|
||||
describe 'fetcher' do
|
||||
it 'can start and stop' do
|
||||
f = Sidekiq::Fetcher.new(nil, { queues: ['default'] })
|
||||
f.start
|
||||
f.terminate
|
||||
end
|
||||
|
||||
it 'can fetch' do
|
||||
SomeWorker.perform_async
|
||||
|
||||
mgr = Minitest::Mock.new
|
||||
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
|
||||
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
|
||||
f = Sidekiq::Fetcher.new(mgr, { queues: ['default'] })
|
||||
f.start
|
||||
f.request_job
|
||||
sleep 0.001
|
||||
f.terminate
|
||||
mgr.verify
|
||||
|
||||
#assert_equal Sidekiq::BasicFetch::UnitOfWork, job.class
|
||||
end
|
||||
end
|
||||
|
||||
end
|
|
@ -45,60 +45,5 @@ class TestFetcher < Sidekiq::Test
|
|||
assert_equal 1, q2.size
|
||||
end
|
||||
|
||||
describe 'fetching' do
|
||||
before do
|
||||
Sidekiq::Fetcher.reset
|
||||
end
|
||||
|
||||
it 'instantiates' do
|
||||
begin
|
||||
Sidekiq.options[:fetch] = NullFetch
|
||||
mgr = Minitest::Mock.new
|
||||
fetch = Sidekiq::Fetcher.new(mgr, {})
|
||||
fetch.fetch
|
||||
Sidekiq::Fetcher.done!
|
||||
ensure
|
||||
Sidekiq.options[:fetch] = Sidekiq::BasicFetch
|
||||
end
|
||||
end
|
||||
|
||||
class NullFetch
|
||||
def initialize(opts)
|
||||
end
|
||||
def retrieve_work
|
||||
end
|
||||
def self.bulk_requeue(*args)
|
||||
end
|
||||
end
|
||||
|
||||
it 'handles redis network errors' do
|
||||
begin
|
||||
Sidekiq.logger.level = Logger::FATAL
|
||||
Sidekiq.options[:fetch] = ErrorFetch
|
||||
mgr = Minitest::Mock.new
|
||||
fetch = Sidekiq::Fetcher.new(mgr, {})
|
||||
def fetch.pause
|
||||
end
|
||||
refute fetch.down
|
||||
fetch.fetch
|
||||
Sidekiq::Fetcher.done!
|
||||
assert fetch.down
|
||||
ensure
|
||||
Sidekiq.options[:fetch] = Sidekiq::BasicFetch
|
||||
Sidekiq.logger.level = Logger::ERROR
|
||||
end
|
||||
end
|
||||
|
||||
class ErrorFetch
|
||||
def initialize(opts)
|
||||
end
|
||||
def retrieve_work
|
||||
raise IOError, "ker-BOOM"
|
||||
end
|
||||
def self.bulk_requeue(*args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue