1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Change old "message" terminology to "job"

This commit is contained in:
Mike Perham 2015-10-23 15:05:50 -07:00
parent 18a513caeb
commit 93dddd7bd9
4 changed files with 18 additions and 18 deletions

View file

@ -6,7 +6,7 @@ module Sidekiq
# can check if the process is shutting down.
TIMEOUT = 2
UnitOfWork = Struct.new(:queue, :message) do
UnitOfWork = Struct.new(:queue, :job) do
def acknowledge
# nothing to do
end
@ -17,7 +17,7 @@ module Sidekiq
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", message)
conn.rpush("queue:#{queue_name}", job)
end
end
end
@ -61,7 +61,7 @@ module Sidekiq
jobs_to_requeue = {}
inprogress.each do |unit_of_work|
jobs_to_requeue[unit_of_work.queue_name] ||= []
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message
jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job
end
Sidekiq.redis do |conn|
@ -71,7 +71,7 @@ module Sidekiq
end
end
end
Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis")
Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis")
rescue => ex
Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}")
end

View file

@ -104,7 +104,7 @@ module Sidekiq
def handle_fetch_exception(ex)
if !@down
@down = Time.now
logger.error("Error fetching message: #{ex}")
logger.error("Error fetching job: #{ex}")
ex.backtrace.each do |bt|
logger.error(bt)
end
@ -113,23 +113,23 @@ module Sidekiq
end
def process(work)
msgstr = work.message
jobstr = work.job
queue = work.queue_name
ack = false
begin
msg = Sidekiq.load_json(msgstr)
klass = msg['class'.freeze].constantize
job = Sidekiq.load_json(jobstr)
klass = job['class'.freeze].constantize
worker = klass.new
worker.jid = msg['jid'.freeze]
worker.jid = job['jid'.freeze]
stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
stats(worker, job, queue) do
Sidekiq.server_middleware.invoke(worker, job, queue) do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
ack = true
execute_job(worker, cloned(msg['args'.freeze]))
execute_job(worker, cloned(job['args'.freeze]))
end
end
ack = true
@ -139,7 +139,7 @@ module Sidekiq
# we didn't properly finish it.
ack = false
rescue Exception => ex
handle_exception(ex, msg || { :message => msgstr })
handle_exception(ex, job || { :job => jobstr })
raise
ensure
work.acknowledge if ack
@ -158,9 +158,9 @@ module Sidekiq
PROCESSED = Concurrent::AtomicFixnum.new
FAILURE = Concurrent::AtomicFixnum.new
def stats(worker, msg, queue)
def stats(worker, job, queue)
tid = thread_identity
WORKER_STATE[tid] = {:queue => queue, :payload => msg, :run_at => Time.now.to_i }
WORKER_STATE[tid] = {:queue => queue, :payload => job, :run_at => Time.now.to_i }
begin
yield
@ -174,7 +174,7 @@ module Sidekiq
end
# Deep clone the arguments passed to the worker so that if
# the message fails, what is pushed back onto Redis hasn't
# the job fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
Marshal.load(Marshal.dump(ary))

View file

@ -20,7 +20,7 @@ class TestFetcher < Sidekiq::Test
uow = fetch.retrieve_work
refute_nil uow
assert_equal 'basic', uow.queue_name
assert_equal 'msg', uow.message
assert_equal 'msg', uow.job
q = Sidekiq::Queue.new('basic')
assert_equal 0, q.size
uow.requeue

View file

@ -87,7 +87,7 @@ class TestProcessor < Sidekiq::Test
before do
work.expect(:queue_name, 'queue:default')
work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args }))
Sidekiq.server_middleware do |chain|
chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job
end