From 9e826a05e13862a732007dd6515cf18cdbe815eb Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Wed, 5 Apr 2017 10:56:06 -0700 Subject: [PATCH] Re-implement set to avoid TLS, shout out to #2152 --- Changes.md | 1 + lib/sidekiq/worker.rb | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 8 deletions(-) diff --git a/Changes.md b/Changes.md index 574a1e11..26dffe3b 100644 --- a/Changes.md +++ b/Changes.md @@ -16,6 +16,7 @@ Sidekiq::Middleware::Server::Logging -> Sidekiq::JobLogging must opt into them. - The Web UI is now BiDi and can render RTL languages like Arabic, Farsi and Hebrew. - Rails 3.2 and Ruby 2.0 and 2.1 are no longer supported. +- The `SomeWorker.set(options)` API was re-written to avoid thread-local state. [#2152] - Please see the [5.0 Upgrade notes](5.0-Upgrade.md) for more detail. 4.2.10 diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index df8d6c51..1a094fff 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -4,6 +4,7 @@ require 'sidekiq/core_ext' module Sidekiq + ## # Include this module in your worker class and you can easily create # asynchronous jobs: @@ -37,6 +38,34 @@ module Sidekiq Sidekiq.logger end + # This helper class encapsulates the set options for `set`, e.g. + # + # SomeWorker.set(queue: 'foo').perform_async(....) + # + class Setter + def initialize(opts) + @opts = opts + end + + def perform_async(*args) + @opts['class'].client_push(@opts.merge!('args' => args)) + end + + # +interval+ must be a timestamp, numeric or something that acts + # numeric (like an activesupport time interval). + def perform_in(interval, *args) + int = interval.to_f + now = Time.now.to_f + ts = (int < 1_000_000_000 ? now + int : int) + + @opts.merge! 'args' => args, 'at' => ts + # Optimization to enqueue something now that is scheduled to go out now or in the past + @opts.delete('at'.freeze) if ts <= now + @opts['class'].client_push(@opts) + end + alias_method :perform_at, :perform_in + end + module ClassMethods def delay(*args) @@ -52,8 +81,7 @@ module Sidekiq end def set(options) - Thread.current[:sidekiq_worker_set] = options - self + Setter.new(options.merge!('class' => self)) end def perform_async(*args) @@ -107,12 +135,7 @@ module Sidekiq def client_push(item) # :nodoc: pool = Thread.current[:sidekiq_via_pool] || get_sidekiq_options['pool'] || Sidekiq.redis_pool - hash = if Thread.current[:sidekiq_worker_set] - x, Thread.current[:sidekiq_worker_set] = Thread.current[:sidekiq_worker_set], nil - x.stringify_keys.merge(item.stringify_keys) - else - item.stringify_keys - end + hash = item.stringify_keys Sidekiq::Client.new(pool).push(hash) end