mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
update code documentation on scheduled poller
This commit is contained in:
parent
0c76c3bade
commit
89180968f9
1 changed files with 13 additions and 8 deletions
|
@ -28,16 +28,21 @@ module Sidekiq
|
|||
now = Time.now.to_f.to_s
|
||||
Sidekiq.redis do |conn|
|
||||
SETS.each do |sorted_set|
|
||||
# Get the next item in the queue if it's score (time to execute) is <= now.
|
||||
# We need to go through the list one at a time to reduce the risk of something
|
||||
# going wrong between the time jobs are popped from the scheduled queue and when
|
||||
# they are pushed onto a work queue and losing the jobs.
|
||||
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
|
||||
if message
|
||||
msg = Sidekiq.load_json(message)
|
||||
if conn.zrem(sorted_set, message)
|
||||
conn.multi do
|
||||
conn.sadd('queues', msg['queue'])
|
||||
conn.rpush("queue:#{msg['queue']}", message)
|
||||
end
|
||||
logger.debug("enqueued #{sorted_set}: #{message}") if logger.debug?
|
||||
msg = Sidekiq.load_json(message)
|
||||
# Pop item off the queue and add it to the work queue. If the job can't be popped from
|
||||
# the queue, it's because another process already popped it so we can move on to the
|
||||
# next one.
|
||||
if conn.zrem(sorted_set, message)
|
||||
conn.multi do
|
||||
conn.sadd('queues', msg['queue'])
|
||||
conn.rpush("queue:#{msg['queue']}", message)
|
||||
end
|
||||
logger.debug("enqueued #{sorted_set}: #{message}") if logger.debug?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Add table
Reference in a new issue