mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Terminology: message -> job
This commit is contained in:
parent
d54dc1c677
commit
3016747846
1 changed files with 9 additions and 9 deletions
|
@ -9,10 +9,10 @@ module Sidekiq
|
||||||
INITIAL_WAIT = 10
|
INITIAL_WAIT = 10
|
||||||
|
|
||||||
##
|
##
|
||||||
# The Poller checks Redis every N seconds for messages in the retry or scheduled
|
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
|
||||||
# set have passed their timestamp and should be enqueued. If so, it
|
# set have passed their timestamp and should be enqueued. If so, it
|
||||||
# just pops the message back onto its original queue so the
|
# just pops the job back onto its original queue so the
|
||||||
# workers can pick it up like any other message.
|
# workers can pick it up like any other job.
|
||||||
class Poller
|
class Poller
|
||||||
include Util
|
include Util
|
||||||
include Actor
|
include Actor
|
||||||
|
@ -24,8 +24,8 @@ module Sidekiq
|
||||||
initial_wait if first_time
|
initial_wait if first_time
|
||||||
|
|
||||||
begin
|
begin
|
||||||
# A message's "score" in Redis is the time at which it should be processed.
|
# A job's "score" in Redis is the time at which it should be processed.
|
||||||
# Just check Redis for the set of messages with a timestamp before now.
|
# Just check Redis for the set of jobs with a timestamp before now.
|
||||||
now = Time.now.to_f.to_s
|
now = Time.now.to_f.to_s
|
||||||
Sidekiq.redis do |conn|
|
Sidekiq.redis do |conn|
|
||||||
SETS.each do |sorted_set|
|
SETS.each do |sorted_set|
|
||||||
|
@ -33,14 +33,14 @@ module Sidekiq
|
||||||
# We need to go through the list one at a time to reduce the risk of something
|
# We need to go through the list one at a time to reduce the risk of something
|
||||||
# going wrong between the time jobs are popped from the scheduled queue and when
|
# going wrong between the time jobs are popped from the scheduled queue and when
|
||||||
# they are pushed onto a work queue and losing the jobs.
|
# they are pushed onto a work queue and losing the jobs.
|
||||||
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
||||||
|
|
||||||
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
||||||
# the queue, it's because another process already popped it so we can move on to the
|
# the queue, it's because another process already popped it so we can move on to the
|
||||||
# next one.
|
# next one.
|
||||||
if conn.zrem(sorted_set, message)
|
if conn.zrem(sorted_set, job)
|
||||||
Sidekiq::Client.push(Sidekiq.load_json(message))
|
Sidekiq::Client.push(Sidekiq.load_json(job))
|
||||||
logger.debug { "enqueued #{sorted_set}: #{message}" }
|
logger.debug { "enqueued #{sorted_set}: #{job}" }
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue