1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Clean up timeout middleware, add client-side option [#161]

This commit is contained in:
Mike Perham 2012-04-26 08:40:07 -07:00
parent 1c6ea1195c
commit 2ec203365e
10 changed files with 50 additions and 27 deletions

View file

@ -1,7 +1,20 @@
1.1.5
1.2.0
-----------
- Fix delayed extensions not available in Workers [#152]
- Add Timeout middleware to optionally kill a worker after N seconds,
just configure like so. (blackgold9)
```ruby
class HangingWorker
include Sidekiq::Worker
sidekiq_options :timeout => 600
def perform
# will be killed if it takes longer than 10 minutes
end
end
```
- Fix delayed extensions not available in workers [#152]
1.1.4
-----------

View file

@ -38,10 +38,14 @@ module Sidekiq
raise(ArgumentError, "Message must include a class and set of arguments: #{item.inspect}") if !item['class'] || !item['args']
raise(ArgumentError, "Message must include a Sidekiq::Worker class, not class name: #{item['class'].ancestors.inspect}") if !item['class'].is_a?(Class) || !item['class'].respond_to?('get_sidekiq_options')
item['retry'] = !!item['class'].get_sidekiq_options['retry']
queue = item['queue'] || item['class'].get_sidekiq_options['queue'] || 'default'
worker_class = item['class']
item['class'] = item['class'].to_s
item['retry'] = !!worker_class.get_sidekiq_options['retry']
queue = item['queue'] || worker_class.get_sidekiq_options['queue'] || 'default'
if !item['timeout'] && worker_class.get_sidekiq_options['timeout']
item['timeout'] = worker_class.get_sidekiq_options['timeout']
end
pushed = false
Sidekiq.client_middleware.invoke(worker_class, item, queue) do

View file

@ -1,20 +1,21 @@
require 'timeout'
module Sidekiq
module Middleware
module Server
class Timeout
@timeout_in_seconds
def initialize(options={:timeout => 120})
@timeout_in_seconds = options[:timeout]
end
def call(worker, msg, queue)
Timeout::timeout (@timeout_in_seconds) {
yield
}
end
end
end
end
end
module Sidekiq
module Middleware
module Server
class Timeout
def call(worker, msg, queue)
if msg['timeout'] && msg['timeout'].to_i != 0
::Timeout.timeout(msg['timeout'].to_i) do
yield
end
else
yield
end
end
end
end
end
end

View file

@ -6,6 +6,7 @@ require 'sidekiq/middleware/server/active_record'
require 'sidekiq/middleware/server/exception_handler'
require 'sidekiq/middleware/server/retry_jobs'
require 'sidekiq/middleware/server/logging'
require 'sidekiq/middleware/server/timeout'
module Sidekiq
class Processor
@ -18,6 +19,7 @@ module Sidekiq
m.add Middleware::Server::Logging
m.add Middleware::Server::RetryJobs
m.add Middleware::Server::ActiveRecord
m.add Middleware::Server::Timeout
end
end

View file

@ -1,3 +1,3 @@
module Sidekiq
VERSION = "1.1.5"
VERSION = "1.2.0"
end

View file

@ -36,6 +36,7 @@ module Sidekiq
# :unique - enable the UniqueJobs middleware for this Worker, default *true*
# :queue - use a named queue for this Worker, default 'default'
# :retry - enable the RetryJobs middleware for this Worker, default *true*
# :timeout - timeout the perform method after N seconds, default *nil*
def sidekiq_options(opts={})
@sidekiq_options = get_sidekiq_options.merge(stringify_keys(opts || {}))
end

View file

@ -1,5 +1,6 @@
class HardWorker
include Sidekiq::Worker
sidekiq_options :timeout => 60
def perform(name, count, salt)
raise name if name == 'crash'

View file

@ -96,8 +96,7 @@ class TestClient < MiniTest::Unit::TestCase
class QueuedWorker
include Sidekiq::Worker
sidekiq_options :queue => :flimflam
sidekiq_options :queue => :flimflam, :timeout => 1
end
it 'enqueues to the named queue' do

View file

@ -4,6 +4,7 @@ require 'active_record'
require 'action_mailer'
require 'sidekiq/extensions/action_mailer'
require 'sidekiq/extensions/active_record'
require 'sidekiq/rails'
Sidekiq.hook_rails!

View file

@ -16,6 +16,7 @@ class TestManager < MiniTest::Unit::TestCase
class IntegrationWorker
include Sidekiq::Worker
sidekiq_options :queue => 'foo'
def perform(a, b)
$mutex.synchronize do
@ -26,8 +27,8 @@ class TestManager < MiniTest::Unit::TestCase
end
it 'processes messages' do
Sidekiq::Client.push('queue' => :foo, 'class' => IntegrationWorker, 'args' => [1, 2])
Sidekiq::Client.push('queue' => :foo, 'class' => IntegrationWorker, 'args' => [1, 3])
IntegrationWorker.perform_async(1, 2)
IntegrationWorker.perform_async(1, 3)
q = TimedQueue.new
mgr = Sidekiq::Manager.new(:queues => [:foo], :concurrency => 2)