diff --git a/Changes.md b/Changes.md index 90bfda49..41921457 100644 --- a/Changes.md +++ b/Changes.md @@ -1,8 +1,20 @@ HEAD ----------- -- Defer loading JSON until a full Thread stack is available. Celluloid's - standard 4k actor stack will lead to crashes when parsing large payloads. +- Handle networking errors causing the scheduler thread to die [#309] +- Rework exception handling to log all Processor and actor death (#325, subelsky) +- Clone arguments when calling worker so modifications are discarded. (#265, hakanensari) + +2.1.0 +----------- + +- Tune Celluloid to no longer run message processing within a Fiber. + This gives us a full Thread stack and also lowers Sidekiq's memory + usage. +- Add pagination within the Web UI [#253] +- Specify which Redis driver to use: *hiredis* or *ruby* (default) +- Remove FailureJobs and UniqueJobs, which were optional middleware + that I don't want to support in core. [#302] 2.0.3 ----------- diff --git a/Gemfile b/Gemfile index 2be6f322..2caaee46 100644 --- a/Gemfile +++ b/Gemfile @@ -1,10 +1,11 @@ source 'http://rubygems.org' gemspec +gem 'celluloid' gem 'slim' gem 'sprockets' gem 'sass' -gem 'rails', '3.2.3' +gem 'rails', '3.2.7' gem 'sqlite3' group :test do diff --git a/README.md b/README.md index 942c2de4..765ee470 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,9 @@ -Sidekiq [![Build Status](https://secure.travis-ci.org/mperham/sidekiq.png)](http://travis-ci.org/mperham/sidekiq) +Sidekiq ============== +[![Build Status](https://secure.travis-ci.org/mperham/sidekiq.png)](http://travis-ci.org/mperham/sidekiq) +[![Dependency Status](https://gemnasium.com/mperham/sidekiq.png)](https://gemnasium.com/mperham/sidekiq) + Simple, efficient message processing for Ruby. Sidekiq uses threads to handle many messages at the same time in the @@ -27,6 +30,8 @@ Requirements I test on Ruby 1.9.3 and JRuby 1.6.x in 1.9 mode. Other versions/VMs are untested but I will do my best to support them. Ruby 1.8 is not supported. +Redis 2.0 or greater is required. + Installation ----------------- @@ -38,6 +43,7 @@ Getting Started ----------------- See the [sidekiq home page](http://mperham.github.com/sidekiq) for the simple 4-step process. +You can watch [Railscast #366](http://railscasts.com/episodes/366-sidekiq) to see Sidekiq in action. More Information @@ -52,6 +58,15 @@ and email to with a greeting in the body. To unsubscribe Once archiving begins, you'll be able to visit [the archives](http://librelist.com/browser/sidekiq/) to see past threads. +Problems? +----------------- + +**Please do not directly email any Sidekiq committers with questions or problems.** A community is best served when discussions are held in public. + +If you have a problem, please review the [FAQ](/mperham/sidekiq/wiki/FAQ) and [Troubleshooting](/mperham/sidekiq/wiki/Problems-and-Troubleshooting) wiki pages. Searching the issues for your problem is also a good idea. If that doesn't help, feel free to email the Sidekiq mailing list or open a new issue. +The mailing list is the preferred place to ask questions on usage. If you are encountering what you think is a bug, please open an issue. + + License ----------------- diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 7601467d..c56acec2 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -124,7 +124,6 @@ module Sidekiq def validate! options[:queues] << 'default' if options[:queues].empty? - options[:queues].shuffle! if !File.exist?(options[:require]) || (File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb")) @@ -141,9 +140,10 @@ module Sidekiq opts = {} @parser = OptionParser.new do |o| - o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg| - q, weight = arg.split(",") - parse_queues(opts, q, weight) + o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg| + queues_and_weights = arg.scan(/(\w+),?(\d*)/) + queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)} + opts[:strict] = queues_and_weights.collect(&:last).none? {|weight| weight != ''} end o.on "-v", "--verbose", "Print more verbose output" do @@ -208,7 +208,7 @@ module Sidekiq end def parse_queues(opts, q, weight) - (weight || 1).to_i.times do + [weight.to_i, 1].max.times do (opts[:queues] ||= []) << q end end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 8bc9aff7..21593df2 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -1,5 +1,4 @@ require 'sidekiq/middleware/chain' -require 'sidekiq/middleware/client/unique_jobs' module Sidekiq class Client diff --git a/lib/sidekiq/exception_handler.rb b/lib/sidekiq/exception_handler.rb new file mode 100644 index 00000000..4e196a11 --- /dev/null +++ b/lib/sidekiq/exception_handler.rb @@ -0,0 +1,30 @@ +module Sidekiq + module ExceptionHandler + + def handle_exception(ex, msg) + Sidekiq.logger.warn msg + Sidekiq.logger.warn ex + Sidekiq.logger.warn ex.backtrace.join("\n") + send_to_airbrake(msg, ex) if defined?(::Airbrake) + send_to_exceptional(msg, ex) if defined?(::Exceptional) + send_to_exception_notifier(msg, ex) if defined?(::ExceptionNotifier) + end + + private + + def send_to_airbrake(msg, ex) + ::Airbrake.notify(ex, :parameters => msg) + end + + def send_to_exceptional(msg, ex) + if ::Exceptional::Config.should_send_to_api? + ::Exceptional.context(msg) + ::Exceptional::Remote.error(::Exceptional::ExceptionData.new(ex)) + end + end + + def send_to_exception_notifier(msg, ex) + ::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg }) + end + end +end diff --git a/lib/sidekiq/extensions/generic_proxy.rb b/lib/sidekiq/extensions/generic_proxy.rb index c8cc96a5..104fb60b 100644 --- a/lib/sidekiq/extensions/generic_proxy.rb +++ b/lib/sidekiq/extensions/generic_proxy.rb @@ -1,6 +1,6 @@ module Sidekiq module Extensions - class Proxy < (RUBY_VERSION < '1.9' ? Object : BasicObject) + class Proxy < BasicObject def initialize(performable, target, at=nil) @performable = performable @target = target diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index efa09883..37c7174b 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -12,8 +12,9 @@ module Sidekiq TIMEOUT = 1 - def initialize(mgr, queues) + def initialize(mgr, queues, strict) @mgr = mgr + @strictly_ordered_queues = strict @queues = queues.map { |q| "queue:#{q}" } @unique_queues = @queues.uniq end @@ -68,6 +69,7 @@ module Sidekiq # recreate the queue command each time we invoke Redis#blpop # to honor weights and avoid queue starvation. def queues_cmd + return @unique_queues.dup << TIMEOUT if @strictly_ordered_queues queues = @queues.sample(@unique_queues.size).uniq queues.concat(@unique_queues - queues) queues << TIMEOUT diff --git a/lib/sidekiq/manager.rb b/lib/sidekiq/manager.rb index 74534be0..bdf75f99 100644 --- a/lib/sidekiq/manager.rb +++ b/lib/sidekiq/manager.rb @@ -27,7 +27,7 @@ module Sidekiq @in_progress = {} @done = false @busy = [] - @fetcher = Fetcher.new(current_actor, options[:queues]) + @fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict]) @ready = @count.times.map { Processor.new_link(current_actor) } procline end diff --git a/lib/sidekiq/middleware/chain.rb b/lib/sidekiq/middleware/chain.rb index 534caa99..60df64c0 100644 --- a/lib/sidekiq/middleware/chain.rb +++ b/lib/sidekiq/middleware/chain.rb @@ -7,22 +7,26 @@ module Sidekiq # # To add middleware for the client: # - # Sidekiq.client_middleware do |chain| - # chain.add MyClientHook + # Sidekiq.configure_client do |config| + # config.client_middleware do |chain| + # chain.add MyClientHook + # end # end # # To modify middleware for the server, just call # with another block: # - # Sidekiq.server_middleware do |chain| - # chain.add MyServerHook - # chain.remove ActiveRecord + # Sidekiq.configure_server do |config| + # config.server_middleware do |chain| + # chain.add MyServerHook + # chain.remove ActiveRecord + # end # end # # This is an example of a minimal server middleware: # # class MyServerHook - # def call(worker, msg, queue) + # def call(worker_instance, msg, queue) # puts "Before work" # yield # puts "After work" @@ -32,7 +36,7 @@ module Sidekiq # This is an example of a minimal client middleware: # # class MyClientHook - # def call(msg, queue) + # def call(worker_class, msg, queue) # puts "Before push" # yield # puts "After push" diff --git a/lib/sidekiq/middleware/client/unique_jobs.rb b/lib/sidekiq/middleware/client/unique_jobs.rb deleted file mode 100644 index 308a0540..00000000 --- a/lib/sidekiq/middleware/client/unique_jobs.rb +++ /dev/null @@ -1,35 +0,0 @@ -require 'digest' - -module Sidekiq - module Middleware - module Client - class UniqueJobs - HASH_KEY_EXPIRATION = 30 * 60 - - def call(worker_class, item, queue) - enabled = worker_class.get_sidekiq_options['unique'] - if enabled - payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(item)) - unique = false - - Sidekiq.redis do |conn| - conn.watch(payload_hash) - - if conn.get(payload_hash) - conn.unwatch - else - unique = conn.multi do - conn.setex(payload_hash, HASH_KEY_EXPIRATION, 1) - end - end - end - yield if unique - else - yield - end - end - - end - end - end -end diff --git a/lib/sidekiq/middleware/server/exception_handler.rb b/lib/sidekiq/middleware/server/exception_handler.rb deleted file mode 100644 index 33be0d87..00000000 --- a/lib/sidekiq/middleware/server/exception_handler.rb +++ /dev/null @@ -1,38 +0,0 @@ -require 'sidekiq/util' - -module Sidekiq - module Middleware - module Server - class ExceptionHandler - include Util - def call(*args) - yield - rescue => ex - logger.warn ex - logger.warn ex.backtrace.join("\n") - send_to_airbrake(args[1], ex) if defined?(::Airbrake) - send_to_exceptional(args[1], ex) if defined?(::Exceptional) - send_to_exception_notifier(args[1], ex) if defined?(::ExceptionNotifier) - raise - end - - private - - def send_to_airbrake(msg, ex) - ::Airbrake.notify(ex, :parameters => msg) - end - - def send_to_exceptional(msg, ex) - if ::Exceptional::Config.should_send_to_api? - ::Exceptional.context(msg) - ::Exceptional::Remote.error(::Exceptional::ExceptionData.new(ex)) - end - end - - def send_to_exception_notifier(msg, ex) - ::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg }) - end - end - end - end -end diff --git a/lib/sidekiq/middleware/server/failure_jobs.rb b/lib/sidekiq/middleware/server/failure_jobs.rb deleted file mode 100644 index 3ac68d27..00000000 --- a/lib/sidekiq/middleware/server/failure_jobs.rb +++ /dev/null @@ -1,23 +0,0 @@ -module Sidekiq - module Middleware - module Server - class FailureJobs - def call(*args) - yield - rescue => e - data = { - :failed_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"), - :payload => args[1], - :exception => e.class.to_s, - :error => e.to_s, - :backtrace => e.backtrace, - :worker => args[1]['class'], - :queue => args[2] - } - Sidekiq.redis {|conn| conn.rpush(:failed, Sidekiq.dump_json(data)) } - raise - end - end - end - end -end diff --git a/lib/sidekiq/middleware/server/unique_jobs.rb b/lib/sidekiq/middleware/server/unique_jobs.rb deleted file mode 100644 index d6b63a41..00000000 --- a/lib/sidekiq/middleware/server/unique_jobs.rb +++ /dev/null @@ -1,15 +0,0 @@ -module Sidekiq - module Middleware - module Server - class UniqueJobs - def call(*args) - yield - ensure - json = Sidekiq.dump_json(args[1]) - hash = Digest::MD5.hexdigest(json) - Sidekiq.redis {|conn| conn.del(hash) } - end - end - end - end -end diff --git a/lib/sidekiq/paginator.rb b/lib/sidekiq/paginator.rb new file mode 100644 index 00000000..71cdc506 --- /dev/null +++ b/lib/sidekiq/paginator.rb @@ -0,0 +1,31 @@ +module Sidekiq + module Paginator + def page(key, pageidx=1, page_size=25) + current_page = pageidx.to_i < 1 ? 1 : pageidx.to_i + pageidx = current_page - 1 + total_size = 0 + items = [] + starting = pageidx * page_size + ending = starting + page_size - 1 + + Sidekiq.redis do |conn| + type = conn.type(key) + + case type + when 'zset' + total_size = conn.zcard(key) + items = conn.zrange(key, starting, ending, :with_scores => true) + when 'list' + total_size = conn.llen(key) + items = conn.lrange(key, starting, ending) + when 'none' + return [1, 0, []] + else + raise "can't page a #{type}" + end + end + + [current_page, total_size, items] + end + end +end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb index 2bda07c0..fee1ae2f 100644 --- a/lib/sidekiq/processor.rb +++ b/lib/sidekiq/processor.rb @@ -2,7 +2,6 @@ require 'celluloid' require 'sidekiq/util' require 'sidekiq/middleware/server/active_record' -require 'sidekiq/middleware/server/exception_handler' require 'sidekiq/middleware/server/retry_jobs' require 'sidekiq/middleware/server/logging' require 'sidekiq/middleware/server/timeout' @@ -16,9 +15,10 @@ module Sidekiq include Util include Celluloid + exclusive :process + def self.default_middleware Middleware::Chain.new do |m| - m.add Middleware::Server::ExceptionHandler m.add Middleware::Server::Logging m.add Middleware::Server::RetryJobs m.add Middleware::Server::ActiveRecord @@ -31,23 +31,20 @@ module Sidekiq end def process(msgstr, queue) - # Celluloid actor calls are performed within a Fiber. - # This would give us a terribly small 4KB stack on MRI - # so we use Celluloid's defer to run things in a thread pool - # in order to get a full-sized stack for the Worker. - defer do - msg = Sidekiq.load_json(msgstr) - klass = constantize(msg['class']) - worker = klass.new - worker.class.sidekiq_options(:queue => queue) + msg = Sidekiq.load_json(msgstr) + klass = constantize(msg['class']) + worker = klass.new + worker.class.sidekiq_options(:queue => queue) - stats(worker, msg, queue) do - Sidekiq.server_middleware.invoke(worker, msg, queue) do - worker.perform(*msg['args']) - end + stats(worker, msg, queue) do + Sidekiq.server_middleware.invoke(worker, msg, queue) do + worker.perform(*cloned(msg['args'])) end end @boss.processor_done!(current_actor) + rescue => ex + handle_exception(ex, msg || { :message => msgstr }) + raise end # See http://github.com/tarcieri/celluloid/issues/22 @@ -92,7 +89,18 @@ module Sidekiq end end end + end + # Singleton classes are not clonable. + SINGLETON_CLASSES = [ NilClass, TrueClass, FalseClass, Symbol, Fixnum, Float ].freeze + + # Clone the arguments passed to the worker so that if + # the message fails, what is pushed back onto Redis hasn't + # been mutated by the worker. + def cloned(ary) + ary.map do |val| + SINGLETON_CLASSES.include?(val.class) ? val : val.clone + end end def hostname diff --git a/lib/sidekiq/redis_connection.rb b/lib/sidekiq/redis_connection.rb index ad04cd6b..40e5dfac 100644 --- a/lib/sidekiq/redis_connection.rb +++ b/lib/sidekiq/redis_connection.rb @@ -6,16 +6,17 @@ module Sidekiq class RedisConnection def self.create(options={}) url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0' + driver = options[:driver] || 'ruby' # need a connection for Fetcher and Retry size = options[:size] || (Sidekiq.server? ? (Sidekiq.options[:concurrency] + 2) : 5) ConnectionPool.new(:timeout => 1, :size => size) do - build_client(url, options[:namespace]) + build_client(url, options[:namespace], driver) end end - def self.build_client(url, namespace) - client = Redis.connect(:url => url) + def self.build_client(url, namespace, driver) + client = Redis.connect(:url => url, :driver => driver) if namespace Redis::Namespace.new(namespace, :redis => client) else diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 466daf87..2d77f940 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -22,25 +22,31 @@ module Sidekiq watchdog('scheduling poller thread died!') do add_jitter if first_time - # A message's "score" in Redis is the time at which it should be processed. - # Just check Redis for the set of messages with a timestamp before now. - now = Time.now.to_f.to_s - Sidekiq.redis do |conn| - SETS.each do |sorted_set| - (messages, _) = conn.multi do - conn.zrangebyscore(sorted_set, '-inf', now) - conn.zremrangebyscore(sorted_set, '-inf', now) - end + begin + # A message's "score" in Redis is the time at which it should be processed. + # Just check Redis for the set of messages with a timestamp before now. + now = Time.now.to_f.to_s + Sidekiq.redis do |conn| + SETS.each do |sorted_set| + (messages, _) = conn.multi do + conn.zrangebyscore(sorted_set, '-inf', now) + conn.zremrangebyscore(sorted_set, '-inf', now) + end - messages.each do |message| - logger.debug { "enqueued #{sorted_set}: #{message}" } - msg = Sidekiq.load_json(message) - conn.multi do - conn.sadd('queues', msg['queue']) - conn.rpush("queue:#{msg['queue']}", message) + messages.each do |message| + logger.debug { "enqueued #{sorted_set}: #{message}" } + msg = Sidekiq.load_json(message) + conn.multi do + conn.sadd('queues', msg['queue']) + conn.rpush("queue:#{msg['queue']}", message) + end end end end + rescue SystemCallError => ex + # ECONNREFUSED, etc. Most likely a problem with + # redis networking. Punt and try again at the next interval + logger.warn ex.message end after(poll_interval) { poll } diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index e1c8ba91..87766082 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -1,8 +1,11 @@ +require 'sidekiq/exception_handler' + module Sidekiq ## # This module is part of Sidekiq core and not intended for extensions. # module Util + include ExceptionHandler EXPIRY = 60 * 60 @@ -20,9 +23,7 @@ module Sidekiq def watchdog(last_words) yield rescue => ex - logger.error last_words - logger.error ex - logger.error ex.backtrace.join("\n") + handle_exception(ex, { :context => last_words }) end def logger diff --git a/lib/sidekiq/version.rb b/lib/sidekiq/version.rb index 941fd740..3f129f92 100644 --- a/lib/sidekiq/version.rb +++ b/lib/sidekiq/version.rb @@ -1,3 +1,3 @@ module Sidekiq - VERSION = "2.0.3" + VERSION = "2.1.1" end diff --git a/lib/sidekiq/web.rb b/lib/sidekiq/web.rb index 1bd5c37f..9735c0d8 100644 --- a/lib/sidekiq/web.rb +++ b/lib/sidekiq/web.rb @@ -1,6 +1,8 @@ require 'sinatra/base' require 'slim' require 'sprockets' +require 'sidekiq/paginator' + module Sidekiq class SprocketsMiddleware def initialize(app, options={}) @@ -28,6 +30,8 @@ module Sidekiq end class Web < Sinatra::Base + include Sidekiq::Paginator + dir = File.expand_path(File.dirname(__FILE__) + "/../../web") set :views, "#{dir}/views" set :root, "#{dir}/public" @@ -68,21 +72,6 @@ module Sidekiq Sidekiq.redis { |conn| conn.zcard(name) } end - def retries(count=50) - zcontents('retry', count) - end - - def scheduled(count=50) - zcontents('schedule', count) - end - - def zcontents(name, count) - Sidekiq.redis do |conn| - results = conn.zrange(name, 0, count, :withscores => true) - results.map { |msg, score| [Sidekiq.load_json(msg), score] } - end - end - def queues @queues ||= Sidekiq.redis do |conn| conn.smembers('queues').map do |q| @@ -128,6 +117,10 @@ module Sidekiq slim :index end + get "/poll" do + slim :poll, layout: false + end + get "/queues" do @queues = queues slim :queues @@ -135,9 +128,10 @@ module Sidekiq get "/queues/:name" do halt 404 unless params[:name] - count = (params[:count] || 10).to_i + @count = (params[:count] || 25).to_i @name = params[:name] - @messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, count) }.map { |str| Sidekiq.load_json(str) } + (@current_page, @total_size, @messages) = page("queue:#{@name}", params[:page], @count) + @messages = @messages.map {|msg| Sidekiq.load_json(msg) } slim :queue end @@ -163,12 +157,16 @@ module Sidekiq end get '/retries' do - @retries = retries + @count = (params[:count] || 25).to_i + (@current_page, @total_size, @retries) = page("retry", params[:page], @count) + @retries = @retries.map {|msg, score| [Sidekiq.load_json(msg), score] } slim :retries end get '/scheduled' do - @scheduled = scheduled + @count = (params[:count] || 25).to_i + (@current_page, @total_size, @scheduled) = page("schedule", params[:page], @count) + @scheduled = @scheduled.map {|msg, score| [Sidekiq.load_json(msg), score] } slim :scheduled end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 50bb021d..f673cdf1 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -51,7 +51,6 @@ module Sidekiq # Allows customization for this type of Worker. # Legal options: # - # :unique - enable the UniqueJobs middleware for this Worker, default *true* # :queue - use a named queue for this Worker, default 'default' # :retry - enable the RetryJobs middleware for this Worker, default *true* # :timeout - timeout the perform method after N seconds, default *nil* @@ -61,7 +60,7 @@ module Sidekiq self.sidekiq_options_hash = get_sidekiq_options.merge(stringify_keys(opts || {})) end - DEFAULT_OPTIONS = { 'unique' => true, 'retry' => true, 'queue' => 'default' } + DEFAULT_OPTIONS = { 'retry' => true, 'queue' => 'default' } def get_sidekiq_options # :nodoc: self.sidekiq_options_hash ||= DEFAULT_OPTIONS diff --git a/myapp/Gemfile b/myapp/Gemfile index fc366b95..c445beac 100644 --- a/myapp/Gemfile +++ b/myapp/Gemfile @@ -9,7 +9,7 @@ platforms :jruby do gem 'activerecord-jdbcsqlite3-adapter' end -gem 'rails', '3.2.2' +gem 'rails', '3.2.6' gem 'sidekiq', :path => '..' gem 'capistrano' diff --git a/sidekiq.gemspec b/sidekiq.gemspec index 63f85643..d09dffce 100644 --- a/sidekiq.gemspec +++ b/sidekiq.gemspec @@ -17,7 +17,7 @@ Gem::Specification.new do |gem| gem.add_dependency 'redis', '~> 3' gem.add_dependency 'redis-namespace' gem.add_dependency 'connection_pool', '~> 0.9.2' - gem.add_dependency 'celluloid', '~> 0.11.0' + gem.add_dependency 'celluloid', '~> 0.11.1' gem.add_dependency 'multi_json', '~> 1' gem.add_development_dependency 'minitest', '~> 3' gem.add_development_dependency 'sinatra' diff --git a/test/test_cli.rb b/test/test_cli.rb index 5a7fcb30..361ea3df 100644 --- a/test/test_cli.rb +++ b/test/test_cli.rb @@ -40,14 +40,29 @@ class TestCli < MiniTest::Unit::TestCase assert_equal ['foo'], Sidekiq.options[:queues] end + it 'sets strictly ordered queues if weights are not present' do + @cli.parse(['sidekiq', '-q', 'foo,bar', '-r', './test/fake_env.rb']) + assert_equal true, !!Sidekiq.options[:strict] + end + + it 'does not set strictly ordered queues if weights are present' do + @cli.parse(['sidekiq', '-q', 'foo,3', '-r', './test/fake_env.rb']) + assert_equal false, !!Sidekiq.options[:strict] + end + it 'changes timeout' do @cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb']) assert_equal 30, Sidekiq.options[:timeout] end - it 'handles multiple queues with weights' do + it 'handles multiple queues with weights with multiple switches' do @cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb']) - assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort + assert_equal %w(foo foo foo bar), Sidekiq.options[:queues] + end + + it 'handles multiple queues with weights with a single switch' do + @cli.parse(['sidekiq', '-q', 'bar,foo,3', '-r', './test/fake_env.rb']) + assert_equal %w(bar foo foo foo), Sidekiq.options[:queues] end it 'sets verbose' do @@ -163,6 +178,24 @@ class TestCli < MiniTest::Unit::TestCase assert_equal 3, Sidekiq.options[:queues].count { |q| q == 'seldom' } end end + + describe 'Sidekiq::CLI#parse_queues' do + describe 'when weight is present' do + it 'concatenates queue to opts[:queues] weight number of times' do + opts = {} + @cli.send :parse_queues, opts, 'often', 7 + assert_equal %w[often] * 7, opts[:queues] + end + end + + describe 'when weight is not present' do + it 'concatenates queue to opts[:queues] once' do + opts = {} + @cli.send :parse_queues, opts, 'once', nil + assert_equal %w[once], opts[:queues] + end + end + end end end diff --git a/test/test_client.rb b/test/test_client.rb index 9f432fde..cd52904f 100644 --- a/test/test_client.rb +++ b/test/test_client.rb @@ -3,36 +3,6 @@ require 'sidekiq/client' require 'sidekiq/worker' class TestClient < MiniTest::Unit::TestCase - describe 'with real redis' do - before do - Sidekiq.redis = REDIS - Sidekiq.redis {|c| c.flushdb } - end - - class QueueWorker - include Sidekiq::Worker - sidekiq_options :queue => 'customqueue' - end - - it 'does not push duplicate messages when configured for unique only' do - Sidekiq.client_middleware do |chain| - chain.add Sidekiq::Middleware::Client::UniqueJobs - end - QueueWorker.sidekiq_options :unique => true - 10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) } - assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") } - end - - it 'does push duplicate messages when not configured for unique only' do - Sidekiq.client_middleware do |chain| - chain.add Sidekiq::Middleware::Client::UniqueJobs - end - QueueWorker.sidekiq_options :unique => false - 10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) } - assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue") } - end - end - describe 'with mock redis' do before do @redis = MiniTest::Mock.new diff --git a/test/test_exception_handler.rb b/test/test_exception_handler.rb new file mode 100644 index 00000000..bda9fdb9 --- /dev/null +++ b/test/test_exception_handler.rb @@ -0,0 +1,109 @@ +require 'helper' +require 'sidekiq' +require 'sidekiq/exception_handler' +require 'stringio' +require 'logger' + +ExceptionHandlerTestException = Class.new(StandardError) +TEST_EXCEPTION = ExceptionHandlerTestException.new("Something didn't work!") + +class Component + include Sidekiq::Util + + def invoke_exception(args) + raise TEST_EXCEPTION + rescue ExceptionHandlerTestException => e + handle_exception(e,args) + end +end + +class TestExceptionHandler < MiniTest::Unit::TestCase + describe "with mock logger" do + before do + @old_logger = Sidekiq.logger + @str_logger = StringIO.new + Sidekiq.logger = Logger.new(@str_logger) + end + + after do + Sidekiq.logger = @old_logger + end + + it "logs the exception to Sidekiq.logger" do + Component.new.invoke_exception(:a => 1) + @str_logger.rewind + log = @str_logger.readlines + assert_match /a=>1/, log[0], "didn't include the context" + assert_match /Something didn't work!/, log[1], "didn't include the exception message" + assert_match /test\/test_exception_handler.rb/, log[2], "didn't include the backtrace" + end + end + + describe "with fake Airbrake" do + before do + ::Airbrake = MiniTest::Mock.new + end + + after do + Object.send(:remove_const, "Airbrake") # HACK should probably inject Airbrake etc into this class in the future + end + + it "notifies Airbrake" do + ::Airbrake.expect(:notify,nil,[TEST_EXCEPTION,:parameters => { :a => 1 }]) + Component.new.invoke_exception(:a => 1) + ::Airbrake.verify + end + end + + describe "with fake ExceptionNotifier" do + before do + ::ExceptionNotifier = Module.new + ::ExceptionNotifier::Notifier = MiniTest::Mock.new + end + + after do + Object.send(:remove_const, "ExceptionNotifier") + end + + it "notifies ExceptionNotifier" do + ::ExceptionNotifier::Notifier.expect(:background_exception_notification,nil,[TEST_EXCEPTION, :data => { :message => { :b => 2 } }]) + Component.new.invoke_exception(:b => 2) + ::ExceptionNotifier::Notifier.verify + end + end + + describe "with fake Exceptional" do + before do + ::Exceptional = Class.new do + + def self.context(msg) + @msg = msg + end + + def self.check_context + @msg + end + end + + ::Exceptional::Config = MiniTest::Mock.new + ::Exceptional::Remote = MiniTest::Mock.new + ::Exceptional::ExceptionData = MiniTest::Mock.new + end + + after do + Object.send(:remove_const, "Exceptional") + end + + it "notifies Exceptional" do + ::Exceptional::Config.expect(:should_send_to_api?,true) + exception_data = MiniTest::Mock.new + ::Exceptional::Remote.expect(:error,nil,[exception_data]) + ::Exceptional::ExceptionData.expect(:new,exception_data,[TEST_EXCEPTION]) + Component.new.invoke_exception(:c => 3) + assert_equal({:c => 3},::Exceptional.check_context,"did not record arguments properly") + ::Exceptional::Config.verify + ::Exceptional::Remote.verify + ::Exceptional::ExceptionData.verify + end + end +end diff --git a/test/test_fetch.rb b/test/test_fetch.rb new file mode 100644 index 00000000..dfbea6f4 --- /dev/null +++ b/test/test_fetch.rb @@ -0,0 +1,13 @@ +require 'helper' +require 'sidekiq/fetch' + +class TestFetcher < MiniTest::Unit::TestCase + describe 'Fetcher#queues_cmd' do + describe 'when queues are strictly ordered' do + it 'returns the unique ordered queues properly based on priority and order they were passed in' do + fetcher = Sidekiq::Fetcher.new nil, %w[high medium low default], true + assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd) + end + end + end +end diff --git a/test/test_manager.rb b/test/test_manager.rb deleted file mode 100644 index 07a3eb39..00000000 --- a/test/test_manager.rb +++ /dev/null @@ -1,51 +0,0 @@ -require 'helper' -require 'sidekiq' -require 'sidekiq/manager' - -# for TimedQueue -require 'connection_pool' - -class TestManager < MiniTest::Unit::TestCase - describe 'with redis' do - before do - Sidekiq.redis = REDIS - Sidekiq.redis {|c| c.flushdb } - $processed = 0 - $mutex = Mutex.new - end - - class IntegrationWorker - include Sidekiq::Worker - sidekiq_options :queue => 'foo' - - def perform(a, b) - $mutex.synchronize do - $processed += 1 - end - a + b - end - end - - it 'processes messages' do - IntegrationWorker.perform_async(1, 2) - IntegrationWorker.perform_async(1, 3) - - q = TimedQueue.new - mgr = Sidekiq::Manager.new(:queues => [:foo], :concurrency => 2) - mgr.when_done do |_| - q << 'done' if $processed == 2 - end - mgr.start! - result = q.timed_pop(1.0) - assert_equal 'done', result - mgr.stop - mgr.terminate - - # Gross bloody hack because I can't get the actor threads - # to shut down cleanly in the test. Need @bascule's help here. - (Thread.list - [Thread.current]).each do |t| - t.raise Interrupt - end - end - end -end diff --git a/test/test_middleware.rb b/test/test_middleware.rb index 44a7f964..2546d487 100644 --- a/test/test_middleware.rb +++ b/test/test_middleware.rb @@ -1,6 +1,5 @@ require 'helper' require 'sidekiq/middleware/chain' -require 'sidekiq/middleware/server/unique_jobs' require 'sidekiq/processor' class TestMiddleware < MiniTest::Unit::TestCase @@ -10,18 +9,6 @@ class TestMiddleware < MiniTest::Unit::TestCase Sidekiq.redis = REDIS end - it 'handles errors' do - handler = Sidekiq::Middleware::Server::ExceptionHandler.new - - assert_raises ArgumentError do - handler.call('', { :a => 1 }, 'default') do - raise ArgumentError - end - end - assert_equal 1, $errors.size - assert_equal({ :a => 1 }, $errors[0][:parameters]) - end - class CustomMiddleware def initialize(name, recorder) @name = name @@ -84,10 +71,3 @@ class TestMiddleware < MiniTest::Unit::TestCase end end end - -class FakeAirbrake - def self.notify(ex, hash) - $errors << hash - end -end -Airbrake = FakeAirbrake diff --git a/test/test_processor.rb b/test/test_processor.rb index bdbfa8f3..2ff09b94 100644 --- a/test/test_processor.rb +++ b/test/test_processor.rb @@ -2,11 +2,14 @@ require 'helper' require 'sidekiq/processor' class TestProcessor < MiniTest::Unit::TestCase + TestException = Class.new(StandardError) + TEST_EXCEPTION = TestException.new("kerboom!") + describe 'with mock setup' do before do $invokes = 0 - $errors = [] @boss = MiniTest::Mock.new + @processor = ::Sidekiq::Processor.new(@boss) Celluloid.logger = nil Sidekiq.redis = REDIS end @@ -14,19 +17,51 @@ class TestProcessor < MiniTest::Unit::TestCase class MockWorker include Sidekiq::Worker def perform(args) - raise "kerboom!" if args == 'boom' + raise TEST_EXCEPTION if args == 'boom' + args.pop if args.is_a? Array $invokes += 1 end end it 'processes as expected' do msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] }) - processor = ::Sidekiq::Processor.new(@boss) - @boss.expect(:processor_done!, nil, [processor]) - processor.process(msg, 'default') + @boss.expect(:processor_done!, nil, [@processor]) + @processor.process(msg, 'default') @boss.verify assert_equal 1, $invokes - assert_equal 0, $errors.size + end + + it 'passes exceptions to ExceptionHandler' do + msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) + begin + @processor.process(msg, 'default') + flunk "Expected #process to raise exception" + rescue TestException + end + + assert_equal 0, $invokes + end + + it 're-raises exceptions after handling' do + msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] }) + re_raise = false + + begin + @processor.process(msg, 'default') + rescue TestException + re_raise = true + end + + assert re_raise, "does not re-raise exceptions after handling" + end + + it 'does not modify original arguments' do + msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] } + msgstr = Sidekiq.dump_json(msg) + processor = ::Sidekiq::Processor.new(@boss) + @boss.expect(:processor_done!, nil, [processor]) + processor.process(msgstr, 'default') + assert_equal [['myarg']], msg['args'] end end end diff --git a/test/test_web.rb b/test/test_web.rb index 3cdcad27..daa8684d 100644 --- a/test/test_web.rb +++ b/test/test_web.rb @@ -31,6 +31,14 @@ class TestWeb < MiniTest::Unit::TestCase refute_match /default/, last_response.body end + it 'can display poll' do + get '/poll' + assert_equal 200, last_response.status + assert_match /hero-unit/, last_response.body + assert_match /workers/, last_response.body + refute_match /navbar/, last_response.body + end + it 'can display queues' do assert Sidekiq::Client.push('queue' => :foo, 'class' => WebWorker, 'args' => [1, 3]) @@ -115,26 +123,26 @@ class TestWeb < MiniTest::Unit::TestCase assert_equal 200, last_response.status assert_match /HardWorker/, last_response.body end - + it 'can delete a single retry' do _, score = add_retry post "/retries/#{score}", 'delete' => 'Delete' assert_equal 302, last_response.status assert_equal 'http://example.org/retries', last_response.header['Location'] - + get "/retries" assert_equal 200, last_response.status refute_match /#{score}/, last_response.body end - + it 'can retry a single retry now' do msg, score = add_retry post "/retries/#{score}", 'retry' => 'Retry' assert_equal 302, last_response.status assert_equal 'http://example.org/retries', last_response.header['Location'] - + get '/queues/default' assert_equal 200, last_response.status assert_match /#{msg['args'][2]}/, last_response.body diff --git a/web/assets/javascripts/application.js b/web/assets/javascripts/application.js index dfffaf75..1a362546 100644 --- a/web/assets/javascripts/application.js +++ b/web/assets/javascripts/application.js @@ -18,3 +18,32 @@ $(function() { } }); }); + +$(function() { + $('a[name=poll]').data('polling', false); + + $('a[name=poll]').on('click', function(e) { + e.preventDefault(); + var pollLink = $(this); + if (pollLink.data('polling')) { + clearInterval(pollLink.data('interval')); + pollLink.text('Live Poll'); + $('.poll-status').text(''); + } + else { + var href = pollLink.attr('href'); + pollLink.data('interval', setInterval(function() { + $.get(href, function(data) { + var responseHtml = $(data); + $('.hero-unit').replaceWith(responseHtml.find('.hero-unit')); + $('.workers').replaceWith(responseHtml.find('.workers')); + }); + var currentTime = new Date(); + $('.poll-status').text('Last polled at: ' + currentTime.getHours() + ':' + currentTime.getMinutes() + ':' + currentTime.getSeconds()); + }, 2000)); + $('.poll-status').text('Starting to poll...'); + pollLink.text('Stop Polling'); + } + pollLink.data('polling', !pollLink.data('polling')); + }) +}); diff --git a/web/assets/stylesheets/layout.css b/web/assets/stylesheets/layout.css index c97bcdab..b0e3bf13 100644 --- a/web/assets/stylesheets/layout.css +++ b/web/assets/stylesheets/layout.css @@ -20,3 +20,7 @@ code { .hero-unit { padding: 30px; } + +.poll-status { + padding-left: 10px; +} diff --git a/web/views/_paging.slim b/web/views/_paging.slim new file mode 100644 index 00000000..867f507e --- /dev/null +++ b/web/views/_paging.slim @@ -0,0 +1,15 @@ +- if @total_size > @count + .pagination.pagination-right + ul + li class="#{'disabled' if @current_page == 1}" + a href="#{url}?page=1" « + - if @current_page > 1 + li + a href="#{url}?page=#{@current_page - 1}" #{@current_page - 1} + li.disabled + a href="#{url}?page=#{@current_page}" #{@current_page} + - if @total_size > @current_page * @count + li + a href="#{url}?page=#{@current_page + 1}" #{@current_page + 1} + li class="#{'disabled' if @total_size <= @current_page * @count}" + a href="#{url}?page=#{(@total_size / @count).ceil + 1}" » diff --git a/web/views/_summary.slim b/web/views/_summary.slim new file mode 100644 index 00000000..96b281de --- /dev/null +++ b/web/views/_summary.slim @@ -0,0 +1,8 @@ +.hero-unit + h1 Sidekiq is #{current_status} + p Processed: #{processed} + p Failed: #{failed} + p Busy Workers: #{workers.size} + p Scheduled: #{zcard('schedule')} + p Retries Pending: #{zcard('retry')} + p Queue Backlog: #{backlog} diff --git a/web/views/_workers.slim b/web/views/_workers.slim new file mode 100644 index 00000000..0be082dd --- /dev/null +++ b/web/views/_workers.slim @@ -0,0 +1,14 @@ +table class="table table-striped table-bordered workers" + tr + th Worker + th Queue + th Class + th Arguments + th Started + - workers.each do |(worker, msg)| + tr + td= worker + td= msg['queue'] + td= msg['payload']['class'] + td= msg['payload']['args'].inspect[0..100] + td== relative_time(Time.parse(msg['run_at'])) diff --git a/web/views/index.slim b/web/views/index.slim index 57e90005..1525d568 100644 --- a/web/views/index.slim +++ b/web/views/index.slim @@ -1,25 +1,10 @@ -.hero-unit - h1 Sidekiq is #{current_status} - p Processed: #{processed} - p Failed: #{failed} - p Busy Workers: #{workers.size} - p Scheduled: #{zcard('schedule')} - p Retries Pending: #{zcard('retry')} - p Queue Backlog: #{backlog} +== slim :_summary + +.poll + a*{name: 'poll'} href='#{{root_path}}poll' Live Poll + span class="poll-status" + +== slim :_workers -table class="table table-striped table-bordered" - tr - th Worker - th Queue - th Class - th Arguments - th Started - - workers.each do |(worker, msg)| - tr - td= worker - td= msg['queue'] - td= msg['payload']['class'] - td= msg['payload']['args'].inspect[0..100] - td== relative_time(Time.parse(msg['run_at'])) form action="#{root_path}reset" method="post" - button.btn type="submit" title="If you kill -9 Sidekiq, this table can fill up with old data." Clear worker list + button.btn type="submit" title="If you kill -9 Sidekiq, this table can fill up with old data." Clear worker list diff --git a/web/views/layout.slim b/web/views/layout.slim index c54731d6..e2bb079f 100644 --- a/web/views/layout.slim +++ b/web/views/layout.slim @@ -13,7 +13,7 @@ html span.icon-bar a.brand href='#{{root_path}}' | Sidekiq - div.nav-collapse + div.nav-collapse ul.nav li a href='#{{root_path}}' Home diff --git a/web/views/poll.slim b/web/views/poll.slim new file mode 100644 index 00000000..827c96ed --- /dev/null +++ b/web/views/poll.slim @@ -0,0 +1,3 @@ +div + == slim :_summary + == slim :_workers diff --git a/web/views/queue.slim b/web/views/queue.slim index 6c122030..3d7e6b7e 100644 --- a/web/views/queue.slim +++ b/web/views/queue.slim @@ -1,5 +1,7 @@ header - h1 Latest messages in #{@name} + h1 Current messages in #{@name} + +== slim :_paging, :locals => { :url => "#{root_path}queues/#{@name}" } table class="table table-striped table-bordered" tr @@ -9,3 +11,5 @@ table class="table table-striped table-bordered" tr td= msg['class'] td= msg['args'].inspect[0..100] + +== slim :_paging, :locals => { :url => "#{root_path}queues/#{@name}" } diff --git a/web/views/retries.slim b/web/views/retries.slim index 023c3a12..1cbeb29a 100644 --- a/web/views/retries.slim +++ b/web/views/retries.slim @@ -1,6 +1,8 @@ h1 Retries - if @retries.size > 0 + == slim :_paging, :locals => { :url => "#{root_path}retries" } + form action="#{root_path}retries" method="post" table class="table table-striped table-bordered" tr diff --git a/web/views/scheduled.slim b/web/views/scheduled.slim index db85e07a..3e8ff34c 100644 --- a/web/views/scheduled.slim +++ b/web/views/scheduled.slim @@ -1,6 +1,8 @@ h1 Scheduled Jobs - if @scheduled.size > 0 + == slim :_paging, :locals => { :url => "#{root_path}scheduled" } + form action="#{root_path}scheduled" method="post" table class="table table-striped table-bordered" tr