diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 6071b64f..b1c146b1 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -6,7 +6,7 @@ module Sidekiq # can check if the process is shutting down. TIMEOUT = 2 - UnitOfWork = Struct.new(:queue, :message) do + UnitOfWork = Struct.new(:queue, :job) do def acknowledge # nothing to do end @@ -17,7 +17,7 @@ module Sidekiq def requeue Sidekiq.redis do |conn| - conn.rpush("queue:#{queue_name}", message) + conn.rpush("queue:#{queue_name}", job) end end end @@ -61,7 +61,7 @@ module Sidekiq jobs_to_requeue = {} inprogress.each do |unit_of_work| jobs_to_requeue[unit_of_work.queue_name] ||= [] - jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.message + jobs_to_requeue[unit_of_work.queue_name] << unit_of_work.job end Sidekiq.redis do |conn| @@ -71,7 +71,7 @@ module Sidekiq end end end - Sidekiq.logger.info("Pushed #{inprogress.size} messages back to Redis") + Sidekiq.logger.info("Pushed #{inprogress.size} jobs back to Redis") rescue => ex Sidekiq.logger.warn("Failed to requeue #{inprogress.size} jobs: #{ex.message}") end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index cc728472..754a4541 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -104,7 +104,7 @@ module Sidekiq def handle_fetch_exception(ex) if !@down @down = Time.now - logger.error("Error fetching message: #{ex}") + logger.error("Error fetching job: #{ex}") ex.backtrace.each do |bt| logger.error(bt) end @@ -113,23 +113,23 @@ module Sidekiq end def process(work) - msgstr = work.message + jobstr = work.job queue = work.queue_name ack = false begin - msg = Sidekiq.load_json(msgstr) - klass = msg['class'.freeze].constantize + job = Sidekiq.load_json(jobstr) + klass = job['class'.freeze].constantize worker = klass.new - worker.jid = msg['jid'.freeze] + worker.jid = job['jid'.freeze] - stats(worker, msg, queue) do - Sidekiq.server_middleware.invoke(worker, msg, queue) do + stats(worker, job, queue) do + Sidekiq.server_middleware.invoke(worker, job, queue) do # Only ack if we either attempted to start this job or # successfully completed it. This prevents us from # losing jobs if a middleware raises an exception before yielding ack = true - execute_job(worker, cloned(msg['args'.freeze])) + execute_job(worker, cloned(job['args'.freeze])) end end ack = true @@ -139,7 +139,7 @@ module Sidekiq # we didn't properly finish it. ack = false rescue Exception => ex - handle_exception(ex, msg || { :message => msgstr }) + handle_exception(ex, job || { :job => jobstr }) raise ensure work.acknowledge if ack @@ -158,9 +158,9 @@ module Sidekiq PROCESSED = Concurrent::AtomicFixnum.new FAILURE = Concurrent::AtomicFixnum.new - def stats(worker, msg, queue) + def stats(worker, job, queue) tid = thread_identity - WORKER_STATE[tid] = {:queue => queue, :payload => msg, :run_at => Time.now.to_i } + WORKER_STATE[tid] = {:queue => queue, :payload => job, :run_at => Time.now.to_i } begin yield @@ -174,7 +174,7 @@ module Sidekiq end # Deep clone the arguments passed to the worker so that if - # the message fails, what is pushed back onto Redis hasn't + # the job fails, what is pushed back onto Redis hasn't # been mutated by the worker. def cloned(ary) Marshal.load(Marshal.dump(ary)) diff --git a/test/test_fetch.rb b/test/test_fetch.rb index f295a12c..fd0da2d5 100644 --- a/test/test_fetch.rb +++ b/test/test_fetch.rb @@ -20,7 +20,7 @@ class TestFetcher < Sidekiq::Test uow = fetch.retrieve_work refute_nil uow assert_equal 'basic', uow.queue_name - assert_equal 'msg', uow.message + assert_equal 'msg', uow.job q = Sidekiq::Queue.new('basic') assert_equal 0, q.size uow.requeue diff --git a/test/test_processor.rb b/test/test_processor.rb index 5b537624..732b9a7f 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -87,7 +87,7 @@ class TestProcessor < Sidekiq::Test before do work.expect(:queue_name, 'queue:default') - work.expect(:message, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args })) + work.expect(:job, Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => worker_args })) Sidekiq.server_middleware do |chain| chain.prepend ExceptionRaisingMiddleware, raise_before_yield, raise_after_yield, skip_job end