1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Fix networking errors causing scheduler to die [#309]

This commit is contained in:
Mike Perham 2012-07-30 10:23:34 -07:00
parent 79dd455cb4
commit 1ee0d5e9b0
3 changed files with 27 additions and 16 deletions

View file

@ -1,3 +1,8 @@
HEAD
-----------
- Handle networking errors causing the scheduler thread to die [#309]
2.1.0
-----------

View file

@ -22,25 +22,31 @@ module Sidekiq
watchdog('scheduling poller thread died!') do
add_jitter if first_time
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
(messages, _) = conn.multi do
conn.zrangebyscore(sorted_set, '-inf', now)
conn.zremrangebyscore(sorted_set, '-inf', now)
end
begin
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
(messages, _) = conn.multi do
conn.zrangebyscore(sorted_set, '-inf', now)
conn.zremrangebyscore(sorted_set, '-inf', now)
end
messages.each do |message|
logger.debug { "enqueued #{sorted_set}: #{message}" }
msg = Sidekiq.load_json(message)
conn.multi do
conn.sadd('queues', msg['queue'])
conn.rpush("queue:#{msg['queue']}", message)
messages.each do |message|
logger.debug { "enqueued #{sorted_set}: #{message}" }
msg = Sidekiq.load_json(message)
conn.multi do
conn.sadd('queues', msg['queue'])
conn.rpush("queue:#{msg['queue']}", message)
end
end
end
end
rescue SystemCallError => ex
# ECONNREFUSED, etc. Most likely a problem with
# redis networking. Punt and try again at the next interval
logger.warn ex.message
end
after(poll_interval) { poll }

View file

@ -1,3 +1,3 @@
module Sidekiq
VERSION = "2.1.0"
VERSION = "2.1.1"
end