mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Support ActiveJob’s wait_until (#5003)
* Implement support for `set(wait_until: <interval>)` * Implement `queue_as`, docs * Rollback implementing `perform_later`, it's a footgun * changes
This commit is contained in:
parent
051555e55c
commit
2b2390d579
3 changed files with 60 additions and 8 deletions
|
@ -19,8 +19,9 @@ end
|
|||
```ruby
|
||||
# config/initializers/sidekiq.rb
|
||||
require "sidekiq/middleware/current_attributes"
|
||||
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS:CurrentAttributes singleton
|
||||
Sidekiq::CurrentAttributes.persist(Myapp::Current) # Your AS::CurrentAttributes singleton
|
||||
```
|
||||
- Implement `queue_as` and `wait_until` for ActiveJob compatibility [#5003]
|
||||
- Retry Redis operation if we get an `UNBLOCKED` Redis error. [#4985]
|
||||
- Run existing signal traps, if any, before running Sidekiq's trap. [#4991]
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ module Sidekiq
|
|||
#
|
||||
# class HardWorker
|
||||
# include Sidekiq::Worker
|
||||
# sidekiq_options queue: 'critical', retry: 5
|
||||
#
|
||||
# def perform(*args)
|
||||
# # do some work
|
||||
|
@ -20,6 +21,26 @@ module Sidekiq
|
|||
# HardWorker.perform_async(1, 2, 3)
|
||||
#
|
||||
# Note that perform_async is a class method, perform is an instance method.
|
||||
#
|
||||
# Sidekiq::Worker also includes several APIs to provide compatibility with
|
||||
# ActiveJob.
|
||||
#
|
||||
# class SomeWorker
|
||||
# include Sidekiq::Worker
|
||||
# queue_as :critical
|
||||
#
|
||||
# def perform(...)
|
||||
# end
|
||||
# end
|
||||
#
|
||||
# SomeWorker.set(wait_until: 1.hour).perform_async(123)
|
||||
#
|
||||
# Note that arguments passed to the job must still obey Sidekiq's
|
||||
# best practice for simple, JSON-native data types. Sidekiq will not
|
||||
# implement ActiveJob's more complex argument serialization. For
|
||||
# this reason, we don't implement `perform_later` as our call semantics
|
||||
# are very different.
|
||||
#
|
||||
module Worker
|
||||
##
|
||||
# The Options module is extracted so we can include it in ActiveJob::Base
|
||||
|
@ -153,10 +174,16 @@ module Sidekiq
|
|||
def initialize(klass, opts)
|
||||
@klass = klass
|
||||
@opts = opts
|
||||
|
||||
# ActiveJob compatibility
|
||||
interval = @opts.delete(:wait_until)
|
||||
at(interval) if interval
|
||||
end
|
||||
|
||||
def set(options)
|
||||
interval = options.delete(:wait_until)
|
||||
@opts.merge!(options)
|
||||
at(interval) if interval
|
||||
self
|
||||
end
|
||||
|
||||
|
@ -167,16 +194,19 @@ module Sidekiq
|
|||
# +interval+ must be a timestamp, numeric or something that acts
|
||||
# numeric (like an activesupport time interval).
|
||||
def perform_in(interval, *args)
|
||||
at(interval).perform_async(*args)
|
||||
end
|
||||
alias_method :perform_at, :perform_in
|
||||
|
||||
private
|
||||
|
||||
def at(interval)
|
||||
int = interval.to_f
|
||||
now = Time.now.to_f
|
||||
ts = (int < 1_000_000_000 ? now + int : int)
|
||||
|
||||
payload = @opts.merge("class" => @klass, "args" => args)
|
||||
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||
payload["at"] = ts if ts > now
|
||||
@klass.client_push(payload)
|
||||
@opts["at"] = ts if ts > now
|
||||
self
|
||||
end
|
||||
alias_method :perform_at, :perform_in
|
||||
end
|
||||
|
||||
module ClassMethods
|
||||
|
@ -192,6 +222,10 @@ module Sidekiq
|
|||
raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
|
||||
end
|
||||
|
||||
def queue_as(q)
|
||||
sidekiq_options("queue" => q.to_s)
|
||||
end
|
||||
|
||||
def set(options)
|
||||
Setter.new(self, options)
|
||||
end
|
||||
|
|
|
@ -5,13 +5,30 @@ describe Sidekiq::Worker do
|
|||
|
||||
class SetWorker
|
||||
include Sidekiq::Worker
|
||||
sidekiq_options :queue => :foo, 'retry' => 12
|
||||
queue_as :foo
|
||||
sidekiq_options 'retry' => 12
|
||||
end
|
||||
|
||||
def setup
|
||||
Sidekiq.redis {|c| c.flushdb }
|
||||
end
|
||||
|
||||
it "provides basic ActiveJob compatibilility" do
|
||||
q = Sidekiq::ScheduledSet.new
|
||||
assert_equal 0, q.size
|
||||
jid = SetWorker.set(wait_until: 1.hour).perform_async(123)
|
||||
assert jid
|
||||
assert_equal 1, q.size
|
||||
|
||||
q = Sidekiq::Queue.new("foo")
|
||||
assert_equal 0, q.size
|
||||
SetWorker.perform_async
|
||||
assert_equal 1, q.size
|
||||
|
||||
SetWorker.set(queue: 'xyz').perform_async
|
||||
assert_equal 1, Sidekiq::Queue.new("xyz").size
|
||||
end
|
||||
|
||||
it 'can be memoized' do
|
||||
q = Sidekiq::Queue.new('bar')
|
||||
assert_equal 0, q.size
|
||||
|
|
Loading…
Reference in a new issue