diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 1f289496..0263b205 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -1,5 +1,4 @@ require 'sidekiq/util' -require 'sidekiq/actor' require 'sidekiq/middleware/server/retry_jobs' require 'sidekiq/middleware/server/logging' @@ -10,12 +9,12 @@ module Sidekiq # processes it. It instantiates the worker, runs the middleware # chain and then calls Sidekiq::Worker#perform. class Processor - # To prevent a memory leak, ensure that stats expire. However, they should take up a minimal amount of storage - # so keep them around for a long time + # To prevent a memory leak, ensure that stats expire. However, they + # should take up a minimal amount of storage so keep them around + # for a long time. STATS_TIMEOUT = 24 * 60 * 60 * 365 * 5 include Util - include Actor def self.default_middleware Middleware::Chain.new do |m| @@ -28,18 +27,54 @@ module Sidekiq end end - attr_accessor :proxy_id + attr_reader :thread - def initialize(boss) - @boss = boss + def initialize(mgr) + @mgr = mgr + @done = false + @work = ::Queue.new + @thread = safe_thread("processor", &method(:run)) + end + + def terminate(wait=false) + @done = true + @work << nil + # unlike the other actors, terminate does not wait + # for the thread to finish because we don't know how + # long the job will take to finish. Instead we + # provide a `kill` method to call after the shutdown + # timeout passes. + @thread.value if wait + end + + def kill(wait=false) + @thread.raise ::Sidekiq::Shutdown + @thread.value if wait end def process(work) + raise ArgumentError, "Processor is shut down!" if @done + @work << work + end + + private + + def run + begin + while !@done + job = @work.pop + go(job) if job + end + rescue Exception => ex + Sidekiq.logger.warn(ex.message) + @mgr.processor_died(self, ex) + end + end + + def go(work) msgstr = work.message queue = work.queue_name - @boss.async.real_thread(proxy_id, Thread.current) - ack = false begin msg = Sidekiq.load_json(msgstr) @@ -69,19 +104,13 @@ module Sidekiq work.acknowledge if ack end - @boss.async.processor_done(current_actor) - end - - def inspect - "" + @mgr.processor_done(self) end def execute_job(worker, cloned_args) worker.perform(*cloned_args) end - private - def thread_identity @str ||= Thread.current.object_id.to_s(36) end @@ -94,7 +123,7 @@ module Sidekiq Sidekiq.redis do |conn| conn.multi do conn.hmset("#{identity}:workers", thread_identity, hash) - conn.expire("#{identity}:workers", 60*60*4) + conn.expire("#{identity}:workers", 14400) end end end diff --git a/test/test_actors.rb b/test/test_actors.rb index 41afbaf9..fa61643f 100644 --- a/test/test_actors.rb +++ b/test/test_actors.rb @@ -1,9 +1,16 @@ require_relative 'helper' +require 'sidekiq/cli' require 'sidekiq/fetch' +require 'sidekiq/processor' class TestActors < Sidekiq::Test class SomeWorker include Sidekiq::Worker + def perform(slp) + raise "boom" if slp == "boom" + sleep(slp) if slp > 0 + $count += 1 + end end describe 'fetcher' do @@ -14,7 +21,7 @@ class TestActors < Sidekiq::Test end it 'can fetch' do - SomeWorker.perform_async + SomeWorker.perform_async(0) mgr = Minitest::Mock.new mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork]) @@ -24,8 +31,6 @@ class TestActors < Sidekiq::Test sleep 0.001 f.terminate mgr.verify - - #assert_equal Sidekiq::BasicFetch::UnitOfWork, job.class end end @@ -37,13 +42,12 @@ class TestActors < Sidekiq::Test end it 'can schedule' do + Sidekiq.redis {|c| c.flushdb} + ss = Sidekiq::ScheduledSet.new - ss.clear - q = Sidekiq::Queue.new - q.clear - SomeWorker.perform_in(0.01) + SomeWorker.perform_in(0.01, 0) assert_equal 0, q.size assert_equal 1, ss.size @@ -57,4 +61,93 @@ class TestActors < Sidekiq::Test end end + describe 'processor' do + before do + $count = 0 + end + + it 'can start and stop' do + f = Sidekiq::Processor.new(nil) + f.terminate + end + + class Mgr + attr_reader :mutex + attr_reader :cond + def initialize + @mutex = ::Mutex.new + @cond = ::ConditionVariable.new + end + def processor_done(inst) + @mutex.synchronize do + @cond.signal + end + end + def processor_died(inst, err) + @mutex.synchronize do + @cond.signal + end + end + end + + it 'can process' do + mgr = Mgr.new + + p = Sidekiq::Processor.new(mgr) + SomeWorker.perform_async(0) + + job = Sidekiq.redis { |c| c.lpop("queue:default") } + uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) + a = $count + mgr.mutex.synchronize do + p.process(uow) + mgr.cond.wait(mgr.mutex) + end + b = $count + assert_equal a + 1, b + + assert_equal "sleep", p.thread.status + p.terminate(true) + assert_equal false, p.thread.status + end + + it 'deals with errors' do + mgr = Mgr.new + + p = Sidekiq::Processor.new(mgr) + SomeWorker.perform_async("boom") + + job = Sidekiq.redis { |c| c.lpop("queue:default") } + uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) + a = $count + mgr.mutex.synchronize do + p.process(uow) + mgr.cond.wait(mgr.mutex) + end + b = $count + assert_equal a, b + + assert_equal false, p.thread.status + p.terminate(true) + end + + it 'gracefully kills' do + mgr = Mgr.new + + p = Sidekiq::Processor.new(mgr) + SomeWorker.perform_async(0.1) + + job = Sidekiq.redis { |c| c.lpop("queue:default") } + uow = Sidekiq::BasicFetch::UnitOfWork.new('default', job) + a = $count + p.process(uow) + sleep(0.02) + p.terminate + p.kill(true) + + b = $count + assert_equal a, b + assert_equal false, p.thread.status + end + end end