mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Create separate data structures for queue and worker
Rspec allows expectations to be written using change(LogWorker.jobs, :size). The result of .jobs in this case was a derived array that wasn't manipulated under the covers. So when Rspec went to check the size of the array after the fact, it appeared that nothing had changed. This sets up a true array for the jobs for a single worker and pulls jobs off that array so rspec can properly make assertions.
This commit is contained in:
parent
2748b62ba0
commit
ed55232a78
1 changed files with 30 additions and 10 deletions
|
@ -68,7 +68,7 @@ module Sidekiq
|
|||
def raw_push(payloads)
|
||||
if Sidekiq::Testing.fake?
|
||||
payloads.each do |job|
|
||||
Queues.jobs[job['queue']] << Sidekiq.load_json(Sidekiq.dump_json(job))
|
||||
Queues.push(job['queue'], job['class'], Sidekiq.load_json(Sidekiq.dump_json(job)))
|
||||
end
|
||||
true
|
||||
elsif Sidekiq::Testing.inline?
|
||||
|
@ -130,15 +130,35 @@ module Sidekiq
|
|||
#
|
||||
class << self
|
||||
def [](queue)
|
||||
jobs[queue]
|
||||
jobs_by_queue[queue]
|
||||
end
|
||||
|
||||
def jobs
|
||||
@jobs ||= Hash.new { |hash, key| hash[key] = [] }
|
||||
def push(queue, klass, job)
|
||||
jobs_by_queue[queue] << job
|
||||
jobs_by_worker[klass] << job
|
||||
end
|
||||
|
||||
def jobs_by_queue
|
||||
@jobs_by_queue ||= Hash.new { |hash, key| hash[key] = [] }
|
||||
end
|
||||
|
||||
def jobs_by_worker
|
||||
@jobs_by_worker ||= Hash.new { |hash, key| hash[key] = [] }
|
||||
end
|
||||
|
||||
def delete_for(jid, queue, klass)
|
||||
jobs_by_queue[queue].delete_if { |job| job["jid"] == jid }
|
||||
jobs_by_worker[klass].delete_if { |job| job["jid"] == jid }
|
||||
end
|
||||
|
||||
def clear_for(queue, klass)
|
||||
jobs_by_queue[queue].clear
|
||||
jobs_by_worker[klass].clear
|
||||
end
|
||||
|
||||
def clear_all
|
||||
jobs.clear
|
||||
jobs_by_queue.clear
|
||||
jobs_by_worker.clear
|
||||
end
|
||||
end
|
||||
end
|
||||
|
@ -208,19 +228,19 @@ module Sidekiq
|
|||
|
||||
# Jobs queued for this worker
|
||||
def jobs
|
||||
Queues.jobs[queue].select { |job| job["class"] == self.to_s }
|
||||
Queues.jobs_by_worker[self.to_s]
|
||||
end
|
||||
|
||||
# Clear all jobs for this worker
|
||||
def clear
|
||||
Queues.jobs[queue].clear
|
||||
Queues.clear_for(queue, self.to_s)
|
||||
end
|
||||
|
||||
# Drain and run all jobs for this worker
|
||||
def drain
|
||||
while jobs.any?
|
||||
next_job = jobs.first
|
||||
Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] }
|
||||
Queues.delete_for(next_job["jid"], queue, self.to_s)
|
||||
process_job(next_job)
|
||||
end
|
||||
end
|
||||
|
@ -229,7 +249,7 @@ module Sidekiq
|
|||
def perform_one
|
||||
raise(EmptyQueueError, "perform_one called with empty job queue") if jobs.empty?
|
||||
next_job = jobs.first
|
||||
Queues.jobs[queue].delete_if { |job| job["jid"] == next_job["jid"] }
|
||||
Queues.delete_for(next_job["jid"], queue, self.to_s)
|
||||
process_job(next_job)
|
||||
end
|
||||
|
||||
|
@ -249,7 +269,7 @@ module Sidekiq
|
|||
|
||||
class << self
|
||||
def jobs # :nodoc:
|
||||
Queues.jobs.values.flatten
|
||||
Queues.jobs_by_queue.values.flatten
|
||||
end
|
||||
|
||||
# Clear all queued jobs across all workers
|
||||
|
|
Loading…
Reference in a new issue