From 9988afc20734d4844340b1408e13b59de94c95e9 Mon Sep 17 00:00:00 2001 From: fatkodima Date: Tue, 1 Oct 2019 01:26:31 +0300 Subject: [PATCH] Speedup UnitOfWork#queue_name Profiling with bin/sidekiqload, results: 1. cpu Before: ``` 1046 (4.7%) 1046 (4.7%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` After: ``` 331 (1.3%) 331 (1.3%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` 2. object allocations Before: ``` 400012 (4.7%) 400012 (4.7%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` After: ``` 100003 (1.2%) 100003 (1.2%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` --- lib/sidekiq/fetch.rb | 10 +++++----- test/test_fetch.rb | 22 +++++++++++++++++++--- 2 files changed, 24 insertions(+), 8 deletions(-) diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 1f90b0c2..8ce9970d 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -14,12 +14,12 @@ module Sidekiq end def queue_name - queue.sub(/.*queue:/, "") + queue.delete_prefix("queue:") end def requeue Sidekiq.redis do |conn| - conn.rpush("queue:#{queue_name}", job) + conn.rpush(queue, job) end end } @@ -61,14 +61,14 @@ module Sidekiq Sidekiq.logger.debug { "Re-queueing terminated jobs" } jobs_to_requeue = {} inprogress.each do |unit_of_work| - jobs_to_requeue[unit_of_work.queue_name] ||= [] - jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job + jobs_to_requeue[unit_of_work.queue] ||= [] + jobs_to_requeue[unit_of_work.queue] << unit_of_work.job end Sidekiq.redis do |conn| conn.pipelined do jobs_to_requeue.each do |queue, jobs| - conn.rpush("queue:#{queue}", jobs) + conn.rpush(queue, jobs) end end end diff --git a/test/test_fetch.rb b/test/test_fetch.rb index e8c8f437..449d801d 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -5,12 +5,18 @@ require 'sidekiq/api' describe Sidekiq::BasicFetch do before do + @prev_redis = Sidekiq.instance_variable_get(:@redis) + Sidekiq.redis = { :namespace => 'fuzzy' } Sidekiq.redis do |conn| - conn.flushdb + conn.redis.flushdb conn.rpush('queue:basic', 'msg') end end + after do + Sidekiq.redis = @prev_redis + end + it 'retrieves' do fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar']) uow = fetch.retrieve_work @@ -31,12 +37,22 @@ describe Sidekiq::BasicFetch do end it 'bulk requeues' do + Sidekiq.redis do |conn| + conn.rpush('queue:foo', ['bob', 'bar']) + conn.rpush('queue:bar', 'widget') + end + q1 = Sidekiq::Queue.new('foo') q2 = Sidekiq::Queue.new('bar') + assert_equal 2, q1.size + assert_equal 1, q2.size + + fetch = Sidekiq::BasicFetch.new(:queues => ['foo', 'bar']) + works = 3.times.map { fetch.retrieve_work } assert_equal 0, q1.size assert_equal 0, q2.size - uow = Sidekiq::BasicFetch::UnitOfWork - Sidekiq::BasicFetch.bulk_requeue([uow.new('fuzzy:queue:foo', 'bob'), uow.new('fuzzy:queue:foo', 'bar'), uow.new('fuzzy:queue:bar', 'widget')], {:queues => []}) + + Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []}) assert_equal 2, q1.size assert_equal 1, q2.size end