mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Speedup UnitOfWork#queue_name
Profiling with bin/sidekiqload, results: 1. cpu Before: ``` 1046 (4.7%) 1046 (4.7%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` After: ``` 331 (1.3%) 331 (1.3%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` 2. object allocations Before: ``` 400012 (4.7%) 400012 (4.7%) Sidekiq::BasicFetch::UnitOfWork#queue_name ``` After: ``` 100003 (1.2%) 100003 (1.2%) Sidekiq::BasicFetch::UnitOfWork#queue_name ```
This commit is contained in:
parent
2c5ab340c7
commit
9988afc207
2 changed files with 24 additions and 8 deletions
|
@ -14,12 +14,12 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def queue_name
|
||||
queue.sub(/.*queue:/, "")
|
||||
queue.delete_prefix("queue:")
|
||||
end
|
||||
|
||||
def requeue
|
||||
Sidekiq.redis do |conn|
|
||||
conn.rpush("queue:#{queue_name}", job)
|
||||
conn.rpush(queue, job)
|
||||
end
|
||||
end
|
||||
}
|
||||
|
@ -61,14 +61,14 @@ module Sidekiq
|
|||
Sidekiq.logger.debug { "Re-queueing terminated jobs" }
|
||||
jobs_to_requeue = {}
|
||||
inprogress.each do |unit_of_work|
|
||||
jobs_to_requeue[unit_of_work.queue_name] ||= []
|
||||
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job
|
||||
jobs_to_requeue[unit_of_work.queue] ||= []
|
||||
jobs_to_requeue[unit_of_work.queue] << unit_of_work.job
|
||||
end
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
conn.pipelined do
|
||||
jobs_to_requeue.each do |queue, jobs|
|
||||
conn.rpush("queue:#{queue}", jobs)
|
||||
conn.rpush(queue, jobs)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -5,12 +5,18 @@ require 'sidekiq/api'
|
|||
|
||||
describe Sidekiq::BasicFetch do
|
||||
before do
|
||||
@prev_redis = Sidekiq.instance_variable_get(:@redis)
|
||||
Sidekiq.redis = { :namespace => 'fuzzy' }
|
||||
Sidekiq.redis do |conn|
|
||||
conn.flushdb
|
||||
conn.redis.flushdb
|
||||
conn.rpush('queue:basic', 'msg')
|
||||
end
|
||||
end
|
||||
|
||||
after do
|
||||
Sidekiq.redis = @prev_redis
|
||||
end
|
||||
|
||||
it 'retrieves' do
|
||||
fetch = Sidekiq::BasicFetch.new(:queues => ['basic', 'bar'])
|
||||
uow = fetch.retrieve_work
|
||||
|
@ -31,12 +37,22 @@ describe Sidekiq::BasicFetch do
|
|||
end
|
||||
|
||||
it 'bulk requeues' do
|
||||
Sidekiq.redis do |conn|
|
||||
conn.rpush('queue:foo', ['bob', 'bar'])
|
||||
conn.rpush('queue:bar', 'widget')
|
||||
end
|
||||
|
||||
q1 = Sidekiq::Queue.new('foo')
|
||||
q2 = Sidekiq::Queue.new('bar')
|
||||
assert_equal 2, q1.size
|
||||
assert_equal 1, q2.size
|
||||
|
||||
fetch = Sidekiq::BasicFetch.new(:queues => ['foo', 'bar'])
|
||||
works = 3.times.map { fetch.retrieve_work }
|
||||
assert_equal 0, q1.size
|
||||
assert_equal 0, q2.size
|
||||
uow = Sidekiq::BasicFetch::UnitOfWork
|
||||
Sidekiq::BasicFetch.bulk_requeue([uow.new('fuzzy:queue:foo', 'bob'), uow.new('fuzzy:queue:foo', 'bar'), uow.new('fuzzy:queue:bar', 'widget')], {:queues => []})
|
||||
|
||||
Sidekiq::BasicFetch.bulk_requeue(works, {:queues => []})
|
||||
assert_equal 2, q1.size
|
||||
assert_equal 1, q2.size
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue