1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Remove 'at' property so it doesn't confuse Client

This commit is contained in:
Mike Perham 2013-05-24 20:32:55 -07:00
parent 3dda03d000
commit c2f09b7a05
4 changed files with 7 additions and 4 deletions

View file

@ -1,6 +1,8 @@
HEAD
-----------
- Scheduled and Retry jobs now use Sidekiq::Client to push
jobs onto the queue, so they use client middleware. [dimko, #948]
- Record the timestamp when jobs are enqueued. Add
Sidekiq::Job#enqueued\_at to query the time. [mariovisic, #944]
- Add Sidekiq::Queue#latency - calculates diff between now and

View file

@ -94,7 +94,10 @@ module Sidekiq
pushed = false
Sidekiq.redis do |conn|
if payloads.first['at']
pushed = conn.zadd('schedule', payloads.map {|hash| [hash['at'].to_s, Sidekiq.dump_json(hash)]})
pushed = conn.zadd('schedule', payloads.map do |hash|
at = hash.delete('at').to_s
[at, Sidekiq.dump_json(hash)]
end)
else
q = payloads.first['queue']
to_push = payloads.map { |entry| Sidekiq.dump_json(entry) }

View file

@ -33,13 +33,12 @@ module Sidekiq
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
msg = Sidekiq.load_json(message).tap { |msg| msg['enqueued_at'] = Time.now.to_f }
# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, message)
Sidekiq::Client.push(msg)
Sidekiq::Client.push(Sidekiq.load_json(message))
logger.debug { "enqueued #{sorted_set}: #{message}" }
end
end

View file

@ -264,7 +264,6 @@ class TestWeb < Minitest::Test
score = Time.now.to_f
msg = { 'class' => 'HardWorker',
'args' => ['bob', 1, Time.now.to_f],
'at' => score,
'jid' => 'f39af2a05e8f4b24dbc0f1e4' }
Sidekiq.redis do |conn|
conn.zadd('schedule', score, Sidekiq.dump_json(msg))