Fetch API refactor, WIP (#4602)
* Fetch API refactor, WIP * save options for later * changes * Fix test failures
This commit is contained in:
parent
50b9e67655
commit
fce05c9d4b
|
@ -12,6 +12,7 @@ HEAD
|
|||
- Remove rack-protection, reimplement CSRF protection [#4588]
|
||||
- Require redis-rb 4.2 [#4591]
|
||||
- Update to jquery 1.12.4 [#4593]
|
||||
- Refactor internal fetch logic and API [#4602]
|
||||
|
||||
6.0.7
|
||||
---------
|
||||
|
|
|
@ -10,6 +10,7 @@ HEAD
|
|||
- Remove `concurrent-ruby` gem dependency [#4586]
|
||||
- Update `constantize` for batch callbacks. [#4469]
|
||||
- Add queue tag to `jobs.recovered.fetch` metric [#4594]
|
||||
- Refactor Pro's fetch infrastructure [#4602]
|
||||
|
||||
5.0.1
|
||||
---------
|
||||
|
|
|
@ -25,8 +25,10 @@ module Sidekiq
|
|||
}
|
||||
|
||||
def initialize(options)
|
||||
@strictly_ordered_queues = !!options[:strict]
|
||||
@queues = options[:queues].map { |q| "queue:#{q}" }
|
||||
raise ArgumentError, "missing queue list" unless options[:queues]
|
||||
@options = options
|
||||
@strictly_ordered_queues = !!@options[:strict]
|
||||
@queues = @options[:queues].map { |q| "queue:#{q}" }
|
||||
if @strictly_ordered_queues
|
||||
@queues.uniq!
|
||||
@queues << TIMEOUT
|
||||
|
@ -38,24 +40,9 @@ module Sidekiq
|
|||
UnitOfWork.new(*work) if work
|
||||
end
|
||||
|
||||
# Creating the Redis#brpop command takes into account any
|
||||
# configured queue weights. By default Redis#brpop returns
|
||||
# data from the first queue that has pending elements. We
|
||||
# recreate the queue command each time we invoke Redis#brpop
|
||||
# to honor weights and avoid queue starvation.
|
||||
def queues_cmd
|
||||
if @strictly_ordered_queues
|
||||
@queues
|
||||
else
|
||||
queues = @queues.shuffle!.uniq
|
||||
queues << TIMEOUT
|
||||
queues
|
||||
end
|
||||
end
|
||||
|
||||
# By leaving this as a class method, it can be pluggable and used by the Manager actor. Making it
|
||||
# an instance method will make it async to the Fetcher actor
|
||||
def self.bulk_requeue(inprogress, options)
|
||||
def bulk_requeue(inprogress, options)
|
||||
return if inprogress.empty?
|
||||
|
||||
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
|
||||
|
@ -76,5 +63,20 @@ module Sidekiq
|
|||
rescue => ex
|
||||
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
|
||||
end
|
||||
|
||||
# Creating the Redis#brpop command takes into account any
|
||||
# configured queue weights. By default Redis#brpop returns
|
||||
# data from the first queue that has pending elements. We
|
||||
# recreate the queue command each time we invoke Redis#brpop
|
||||
# to honor weights and avoid queue starvation.
|
||||
def queues_cmd
|
||||
if @strictly_ordered_queues
|
||||
@queues
|
||||
else
|
||||
queues = @queues.shuffle!.uniq
|
||||
queues << TIMEOUT
|
||||
queues
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -22,6 +22,7 @@ module Sidekiq
|
|||
attr_accessor :manager, :poller, :fetcher
|
||||
|
||||
def initialize(options)
|
||||
options[:fetch] ||= BasicFetch.new(options)
|
||||
@manager = Sidekiq::Manager.new(options)
|
||||
@poller = Sidekiq::Scheduled::Poller.new
|
||||
@done = false
|
||||
|
@ -56,7 +57,7 @@ module Sidekiq
|
|||
|
||||
# Requeue everything in case there was a worker who grabbed work while stopped
|
||||
# This call is a no-op in Sidekiq but necessary for Sidekiq Pro.
|
||||
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
|
||||
strategy = @options[:fetch]
|
||||
strategy.bulk_requeue([], @options)
|
||||
|
||||
clear_heartbeat
|
||||
|
|
|
@ -35,7 +35,7 @@ module Sidekiq
|
|||
@done = false
|
||||
@workers = Set.new
|
||||
@count.times do
|
||||
@workers << Processor.new(self)
|
||||
@workers << Processor.new(self, options)
|
||||
end
|
||||
@plock = Mutex.new
|
||||
end
|
||||
|
@ -90,7 +90,7 @@ module Sidekiq
|
|||
@plock.synchronize do
|
||||
@workers.delete(processor)
|
||||
unless @done
|
||||
p = Processor.new(self)
|
||||
p = Processor.new(self, options)
|
||||
@workers << p
|
||||
p.start
|
||||
end
|
||||
|
@ -123,7 +123,7 @@ module Sidekiq
|
|||
# contract says that jobs are run AT LEAST once. Process termination
|
||||
# is delayed until we're certain the jobs are back in Redis because
|
||||
# it is worse to lose a job than to run it twice.
|
||||
strategy = (@options[:fetch] || Sidekiq::BasicFetch)
|
||||
strategy = @options[:fetch]
|
||||
strategy.bulk_requeue(jobs, @options)
|
||||
end
|
||||
|
||||
|
|
|
@ -28,15 +28,15 @@ module Sidekiq
|
|||
attr_reader :thread
|
||||
attr_reader :job
|
||||
|
||||
def initialize(mgr)
|
||||
def initialize(mgr, options)
|
||||
@mgr = mgr
|
||||
@down = false
|
||||
@done = false
|
||||
@job = nil
|
||||
@thread = nil
|
||||
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
|
||||
@reloader = Sidekiq.options[:reloader]
|
||||
@job_logger = (mgr.options[:job_logger] || Sidekiq::JobLogger).new
|
||||
@strategy = options[:fetch]
|
||||
@reloader = options[:reloader] || proc { |&block| block.call }
|
||||
@job_logger = (options[:job_logger] || Sidekiq::JobLogger).new
|
||||
@retrier = Sidekiq::JobRetry.new
|
||||
end
|
||||
|
||||
|
|
|
@ -50,7 +50,8 @@ describe 'Actors' do
|
|||
end
|
||||
|
||||
it 'can start and stop' do
|
||||
f = Sidekiq::Processor.new(Mgr.new)
|
||||
m = Mgr.new
|
||||
f = Sidekiq::Processor.new(m, m.options)
|
||||
f.terminate
|
||||
end
|
||||
|
||||
|
@ -74,14 +75,16 @@ describe 'Actors' do
|
|||
end
|
||||
end
|
||||
def options
|
||||
{ :concurrency => 3, :queues => ['default'] }
|
||||
opts = { :concurrency => 3, :queues => ['default'] }
|
||||
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
|
||||
opts
|
||||
end
|
||||
end
|
||||
|
||||
it 'can process' do
|
||||
mgr = Mgr.new
|
||||
|
||||
p = Sidekiq::Processor.new(mgr)
|
||||
p = Sidekiq::Processor.new(mgr, mgr.options)
|
||||
JoeWorker.perform_async(0)
|
||||
|
||||
a = $count
|
||||
|
@ -93,7 +96,7 @@ describe 'Actors' do
|
|||
it 'deals with errors' do
|
||||
mgr = Mgr.new
|
||||
|
||||
p = Sidekiq::Processor.new(mgr)
|
||||
p = Sidekiq::Processor.new(mgr, mgr.options)
|
||||
JoeWorker.perform_async("boom")
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 1, q.size
|
||||
|
@ -116,7 +119,7 @@ describe 'Actors' do
|
|||
it 'gracefully kills' do
|
||||
mgr = Mgr.new
|
||||
|
||||
p = Sidekiq::Processor.new(mgr)
|
||||
p = Sidekiq::Processor.new(mgr, mgr.options)
|
||||
JoeWorker.perform_async(1)
|
||||
q = Sidekiq::Queue.new
|
||||
assert_equal 1, q.size
|
||||
|
|
|
@ -419,6 +419,7 @@ describe Sidekiq::CLI do
|
|||
|
||||
describe '#run' do
|
||||
before do
|
||||
Sidekiq.options[:concurrency] = 2
|
||||
Sidekiq.options[:require] = './test/fake_env.rb'
|
||||
end
|
||||
|
||||
|
|
|
@ -52,7 +52,7 @@ describe Sidekiq::BasicFetch do
|
|||
assert_equal 0, q1.size
|
||||
assert_equal 0, q2.size
|
||||
|
||||
Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []})
|
||||
fetch.bulk_requeue(works, {:queues => []})
|
||||
assert_equal 2, q1.size
|
||||
assert_equal 1, q2.size
|
||||
end
|
||||
|
|
|
@ -8,7 +8,7 @@ describe Sidekiq::Manager do
|
|||
end
|
||||
|
||||
def new_manager(opts)
|
||||
Sidekiq::Manager.new(opts)
|
||||
Sidekiq::Manager.new(opts.merge(fetch: Sidekiq::BasicFetch.new(opts)))
|
||||
end
|
||||
|
||||
it 'creates N processor instances' do
|
||||
|
|
|
@ -78,10 +78,8 @@ describe Sidekiq::Middleware do
|
|||
end
|
||||
|
||||
boss = Minitest::Mock.new
|
||||
boss.expect(:options, {:queues => ['default'] }, [])
|
||||
boss.expect(:options, {:queues => ['default'] }, [])
|
||||
boss.expect(:options, {:queues => ['default'] }, [])
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
opts = {:queues => ['default'] }
|
||||
processor = Sidekiq::Processor.new(boss, opts)
|
||||
boss.expect(:processor_done, nil, [processor])
|
||||
processor.process(Sidekiq::BasicFetch::UnitOfWork.new('queue:default', msg))
|
||||
assert_equal %w(2 before 3 before 1 before work_performed 1 after 3 after 2 after), $recorder.flatten
|
||||
|
|
|
@ -11,10 +11,9 @@ describe Sidekiq::Processor do
|
|||
before do
|
||||
$invokes = 0
|
||||
@mgr = Minitest::Mock.new
|
||||
@mgr.expect(:options, {:queues => ['default']})
|
||||
@mgr.expect(:options, {:queues => ['default']})
|
||||
@mgr.expect(:options, {:queues => ['default']})
|
||||
@processor = ::Sidekiq::Processor.new(@mgr)
|
||||
opts = {:queues => ['default']}
|
||||
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
|
||||
@processor = ::Sidekiq::Processor.new(@mgr, opts)
|
||||
end
|
||||
|
||||
class MockWorker
|
||||
|
@ -327,11 +326,9 @@ describe Sidekiq::Processor do
|
|||
end
|
||||
|
||||
before do
|
||||
opts = {:queues => ['default'], job_logger: CustomJobLogger}
|
||||
@mgr = Minitest::Mock.new
|
||||
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
|
||||
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
|
||||
@mgr.expect(:options, {:queues => ['default'], job_logger: CustomJobLogger})
|
||||
@processor = ::Sidekiq::Processor.new(@mgr)
|
||||
@processor = ::Sidekiq::Processor.new(@mgr, opts)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -356,18 +353,15 @@ describe Sidekiq::Processor do
|
|||
|
||||
describe 'custom job logger class' do
|
||||
before do
|
||||
@mgr = Minitest::Mock.new
|
||||
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
|
||||
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
|
||||
@mgr.expect(:options, {:queues => ['default'], :job_logger => CustomJobLogger})
|
||||
@processor = ::Sidekiq::Processor.new(@mgr)
|
||||
opts = {:queues => ['default'], :job_logger => CustomJobLogger}
|
||||
opts[:fetch] = Sidekiq::BasicFetch.new(opts)
|
||||
@processor = ::Sidekiq::Processor.new(nil, opts)
|
||||
end
|
||||
|
||||
it 'is called instead default Sidekiq::JobLogger' do
|
||||
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
||||
@processor.process(work(msg))
|
||||
assert_equal 1, $invokes
|
||||
@mgr.verify
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue