mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
commit
68bfc12c39
5 changed files with 56 additions and 11 deletions
|
@ -18,6 +18,8 @@
|
|||
```ruby
|
||||
Sidekiq.default_worker_options = { 'queue' => 'default', 'backtrace' => true }
|
||||
```
|
||||
- Added two handy Sidekiq::Client class methods for scheduled jobs:
|
||||
`enqueue_to_in` and `enqueue_in` [#1212]
|
||||
|
||||
2.14.1
|
||||
-----------
|
||||
|
|
|
@ -88,6 +88,29 @@ module Sidekiq
|
|||
klass.client_push('queue' => queue, 'class' => klass, 'args' => args)
|
||||
end
|
||||
|
||||
# Example usage:
|
||||
# Sidekiq::Client.enqueue_to_in(:queue_name, 3.minutes, MyWorker, 'foo', 1, :bat => 'bar')
|
||||
#
|
||||
def enqueue_to_in(queue, interval, klass, *args)
|
||||
int = interval.to_f
|
||||
now = Time.now.to_f
|
||||
ts = (int < 1_000_000_000 ? now + int : int)
|
||||
|
||||
item = { 'queue' => queue, 'class' => klass, 'args' => args, 'at' => ts }
|
||||
|
||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||
item.delete('at') if ts <= now
|
||||
|
||||
klass.client_push(item)
|
||||
end
|
||||
|
||||
# Example usage:
|
||||
# Sidekiq::Client.enqueue_in(3.minutes, MyWorker, 'foo', 1, :bat => 'bar')
|
||||
#
|
||||
def enqueue_in(interval, klass, *args)
|
||||
enqueue_to_in klass.get_sidekiq_options['queue'], interval, klass, args
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def raw_push(payloads)
|
||||
|
|
|
@ -41,16 +41,7 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def perform_in(interval, *args)
|
||||
int = interval.to_f
|
||||
now = Time.now.to_f
|
||||
ts = (int < 1_000_000_000 ? now + int : int)
|
||||
|
||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||
if ts <= now
|
||||
perform_async(*args)
|
||||
else
|
||||
client_push('class' => self, 'args' => args, 'at' => ts)
|
||||
end
|
||||
Sidekiq::Client.enqueue_in(interval, self, args)
|
||||
end
|
||||
alias_method :perform_at, :perform_in
|
||||
|
||||
|
|
|
@ -101,6 +101,27 @@ class TestClient < Sidekiq::Test
|
|||
@redis.verify
|
||||
end
|
||||
|
||||
it 'enqueues messages to redis (delayed, custom queue)' do
|
||||
@redis.expect :zadd, 1, ['schedule', Array]
|
||||
pushed = Sidekiq::Client.enqueue_to_in(:custom_queue, 3.minutes, MyWorker, 1, 2)
|
||||
assert pushed
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
it 'enqueues messages to redis (delayed into past, custom queue)' do
|
||||
@redis.expect :lpush, 1, ['queue:custom_queue', Array]
|
||||
pushed = Sidekiq::Client.enqueue_to_in(:custom_queue, -3.minutes, MyWorker, 1, 2)
|
||||
assert pushed
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
it 'enqueues messages to redis (delayed)' do
|
||||
@redis.expect :zadd, 1, ['schedule', Array]
|
||||
pushed = Sidekiq::Client.enqueue_in(3.minutes, MyWorker, 1, 2)
|
||||
assert pushed
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
class QueuedWorker
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options :queue => :flimflam
|
||||
|
|
|
@ -7,12 +7,13 @@ class TestScheduling < Sidekiq::Test
|
|||
@redis = Minitest::Mock.new
|
||||
# Ugh, this is terrible.
|
||||
Sidekiq.instance_variable_set(:@redis, @redis)
|
||||
|
||||
def @redis.multi; [yield] * 2 if block_given?; end
|
||||
def @redis.with; yield self; end
|
||||
end
|
||||
|
||||
class ScheduledWorker
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options :queue => :custom_queue
|
||||
def perform(x)
|
||||
end
|
||||
end
|
||||
|
@ -29,6 +30,13 @@ class TestScheduling < Sidekiq::Test
|
|||
@redis.verify
|
||||
end
|
||||
|
||||
it 'schedules job right away on negative timestamp/interval' do
|
||||
@redis.expect :sadd, true, ['queues', :custom_queue]
|
||||
@redis.expect :lpush, true, ['queue:custom_queue', Array]
|
||||
assert ScheduledWorker.perform_in(-300, 'mike')
|
||||
@redis.verify
|
||||
end
|
||||
|
||||
it 'schedules multiple jobs at once' do
|
||||
@redis.expect :zadd, true, ['schedule', Array]
|
||||
assert Sidekiq::Client.push_bulk('class' => ScheduledWorker, 'args' => [['mike'], ['mike']], 'at' => 600)
|
||||
|
|
Loading…
Add table
Reference in a new issue