1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Refactor Processor to use bare threads

This commit is contained in:
Mike Perham 2015-10-05 10:13:00 -07:00
parent b182b117f1
commit 182db329cc
2 changed files with 146 additions and 24 deletions

View file

@ -1,5 +1,4 @@
require 'sidekiq/util'
require 'sidekiq/actor'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'
@ -10,12 +9,12 @@ module Sidekiq
# processes it. It instantiates the worker, runs the middleware
# chain and then calls Sidekiq::Worker#perform.
class Processor
# To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage
# so keep them around for a long time
# To prevent a memory leak, ensure that stats expire. However, they
# should take up a minimal amount of storage so keep them around
# for a long time.
STATS_TIMEOUT = 24 * 60 * 60 * 365 * 5
include Util
include Actor
def self.default_middleware
Middleware::Chain.new do |m|
@ -28,18 +27,54 @@ module Sidekiq
end
end
attr_accessor :proxy_id
attr_reader :thread
def initialize(boss)
@boss = boss
def initialize(mgr)
@mgr = mgr
@done = false
@work = ::Queue.new
@thread = safe_thread("processor", &method(:run))
end
def terminate(wait=false)
@done = true
@work << nil
# unlike the other actors, terminate does not wait
# for the thread to finish because we don't know how
# long the job will take to finish. Instead we
# provide a `kill` method to call after the shutdown
# timeout passes.
@thread.value if wait
end
def kill(wait=false)
@thread.raise ::Sidekiq::Shutdown
@thread.value if wait
end
def process(work)
raise ArgumentError, "Processor is shut down!" if @done
@work << work
end
private
def run
begin
while !@done
job = @work.pop
go(job) if job
end
rescue Exception => ex
Sidekiq.logger.warn(ex.message)
@mgr.processor_died(self, ex)
end
end
def go(work)
msgstr = work.message
queue = work.queue_name
@boss.async.real_thread(proxy_id, Thread.current)
ack = false
begin
msg = Sidekiq.load_json(msgstr)
@ -69,19 +104,13 @@ module Sidekiq
work.acknowledge if ack
end
@boss.async.processor_done(current_actor)
end
def inspect
"<Processor##{object_id.to_s(16)}>"
@mgr.processor_done(self)
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
private
def thread_identity
@str ||= Thread.current.object_id.to_s(36)
end
@ -94,7 +123,7 @@ module Sidekiq
Sidekiq.redis do |conn|
conn.multi do
conn.hmset("#{identity}:workers", thread_identity, hash)
conn.expire("#{identity}:workers", 60*60*4)
conn.expire("#{identity}:workers", 14400)
end
end
end

View file

@ -1,9 +1,16 @@
require_relative 'helper'
require 'sidekiq/cli'
require 'sidekiq/fetch'
require 'sidekiq/processor'
class TestActors < Sidekiq::Test
class SomeWorker
include Sidekiq::Worker
def perform(slp)
raise "boom" if slp == "boom"
sleep(slp) if slp > 0
$count += 1
end
end
describe 'fetcher' do
@ -14,7 +21,7 @@ class TestActors < Sidekiq::Test
end
it 'can fetch' do
SomeWorker.perform_async
SomeWorker.perform_async(0)
mgr = Minitest::Mock.new
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
@ -24,8 +31,6 @@ class TestActors < Sidekiq::Test
sleep 0.001
f.terminate
mgr.verify
#assert_equal Sidekiq::BasicFetch::UnitOfWork, job.class
end
end
@ -37,13 +42,12 @@ class TestActors < Sidekiq::Test
end
it 'can schedule' do
Sidekiq.redis {|c| c.flushdb}
ss = Sidekiq::ScheduledSet.new
ss.clear
q = Sidekiq::Queue.new
q.clear
SomeWorker.perform_in(0.01)
SomeWorker.perform_in(0.01, 0)
assert_equal 0, q.size
assert_equal 1, ss.size
@ -57,4 +61,93 @@ class TestActors < Sidekiq::Test
end
end
describe 'processor' do
before do
$count = 0
end
it 'can start and stop' do
f = Sidekiq::Processor.new(nil)
f.terminate
end
class Mgr
attr_reader :mutex
attr_reader :cond
def initialize
@mutex = ::Mutex.new
@cond = ::ConditionVariable.new
end
def processor_done(inst)
@mutex.synchronize do
@cond.signal
end
end
def processor_died(inst, err)
@mutex.synchronize do
@cond.signal
end
end
end
it 'can process' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async(0)
job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
mgr.mutex.synchronize do
p.process(uow)
mgr.cond.wait(mgr.mutex)
end
b = $count
assert_equal a + 1, b
assert_equal "sleep", p.thread.status
p.terminate(true)
assert_equal false, p.thread.status
end
it 'deals with errors' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async("boom")
job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
mgr.mutex.synchronize do
p.process(uow)
mgr.cond.wait(mgr.mutex)
end
b = $count
assert_equal a, b
assert_equal false, p.thread.status
p.terminate(true)
end
it 'gracefully kills' do
mgr = Mgr.new
p = Sidekiq::Processor.new(mgr)
SomeWorker.perform_async(0.1)
job = Sidekiq.redis { |c| c.lpop("queue:default") }
uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job)
a = $count
p.process(uow)
sleep(0.02)
p.terminate
p.kill(true)
b = $count
assert_equal a, b
assert_equal false, p.thread.status
end
end
end