mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
commit
c852d7e2e5
12 changed files with 43 additions and 18 deletions
|
@ -82,4 +82,22 @@ module Sidekiq
|
|||
@server_chain
|
||||
end
|
||||
|
||||
def self.load_json(string, options={})
|
||||
# Can't reliably detect whether MultiJson responds to load, since it's
|
||||
# a reserved word. Use adapter as a proxy for new features.
|
||||
if MultiJson.respond_to?(:adapter)
|
||||
MultiJson.load(string, options)
|
||||
else
|
||||
MultiJson.decode(string, options)
|
||||
end
|
||||
end
|
||||
|
||||
def self.dump_json(object, options={})
|
||||
if MultiJson.respond_to?(:dump)
|
||||
MultiJson.dump(object, options)
|
||||
else
|
||||
MultiJson.encode(object, options)
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -45,7 +45,7 @@ module Sidekiq
|
|||
|
||||
pushed = false
|
||||
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
|
||||
payload = MultiJson.encode(item)
|
||||
payload = Sidekiq.dump_json(item)
|
||||
Sidekiq.redis do |conn|
|
||||
_, pushed = conn.multi do
|
||||
conn.sadd('queues', queue)
|
||||
|
|
|
@ -110,7 +110,7 @@ module Sidekiq
|
|||
processor = @ready.pop
|
||||
@in_progress[processor.object_id] = [msg, queue]
|
||||
@busy << processor
|
||||
processor.process!(MultiJson.decode(msg), queue)
|
||||
processor.process!(Sidekiq.load_json(msg), queue)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
require 'multi_json'
|
||||
require 'digest'
|
||||
require 'multi_json'
|
||||
|
||||
module Sidekiq
|
||||
module Middleware
|
||||
|
@ -10,7 +10,7 @@ module Sidekiq
|
|||
def call(worker_class, item, queue)
|
||||
enabled = worker_class.get_sidekiq_options['unique']
|
||||
if enabled
|
||||
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
|
||||
payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(item))
|
||||
unique = false
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
require 'multi_json'
|
||||
|
||||
module Sidekiq
|
||||
module Middleware
|
||||
module Server
|
||||
|
@ -14,8 +16,7 @@ module Sidekiq
|
|||
:worker => args[1]['class'],
|
||||
:queue => args[2]
|
||||
}
|
||||
|
||||
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
|
||||
Sidekiq.redis {|conn| conn.rpush(:failed, Sidekiq.dump_json(data)) }
|
||||
raise
|
||||
end
|
||||
end
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
require 'multi_json'
|
||||
|
||||
require 'sidekiq/retry'
|
||||
|
||||
module Sidekiq
|
||||
|
@ -44,7 +46,7 @@ module Sidekiq
|
|||
delay = DELAY.call(count)
|
||||
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
|
||||
retry_at = Time.now.to_f + delay
|
||||
payload = MultiJson.encode(msg)
|
||||
payload = Sidekiq.dump_json(msg)
|
||||
Sidekiq.redis do |conn|
|
||||
conn.zadd('retry', retry_at.to_s, payload)
|
||||
end
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
require 'multi_json'
|
||||
|
||||
module Sidekiq
|
||||
module Middleware
|
||||
module Server
|
||||
|
@ -5,7 +7,7 @@ module Sidekiq
|
|||
def call(*args)
|
||||
yield
|
||||
ensure
|
||||
json = MultiJson.encode(args[1])
|
||||
json = Sidekiq.dump_json(args[1])
|
||||
hash = Digest::MD5.hexdigest(json)
|
||||
Sidekiq.redis {|conn| conn.del(hash) }
|
||||
end
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
require 'celluloid'
|
||||
require 'multi_json'
|
||||
require 'sidekiq/util'
|
||||
|
||||
require 'sidekiq/middleware/server/active_record'
|
||||
|
@ -53,8 +54,8 @@ module Sidekiq
|
|||
redis do |conn|
|
||||
conn.multi do
|
||||
conn.set("worker:#{self}:started", Time.now.to_s)
|
||||
conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
|
||||
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
|
||||
hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")}
|
||||
conn.set("worker:#{self}", Sidekiq.dump_json(hash))
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ module Sidekiq
|
|||
|
||||
messages.each do |message|
|
||||
logger.debug { "Retrying #{message}" }
|
||||
msg = MultiJson.decode(message)
|
||||
msg = Sidekiq.load_json(message)
|
||||
conn.rpush("queue:#{msg['queue']}", message)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -52,7 +52,7 @@ module Sidekiq
|
|||
Sidekiq.redis do |conn|
|
||||
conn.smembers('workers').map do |w|
|
||||
msg = conn.get("worker:#{w}")
|
||||
msg = MultiJson.decode(msg) if msg
|
||||
msg = Sidekiq.load_json(msg) if msg
|
||||
[w, msg]
|
||||
end.sort { |x| x[1] ? -1 : 1 }
|
||||
end
|
||||
|
@ -74,7 +74,7 @@ module Sidekiq
|
|||
def retries
|
||||
Sidekiq.redis do |conn|
|
||||
results = conn.zrange('retry', 0, 25, :withscores => true)
|
||||
results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] }
|
||||
results.each_slice(2).map { |msg, score| [Sidekiq.load_json(msg), Float(score)] }
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -89,7 +89,7 @@ module Sidekiq
|
|||
def retries_with_score(score)
|
||||
Sidekiq.redis do |conn|
|
||||
results = conn.zrangebyscore('retry', score, score)
|
||||
results.map { |msg| MultiJson.decode(msg) }
|
||||
results.map { |msg| Sidekiq.load_json(msg) }
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -124,7 +124,7 @@ module Sidekiq
|
|||
get "/queues/:name" do
|
||||
halt 404 unless params[:name]
|
||||
@name = params[:name]
|
||||
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) }
|
||||
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| Sidekiq.load_json(str) }
|
||||
slim :queue
|
||||
end
|
||||
|
||||
|
@ -142,7 +142,7 @@ module Sidekiq
|
|||
results = conn.zrangebyscore('retry', score, score)
|
||||
conn.zremrangebyscore('retry', score, score)
|
||||
results.map do |message|
|
||||
msg = MultiJson.decode(message)
|
||||
msg = Sidekiq.load_json(message)
|
||||
conn.rpush("queue:#{msg['queue']}", message)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
|
|||
gem.add_dependency 'redis-namespace'
|
||||
gem.add_dependency 'connection_pool', '~> 0.9.0'
|
||||
gem.add_dependency 'celluloid', '~> 0.10.0'
|
||||
gem.add_dependency 'multi_json', '>= 1.0', '< 1.3'
|
||||
gem.add_dependency 'multi_json', '~> 1.0'
|
||||
gem.add_development_dependency 'minitest'
|
||||
gem.add_development_dependency 'sinatra'
|
||||
gem.add_development_dependency 'slim'
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
require 'helper'
|
||||
require 'multi_json'
|
||||
require 'sidekiq/retry'
|
||||
require 'sidekiq/middleware/server/retry_jobs'
|
||||
|
||||
|
@ -81,7 +82,7 @@ class TestRetry < MiniTest::Unit::TestCase
|
|||
end
|
||||
|
||||
it 'should poll like a bad mother...SHUT YO MOUTH' do
|
||||
fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
|
||||
fake_msg = Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
|
||||
@redis.expect :multi, [[fake_msg], 1], []
|
||||
@redis.expect :rpush, 1, ['queue:someq', fake_msg]
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue