1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Fixed qu, queue_classic, sneakers adapters

This commit is contained in:
Cristian Bica 2014-08-07 00:09:28 +03:00
parent 38ee4fd8d3
commit 6ff5972c0d
3 changed files with 13 additions and 9 deletions

View file

@ -5,7 +5,9 @@ module ActiveJob
class QuAdapter class QuAdapter
class << self class << self
def enqueue(job, *args) def enqueue(job, *args)
Qu::Payload.new(klass: JobWrapper, args: [job, *args], queue: job.queue_name).push Qu::Payload.new(klass: JobWrapper, args: [job.name, *args]).tap do |payload|
payload.instance_variable_set(:@queue, job.queue_name)
end.push
end end
def enqueue_at(job, timestamp, *args) def enqueue_at(job, timestamp, *args)
@ -14,8 +16,8 @@ module ActiveJob
end end
class JobWrapper < Qu::Job class JobWrapper < Qu::Job
def initialize(job, *args) def initialize(job_name, *args)
@job = job @job = job_name.constantize
@args = args @args = args
end end

View file

@ -5,7 +5,7 @@ module ActiveJob
class QueueClassicAdapter class QueueClassicAdapter
class << self class << self
def enqueue(job, *args) def enqueue(job, *args)
QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job, *args) QC::Queue.new(job.queue_name).enqueue("#{JobWrapper.name}.perform", job.name, *args)
end end
def enqueue_at(job, timestamp, *args) def enqueue_at(job, timestamp, *args)
@ -14,8 +14,8 @@ module ActiveJob
end end
class JobWrapper class JobWrapper
def self.perform(job, *args) def self.perform(job_name, *args)
job.new.execute *args job_name.constantize.new.execute *args
end end
end end
end end

View file

@ -10,7 +10,7 @@ module ActiveJob
def enqueue(job, *args) def enqueue(job, *args)
@monitor.synchronize do @monitor.synchronize do
JobWrapper.from_queue job.queue_name JobWrapper.from_queue job.queue_name
JobWrapper.enqueue [ job, *args ] JobWrapper.enqueue ActiveSupport::JSON.encode([ job.name, *args ])
end end
end end
@ -22,8 +22,10 @@ module ActiveJob
class JobWrapper class JobWrapper
include Sneakers::Worker include Sneakers::Worker
def work(job, *args) def work(msg)
job.new.execute *args job_name, *args = ActiveSupport::JSON.decode(msg)
job_name.constantize.new.execute *args
ack!
end end
end end
end end