1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

bugfixes for Pro 6.0

This commit is contained in:
Mike Perham 2022-08-30 11:51:23 -07:00
parent 343537619b
commit 1b83a15278
No known key found for this signature in database
13 changed files with 69 additions and 175 deletions

View file

@ -70,19 +70,29 @@ module Sidekiq
end end
def self.configure_server(&block) def self.configure_server(&block)
(@servers ||= []) << block (@config_blocks ||= []) << block
yield default_configuration if server? yield default_configuration if server?
end end
# Creates a Sidekiq::Config instance that is more tuned for embedding # Creates a Sidekiq::Config instance that is more tuned for embedding
# within an arbitrary Ruby process. Noteably it reduces concurrency by # within an arbitrary Ruby process. Noteably it reduces concurrency by
# default so there is less contention for CPU time. # default so there is less contention for CPU time with other threads.
#
# inst = Sidekiq.configure_embed do |config|
# config.queues = %w[critical default low]
# end
# inst.run
# sleep 10
# inst.terminate
#
# NB: it is really easy to overload a Ruby process with threads due to the GIL.
# I do not recommend setting concurrency higher than 2-3.
def self.configure_embed(&block) def self.configure_embed(&block)
require "sidekiq/capsule" require "sidekiq/capsule"
require "sidekiq/launcher" require "sidekiq/launcher"
cfg = Sidekiq::Config.new cfg = Sidekiq::Config.new
cfg.concurrency = 2 cfg.concurrency = 1
@servers.each { |block| block.call(cfg) } @config_blocks.each { |block| block.call(cfg) }
yield cfg yield cfg
Sidekiq::Launcher.new(cfg) Sidekiq::Launcher.new(cfg)

View file

@ -1,5 +1,4 @@
require "sidekiq/component" require "sidekiq/component"
require "sidekiq/fetch"
module Sidekiq module Sidekiq
# A Sidekiq::Capsule is the set of resources necessary to # A Sidekiq::Capsule is the set of resources necessary to
@ -21,21 +20,21 @@ module Sidekiq
attr_reader :name attr_reader :name
attr_reader :queues attr_reader :queues
attr_reader :strict
attr_accessor :concurrency attr_accessor :concurrency
attr_accessor :fetch_class
def initialize(name, config) def initialize(name, config)
@name = name @name = name
@config = config @config = config
@queues = ["default"] @queues = ["default"]
@concurrency = 10 @concurrency = 10
@strict = true
@fetch_class = Sidekiq::BasicFetch
end end
def fetcher def fetcher
@fetcher ||= fetch_class.new(self) @fetcher ||= begin
inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self)
inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup)
inst
end
end end
def stop def stop
@ -43,12 +42,10 @@ module Sidekiq
end end
def queues=(val) def queues=(val)
@strict = true
@queues = Array(val).each_with_object([]) do |qstr, memo| @queues = Array(val).each_with_object([]) do |qstr, memo|
arr = qstr arr = qstr
arr = qstr.split(",") if qstr.is_a?(String) arr = qstr.split(",") if qstr.is_a?(String)
name, weight = arr name, weight = arr
@strict = false if weight.to_i > 0
[weight.to_i, 1].max.times do [weight.to_i, 1].max.times do
memo << name memo << name
end end
@ -59,18 +56,22 @@ module Sidekiq
# Avoid if possible and add middleware globally so all # Avoid if possible and add middleware globally so all
# capsules share the same chains. Easier to debug that way. # capsules share the same chains. Easier to debug that way.
def client_middleware def client_middleware
@client_chain ||= config.client_middleware.dup @client_chain ||= config.client_middleware.copy_for(self)
yield @client_chain if block_given? yield @client_chain if block_given?
@client_chain @client_chain
end end
def server_middleware def server_middleware
@server_chain ||= config.server_middleware.dup @server_chain ||= config.server_middleware.copy_for(self)
yield @server_chain if block_given? yield @server_chain if block_given?
@server_chain @server_chain
end end
def redis_pool def redis_pool
Thread.current[:sidekiq_redis_pool] || local_redis_pool
end
def local_redis_pool
# connection pool is lazy, it will not create connections unless you actually need them # connection pool is lazy, it will not create connections unless you actually need them
# so don't be skimpy! # so don't be skimpy!
@redis ||= config.new_redis_pool(@concurrency) @redis ||= config.new_redis_pool(@concurrency)
@ -98,6 +99,10 @@ module Sidekiq
end end
end end
def lookup(name)
config.lookup(name)
end
def logger def logger
config.logger config.logger
end end

View file

@ -48,10 +48,11 @@ module Sidekiq
# old calling method, accept 1 pool argument # old calling method, accept 1 pool argument
@redis_pool = args[0] @redis_pool = args[0]
@chain = Sidekiq.default_configuration.client_middleware @chain = Sidekiq.default_configuration.client_middleware
@config = Sidekiq.default_configuration
else else
# new calling method: keyword arguments # new calling method: keyword arguments
@config = kwargs[:config] || Sidekiq.default_configuration @config = kwargs[:config] || Sidekiq.default_configuration
@redis_pool = kwargs[:pool] || Thread.current[:sidekiq_via_pool] || @config&.redis_pool @redis_pool = kwargs[:pool] || Thread.current[:sidekiq_redis_pool] || @config&.redis_pool
@chain = kwargs[:chain] || @config&.client_middleware @chain = kwargs[:chain] || @config&.client_middleware
raise ArgumentError, "No Redis pool available for Sidekiq::Client" unless @redis_pool raise ArgumentError, "No Redis pool available for Sidekiq::Client" unless @redis_pool
end end
@ -147,11 +148,11 @@ module Sidekiq
# you cannot scale any other way (e.g. splitting your app into smaller apps). # you cannot scale any other way (e.g. splitting your app into smaller apps).
def self.via(pool) def self.via(pool)
raise ArgumentError, "No pool given" if pool.nil? raise ArgumentError, "No pool given" if pool.nil?
current_sidekiq_pool = Thread.current[:sidekiq_via_pool] current_sidekiq_pool = Thread.current[:sidekiq_redis_pool]
Thread.current[:sidekiq_via_pool] = pool Thread.current[:sidekiq_redis_pool] = pool
yield yield
ensure ensure
Thread.current[:sidekiq_via_pool] = current_sidekiq_pool Thread.current[:sidekiq_redis_pool] = current_sidekiq_pool
end end
class << self class << self

View file

@ -50,6 +50,7 @@ module Sidekiq
oneshot = options.fetch(:oneshot, true) oneshot = options.fetch(:oneshot, true)
reverse = options[:reverse] reverse = options[:reverse]
reraise = options[:reraise] reraise = options[:reraise]
logger.debug("Firing #{event} event") if oneshot
arr = config[:lifecycle_events][event] arr = config[:lifecycle_events][event]
arr.reverse! if reverse arr.reverse! if reverse

View file

@ -76,13 +76,13 @@ module Sidekiq
end end
def client_middleware def client_middleware
@client_chain ||= Sidekiq::Middleware::Chain.new @client_chain ||= Sidekiq::Middleware::Chain.new(self)
yield @client_chain if block_given? yield @client_chain if block_given?
@client_chain @client_chain
end end
def server_middleware def server_middleware
@server_chain ||= Sidekiq::Middleware::Chain.new @server_chain ||= Sidekiq::Middleware::Chain.new(self)
yield @server_chain if block_given? yield @server_chain if block_given?
@server_chain @server_chain
end end
@ -106,7 +106,11 @@ module Sidekiq
end end
def redis_pool def redis_pool
# this is our global client/housekeeping pool. each capsule has its Thread.current[:sidekiq_redis_pool] || Thread.current[:sidekiq_capsule]&.redis_pool || local_redis_pool
end
private def local_redis_pool
# this is our default client/housekeeping pool. each capsule has its
# own pool for executing threads. # own pool for executing threads.
size = Integer(ENV["RAILS_MAX_THREADS"] || 5) size = Integer(ENV["RAILS_MAX_THREADS"] || 5)
@redis ||= new_redis_pool(size) @redis ||= new_redis_pool(size)
@ -162,9 +166,12 @@ module Sidekiq
end end
# find a singleton # find a singleton
def lookup(name) def lookup(name, default_class = nil)
# JNDI is just a fancy name for a hash lookup # JNDI is just a fancy name for a hash lookup
@directory[name] @directory.fetch(name) do |key|
return nil unless default_class
@directory[key] = default_class.new(self)
end
end end
## ##
@ -237,6 +244,9 @@ module Sidekiq
# INTERNAL USE ONLY # INTERNAL USE ONLY
def handle_exception(ex, ctx = {}) def handle_exception(ex, ctx = {})
if @options[:error_handlers].size == 0
p ["!!!!!", ex]
end
@options[:error_handlers].each do |handler| @options[:error_handlers].each do |handler|
handler.call(ex, ctx, self) handler.call(ex, ctx, self)
rescue => e rescue => e

View file

@ -2,6 +2,7 @@
require "sidekiq" require "sidekiq"
require "sidekiq/component" require "sidekiq/component"
require "sidekiq/capsule"
module Sidekiq # :nodoc: module Sidekiq # :nodoc:
class BasicFetch class BasicFetch
@ -29,7 +30,7 @@ module Sidekiq # :nodoc:
def initialize(cap) def initialize(cap)
raise ArgumentError, "missing queue list" unless cap.queues raise ArgumentError, "missing queue list" unless cap.queues
@config = cap @config = cap
@strictly_ordered_queues = !!@config.strict @strictly_ordered_queues = (config.queues.size == config.queues.uniq.size)
@queues = config.queues.map { |q| "queue:#{q}" } @queues = config.queues.map { |q| "queue:#{q}" }
if @strictly_ordered_queues if @strictly_ordered_queues
@queues.uniq! @queues.uniq!

View file

@ -80,15 +80,6 @@ module Sidekiq
class Chain class Chain
include Enumerable include Enumerable
# A unique instance of the middleware chain is created for
# each job executed in order to be thread-safe.
# @param copy [Sidekiq::Middleware::Chain] New instance of Chain
# @returns nil
def initialize_copy(copy)
copy.instance_variable_set(:@entries, entries.dup)
nil
end
# Iterate through each middleware in the chain # Iterate through each middleware in the chain
def each(&block) def each(&block)
entries.each(&block) entries.each(&block)
@ -105,6 +96,12 @@ module Sidekiq
@entries ||= [] @entries ||= []
end end
def copy_for(capsule)
chain = Sidekiq::Middleware::Chain.new(capsule)
chain.instance_variable_set(:@entries, entries.dup)
chain
end
# Remove all middleware matching the given Class # Remove all middleware matching the given Class
# @param klass [Class] # @param klass [Class]
def remove(klass) def remove(klass)

View file

@ -51,7 +51,7 @@ module Sidekiq
end end
def server_middleware def server_middleware
@server_chain ||= Middleware::Chain.new @server_chain ||= Middleware::Chain.new(Sidekiq.default_configuration)
yield @server_chain if block_given? yield @server_chain if block_given?
@server_chain @server_chain
end end

View file

@ -31,20 +31,16 @@ describe Sidekiq::Capsule do
it "parses queues correctly" do it "parses queues correctly" do
cap = @cap cap = @cap
assert_equal ["default"], cap.queues assert_equal ["default"], cap.queues
assert cap.strict
cap.queues = %w[foo bar,2] cap.queues = %w[foo bar,2]
assert_equal %w[foo bar bar], cap.queues assert_equal %w[foo bar bar], cap.queues
refute cap.strict
cap.queues = ["default"] cap.queues = ["default"]
assert_equal %w[default], cap.queues assert_equal %w[default], cap.queues
assert cap.strict
# config/sidekiq.yml input will look like this # config/sidekiq.yml input will look like this
cap.queues = [["foo"], ["baz", 3]] cap.queues = [["foo"], ["baz", 3]]
assert_equal %w[foo baz baz baz], cap.queues assert_equal %w[foo baz baz baz], cap.queues
refute cap.strict
end end
it "can have customized middleware chains" do it "can have customized middleware chains" do

View file

@ -22,10 +22,6 @@ describe Sidekiq::CLI do
@cli.config.capsules.first.concurrency @cli.config.capsules.first.concurrency
end end
def strict
@cli.config.capsules.first.strict
end
describe "#parse" do describe "#parse" do
describe "options" do describe "options" do
it "accepts -r" do it "accepts -r" do
@ -63,16 +59,6 @@ describe Sidekiq::CLI do
end end
end end
describe "setting internal options via the config file" do
describe "setting the `strict` option via the config file" do
it "discards the `strict` option specified via the config file" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_with_internal_options.yml])
assert_equal true, !!strict
end
end
end
describe "queues" do describe "queues" do
it "accepts with -q" do it "accepts with -q" do
@cli.parse(%w[sidekiq -q foo -r ./test/fake_env.rb]) @cli.parse(%w[sidekiq -q foo -r ./test/fake_env.rb])
@ -86,12 +72,6 @@ describe Sidekiq::CLI do
assert_equal ["foo", "bar"], queues assert_equal ["foo", "bar"], queues
end end
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -q foo -q bar -r ./test/fake_env.rb])
assert_equal true, !!strict
end
end end
describe "when weights are present" do describe "when weights are present" do
@ -100,12 +80,6 @@ describe Sidekiq::CLI do
assert_equal ["foo", "foo", "foo", "bar"], queues assert_equal ["foo", "foo", "foo", "bar"], queues
end end
it "does not set strictly ordered queues" do
@cli.parse(%w[sidekiq -q foo,3 -q bar -r ./test/fake_env.rb])
assert_equal false, !!strict
end
end end
it "accepts queues with multi-word names" do it "accepts queues with multi-word names" do
@ -274,99 +248,6 @@ describe Sidekiq::CLI do
assert_equal 7, queues.count { |q| q == "often" } assert_equal 7, queues.count { |q| q == "often" }
assert_equal 3, queues.count { |q| q == "seldom" } assert_equal 3, queues.count { |q| q == "seldom" }
end end
describe "when the config file specifies queues with weights" do
describe "when -q specifies queues without weights" do
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/config.yml
-r ./test/fake_env.rb
-q foo -q bar])
assert_equal true, !!strict
end
end
describe "when -q specifies no queues" do
it "does not set strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/config.yml
-r ./test/fake_env.rb])
assert_equal false, !!strict
end
end
describe "when -q specifies queues with weights" do
it "does not set strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/config.yml
-r ./test/fake_env.rb
-q foo,2 -q bar,3])
assert_equal false, !!strict
end
end
end
describe "when the config file specifies queues without weights" do
describe "when -q specifies queues without weights" do
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml
-r ./test/fake_env.rb
-q foo -q bar])
assert_equal true, !!strict
end
end
describe "when -q specifies no queues" do
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml
-r ./test/fake_env.rb])
assert_equal true, !!strict
end
end
describe "when -q specifies queues with weights" do
it "does not set strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml
-r ./test/fake_env.rb
-q foo,2 -q bar,3])
assert_equal false, !!strict
end
end
end
describe "when the config file specifies no queues" do
describe "when -q specifies queues without weights" do
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml
-r ./test/fake_env.rb
-q foo -q bar])
assert_equal true, !!strict
end
end
describe "when -q specifies no queues" do
it "sets strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml
-r ./test/fake_env.rb])
assert_equal true, !!strict
end
end
describe "when -q specifies queues with weights" do
it "does not set strictly ordered queues" do
@cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml
-r ./test/fake_env.rb
-q foo,2 -q bar,3])
assert_equal false, !!strict
end
end
end
end end
describe "default config file" do describe "default config file" do

View file

@ -16,7 +16,6 @@ describe Sidekiq::BasicFetch do
it "retrieves" do it "retrieves" do
@cap.queues = ["basic", "bar,3"] @cap.queues = ["basic", "bar,3"]
refute @cap.strict
fetch = Sidekiq::BasicFetch.new(@cap) fetch = Sidekiq::BasicFetch.new(@cap)
uow = fetch.retrieve_work uow = fetch.retrieve_work
@ -30,9 +29,8 @@ describe Sidekiq::BasicFetch do
assert_nil uow.acknowledge assert_nil uow.acknowledge
end end
it "retrieves with strict setting" do it "retrieves with strict ordering" do
@cap.queues = ["basic", "bar"] @cap.queues = ["basic", "bar"]
assert @cap.strict
fetch = Sidekiq::BasicFetch.new(@cap) fetch = Sidekiq::BasicFetch.new(@cap)
cmd = fetch.queues_cmd cmd = fetch.queues_cmd
assert_equal cmd, ["queue:basic", "queue:bar", Sidekiq::BasicFetch::TIMEOUT] assert_equal cmd, ["queue:basic", "queue:bar", Sidekiq::BasicFetch::TIMEOUT]

View file

@ -35,11 +35,11 @@ class MyCustomMiddleware
end end
describe Sidekiq::Job do describe Sidekiq::Job do
describe "#set" do
before do before do
@cfg = reset! @config = reset!
end end
describe "#set" do
it "provides basic ActiveJob compatibilility" do it "provides basic ActiveJob compatibilility" do
q = Sidekiq::ScheduledSet.new q = Sidekiq::ScheduledSet.new
assert_equal 0, q.size assert_equal 0, q.size
@ -140,16 +140,10 @@ describe Sidekiq::Job do
$my_recorder = [] $my_recorder = []
it "executes middleware & runs job inline" do it "executes middleware & runs job inline" do
server_chain = Sidekiq::Middleware::Chain.new @config.server_middleware.add MyCustomMiddleware, "1-server", $my_recorder
server_chain.add MyCustomMiddleware, "1-server", $my_recorder @config.client_middleware.add MyCustomMiddleware, "1-client", $my_recorder
client_chain = Sidekiq::Middleware::Chain.new
client_chain.add MyCustomMiddleware, "1-client", $my_recorder
Sidekiq.default_configuration.stub(:server_middleware, server_chain) do
Sidekiq.default_configuration.stub(:client_middleware, client_chain) do
MyCustomJob.perform_inline($my_recorder) MyCustomJob.perform_inline($my_recorder)
assert_equal $my_recorder.flatten, %w[1-client-before 1-client-after 1-server-before work_performed 1-server-after] assert_equal $my_recorder.flatten, %w[1-client-before 1-client-after 1-server-before work_performed 1-server-after]
end end
end end
end
end
end end