From 68c725e47badeca6b032c7eaa51fc208260894fe Mon Sep 17 00:00:00 2001 From: Erik Michaels-Ober Date: Sun, 22 Apr 2012 13:46:00 -0700 Subject: [PATCH 1/2] Let's try this again... Use `Object#respond_to?` to determine which MultiJson API to use. --- lib/sidekiq/client.rb | 6 +++- lib/sidekiq/manager.rb | 6 +++- lib/sidekiq/middleware/client/unique_jobs.rb | 8 +++-- lib/sidekiq/middleware/server/failure_jobs.rb | 9 ++++-- lib/sidekiq/middleware/server/retry_jobs.rb | 8 ++++- lib/sidekiq/middleware/server/unique_jobs.rb | 8 ++++- lib/sidekiq/processor.rb | 9 ++++-- lib/sidekiq/retry.rb | 6 +++- lib/sidekiq/web.rb | 32 ++++++++++++++++--- sidekiq.gemspec | 2 +- test/test_retry.rb | 7 +++- 11 files changed, 83 insertions(+), 18 deletions(-) diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 711334be..82fcd180 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -45,7 +45,11 @@ module Sidekiq pushed = false Sidekiq.client_middleware.invoke(worker_class, item, queue) do - payload = MultiJson.encode(item) + payload = if MultiJson.respond_to?(:dump) + MultiJson.dump(item) + else + MultiJson.encode(item) + end Sidekiq.redis do |conn| _, pushed = conn.multi do conn.sadd('queues', queue) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 7afd1c5f..983ca502 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -110,7 +110,11 @@ module Sidekiq processor = @ready.pop @in_progress[processor.object_id] = [msg, queue] @busy << processor - processor.process!(MultiJson.decode(msg), queue) + if MultiJson.respond_to?(:adapter) + processor.process!(MultiJson.load(msg), queue) + else + processor.process!(MultiJson.decode(msg), queue) + end end end end diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb index c067c923..529f9a6b 100644 --- a/lib/sidekiq/middleware/client/unique_jobs.rb +++ b/lib/sidekiq/middleware/client/unique_jobs.rb @@ -1,5 +1,5 @@ -require 'multi_json' require 'digest' +require 'multi_json' module Sidekiq module Middleware @@ -10,7 +10,11 @@ module Sidekiq def call(worker_class, item, queue) enabled = worker_class.get_sidekiq_options['unique'] if enabled - payload_hash = Digest::MD5.hexdigest(MultiJson.encode(item)) + payload_hash = if MultiJson.respond_to?(:dump) + Digest::MD5.hexdigest(MultiJson.dump(item)) + else + Digest::MD5.hexdigest(MultiJson.encode(item)) + end unique = false Sidekiq.redis do |conn| diff --git a/lib/sidekiq/middleware/server/failure_jobs.rb b/lib/sidekiq/middleware/server/failure_jobs.rb index caa92dcc..0febd705 100644 --- a/lib/sidekiq/middleware/server/failure_jobs.rb +++ b/lib/sidekiq/middleware/server/failure_jobs.rb @@ -1,3 +1,5 @@ +require 'multi_json' + module Sidekiq module Middleware module Server @@ -14,8 +16,11 @@ module Sidekiq :worker => args[1]['class'], :queue => args[2] } - - Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) } + if MultiJson.respond_to?(:dump) + Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.dump(data)) } + else + Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) } + end raise end end diff --git a/lib/sidekiq/middleware/server/retry_jobs.rb b/lib/sidekiq/middleware/server/retry_jobs.rb index 7b36f893..7df6803f 100644 --- a/lib/sidekiq/middleware/server/retry_jobs.rb +++ b/lib/sidekiq/middleware/server/retry_jobs.rb @@ -1,3 +1,5 @@ +require 'multi_json' + require 'sidekiq/retry' module Sidekiq @@ -44,7 +46,11 @@ module Sidekiq delay = DELAY.call(count) logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay - payload = MultiJson.encode(msg) + payload = if MultiJson.respond_to?(:dump) + MultiJson.dump(msg) + else + MultiJson.encode(msg) + end Sidekiq.redis do |conn| conn.zadd('retry', retry_at.to_s, payload) end diff --git a/lib/sidekiq/middleware/server/unique_jobs.rb b/lib/sidekiq/middleware/server/unique_jobs.rb index b6887ebd..bba7da3a 100644 --- a/lib/sidekiq/middleware/server/unique_jobs.rb +++ b/lib/sidekiq/middleware/server/unique_jobs.rb @@ -1,3 +1,5 @@ +require 'multi_json' + module Sidekiq module Middleware module Server @@ -5,7 +7,11 @@ module Sidekiq def call(*args) yield ensure - json = MultiJson.encode(args[1]) + json = if MultiJson.respond_to?(:dump) + MultiJson.dump(args[1]) + else + MultiJson.encode(args[1]) + end hash = Digest::MD5.hexdigest(json) Sidekiq.redis {|conn| conn.del(hash) } end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index a201acd7..65e3a41f 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -1,4 +1,5 @@ require 'celluloid' +require 'multi_json' require 'sidekiq/util' require 'sidekiq/middleware/server/active_record' @@ -53,8 +54,12 @@ module Sidekiq redis do |conn| conn.multi do conn.set("worker:#{self}:started", Time.now.to_s) - conn.set("worker:#{self}", MultiJson.encode(:queue => queue, :payload => msg, - :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"))) + hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")} + if MultiJson.respond_to?(:dump) + conn.set("worker:#{self}", MultiJson.dump(hash)) + else + conn.set("worker:#{self}", MultiJson.encode(hash)) + end end end diff --git a/lib/sidekiq/retry.rb b/lib/sidekiq/retry.rb index 3e1688ee..b8bce010 100644 --- a/lib/sidekiq/retry.rb +++ b/lib/sidekiq/retry.rb @@ -46,7 +46,11 @@ module Sidekiq messages.each do |message| logger.debug { "Retrying #{message}" } - msg = MultiJson.decode(message) + msg = if MultiJson.respond_to?(:adapter) + MultiJson.load(message) + else + MultiJson.decode(message) + end conn.rpush("queue:#{msg['queue']}", message) end end diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index bd2f03fa..10be931d 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -52,7 +52,13 @@ module Sidekiq Sidekiq.redis do |conn| conn.smembers('workers').map do |w| msg = conn.get("worker:#{w}") - msg = MultiJson.decode(msg) if msg + if msg + msg = if MultiJson.respond_to?(:adapter) + MultiJson.load(msg) + else + MultiJson.decode(msg) + end + end [w, msg] end.sort { |x| x[1] ? -1 : 1 } end @@ -74,7 +80,11 @@ module Sidekiq def retries Sidekiq.redis do |conn| results = conn.zrange('retry', 0, 25, :withscores => true) - results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] } + if MultiJson.respond_to?(:adapter) + results.each_slice(2).map { |msg, score| [MultiJson.load(msg), Float(score)] } + else + results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] } + end end end @@ -89,7 +99,11 @@ module Sidekiq def retries_with_score(score) Sidekiq.redis do |conn| results = conn.zrangebyscore('retry', score, score) - results.map { |msg| MultiJson.decode(msg) } + if MultiJson.respond_to?(:adapter) + results.map { |msg| MultiJson.load(msg) } + else + results.map { |msg| MultiJson.decode(msg) } + end end end @@ -124,7 +138,11 @@ module Sidekiq get "/queues/:name" do halt 404 unless params[:name] @name = params[:name] - @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) } + if MultiJson.respond_to?(:adapter) + @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.load(str) } + else + @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) } + end slim :queue end @@ -142,7 +160,11 @@ module Sidekiq results = conn.zrangebyscore('retry', score, score) conn.zremrangebyscore('retry', score, score) results.map do |message| - msg = MultiJson.decode(message) + msg = if MultiJson.respond_to?(:adapter) + MultiJson.load(message) + else + MultiJson.decode(message) + end conn.rpush("queue:#{msg['queue']}", message) end end diff --git a/sidekiq.gemspec b/sidekiq.gemspec index 43218020..9e16035f 100644 --- a/sidekiq.gemspec +++ b/sidekiq.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |gem| gem.add_dependency 'redis-namespace' gem.add_dependency 'connection_pool', '~> 0.9.0' gem.add_dependency 'celluloid', '~> 0.10.0' - gem.add_dependency 'multi_json', '>= 1.0', '< 1.3' + gem.add_dependency 'multi_json', '~> 1.0' gem.add_development_dependency 'minitest' gem.add_development_dependency 'sinatra' gem.add_development_dependency 'slim' diff --git a/test/test_retry.rb b/test/test_retry.rb index 6cf34563..f47187be 100644 --- a/test/test_retry.rb +++ b/test/test_retry.rb @@ -1,4 +1,5 @@ require 'helper' +require 'multi_json' require 'sidekiq/retry' require 'sidekiq/middleware/server/retry_jobs' @@ -81,7 +82,11 @@ class TestRetry < MiniTest::Unit::TestCase end it 'should poll like a bad mother...SHUT YO MOUTH' do - fake_msg = MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) + fake_msg = if MultiJson.respond_to?(:dump) + MultiJson.dump({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) + else + MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) + end @redis.expect :multi, [[fake_msg], 1], [] @redis.expect :rpush, 1, ['queue:someq', fake_msg] From 5eb8d397f0c51a3326717c9d7872c8288454fe0c Mon Sep 17 00:00:00 2001 From: Erik Michaels-Ober Date: Sun, 22 Apr 2012 14:02:35 -0700 Subject: [PATCH 2/2] Refactor to use Sidekiq.dump_json and Sidekiq.load_json These methods perform MultiJson feature detection and can be removed after this library's MultiJson dependency is upgraded to ~> 2.0. --- lib/sidekiq.rb | 18 +++++++++++ lib/sidekiq/client.rb | 6 +--- lib/sidekiq/manager.rb | 6 +--- lib/sidekiq/middleware/client/unique_jobs.rb | 6 +--- lib/sidekiq/middleware/server/failure_jobs.rb | 6 +--- lib/sidekiq/middleware/server/retry_jobs.rb | 6 +--- lib/sidekiq/middleware/server/unique_jobs.rb | 6 +--- lib/sidekiq/processor.rb | 6 +--- lib/sidekiq/retry.rb | 6 +--- lib/sidekiq/web.rb | 32 +++---------------- test/test_retry.rb | 6 +--- 11 files changed, 32 insertions(+), 72 deletions(-) diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 04480b3f..b953a627 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -82,4 +82,22 @@ module Sidekiq @server_chain end + def self.load_json(string, options={}) + # Can't reliably detect whether MultiJson responds to load, since it's + # a reserved word. Use adapter as a proxy for new features. + if MultiJson.respond_to?(:adapter) + MultiJson.load(string, options) + else + MultiJson.decode(string, options) + end + end + + def self.dump_json(object, options={}) + if MultiJson.respond_to?(:dump) + MultiJson.dump(object, options) + else + MultiJson.encode(object, options) + end + end + end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 82fcd180..584c2832 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -45,11 +45,7 @@ module Sidekiq pushed = false Sidekiq.client_middleware.invoke(worker_class, item, queue) do - payload = if MultiJson.respond_to?(:dump) - MultiJson.dump(item) - else - MultiJson.encode(item) - end + payload = Sidekiq.dump_json(item) Sidekiq.redis do |conn| _, pushed = conn.multi do conn.sadd('queues', queue) diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 983ca502..22ff3328 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -110,11 +110,7 @@ module Sidekiq processor = @ready.pop @in_progress[processor.object_id] = [msg, queue] @busy << processor - if MultiJson.respond_to?(:adapter) - processor.process!(MultiJson.load(msg), queue) - else - processor.process!(MultiJson.decode(msg), queue) - end + processor.process!(Sidekiq.load_json(msg), queue) end end end diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb index 529f9a6b..facf081c 100644 --- a/lib/sidekiq/middleware/client/unique_jobs.rb +++ b/lib/sidekiq/middleware/client/unique_jobs.rb @@ -10,11 +10,7 @@ module Sidekiq def call(worker_class, item, queue) enabled = worker_class.get_sidekiq_options['unique'] if enabled - payload_hash = if MultiJson.respond_to?(:dump) - Digest::MD5.hexdigest(MultiJson.dump(item)) - else - Digest::MD5.hexdigest(MultiJson.encode(item)) - end + payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(item)) unique = false Sidekiq.redis do |conn| diff --git a/lib/sidekiq/middleware/server/failure_jobs.rb b/lib/sidekiq/middleware/server/failure_jobs.rb index 0febd705..3fc40d00 100644 --- a/lib/sidekiq/middleware/server/failure_jobs.rb +++ b/lib/sidekiq/middleware/server/failure_jobs.rb @@ -16,11 +16,7 @@ module Sidekiq :worker => args[1]['class'], :queue => args[2] } - if MultiJson.respond_to?(:dump) - Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.dump(data)) } - else - Sidekiq.redis {|conn| conn.rpush(:failed, MultiJson.encode(data)) } - end + Sidekiq.redis {|conn| conn.rpush(:failed, Sidekiq.dump_json(data)) } raise end end diff --git a/lib/sidekiq/middleware/server/retry_jobs.rb b/lib/sidekiq/middleware/server/retry_jobs.rb index 7df6803f..8f7ec36a 100644 --- a/lib/sidekiq/middleware/server/retry_jobs.rb +++ b/lib/sidekiq/middleware/server/retry_jobs.rb @@ -46,11 +46,7 @@ module Sidekiq delay = DELAY.call(count) logger.debug { "Failure! Retry #{count} in #{delay} seconds" } retry_at = Time.now.to_f + delay - payload = if MultiJson.respond_to?(:dump) - MultiJson.dump(msg) - else - MultiJson.encode(msg) - end + payload = Sidekiq.dump_json(msg) Sidekiq.redis do |conn| conn.zadd('retry', retry_at.to_s, payload) end diff --git a/lib/sidekiq/middleware/server/unique_jobs.rb b/lib/sidekiq/middleware/server/unique_jobs.rb index bba7da3a..819fd8c4 100644 --- a/lib/sidekiq/middleware/server/unique_jobs.rb +++ b/lib/sidekiq/middleware/server/unique_jobs.rb @@ -7,11 +7,7 @@ module Sidekiq def call(*args) yield ensure - json = if MultiJson.respond_to?(:dump) - MultiJson.dump(args[1]) - else - MultiJson.encode(args[1]) - end + json = Sidekiq.dump_json(args[1]) hash = Digest::MD5.hexdigest(json) Sidekiq.redis {|conn| conn.del(hash) } end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 65e3a41f..900b9059 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -55,11 +55,7 @@ module Sidekiq conn.multi do conn.set("worker:#{self}:started", Time.now.to_s) hash = {:queue => queue, :payload => msg, :run_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z")} - if MultiJson.respond_to?(:dump) - conn.set("worker:#{self}", MultiJson.dump(hash)) - else - conn.set("worker:#{self}", MultiJson.encode(hash)) - end + conn.set("worker:#{self}", Sidekiq.dump_json(hash)) end end diff --git a/lib/sidekiq/retry.rb b/lib/sidekiq/retry.rb index b8bce010..5a14ee6b 100644 --- a/lib/sidekiq/retry.rb +++ b/lib/sidekiq/retry.rb @@ -46,11 +46,7 @@ module Sidekiq messages.each do |message| logger.debug { "Retrying #{message}" } - msg = if MultiJson.respond_to?(:adapter) - MultiJson.load(message) - else - MultiJson.decode(message) - end + msg = Sidekiq.load_json(message) conn.rpush("queue:#{msg['queue']}", message) end end diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index 10be931d..9410d78c 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -52,13 +52,7 @@ module Sidekiq Sidekiq.redis do |conn| conn.smembers('workers').map do |w| msg = conn.get("worker:#{w}") - if msg - msg = if MultiJson.respond_to?(:adapter) - MultiJson.load(msg) - else - MultiJson.decode(msg) - end - end + msg = Sidekiq.load_json(msg) if msg [w, msg] end.sort { |x| x[1] ? -1 : 1 } end @@ -80,11 +74,7 @@ module Sidekiq def retries Sidekiq.redis do |conn| results = conn.zrange('retry', 0, 25, :withscores => true) - if MultiJson.respond_to?(:adapter) - results.each_slice(2).map { |msg, score| [MultiJson.load(msg), Float(score)] } - else - results.each_slice(2).map { |msg, score| [MultiJson.decode(msg), Float(score)] } - end + results.each_slice(2).map { |msg, score| [Sidekiq.load_json(msg), Float(score)] } end end @@ -99,11 +89,7 @@ module Sidekiq def retries_with_score(score) Sidekiq.redis do |conn| results = conn.zrangebyscore('retry', score, score) - if MultiJson.respond_to?(:adapter) - results.map { |msg| MultiJson.load(msg) } - else - results.map { |msg| MultiJson.decode(msg) } - end + results.map { |msg| Sidekiq.load_json(msg) } end end @@ -138,11 +124,7 @@ module Sidekiq get "/queues/:name" do halt 404 unless params[:name] @name = params[:name] - if MultiJson.respond_to?(:adapter) - @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.load(str) } - else - @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| MultiJson.decode(str) } - end + @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, 10) }.map { |str| Sidekiq.load_json(str) } slim :queue end @@ -160,11 +142,7 @@ module Sidekiq results = conn.zrangebyscore('retry', score, score) conn.zremrangebyscore('retry', score, score) results.map do |message| - msg = if MultiJson.respond_to?(:adapter) - MultiJson.load(message) - else - MultiJson.decode(message) - end + msg = Sidekiq.load_json(message) conn.rpush("queue:#{msg['queue']}", message) end end diff --git a/test/test_retry.rb b/test/test_retry.rb index f47187be..96a045cb 100644 --- a/test/test_retry.rb +++ b/test/test_retry.rb @@ -82,11 +82,7 @@ class TestRetry < MiniTest::Unit::TestCase end it 'should poll like a bad mother...SHUT YO MOUTH' do - fake_msg = if MultiJson.respond_to?(:dump) - MultiJson.dump({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) - else - MultiJson.encode({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) - end + fake_msg = Sidekiq.dump_json({ 'class' => 'Bob', 'args' => [1,2], 'queue' => 'someq' }) @redis.expect :multi, [[fake_msg], 1], [] @redis.expect :rpush, 1, ['queue:someq', fake_msg]