mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Defer loading message JSON until a full stack is available.
This commit is contained in:
parent
29f45e246e
commit
7a45208cf0
6 changed files with 20 additions and 12 deletions
|
@ -1,3 +1,9 @@
|
|||
HEAD
|
||||
-----------
|
||||
|
||||
- Defer loading JSON until a full Thread stack is available. Celluloid's
|
||||
standard 4k actor stack will lead to crashes when parsing large payloads.
|
||||
|
||||
2.0.3
|
||||
-----------
|
||||
- Fix sidekiq-web's navbar on mobile devices and windows <= 980px (ezkl)
|
||||
|
|
|
@ -110,7 +110,7 @@ module Sidekiq
|
|||
processor = @ready.pop
|
||||
@in_progress[processor.object_id] = [msg, queue]
|
||||
@busy << processor
|
||||
processor.process!(Sidekiq.load_json(msg), queue)
|
||||
processor.process!(msg, queue)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -30,15 +30,16 @@ module Sidekiq
|
|||
@boss = boss
|
||||
end
|
||||
|
||||
def process(msg, queue)
|
||||
klass = constantize(msg['class'])
|
||||
worker = klass.new
|
||||
|
||||
def process(msgstr, queue)
|
||||
# Celluloid actor calls are performed within a Fiber.
|
||||
# This would give us a terribly small 4KB stack on MRI
|
||||
# so we use Celluloid's defer to run things in a thread pool
|
||||
# in order to get a full-sized stack for the Worker.
|
||||
defer do
|
||||
msg = Sidekiq.load_json(msgstr)
|
||||
klass = constantize(msg['class'])
|
||||
worker = klass.new
|
||||
|
||||
stats(worker, msg, queue) do
|
||||
Sidekiq.server_middleware.invoke(worker, msg, queue) do
|
||||
worker.perform(*msg['args'])
|
||||
|
|
|
@ -43,9 +43,10 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
class CustomWorker
|
||||
$recorder = []
|
||||
include Sidekiq::Worker
|
||||
def perform(recorder)
|
||||
recorder << ['work_performed']
|
||||
$recorder << ['work_performed']
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -56,18 +57,18 @@ class TestMiddleware < MiniTest::Unit::TestCase
|
|||
|
||||
it 'executes middleware in the proper order' do
|
||||
recorder = []
|
||||
msg = { 'class' => CustomWorker.to_s, 'args' => [recorder] }
|
||||
msg = Sidekiq.dump_json({ 'class' => CustomWorker.to_s, 'args' => [$recorder] })
|
||||
|
||||
Sidekiq.server_middleware do |chain|
|
||||
# should only add once, second should be ignored
|
||||
2.times { |i| chain.add CustomMiddleware, i.to_s, recorder }
|
||||
2.times { |i| chain.add CustomMiddleware, i.to_s, $recorder }
|
||||
end
|
||||
|
||||
boss = MiniTest::Mock.new
|
||||
processor = Sidekiq::Processor.new(boss)
|
||||
boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg, 'default')
|
||||
assert_equal %w(0 before work_performed 0 after), recorder.flatten
|
||||
assert_equal %w(0 before work_performed 0 after), $recorder.flatten
|
||||
end
|
||||
|
||||
it 'allows middleware to abruptly stop processing rest of chain' do
|
||||
|
|
|
@ -20,7 +20,7 @@ class TestProcessor < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'processes as expected' do
|
||||
msg = { 'class' => MockWorker.to_s, 'args' => ['myarg'] }
|
||||
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
|
||||
processor = ::Sidekiq::Processor.new(@boss)
|
||||
@boss.expect(:processor_done!, nil, [processor])
|
||||
processor.process(msg, 'default')
|
||||
|
|
|
@ -18,7 +18,7 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'updates global stats in the success case' do
|
||||
msg = { 'class' => DumbWorker.to_s, 'args' => [""] }
|
||||
msg = Sidekiq.dump_json({ 'class' => DumbWorker.to_s, 'args' => [""] })
|
||||
boss = MiniTest::Mock.new
|
||||
|
||||
@redis.with do |conn|
|
||||
|
@ -44,7 +44,7 @@ class TestStats < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'updates global stats in the error case' do
|
||||
msg = { 'class' => DumbWorker.to_s, 'args' => [nil] }
|
||||
msg = Sidekiq.dump_json({ 'class' => DumbWorker.to_s, 'args' => [nil] })
|
||||
boss = MiniTest::Mock.new
|
||||
|
||||
@redis.with do |conn|
|
||||
|
|
Loading…
Reference in a new issue