1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Let's try this again...

Use `Object#respond_to?` to determine which MultiJson API to use.
This commit is contained in:
Erik Michaels-Ober 2012-04-22 13:46:00 -07:00
parent aa3512462b
commit 68c725e47b
11 changed files with 83 additions and 18 deletions

View file

@ -45,7 +45,11 @@ module Sidekiq
pushed = false
Sidekiq.client_middleware.invoke(worker_class, item, queue) do
payload = MultiJson.encode(item)
payload = if MultiJson.respond_to?(:dump)
MultiJson.dump(item)
else
MultiJson.encode(item)
end
Sidekiq.redis do |conn|
_, pushed = conn.multi do
conn.sadd('queues', queue)

View file

@ -110,7 +110,11 @@ module Sidekiq
processor = @ready.pop
@in_progress[processor.object_id] = [msg, queue]
@busy << processor
processor.process!(MultiJson.decode(msg), queue)
if MultiJson.respond_to?(:adapter)
processor.process!(MultiJson.load(msg), queue)
else
processor.process!(MultiJson.decode(msg), queue)
end
end
end
end

View file

@ -1,5 +1,5 @@
require 'multi_json'
require 'digest'
require 'multi_json'
module Sidekiq
module Middleware
@ -10,7 +10,11 @@ module Sidekiq
def call(worker_class, item, queue)
enabled = worker_class.get_sidekiq_options['unique']
if enabled
payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item))
payload_hash = if MultiJson.respond_to?(:dump)
Digest::MD5.hexdigest(MultiJson.dump(item))
else
Digest::MD5.hexdigest(MultiJson.encode(item))
end
unique = false
Sidekiq.redis do |conn|

View file

@ -1,3 +1,5 @@
require 'multi_json'
module Sidekiq
module Middleware
module Server
@ -14,8 +16,11 @@ module Sidekiq
:worker => args[1]['class'],
:queue => args[2]
}
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
if MultiJson.respond_to?(:dump)
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.dump(data)) }
else
Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) }
end
raise
end
end

View file

@ -1,3 +1,5 @@
require 'multi_json'
require 'sidekiq/retry'
module Sidekiq
@ -44,7 +46,11 @@ module Sidekiq
delay = DELAY.call(count)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = MultiJson.encode(msg)
payload = if MultiJson.respond_to?(:dump)
MultiJson.dump(msg)
else
MultiJson.encode(msg)
end
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end

View file

@ -1,3 +1,5 @@
require 'multi_json'
module Sidekiq
module Middleware
module Server
@ -5,7 +7,11 @@ module Sidekiq
def call(*args)
yield
ensure
json = MultiJson.encode(args[1])
json = if MultiJson.respond_to?(:dump)
MultiJson.dump(args[1])
else
MultiJson.encode(args[1])
end
hash = Digest::MD5.hexdigest(json)
Sidekiq.redis {|conn| conn.del(hash) }
end

View file

@ -1,4 +1,5 @@
require 'celluloid'
require 'multi_json'
require 'sidekiq/util'
require 'sidekiq/middleware/server/active_record'
@ -53,8 +54,12 @@ module Sidekiq
redis do |conn|
conn.multi do
conn.set("worker:#{self}:started", Time.now.to_s)
conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg,
:run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")))
hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")}
if MultiJson.respond_to?(:dump)
conn.set("worker:#{self}", MultiJson.dump(hash))
else
conn.set("worker:#{self}", MultiJson.encode(hash))
end
end
end

View file

@ -46,7 +46,11 @@ module Sidekiq
messages.each do |message|
logger.debug { "Retrying #{message}" }
msg = MultiJson.decode(message)
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(message)
else
MultiJson.decode(message)
end
conn.rpush("queue:#{msg['queue']}", message)
end
end

View file

@ -52,7 +52,13 @@ module Sidekiq
Sidekiq.redis do |conn|
conn.smembers('workers').map do |w|
msg = conn.get("worker:#{w}")
msg = MultiJson.decode(msg) if msg
if msg
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(msg)
else
MultiJson.decode(msg)
end
end
[w, msg]
end.sort { |x| x[1] ? -1 : 1 }
end
@ -74,7 +80,11 @@ module Sidekiq
def retries
Sidekiq.redis do |conn|
results = conn.zrange('retry', 0, 25, :withscores => true)
results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] }
if MultiJson.respond_to?(:adapter)
results.each_slice(2).map { |msg, score| [MultiJson.load(msg), Float(score)] }
else
results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] }
end
end
end
@ -89,7 +99,11 @@ module Sidekiq
def retries_with_score(score)
Sidekiq.redis do |conn|
results = conn.zrangebyscore('retry', score, score)
results.map { |msg| MultiJson.decode(msg) }
if MultiJson.respond_to?(:adapter)
results.map { |msg| MultiJson.load(msg) }
else
results.map { |msg| MultiJson.decode(msg) }
end
end
end
@ -124,7 +138,11 @@ module Sidekiq
get "/queues/:name" do
halt 404 unless params[:name]
@name = params[:name]
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) }
if MultiJson.respond_to?(:adapter)
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.load(str) }
else
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) }
end
slim :queue
end
@ -142,7 +160,11 @@ module Sidekiq
results = conn.zrangebyscore('retry', score, score)
conn.zremrangebyscore('retry', score, score)
results.map do |message|
msg = MultiJson.decode(message)
msg = if MultiJson.respond_to?(:adapter)
MultiJson.load(message)
else
MultiJson.decode(message)
end
conn.rpush("queue:#{msg['queue']}", message)
end
end

View file

@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool', '~> 0.9.0'
gem.add_dependency 'celluloid', '~> 0.10.0'
gem.add_dependency 'multi_json', '>= 1.0', '< 1.3'
gem.add_dependency 'multi_json', '~> 1.0'
gem.add_development_dependency 'minitest'
gem.add_development_dependency 'sinatra'
gem.add_development_dependency 'slim'

View file

@ -1,4 +1,5 @@
require 'helper'
require 'multi_json'
require 'sidekiq/retry'
require 'sidekiq/middleware/server/retry_jobs'
@ -81,7 +82,11 @@ class TestRetry < MiniTest::Unit::TestCase
end
it 'should poll like a bad mother...SHUT YO MOUTH' do
fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
fake_msg = if MultiJson.respond_to?(:dump)
MultiJson.dump({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
else
MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' })
end
@redis.expect :multi, [[fake_msg], 1], []
@redis.expect :rpush, 1, ['queue:someq', fake_msg]