1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
Conflicts:
	lib/sidekiq/processor.rb
This commit is contained in:
Niels Kristian 2012-08-20 15:55:06 +02:00
commit 5ceec56b12
23 changed files with 124 additions and 34 deletions

View file

@ -1,6 +1,27 @@
HEAD HEAD
----------- -----------
- Change capistrano recipe to run 'quiet' before deploy:update\_code so
it is run upon both 'deploy' and 'deploy:migrations'. [#352]
2.2.0
-----------
- Roll back Celluloid optimizations in 2.1.0 which caused instability.
- Add extension to delay any arbitrary class method to Sidekiq.
Previously this was limited to ActiveRecord classes.
```ruby
SomeClass.delay.class_method(1, 'mike', Date.today)
```
- Sidekiq::Client now generates and returns a random, 128-bit Job ID 'jid' which
can be used to track the processing of a Job, e.g. for calling back to a webhook
when a job is finished.
2.1.1
-----------
- Handle networking errors causing the scheduler thread to die [#309] - Handle networking errors causing the scheduler thread to die [#309]
- Rework exception handling to log all Processor and actor death (#325, subelsky) - Rework exception handling to log all Processor and actor death (#325, subelsky)
- Clone arguments when calling worker so modifications are discarded. (#265, hakanensari) - Clone arguments when calling worker so modifications are discarded. (#265, hakanensari)

View file

@ -5,7 +5,7 @@ gem 'celluloid'
gem 'slim' gem 'slim'
gem 'sprockets' gem 'sprockets'
gem 'sass' gem 'sass'
gem 'rails', '3.2.7' gem 'rails', '3.2.8'
gem 'sqlite3' gem 'sqlite3'
group :test do group :test do

View file

@ -5,6 +5,7 @@ require 'sidekiq/worker'
require 'sidekiq/redis_connection' require 'sidekiq/redis_connection'
require 'sidekiq/util' require 'sidekiq/util'
require 'sidekiq/extensions/class_methods'
require 'sidekiq/extensions/action_mailer' require 'sidekiq/extensions/action_mailer'
require 'sidekiq/extensions/active_record' require 'sidekiq/extensions/active_record'
require 'sidekiq/rails' if defined?(::Rails::Engine) require 'sidekiq/rails' if defined?(::Rails::Engine)

View file

@ -1,5 +1,5 @@
Capistrano::Configuration.instance.load do Capistrano::Configuration.instance.load do
before "deploy", "sidekiq:quiet" before "deploy:update_code", "sidekiq:quiet"
after "deploy:stop", "sidekiq:stop" after "deploy:stop", "sidekiq:stop"
after "deploy:start", "sidekiq:start" after "deploy:start", "sidekiq:start"
after "deploy:restart", "sidekiq:restart" after "deploy:restart", "sidekiq:restart"

View file

@ -58,7 +58,7 @@ module Sidekiq
options.merge!(config.merge(cli)) options.merge!(config.merge(cli))
Sidekiq.logger.level = Logger::DEBUG if options[:verbose] Sidekiq.logger.level = Logger::DEBUG if options[:verbose]
Celluloid.logger = nil Celluloid.logger = nil unless options[:verbose]
validate! validate!
write_pid write_pid
@ -141,7 +141,7 @@ module Sidekiq
@parser = OptionParser.new do |o| @parser = OptionParser.new do |o|
o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg| o.on "-q", "--queue QUEUE[,WEIGHT]...", "Queues to process with optional weights" do |arg|
queues_and_weights = arg.scan(/(\w+),?(\d*)/) queues_and_weights = arg.scan(/([\w-]+),?(\d*)/)
queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)} queues_and_weights.each {|queue_and_weight| parse_queues(opts, *queue_and_weight)}
opts[:strict] = queues_and_weights.collect(&:last).none? {|weight| weight != ''} opts[:strict] = queues_and_weights.collect(&:last).none? {|weight| weight != ''}
end end

View file

@ -28,6 +28,8 @@ module Sidekiq
# All options must be strings, not symbols. NB: because we are serializing to JSON, all # All options must be strings, not symbols. NB: because we are serializing to JSON, all
# symbols in 'args' will be converted to strings. # symbols in 'args' will be converted to strings.
# #
# Returns nil if not pushed to Redis or a unique Job ID if pushed.
#
# Example: # Example:
# Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar']) # Sidekiq::Client.push('queue' => 'my_queue', 'class' => MyWorker, 'args' => ['foo', 1, :bat => 'bar'])
# #
@ -42,6 +44,7 @@ module Sidekiq
item = worker_class.get_sidekiq_options.merge(item) item = worker_class.get_sidekiq_options.merge(item)
item['retry'] = !!item['retry'] item['retry'] = !!item['retry']
queue = item['queue'] queue = item['queue']
item['jid'] = SecureRandom.base64
pushed = false pushed = false
Sidekiq.client_middleware.invoke(worker_class, item, queue) do Sidekiq.client_middleware.invoke(worker_class, item, queue) do
@ -57,7 +60,7 @@ module Sidekiq
end end
end end
end end
!! pushed pushed ? item['jid'] : nil
end end
# Resque compatibility helpers. # Resque compatibility helpers.

View file

@ -24,7 +24,7 @@ module Sidekiq
end end
def send_to_exception_notifier(msg, ex) def send_to_exception_notifier(msg, ex)
::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg }) ::ExceptionNotifier::Notifier.background_exception_notification(ex, :data => { :message => msg }).deliver
end end
end end
end end

View file

@ -3,11 +3,14 @@ require 'sidekiq/extensions/generic_proxy'
module Sidekiq module Sidekiq
module Extensions module Extensions
## ##
# Adds a 'delay' method to ActiveRecord to offload arbitrary method # Adds a 'delay' method to ActiveRecords to offload instance method
# execution to Sidekiq. Examples: # execution to Sidekiq. Examples:
# #
# User.delay.delete_inactive
# User.recent_signups.each { |user| user.delay.mark_as_awesome } # User.recent_signups.each { |user| user.delay.mark_as_awesome }
#
# Please note, this is not recommended as this will serialize the entire
# object to Redis. Your Sidekiq jobs should pass IDs, not entire instances.
# This is here for backwards compatibility with Delayed::Job only.
class DelayedModel class DelayedModel
include Sidekiq::Worker include Sidekiq::Worker

View file

@ -0,0 +1,33 @@
require 'sidekiq/extensions/generic_proxy'
module Sidekiq
module Extensions
##
# Adds a 'delay' method to all Classes to offload class method
# execution to Sidekiq. Examples:
#
# User.delay.delete_inactive
# Wikipedia.delay.download_changes_for(Date.today)
#
class DelayedClass
include Sidekiq::Worker
def perform(yml)
(target, method_name, args) = YAML.load(yml)
target.send(method_name, *args)
end
end
module Klass
def delay
Proxy.new(DelayedClass, self)
end
def delay_for(interval)
Proxy.new(DelayedClass, self, Time.now.to_f + interval.to_f)
end
end
end
end
Class.send(:include, Sidekiq::Extensions::Klass)

View file

@ -15,8 +15,6 @@ module Sidekiq
include Util include Util
include Celluloid include Celluloid
exclusive :process
def self.default_middleware def self.default_middleware
Middleware::Chain.new do |m| Middleware::Chain.new do |m|
m.add Middleware::Server::Logging m.add Middleware::Server::Logging
@ -31,6 +29,11 @@ module Sidekiq
end end
def process(msgstr, queue) def process(msgstr, queue)
# Defer worker execution to Celluloid's thread pool since all actor
# invocations are run within a Fiber, which dramatically limits
# our stack size.
defer do
begin
msg = Sidekiq.load_json(msgstr) msg = Sidekiq.load_json(msgstr)
klass = constantize(msg['class']) klass = constantize(msg['class'])
worker = klass.new worker = klass.new
@ -41,11 +44,13 @@ module Sidekiq
worker.perform(*cloned(msg['args'])) worker.perform(*cloned(msg['args']))
end end
end end
@boss.processor_done!(current_actor)
rescue => ex rescue => ex
handle_exception(ex, msg || { :message => msgstr }) handle_exception(ex, msg || { :message => msgstr })
raise raise
end end
end
@boss.processor_done!(current_actor)
end
# See http://github.com/tarcieri/celluloid/issues/22 # See http://github.com/tarcieri/celluloid/issues/22
def inspect def inspect

View file

@ -2,7 +2,6 @@ module Sidekiq
def self.hook_rails! def self.hook_rails!
return unless Sidekiq.options[:enable_rails_extensions] return unless Sidekiq.options[:enable_rails_extensions]
if defined?(ActiveRecord) if defined?(ActiveRecord)
ActiveRecord::Base.extend(Sidekiq::Extensions::ActiveRecord)
ActiveRecord::Base.send(:include, Sidekiq::Extensions::ActiveRecord) ActiveRecord::Base.send(:include, Sidekiq::Extensions::ActiveRecord)
end end

View file

@ -43,7 +43,7 @@ module Sidekiq
end end
end end
end end
rescue SystemCallError => ex rescue SystemCallError, Redis::TimeoutError, Redis::ConnectionError => ex
# ECONNREFUSED, etc. Most likely a problem with # ECONNREFUSED, etc. Most likely a problem with
# redis networking. Punt and try again at the next interval # redis networking. Punt and try again at the next interval
logger.warn ex.message logger.warn ex.message

View file

@ -1,3 +1,3 @@
module Sidekiq module Sidekiq
VERSION = "2.1.1" VERSION = "2.2.0"
end end

View file

@ -65,6 +65,11 @@ class TestCli < MiniTest::Unit::TestCase
assert_equal %w(bar foo foo foo), Sidekiq.options[:queues] assert_equal %w(bar foo foo foo), Sidekiq.options[:queues]
end end
it 'handles queues with multi-word names' do
@cli.parse(['sidekiq', '-q', 'queue_one,queue-two', '-r', './test/fake_env.rb'])
assert_equal %w(queue_one queue-two), Sidekiq.options[:queues]
end
it 'sets verbose' do it 'sets verbose' do
old = Sidekiq.logger.level old = Sidekiq.logger.level
@cli.parse(['sidekiq', '-v', '-r', './test/fake_env.rb']) @cli.parse(['sidekiq', '-v', '-r', './test/fake_env.rb'])

View file

@ -36,6 +36,7 @@ class TestClient < MiniTest::Unit::TestCase
@redis.expect :rpush, 1, ['queue:foo', String] @redis.expect :rpush, 1, ['queue:foo', String]
pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2]) pushed = Sidekiq::Client.push('queue' => 'foo', 'class' => MyWorker, 'args' => [1, 2])
assert pushed assert pushed
assert_equal 24, pushed.size
@redis.verify @redis.verify
end end

View file

@ -66,9 +66,12 @@ class TestExceptionHandler < MiniTest::Unit::TestCase
end end
it "notifies ExceptionNotifier" do it "notifies ExceptionNotifier" do
::ExceptionNotifier::Notifier.expect(:background_exception_notification,nil,[TEST_EXCEPTION, :data => { :message => { :b => 2 } }]) mail = MiniTest::Mock.new
mail.expect(:deliver,nil)
::ExceptionNotifier::Notifier.expect(:background_exception_notification,mail,[TEST_EXCEPTION, :data => { :message => { :b => 2 } }])
Component.new.invoke_exception(:b => 2) Component.new.invoke_exception(:b => 2)
::ExceptionNotifier::Notifier.verify ::ExceptionNotifier::Notifier.verify
mail.verify
end end
end end

View file

@ -54,6 +54,15 @@ class TestExtensions < MiniTest::Unit::TestCase
UserMailer.delay_for(5.days).greetings(1, 2) UserMailer.delay_for(5.days).greetings(1, 2)
assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') } assert_equal 1, Sidekiq.redis {|c| c.zcard('schedule') }
end end
class SomeClass
def self.doit(arg)
end
end
it 'allows delay of any ole class method' do
SomeClass.delay.doit(Date.today)
end
end end
describe 'sidekiq rails extensions configuration' do describe 'sidekiq rails extensions configuration' do

View file

@ -19,13 +19,13 @@ class TestScheduling < MiniTest::Unit::TestCase
it 'schedules a job via interval' do it 'schedules a job via interval' do
@redis.expect :zadd, true, ['schedule', String, String] @redis.expect :zadd, true, ['schedule', String, String]
assert_equal true, ScheduledWorker.perform_in(600, 'mike') assert ScheduledWorker.perform_in(600, 'mike')
@redis.verify @redis.verify
end end
it 'schedules a job via timestamp' do it 'schedules a job via timestamp' do
@redis.expect :zadd, true, ['schedule', String, String] @redis.expect :zadd, true, ['schedule', String, String]
assert_equal true, ScheduledWorker.perform_in(5.days.from_now, 'mike') assert ScheduledWorker.perform_in(5.days.from_now, 'mike')
@redis.verify @redis.verify
end end
end end

View file

@ -76,10 +76,15 @@ class TestTesting < MiniTest::Unit::TestCase
assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size assert_equal 1, Sidekiq::Extensions::DelayedMailer.jobs.size
end end
class Something
def self.foo(x)
end
end
it 'stubs the delay call on models' do it 'stubs the delay call on models' do
assert_equal 0, Sidekiq::Extensions::DelayedModel.jobs.size assert_equal 0, Sidekiq::Extensions::DelayedClass.jobs.size
FooModel.delay.bar('hello!') Something.delay.foo(Date.today)
assert_equal 1, Sidekiq::Extensions::DelayedModel.jobs.size assert_equal 1, Sidekiq::Extensions::DelayedClass.jobs.size
end end
it 'stubs the enqueue call' do it 'stubs the enqueue call' do

View file

@ -24,7 +24,7 @@ class TestInline < MiniTest::Unit::TestCase
class InlineWorkerWithTimeParam class InlineWorkerWithTimeParam
include Sidekiq::Worker include Sidekiq::Worker
def perform(time) def perform(time)
raise ParameterIsNotString unless time.is_a?(String) raise ParameterIsNotString unless time.is_a?(String) || time.is_a?(Numeric)
end end
end end

View file

@ -20,6 +20,8 @@ $(function() {
}); });
$(function() { $(function() {
function pad(n) { return ('0' + n).slice(-2); }
$('a[name=poll]').data('polling', false); $('a[name=poll]').data('polling', false);
$('a[name=poll]').on('click', function(e) { $('a[name=poll]').on('click', function(e) {
@ -39,7 +41,7 @@ $(function() {
$('.workers').replaceWith(responseHtml.find('.workers')); $('.workers').replaceWith(responseHtml.find('.workers'));
}); });
var currentTime = new Date(); var currentTime = new Date();
$('.poll-status').text('Last polled at: ' + currentTime.getHours() + ':' + currentTime.getMinutes() + ':' + currentTime.getSeconds()); $('.poll-status').text('Last polled at: ' + currentTime.getHours() + ':' + pad(currentTime.getMinutes()) + ':' + pad(currentTime.getSeconds()));
}, 2000)); }, 2000));
$('.poll-status').text('Starting to poll...'); $('.poll-status').text('Starting to poll...');
pollLink.text('Stop Polling'); pollLink.text('Stop Polling');

View file

@ -11,4 +11,4 @@ table class="table table-striped table-bordered workers"
td= msg['queue'] td= msg['queue']
td= msg['payload']['class'] td= msg['payload']['class']
td= msg['payload']['args'].inspect[0..100] td= msg['payload']['args'].inspect[0..100]
td== relative_time(Time.parse(msg['run_at'])) td== relative_time(msg['run_at'].is_a?(Numeric) ? Time.at(msg['run_at']) : Time.parse(msg['run_at']))

View file

@ -22,11 +22,11 @@ header
td= msg['retry_count'] td= msg['retry_count']
tr tr
th Last Retry th Last Retry
td== relative_time(Time.parse(msg['retried_at'])) td== relative_time(msg['retried_at'].is_a?(Numeric) ? Time.at(msg['retried_at']) : Time.parse(msg['retried_at']))
- else - else
tr tr
th Originally Failed th Originally Failed
td== relative_time(Time.parse(msg['failed_at'])) td== relative_time(msg['failed_at'].is_a?(Numeric) ? Time.at(msg['failed_at']) : Time.parse(msg['failed_at']))
tr tr
th Next Retry th Next Retry
td== relative_time(Time.at(@score)) td== relative_time(Time.at(@score))