mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Re-implement set to avoid TLS, shout out to #2152
This commit is contained in:
parent
62f2f60ff3
commit
9e826a05e1
2 changed files with 32 additions and 8 deletions
|
@ -16,6 +16,7 @@ Sidekiq::Middleware::Server::Logging -> Sidekiq::JobLogging
|
||||||
must opt into them.
|
must opt into them.
|
||||||
- The Web UI is now BiDi and can render RTL languages like Arabic, Farsi and Hebrew.
|
- The Web UI is now BiDi and can render RTL languages like Arabic, Farsi and Hebrew.
|
||||||
- Rails 3.2 and Ruby 2.0 and 2.1 are no longer supported.
|
- Rails 3.2 and Ruby 2.0 and 2.1 are no longer supported.
|
||||||
|
- The `SomeWorker.set(options)` API was re-written to avoid thread-local state. [#2152]
|
||||||
- Please see the [5.0 Upgrade notes](5.0-Upgrade.md) for more detail.
|
- Please see the [5.0 Upgrade notes](5.0-Upgrade.md) for more detail.
|
||||||
|
|
||||||
4.2.10
|
4.2.10
|
||||||
|
|
|
@ -4,6 +4,7 @@ require 'sidekiq/core_ext'
|
||||||
|
|
||||||
module Sidekiq
|
module Sidekiq
|
||||||
|
|
||||||
|
|
||||||
##
|
##
|
||||||
# Include this module in your worker class and you can easily create
|
# Include this module in your worker class and you can easily create
|
||||||
# asynchronous jobs:
|
# asynchronous jobs:
|
||||||
|
@ -37,6 +38,34 @@ module Sidekiq
|
||||||
Sidekiq.logger
|
Sidekiq.logger
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# This helper class encapsulates the set options for `set`, e.g.
|
||||||
|
#
|
||||||
|
# SomeWorker.set(queue: 'foo').perform_async(....)
|
||||||
|
#
|
||||||
|
class Setter
|
||||||
|
def initialize(opts)
|
||||||
|
@opts = opts
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform_async(*args)
|
||||||
|
@opts['class'].client_push(@opts.merge!('args' => args))
|
||||||
|
end
|
||||||
|
|
||||||
|
# +interval+ must be a timestamp, numeric or something that acts
|
||||||
|
# numeric (like an activesupport time interval).
|
||||||
|
def perform_in(interval, *args)
|
||||||
|
int = interval.to_f
|
||||||
|
now = Time.now.to_f
|
||||||
|
ts = (int < 1_000_000_000 ? now + int : int)
|
||||||
|
|
||||||
|
@opts.merge! 'args' => args, 'at' => ts
|
||||||
|
# Optimization to enqueue something now that is scheduled to go out now or in the past
|
||||||
|
@opts.delete('at'.freeze) if ts <= now
|
||||||
|
@opts['class'].client_push(@opts)
|
||||||
|
end
|
||||||
|
alias_method :perform_at, :perform_in
|
||||||
|
end
|
||||||
|
|
||||||
module ClassMethods
|
module ClassMethods
|
||||||
|
|
||||||
def delay(*args)
|
def delay(*args)
|
||||||
|
@ -52,8 +81,7 @@ module Sidekiq
|
||||||
end
|
end
|
||||||
|
|
||||||
def set(options)
|
def set(options)
|
||||||
Thread.current[:sidekiq_worker_set] = options
|
Setter.new(options.merge!('class' => self))
|
||||||
self
|
|
||||||
end
|
end
|
||||||
|
|
||||||
def perform_async(*args)
|
def perform_async(*args)
|
||||||
|
@ -107,12 +135,7 @@ module Sidekiq
|
||||||
|
|
||||||
def client_push(item) # :nodoc:
|
def client_push(item) # :nodoc:
|
||||||
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
|
pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool
|
||||||
hash = if Thread.current[:sidekiq_worker_set]
|
hash = item.stringify_keys
|
||||||
x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil
|
|
||||||
x.stringify_keys.merge(item.stringify_keys)
|
|
||||||
else
|
|
||||||
item.stringify_keys
|
|
||||||
end
|
|
||||||
Sidekiq::Client.new(pool).push(hash)
|
Sidekiq::Client.new(pool).push(hash)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue