1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00
mperham--sidekiq/lib/sidekiq/worker.rb

216 lines
6.7 KiB
Ruby
Raw Normal View History

# frozen_string_literal: true
require 'sidekiq/client'
2012-01-16 23:05:38 -05:00
module Sidekiq
##
# Include this module in your worker class and you can easily create
# asynchronous jobs:
#
# class HardWorker
# include Sidekiq::Worker
#
# def perform(*args)
# # do some work
# end
# end
#
# Then in your Rails app, you can do this:
#
# HardWorker.perform_async(1, 2, 3)
#
# Note that perform_async is a class method, perform is an instance method.
module Worker
2012-11-04 01:05:37 -04:00
attr_accessor :jid
def self.included(base)
raise ArgumentError, "You cannot include Sidekiq::Worker in an ActiveJob: #{base.name}" if base.ancestors.any? {|c| c.name == 'ActiveJob::Base' }
base.extend(ClassMethods)
base.sidekiq_class_attribute :sidekiq_options_hash
base.sidekiq_class_attribute :sidekiq_retry_in_block
base.sidekiq_class_attribute :sidekiq_retries_exhausted_block
end
def logger
Sidekiq.logger
end
# This helper class encapsulates the set options for `set`, e.g.
#
# SomeWorker.set(queue: 'foo').perform_async(....)
#
class Setter
def initialize(klass, opts)
@klass = klass
@opts = opts
end
def perform_async(*args)
@klass.client_push(@opts.merge('args' => args, 'class' => @klass))
end
# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)
payload = @opts.merge('class' => @klass, 'args' => args, 'at' => ts)
# Optimization to enqueue something now that is scheduled to go out now or in the past
payload.delete('at') if ts <= now
@klass.client_push(payload)
end
alias_method :perform_at, :perform_in
end
module ClassMethods
ACCESSOR_MUTEX = Mutex.new
def delay(*args)
raise ArgumentError, "Do not call .delay on a Sidekiq::Worker class, call .perform_async"
end
def delay_for(*args)
raise ArgumentError, "Do not call .delay_for on a Sidekiq::Worker class, call .perform_in"
end
def delay_until(*args)
raise ArgumentError, "Do not call .delay_until on a Sidekiq::Worker class, call .perform_at"
end
def set(options)
Setter.new(self, options)
end
def perform_async(*args)
client_push('class' => self, 'args' => args)
end
2015-12-04 14:03:16 -05:00
# +interval+ must be a timestamp, numeric or something that acts
# numeric (like an activesupport time interval).
def perform_in(interval, *args)
2013-09-29 17:24:25 -04:00
int = interval.to_f
now = Time.now.to_f
ts = (int < 1_000_000_000 ? now + int : int)
2013-09-29 17:24:25 -04:00
item = { 'class' => self, 'args' => args, 'at' => ts }
2013-09-29 17:24:25 -04:00
# Optimization to enqueue something now that is scheduled to go out now or in the past
item.delete('at') if ts <= now
2013-09-29 17:24:25 -04:00
client_push(item)
end
alias_method :perform_at, :perform_in
##
# Allows customization for this type of Worker.
# Legal options:
#
2016-01-18 10:59:09 -05:00
# queue - use a named queue for this Worker, default 'default'
# retry - enable the RetryJobs middleware for this Worker, *true* to use the default
# or *Integer* count
# backtrace - whether to save any error backtrace in the retry payload to display in web UI,
# can be true, false or an integer number of lines to save, default *false*
2016-01-18 10:59:09 -05:00
# pool - use the given Redis connection pool to push this type of job to a given shard.
#
# In practice, any option is allowed. This is the main mechanism to configure the
# options for a specific job.
def sidekiq_options(opts={})
# stringify
2017-05-15 17:50:23 -04:00
self.sidekiq_options_hash = get_sidekiq_options.merge(Hash[opts.map{|k, v| [k.to_s, v]}])
end
2013-06-25 11:07:45 -04:00
def sidekiq_retry_in(&block)
2013-06-26 00:10:46 -04:00
self.sidekiq_retry_in_block = block
2013-06-25 11:07:45 -04:00
end
def sidekiq_retries_exhausted(&block)
self.sidekiq_retries_exhausted_block = block
end
def get_sidekiq_options # :nodoc:
2013-09-07 12:54:13 -04:00
self.sidekiq_options_hash ||= Sidekiq.default_worker_options
end
2012-10-18 00:00:54 -04:00
def client_push(item) # :nodoc:
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
# stringify
item.keys.each do |key|
item[key.to_s] = item.delete(key)
end
Sidekiq::Client.new(pool).push(item)
2012-06-28 03:46:18 -04:00
end
def sidekiq_class_attribute(*attrs)
instance_reader = true
instance_writer = true
attrs.each do |name|
synchronized_getter = "__synchronized_#{name}"
singleton_class.instance_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
end
define_singleton_method(synchronized_getter) { nil }
singleton_class.class_eval do
private(synchronized_getter)
end
define_singleton_method(name) { ACCESSOR_MUTEX.synchronize { send synchronized_getter } }
ivar = "@#{name}"
singleton_class.instance_eval do
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
end
define_singleton_method("#{name}=") do |val|
singleton_class.class_eval do
ACCESSOR_MUTEX.synchronize do
undef_method(synchronized_getter) if method_defined?(synchronized_getter) || private_method_defined?(synchronized_getter)
define_method(synchronized_getter) { val }
end
end
if singleton_class?
class_eval do
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined? ivar
instance_variable_get ivar
else
singleton_class.send name
end
end
end
end
val
end
if instance_reader
undef_method(name) if method_defined?(name) || private_method_defined?(name)
define_method(name) do
if instance_variable_defined?(ivar)
instance_variable_get ivar
else
self.class.public_send name
end
end
end
if instance_writer
m = "#{name}="
undef_method(m) if method_defined?(m) || private_method_defined?(m)
attr_writer name
end
end
end
2012-01-16 23:05:38 -05:00
end
end
end