1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
Conflicts:
	lib/sidekiq/processor.rb
This commit is contained in:
Niels Kristian 2012-08-09 13:44:12 +02:00
commit 13611f8b81
43 changed files with 488 additions and 329 deletions

View file

@ -1,8 +1,20 @@
HEAD
-----------
- Defer loading JSON until a full Thread stack is available. Celluloid's
standard 4k actor stack will lead to crashes when parsing large payloads.
- Handle networking errors causing the scheduler thread to die [#309]
- Rework exception handling to log all Processor and actor death (#325, subelsky)
- Clone arguments when calling worker so modifications are discarded. (#265, hakanensari)
2.1.0
-----------
- Tune Celluloid to no longer run message processing within a Fiber.
This gives us a full Thread stack and also lowers Sidekiq's memory
usage.
- Add pagination within the Web UI [#253]
- Specify which Redis driver to use: *hiredis* or *ruby* (default)
- Remove FailureJobs and UniqueJobs, which were optional middleware
that I don't want to support in core. [#302]
2.0.3
-----------

View file

@ -1,10 +1,11 @@
source 'http://rubygems.org'
gemspec
gem 'celluloid'
gem 'slim'
gem 'sprockets'
gem 'sass'
gem 'rails', '3.2.3'
gem 'rails', '3.2.7'
gem 'sqlite3'
group :test do

View file

@ -1,6 +1,9 @@
Sidekiq [![Build Status](https://secure.travis-ci.org/mperham/sidekiq.png)](http://travis-ci.org/mperham/sidekiq)
Sidekiq
==============
[![Build Status](https://secure.travis-ci.org/mperham/sidekiq.png)](http://travis-ci.org/mperham/sidekiq)
[![Dependency Status](https://gemnasium.com/mperham/sidekiq.png)](https://gemnasium.com/mperham/sidekiq)
Simple, efficient message processing for Ruby.
Sidekiq uses threads to handle many messages at the same time in the
@ -27,6 +30,8 @@ Requirements
I test on Ruby 1.9.3 and JRuby 1.6.x in 1.9 mode. Other versions/VMs are
untested but I will do my best to support them. Ruby 1.8 is not supported.
Redis 2.0 or greater is required.
Installation
-----------------
@ -38,6 +43,7 @@ Getting Started
-----------------
See the [sidekiq home page](http://mperham.github.com/sidekiq) for the simple 4-step process.
You can watch [Railscast #366](http://railscasts.com/episodes/366-sidekiq) to see Sidekiq in action.
More Information
@ -52,6 +58,15 @@ and email to <sidekiq@librelist.org> with a greeting in the body. To unsubscribe
Once archiving begins, you'll be able to visit [the archives](http://librelist.com/browser/sidekiq/) to see past threads.
Problems?
-----------------
**Please do not directly email any Sidekiq committers with questions or problems.** A community is best served when discussions are held in public.
If you have a problem, please review the [FAQ](/mperham/sidekiq/wiki/FAQ) and [Troubleshooting](/mperham/sidekiq/wiki/Problems-and-Troubleshooting) wiki pages. Searching the issues for your problem is also a good idea. If that doesn't help, feel free to email the Sidekiq mailing list or open a new issue.
The mailing list is the preferred place to ask questions on usage. If you are encountering what you think is a bug, please open an issue.
License
-----------------

View file

@ -124,7 +124,6 @@ module Sidekiq
def validate!
options[:queues] << 'default' if options[:queues].empty?
options[:queues].shuffle!
if !File.exist?(options[:require]) ||
(File.directory?(options[:require]) && !File.exist?("#{options[:require]}/config/application.rb"))
@ -141,9 +140,10 @@ module Sidekiq
opts = {}
@parser = OptionParser.new do |o|
o.on "-q", "--queue QUEUE,WEIGHT", "Queue to process, with optional weight" do |arg|
q, weight = arg.split(",")
parse_queues(opts, q, weight)
o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg|
queues_and_weights = arg.scan(/(\w+),?(\d*)/)
queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)}
opts[:strict] = queues_and_weights.collect(&:last).none? {|weight| weight != ''}
end
o.on "-v", "--verbose", "Print more verbose output" do
@ -208,7 +208,7 @@ module Sidekiq
end
def parse_queues(opts, q, weight)
(weight || 1).to_i.times do
[weight.to_i, 1].max.times do
(opts[:queues] ||= []) << q
end
end

View file

@ -1,5 +1,4 @@
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/client/unique_jobs'
module Sidekiq
class Client

View file

@ -0,0 +1,30 @@
module Sidekiq
module ExceptionHandler
def handle_exception(ex, msg)
Sidekiq.logger.warn msg
Sidekiq.logger.warn ex
Sidekiq.logger.warn ex.backtrace.join("\n")
send_to_airbrake(msg, ex) if defined?(::Airbrake)
send_to_exceptional(msg, ex) if defined?(::Exceptional)
send_to_exception_notifier(msg, ex) if defined?(::ExceptionNotifier)
end
private
def send_to_airbrake(msg, ex)
::Airbrake.notify(ex, :parameters => msg)
end
def send_to_exceptional(msg, ex)
if ::Exceptional::Config.should_send_to_api?
::Exceptional.context(msg)
::Exceptional::Remote.error(::Exceptional::ExceptionData.new(ex))
end
end
def send_to_exception_notifier(msg, ex)
::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg })
end
end
end

View file

@ -1,6 +1,6 @@
module Sidekiq
module Extensions
class Proxy < (RUBY_VERSION < '1.9' ? Object : BasicObject)
class Proxy < BasicObject
def initialize(performable, target, at=nil)
@performable = performable
@target = target

View file

@ -12,8 +12,9 @@ module Sidekiq
TIMEOUT = 1
def initialize(mgr, queues)
def initialize(mgr, queues, strict)
@mgr = mgr
@strictly_ordered_queues = strict
@queues = queues.map { |q| "queue:#{q}" }
@unique_queues = @queues.uniq
end
@ -68,6 +69,7 @@ module Sidekiq
# recreate the queue command each time we invoke Redis#blpop
# to honor weights and avoid queue starvation.
def queues_cmd
return @unique_queues.dup << TIMEOUT if @strictly_ordered_queues
queues = @queues.sample(@unique_queues.size).uniq
queues.concat(@unique_queues - queues)
queues << TIMEOUT

View file

@ -27,7 +27,7 @@ module Sidekiq
@in_progress = {}
@done = false
@busy = []
@fetcher = Fetcher.new(current_actor, options[:queues])
@fetcher = Fetcher.new(current_actor, options[:queues], !!options[:strict])
@ready = @count.times.map { Processor.new_link(current_actor) }
procline
end

View file

@ -7,22 +7,26 @@ module Sidekiq
#
# To add middleware for the client:
#
# Sidekiq.client_middleware do |chain|
# chain.add MyClientHook
# Sidekiq.configure_client do |config|
# config.client_middleware do |chain|
# chain.add MyClientHook
# end
# end
#
# To modify middleware for the server, just call
# with another block:
#
# Sidekiq.server_middleware do |chain|
# chain.add MyServerHook
# chain.remove ActiveRecord
# Sidekiq.configure_server do |config|
# config.server_middleware do |chain|
# chain.add MyServerHook
# chain.remove ActiveRecord
# end
# end
#
# This is an example of a minimal server middleware:
#
# class MyServerHook
# def call(worker, msg, queue)
# def call(worker_instance, msg, queue)
# puts "Before work"
# yield
# puts "After work"
@ -32,7 +36,7 @@ module Sidekiq
# This is an example of a minimal client middleware:
#
# class MyClientHook
# def call(msg, queue)
# def call(worker_class, msg, queue)
# puts "Before push"
# yield
# puts "After push"

View file

@ -1,35 +0,0 @@
require 'digest'
module Sidekiq
module Middleware
module Client
class UniqueJobs
HASH_KEY_EXPIRATION = 30 * 60
def call(worker_class, item, queue)
enabled = worker_class.get_sidekiq_options['unique']
if enabled
payload_hash = Digest::MD5.hexdigest(Sidekiq.dump_json(item))
unique = false
Sidekiq.redis do |conn|
conn.watch(payload_hash)
if conn.get(payload_hash)
conn.unwatch
else
unique = conn.multi do
conn.setex(payload_hash, HASH_KEY_EXPIRATION, 1)
end
end
end
yield if unique
else
yield
end
end
end
end
end
end

View file

@ -1,38 +0,0 @@
require 'sidekiq/util'
module Sidekiq
module Middleware
module Server
class ExceptionHandler
include Util
def call(*args)
yield
rescue => ex
logger.warn ex
logger.warn ex.backtrace.join("\n")
send_to_airbrake(args[1], ex) if defined?(::Airbrake)
send_to_exceptional(args[1], ex) if defined?(::Exceptional)
send_to_exception_notifier(args[1], ex) if defined?(::ExceptionNotifier)
raise
end
private
def send_to_airbrake(msg, ex)
::Airbrake.notify(ex, :parameters => msg)
end
def send_to_exceptional(msg, ex)
if ::Exceptional::Config.should_send_to_api?
::Exceptional.context(msg)
::Exceptional::Remote.error(::Exceptional::ExceptionData.new(ex))
end
end
def send_to_exception_notifier(msg, ex)
::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg })
end
end
end
end
end

View file

@ -1,23 +0,0 @@
module Sidekiq
module Middleware
module Server
class FailureJobs
def call(*args)
yield
rescue => e
data = {
:failed_at => Time.now.strftime("%Y/%m/%d %H:%M:%S %Z"),
:payload => args[1],
:exception => e.class.to_s,
:error => e.to_s,
:backtrace => e.backtrace,
:worker => args[1]['class'],
:queue => args[2]
}
Sidekiq.redis {|conn| conn.rpush(:failed, Sidekiq.dump_json(data)) }
raise
end
end
end
end
end

View file

@ -1,15 +0,0 @@
module Sidekiq
module Middleware
module Server
class UniqueJobs
def call(*args)
yield
ensure
json = Sidekiq.dump_json(args[1])
hash = Digest::MD5.hexdigest(json)
Sidekiq.redis {|conn| conn.del(hash) }
end
end
end
end
end

31
lib/sidekiq/paginator.rb Normal file
View file

@ -0,0 +1,31 @@
module Sidekiq
module Paginator
def page(key, pageidx=1, page_size=25)
current_page = pageidx.to_i < 1 ? 1 : pageidx.to_i
pageidx = current_page - 1
total_size = 0
items = []
starting = pageidx * page_size
ending = starting + page_size - 1
Sidekiq.redis do |conn|
type = conn.type(key)
case type
when 'zset'
total_size = conn.zcard(key)
items = conn.zrange(key, starting, ending, :with_scores => true)
when 'list'
total_size = conn.llen(key)
items = conn.lrange(key, starting, ending)
when 'none'
return [1, 0, []]
else
raise "can't page a #{type}"
end
end
[current_page, total_size, items]
end
end
end

View file

@ -2,7 +2,6 @@ require 'celluloid'
require 'sidekiq/util'
require 'sidekiq/middleware/server/active_record'
require 'sidekiq/middleware/server/exception_handler'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'
require 'sidekiq/middleware/server/timeout'
@ -16,9 +15,10 @@ module Sidekiq
include Util
include Celluloid
exclusive :process
def self.default_middleware
Middleware::Chain.new do |m|
m.add Middleware::Server::ExceptionHandler
m.add Middleware::Server::Logging
m.add Middleware::Server::RetryJobs
m.add Middleware::Server::ActiveRecord
@ -31,23 +31,20 @@ module Sidekiq
end
def process(msgstr, queue)
# Celluloid actor calls are performed within a Fiber.
# This would give us a terribly small 4KB stack on MRI
# so we use Celluloid's defer to run things in a thread pool
# in order to get a full-sized stack for the Worker.
defer do
msg = Sidekiq.load_json(msgstr)
klass = constantize(msg['class'])
worker = klass.new
worker.class.sidekiq_options(:queue => queue)
msg = Sidekiq.load_json(msgstr)
klass = constantize(msg['class'])
worker = klass.new
worker.class.sidekiq_options(:queue => queue)
stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
worker.perform(*msg['args'])
end
stats(worker, msg, queue) do
Sidekiq.server_middleware.invoke(worker, msg, queue) do
worker.perform(*cloned(msg['args']))
end
end
@boss.processor_done!(current_actor)
rescue => ex
handle_exception(ex, msg || { :message => msgstr })
raise
end
# See http://github.com/tarcieri/celluloid/issues/22
@ -92,7 +89,18 @@ module Sidekiq
end
end
end
end
# Singleton classes are not clonable.
SINGLETON_CLASSES = [ NilClass, TrueClass, FalseClass, Symbol, Fixnum, Float ].freeze
# Clone the arguments passed to the worker so that if
# the message fails, what is pushed back onto Redis hasn't
# been mutated by the worker.
def cloned(ary)
ary.map do |val|
SINGLETON_CLASSES.include?(val.class) ? val : val.clone
end
end
def hostname

View file

@ -6,16 +6,17 @@ module Sidekiq
class RedisConnection
def self.create(options={})
url = options[:url] || ENV['REDISTOGO_URL'] || 'redis://localhost:6379/0'
driver = options[:driver] || 'ruby'
# need a connection for Fetcher and Retry
size = options[:size] || (Sidekiq.server? ? (Sidekiq.options[:concurrency] + 2) : 5)
ConnectionPool.new(:timeout => 1, :size => size) do
build_client(url, options[:namespace])
build_client(url, options[:namespace], driver)
end
end
def self.build_client(url, namespace)
client = Redis.connect(:url => url)
def self.build_client(url, namespace, driver)
client = Redis.connect(:url => url, :driver => driver)
if namespace
Redis::Namespace.new(namespace, :redis => client)
else

View file

@ -22,25 +22,31 @@ module Sidekiq
watchdog('scheduling poller thread died!') do
add_jitter if first_time
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
(messages, _) = conn.multi do
conn.zrangebyscore(sorted_set, '-inf', now)
conn.zremrangebyscore(sorted_set, '-inf', now)
end
begin
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
(messages, _) = conn.multi do
conn.zrangebyscore(sorted_set, '-inf', now)
conn.zremrangebyscore(sorted_set, '-inf', now)
end
messages.each do |message|
logger.debug { "enqueued #{sorted_set}: #{message}" }
msg = Sidekiq.load_json(message)
conn.multi do
conn.sadd('queues', msg['queue'])
conn.rpush("queue:#{msg['queue']}", message)
messages.each do |message|
logger.debug { "enqueued #{sorted_set}: #{message}" }
msg = Sidekiq.load_json(message)
conn.multi do
conn.sadd('queues', msg['queue'])
conn.rpush("queue:#{msg['queue']}", message)
end
end
end
end
rescue SystemCallError => ex
# ECONNREFUSED, etc. Most likely a problem with
# redis networking. Punt and try again at the next interval
logger.warn ex.message
end
after(poll_interval) { poll }

View file

@ -1,8 +1,11 @@
require 'sidekiq/exception_handler'
module Sidekiq
##
# This module is part of Sidekiq core and not intended for extensions.
#
module Util
include ExceptionHandler
EXPIRY = 60 * 60
@ -20,9 +23,7 @@ module Sidekiq
def watchdog(last_words)
yield
rescue => ex
logger.error last_words
logger.error ex
logger.error ex.backtrace.join("\n")
handle_exception(ex, { :context => last_words })
end
def logger

View file

@ -1,3 +1,3 @@
module Sidekiq
VERSION = "2.0.3"
VERSION = "2.1.1"
end

View file

@ -1,6 +1,8 @@
require 'sinatra/base'
require 'slim'
require 'sprockets'
require 'sidekiq/paginator'
module Sidekiq
class SprocketsMiddleware
def initialize(app, options={})
@ -28,6 +30,8 @@ module Sidekiq
end
class Web < Sinatra::Base
include Sidekiq::Paginator
dir = File.expand_path(File.dirname(__FILE__) + "/../../web")
set :views, "#{dir}/views"
set :root, "#{dir}/public"
@ -68,21 +72,6 @@ module Sidekiq
Sidekiq.redis { |conn| conn.zcard(name) }
end
def retries(count=50)
zcontents('retry', count)
end
def scheduled(count=50)
zcontents('schedule', count)
end
def zcontents(name, count)
Sidekiq.redis do |conn|
results = conn.zrange(name, 0, count, :withscores => true)
results.map { |msg, score| [Sidekiq.load_json(msg), score] }
end
end
def queues
@queues ||= Sidekiq.redis do |conn|
conn.smembers('queues').map do |q|
@ -128,6 +117,10 @@ module Sidekiq
slim :index
end
get "/poll" do
slim :poll, layout: false
end
get "/queues" do
@queues = queues
slim :queues
@ -135,9 +128,10 @@ module Sidekiq
get "/queues/:name" do
halt 404 unless params[:name]
count = (params[:count] || 10).to_i
@count = (params[:count] || 25).to_i
@name = params[:name]
@messages = Sidekiq.redis {|conn| conn.lrange("queue:#{@name}", 0, count) }.map { |str| Sidekiq.load_json(str) }
(@current_page, @total_size, @messages) = page("queue:#{@name}", params[:page], @count)
@messages = @messages.map {|msg| Sidekiq.load_json(msg) }
slim :queue
end
@ -163,12 +157,16 @@ module Sidekiq
end
get '/retries' do
@retries = retries
@count = (params[:count] || 25).to_i
(@current_page, @total_size, @retries) = page("retry", params[:page], @count)
@retries = @retries.map {|msg, score| [Sidekiq.load_json(msg), score] }
slim :retries
end
get '/scheduled' do
@scheduled = scheduled
@count = (params[:count] || 25).to_i
(@current_page, @total_size, @scheduled) = page("schedule", params[:page], @count)
@scheduled = @scheduled.map {|msg, score| [Sidekiq.load_json(msg), score] }
slim :scheduled
end

View file

@ -51,7 +51,6 @@ module Sidekiq
# Allows customization for this type of Worker.
# Legal options:
#
# :unique - enable the UniqueJobs middleware for this Worker, default *true*
# :queue - use a named queue for this Worker, default 'default'
# :retry - enable the RetryJobs middleware for this Worker, default *true*
# :timeout - timeout the perform method after N seconds, default *nil*
@ -61,7 +60,7 @@ module Sidekiq
self.sidekiq_options_hash = get_sidekiq_options.merge(stringify_keys(opts || {}))
end
DEFAULT_OPTIONS = { 'unique' => true, 'retry' => true, 'queue' => 'default' }
DEFAULT_OPTIONS = { 'retry' => true, 'queue' => 'default' }
def get_sidekiq_options # :nodoc:
self.sidekiq_options_hash ||= DEFAULT_OPTIONS

View file

@ -9,7 +9,7 @@ platforms :jruby do
gem 'activerecord-jdbcsqlite3-adapter'
end
gem 'rails', '3.2.2'
gem 'rails', '3.2.6'
gem 'sidekiq', :path => '..'
gem 'capistrano'

View file

@ -17,7 +17,7 @@ Gem::Specification.new do |gem|
gem.add_dependency 'redis', '~> 3'
gem.add_dependency 'redis-namespace'
gem.add_dependency 'connection_pool', '~> 0.9.2'
gem.add_dependency 'celluloid', '~> 0.11.0'
gem.add_dependency 'celluloid', '~> 0.11.1'
gem.add_dependency 'multi_json', '~> 1'
gem.add_development_dependency 'minitest', '~> 3'
gem.add_development_dependency 'sinatra'

View file

@ -40,14 +40,29 @@ class TestCli < MiniTest::Unit::TestCase
assert_equal ['foo'], Sidekiq.options[:queues]
end
it 'sets strictly ordered queues if weights are not present' do
@cli.parse(['sidekiq', '-q', 'foo,bar', '-r', './test/fake_env.rb'])
assert_equal true, !!Sidekiq.options[:strict]
end
it 'does not set strictly ordered queues if weights are present' do
@cli.parse(['sidekiq', '-q', 'foo,3', '-r', './test/fake_env.rb'])
assert_equal false, !!Sidekiq.options[:strict]
end
it 'changes timeout' do
@cli.parse(['sidekiq', '-t', '30', '-r', './test/fake_env.rb'])
assert_equal 30, Sidekiq.options[:timeout]
end
it 'handles multiple queues with weights' do
it 'handles multiple queues with weights with multiple switches' do
@cli.parse(['sidekiq', '-q', 'foo,3', '-q', 'bar', '-r', './test/fake_env.rb'])
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues].sort
assert_equal %w(foo foo foo bar), Sidekiq.options[:queues]
end
it 'handles multiple queues with weights with a single switch' do
@cli.parse(['sidekiq', '-q', 'bar,foo,3', '-r', './test/fake_env.rb'])
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues]
end
it 'sets verbose' do
@ -163,6 +178,24 @@ class TestCli < MiniTest::Unit::TestCase
assert_equal 3, Sidekiq.options[:queues].count { |q| q == 'seldom' }
end
end
describe 'Sidekiq::CLI#parse_queues' do
describe 'when weight is present' do
it 'concatenates queue to opts[:queues] weight number of times' do
opts = {}
@cli.send :parse_queues, opts, 'often', 7
assert_equal %w[often] * 7, opts[:queues]
end
end
describe 'when weight is not present' do
it 'concatenates queue to opts[:queues] once' do
opts = {}
@cli.send :parse_queues, opts, 'once', nil
assert_equal %w[once], opts[:queues]
end
end
end
end
end

View file

@ -3,36 +3,6 @@ require 'sidekiq/client'
require 'sidekiq/worker'
class TestClient < MiniTest::Unit::TestCase
describe 'with real redis' do
before do
Sidekiq.redis = REDIS
Sidekiq.redis {|c| c.flushdb }
end
class QueueWorker
include Sidekiq::Worker
sidekiq_options :queue => 'customqueue'
end
it 'does not push duplicate messages when configured for unique only' do
Sidekiq.client_middleware do |chain|
chain.add Sidekiq::Middleware::Client::UniqueJobs
end
QueueWorker.sidekiq_options :unique => true
10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) }
assert_equal 1, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end
it 'does push duplicate messages when not configured for unique only' do
Sidekiq.client_middleware do |chain|
chain.add Sidekiq::Middleware::Client::UniqueJobs
end
QueueWorker.sidekiq_options :unique => false
10.times { Sidekiq::Client.push('class' => QueueWorker, 'args' => [1, 2]) }
assert_equal 10, Sidekiq.redis {|c| c.llen("queue:customqueue") }
end
end
describe 'with mock redis' do
before do
@redis = MiniTest::Mock.new

View file

@ -0,0 +1,109 @@
require 'helper'
require 'sidekiq'
require 'sidekiq/exception_handler'
require 'stringio'
require 'logger'
ExceptionHandlerTestException = Class.new(StandardError)
TEST_EXCEPTION = ExceptionHandlerTestException.new("Something didn't work!")
class Component
include Sidekiq::Util
def invoke_exception(args)
raise TEST_EXCEPTION
rescue ExceptionHandlerTestException => e
handle_exception(e,args)
end
end
class TestExceptionHandler < MiniTest::Unit::TestCase
describe "with mock logger" do
before do
@old_logger = Sidekiq.logger
@str_logger = StringIO.new
Sidekiq.logger = Logger.new(@str_logger)
end
after do
Sidekiq.logger = @old_logger
end
it "logs the exception to Sidekiq.logger" do
Component.new.invoke_exception(:a => 1)
@str_logger.rewind
log = @str_logger.readlines
assert_match /a=>1/, log[0], "didn't include the context"
assert_match /Something didn't work!/, log[1], "didn't include the exception message"
assert_match /test\/test_exception_handler.rb/, log[2], "didn't include the backtrace"
end
end
describe "with fake Airbrake" do
before do
::Airbrake = MiniTest::Mock.new
end
after do
Object.send(:remove_const, "Airbrake") # HACK should probably inject Airbrake etc into this class in the future
end
it "notifies Airbrake" do
::Airbrake.expect(:notify,nil,[TEST_EXCEPTION,:parameters => { :a => 1 }])
Component.new.invoke_exception(:a => 1)
::Airbrake.verify
end
end
describe "with fake ExceptionNotifier" do
before do
::ExceptionNotifier = Module.new
::ExceptionNotifier::Notifier = MiniTest::Mock.new
end
after do
Object.send(:remove_const, "ExceptionNotifier")
end
it "notifies ExceptionNotifier" do
::ExceptionNotifier::Notifier.expect(:background_exception_notification,nil,[TEST_EXCEPTION, :data => { :message => { :b => 2 } }])
Component.new.invoke_exception(:b => 2)
::ExceptionNotifier::Notifier.verify
end
end
describe "with fake Exceptional" do
before do
::Exceptional = Class.new do
def self.context(msg)
@msg = msg
end
def self.check_context
@msg
end
end
::Exceptional::Config = MiniTest::Mock.new
::Exceptional::Remote = MiniTest::Mock.new
::Exceptional::ExceptionData = MiniTest::Mock.new
end
after do
Object.send(:remove_const, "Exceptional")
end
it "notifies Exceptional" do
::Exceptional::Config.expect(:should_send_to_api?,true)
exception_data = MiniTest::Mock.new
::Exceptional::Remote.expect(:error,nil,[exception_data])
::Exceptional::ExceptionData.expect(:new,exception_data,[TEST_EXCEPTION])
Component.new.invoke_exception(:c => 3)
assert_equal({:c => 3},::Exceptional.check_context,"did not record arguments properly")
::Exceptional::Config.verify
::Exceptional::Remote.verify
::Exceptional::ExceptionData.verify
end
end
end

13
test/test_fetch.rb Normal file
View file

@ -0,0 +1,13 @@
require 'helper'
require 'sidekiq/fetch'
class TestFetcher < MiniTest::Unit::TestCase
describe 'Fetcher#queues_cmd' do
describe 'when queues are strictly ordered' do
it 'returns the unique ordered queues properly based on priority and order they were passed in' do
fetcher = Sidekiq::Fetcher.new nil, %w[high medium low default], true
assert_equal (%w[queue:high queue:medium queue:low queue:default] << 1), fetcher._send_(:queues_cmd)
end
end
end
end

View file

@ -1,51 +0,0 @@
require 'helper'
require 'sidekiq'
require 'sidekiq/manager'
# for TimedQueue
require 'connection_pool'
class TestManager < MiniTest::Unit::TestCase
describe 'with redis' do
before do
Sidekiq.redis = REDIS
Sidekiq.redis {|c| c.flushdb }
$processed = 0
$mutex = Mutex.new
end
class IntegrationWorker
include Sidekiq::Worker
sidekiq_options :queue => 'foo'
def perform(a, b)
$mutex.synchronize do
$processed += 1
end
a + b
end
end
it 'processes messages' do
IntegrationWorker.perform_async(1, 2)
IntegrationWorker.perform_async(1, 3)
q = TimedQueue.new
mgr = Sidekiq::Manager.new(:queues => [:foo], :concurrency => 2)
mgr.when_done do |_|
q << 'done' if $processed == 2
end
mgr.start!
result = q.timed_pop(1.0)
assert_equal 'done', result
mgr.stop
mgr.terminate
# Gross bloody hack because I can't get the actor threads
# to shut down cleanly in the test. Need @bascule's help here.
(Thread.list - [Thread.current]).each do |t|
t.raise Interrupt
end
end
end
end

View file

@ -1,6 +1,5 @@
require 'helper'
require 'sidekiq/middleware/chain'
require 'sidekiq/middleware/server/unique_jobs'
require 'sidekiq/processor'
class TestMiddleware < MiniTest::Unit::TestCase
@ -10,18 +9,6 @@ class TestMiddleware < MiniTest::Unit::TestCase
Sidekiq.redis = REDIS
end
it 'handles errors' do
handler = Sidekiq::Middleware::Server::ExceptionHandler.new
assert_raises ArgumentError do
handler.call('', { :a => 1 }, 'default') do
raise ArgumentError
end
end
assert_equal 1, $errors.size
assert_equal({ :a => 1 }, $errors[0][:parameters])
end
class CustomMiddleware
def initialize(name, recorder)
@name = name
@ -84,10 +71,3 @@ class TestMiddleware < MiniTest::Unit::TestCase
end
end
end
class FakeAirbrake
def self.notify(ex, hash)
$errors << hash
end
end
Airbrake = FakeAirbrake

View file

@ -2,11 +2,14 @@ require 'helper'
require 'sidekiq/processor'
class TestProcessor < MiniTest::Unit::TestCase
TestException = Class.new(StandardError)
TEST_EXCEPTION = TestException.new("kerboom!")
describe 'with mock setup' do
before do
$invokes = 0
$errors = []
@boss = MiniTest::Mock.new
@processor = ::Sidekiq::Processor.new(@boss)
Celluloid.logger = nil
Sidekiq.redis = REDIS
end
@ -14,19 +17,51 @@ class TestProcessor < MiniTest::Unit::TestCase
class MockWorker
include Sidekiq::Worker
def perform(args)
raise "kerboom!" if args == 'boom'
raise TEST_EXCEPTION if args == 'boom'
args.pop if args.is_a? Array
$invokes += 1
end
end
it 'processes as expected' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['myarg'] })
processor = ::Sidekiq::Processor.new(@boss)
@boss.expect(:processor_done!, nil, [processor])
processor.process(msg, 'default')
@boss.expect(:processor_done!, nil, [@processor])
@processor.process(msg, 'default')
@boss.verify
assert_equal 1, $invokes
assert_equal 0, $errors.size
end
it 'passes exceptions to ExceptionHandler' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
begin
@processor.process(msg, 'default')
flunk "Expected #process to raise exception"
rescue TestException
end
assert_equal 0, $invokes
end
it 're-raises exceptions after handling' do
msg = Sidekiq.dump_json({ 'class' => MockWorker.to_s, 'args' => ['boom'] })
re_raise = false
begin
@processor.process(msg, 'default')
rescue TestException
re_raise = true
end
assert re_raise, "does not re-raise exceptions after handling"
end
it 'does not modify original arguments' do
msg = { 'class' => MockWorker.to_s, 'args' => [['myarg']] }
msgstr = Sidekiq.dump_json(msg)
processor = ::Sidekiq::Processor.new(@boss)
@boss.expect(:processor_done!, nil, [processor])
processor.process(msgstr, 'default')
assert_equal [['myarg']], msg['args']
end
end
end

View file

@ -31,6 +31,14 @@ class TestWeb < MiniTest::Unit::TestCase
refute_match /default/, last_response.body
end
it 'can display poll' do
get '/poll'
assert_equal 200, last_response.status
assert_match /hero-unit/, last_response.body
assert_match /workers/, last_response.body
refute_match /navbar/, last_response.body
end
it 'can display queues' do
assert Sidekiq::Client.push('queue' => :foo, 'class' => WebWorker, 'args' => [1, 3])
@ -115,26 +123,26 @@ class TestWeb < MiniTest::Unit::TestCase
assert_equal 200, last_response.status
assert_match /HardWorker/, last_response.body
end
it 'can delete a single retry' do
_, score = add_retry
post "/retries/#{score}", 'delete' => 'Delete'
assert_equal 302, last_response.status
assert_equal 'http://example.org/retries', last_response.header['Location']
get "/retries"
assert_equal 200, last_response.status
refute_match /#{score}/, last_response.body
end
it 'can retry a single retry now' do
msg, score = add_retry
post "/retries/#{score}", 'retry' => 'Retry'
assert_equal 302, last_response.status
assert_equal 'http://example.org/retries', last_response.header['Location']
get '/queues/default'
assert_equal 200, last_response.status
assert_match /#{msg['args'][2]}/, last_response.body

View file

@ -18,3 +18,32 @@ $(function() {
}
});
});
$(function() {
$('a[name=poll]').data('polling', false);
$('a[name=poll]').on('click', function(e) {
e.preventDefault();
var pollLink = $(this);
if (pollLink.data('polling')) {
clearInterval(pollLink.data('interval'));
pollLink.text('Live Poll');
$('.poll-status').text('');
}
else {
var href = pollLink.attr('href');
pollLink.data('interval', setInterval(function() {
$.get(href, function(data) {
var responseHtml = $(data);
$('.hero-unit').replaceWith(responseHtml.find('.hero-unit'));
$('.workers').replaceWith(responseHtml.find('.workers'));
});
var currentTime = new Date();
$('.poll-status').text('Last polled at: ' + currentTime.getHours() + ':' + currentTime.getMinutes() + ':' + currentTime.getSeconds());
}, 2000));
$('.poll-status').text('Starting to poll...');
pollLink.text('Stop Polling');
}
pollLink.data('polling', !pollLink.data('polling'));
})
});

View file

@ -20,3 +20,7 @@ code {
.hero-unit {
padding: 30px;
}
.poll-status {
padding-left: 10px;
}

15
web/views/_paging.slim Normal file
View file

@ -0,0 +1,15 @@
- if @total_size > @count
.pagination.pagination-right
ul
li class="#{'disabled' if @current_page == 1}"
a href="#{url}?page=1" «
- if @current_page > 1
li
a href="#{url}?page=#{@current_page - 1}" #{@current_page - 1}
li.disabled
a href="#{url}?page=#{@current_page}" #{@current_page}
- if @total_size > @current_page * @count
li
a href="#{url}?page=#{@current_page + 1}" #{@current_page + 1}
li class="#{'disabled' if @total_size <= @current_page * @count}"
a href="#{url}?page=#{(@total_size / @count).ceil + 1}" »

8
web/views/_summary.slim Normal file
View file

@ -0,0 +1,8 @@
.hero-unit
h1 Sidekiq is #{current_status}
p Processed: #{processed}
p Failed: #{failed}
p Busy Workers: #{workers.size}
p Scheduled: #{zcard('schedule')}
p Retries Pending: #{zcard('retry')}
p Queue Backlog: #{backlog}

14
web/views/_workers.slim Normal file
View file

@ -0,0 +1,14 @@
table class="table table-striped table-bordered workers"
tr
th Worker
th Queue
th Class
th Arguments
th Started
- workers.each do |(worker, msg)|
tr
td= worker
td= msg['queue']
td= msg['payload']['class']
td= msg['payload']['args'].inspect[0..100]
td== relative_time(Time.parse(msg['run_at']))

View file

@ -1,25 +1,10 @@
.hero-unit
h1 Sidekiq is #{current_status}
p Processed: #{processed}
p Failed: #{failed}
p Busy Workers: #{workers.size}
p Scheduled: #{zcard('schedule')}
p Retries Pending: #{zcard('retry')}
p Queue Backlog: #{backlog}
== slim :_summary
.poll
a*{name: 'poll'} href='#{{root_path}}poll' Live Poll
span class="poll-status"
== slim :_workers
table class="table table-striped table-bordered"
tr
th Worker
th Queue
th Class
th Arguments
th Started
- workers.each do |(worker, msg)|
tr
td= worker
td= msg['queue']
td= msg['payload']['class']
td= msg['payload']['args'].inspect[0..100]
td== relative_time(Time.parse(msg['run_at']))
form action="#{root_path}reset" method="post"
button.btn type="submit" title="If you kill -9 Sidekiq, this table can fill up with old data." Clear worker list
button.btn type="submit" title="If you kill -9 Sidekiq, this table can fill up with old data." Clear worker list

View file

@ -13,7 +13,7 @@ html
span.icon-bar
a.brand href='#{{root_path}}'
| Sidekiq
div.nav-collapse
div.nav-collapse
ul.nav
li
a href='#{{root_path}}' Home

3
web/views/poll.slim Normal file
View file

@ -0,0 +1,3 @@
div
== slim :_summary
== slim :_workers

View file

@ -1,5 +1,7 @@
header
h1 Latest messages in #{@name}
h1 Current messages in #{@name}
== slim :_paging, :locals => { :url => "#{root_path}queues/#{@name}" }
table class="table table-striped table-bordered"
tr
@ -9,3 +11,5 @@ table class="table table-striped table-bordered"
tr
td= msg['class']
td= msg['args'].inspect[0..100]
== slim :_paging, :locals => { :url => "#{root_path}queues/#{@name}" }

View file

@ -1,6 +1,8 @@
h1 Retries
- if @retries.size > 0
== slim :_paging, :locals => { :url => "#{root_path}retries" }
form action="#{root_path}retries" method="post"
table class="table table-striped table-bordered"
tr

View file

@ -1,6 +1,8 @@
h1 Scheduled Jobs
- if @scheduled.size > 0
== slim :_paging, :locals => { :url => "#{root_path}scheduled" }
form action="#{root_path}scheduled" method="post"
table class="table table-striped table-bordered"
tr