From 1b83a152786ed382f07fff12d2608534f1e3c922 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 30 Aug 2022 11:51:23 -0700 Subject: [PATCH] bugfixes for Pro 6.0 --- lib/sidekiq.rb | 18 +++-- lib/sidekiq/capsule.rb | 25 ++++--- lib/sidekiq/client.rb | 9 +-- lib/sidekiq/component.rb | 1 + lib/sidekiq/config.rb | 20 ++++-- lib/sidekiq/fetch.rb | 3 +- lib/sidekiq/middleware/chain.rb | 15 ++-- lib/sidekiq/testing.rb | 2 +- myapp/Gemfile | 2 +- test/capsule.rb | 4 -- test/cli.rb | 119 -------------------------------- test/fetch.rb | 4 +- test/job.rb | 22 +++--- 13 files changed, 69 insertions(+), 175 deletions(-) diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 2ce214b2..3230f17b 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -70,19 +70,29 @@ module Sidekiq end def self.configure_server(&block) - (@servers ||= []) << block + (@config_blocks ||= []) << block yield default_configuration if server? end # Creates a Sidekiq::Config instance that is more tuned for embedding # within an arbitrary Ruby process. Noteably it reduces concurrency by - # default so there is less contention for CPU time. + # default so there is less contention for CPU time with other threads. + # + # inst = Sidekiq.configure_embed do |config| + # config.queues = %w[critical default low] + # end + # inst.run + # sleep 10 + # inst.terminate + # + # NB: it is really easy to overload a Ruby process with threads due to the GIL. + # I do not recommend setting concurrency higher than 2-3. def self.configure_embed(&block) require "sidekiq/capsule" require "sidekiq/launcher" cfg = Sidekiq::Config.new - cfg.concurrency = 2 - @servers.each { |block| block.call(cfg) } + cfg.concurrency = 1 + @config_blocks.each { |block| block.call(cfg) } yield cfg Sidekiq::Launcher.new(cfg) diff --git a/lib/sidekiq/capsule.rb b/lib/sidekiq/capsule.rb index 02d9dc54..c4aafe19 100644 --- a/lib/sidekiq/capsule.rb +++ b/lib/sidekiq/capsule.rb @@ -1,5 +1,4 @@ require "sidekiq/component" -require "sidekiq/fetch" module Sidekiq # A Sidekiq::Capsule is the set of resources necessary to @@ -21,21 +20,21 @@ module Sidekiq attr_reader :name attr_reader :queues - attr_reader :strict attr_accessor :concurrency - attr_accessor :fetch_class def initialize(name, config) @name = name @config = config @queues = ["default"] @concurrency = 10 - @strict = true - @fetch_class = Sidekiq::BasicFetch end def fetcher - @fetcher ||= fetch_class.new(self) + @fetcher ||= begin + inst = (config[:fetch_class] || Sidekiq::BasicFetch).new(self) + inst.setup(config[:fetch_setup]) if inst.respond_to?(:setup) + inst + end end def stop @@ -43,12 +42,10 @@ module Sidekiq end def queues=(val) - @strict = true @queues = Array(val).each_with_object([]) do |qstr, memo| arr = qstr arr = qstr.split(",") if qstr.is_a?(String) name, weight = arr - @strict = false if weight.to_i > 0 [weight.to_i, 1].max.times do memo << name end @@ -59,18 +56,22 @@ module Sidekiq # Avoid if possible and add middleware globally so all # capsules share the same chains. Easier to debug that way. def client_middleware - @client_chain ||= config.client_middleware.dup + @client_chain ||= config.client_middleware.copy_for(self) yield @client_chain if block_given? @client_chain end def server_middleware - @server_chain ||= config.server_middleware.dup + @server_chain ||= config.server_middleware.copy_for(self) yield @server_chain if block_given? @server_chain end def redis_pool + Thread.current[:sidekiq_redis_pool] || local_redis_pool + end + + def local_redis_pool # connection pool is lazy, it will not create connections unless you actually need them # so don't be skimpy! @redis ||= config.new_redis_pool(@concurrency) @@ -98,6 +99,10 @@ module Sidekiq end end + def lookup(name) + config.lookup(name) + end + def logger config.logger end diff --git a/lib/sidekiq/client.rb b/lib/sidekiq/client.rb index 88a5cc32..1316b77f 100644 --- a/lib/sidekiq/client.rb +++ b/lib/sidekiq/client.rb @@ -48,10 +48,11 @@ module Sidekiq # old calling method, accept 1 pool argument @redis_pool = args[0] @chain = Sidekiq.default_configuration.client_middleware + @config = Sidekiq.default_configuration else # new calling method: keyword arguments @config = kwargs[:config] || Sidekiq.default_configuration - @redis_pool = kwargs[:pool] || Thread.current[:sidekiq_via_pool] || @config&.redis_pool + @redis_pool = kwargs[:pool] || Thread.current[:sidekiq_redis_pool] || @config&.redis_pool @chain = kwargs[:chain] || @config&.client_middleware raise ArgumentError, "No Redis pool available for Sidekiq::Client" unless @redis_pool end @@ -147,11 +148,11 @@ module Sidekiq # you cannot scale any other way (e.g. splitting your app into smaller apps). def self.via(pool) raise ArgumentError, "No pool given" if pool.nil? - current_sidekiq_pool = Thread.current[:sidekiq_via_pool] - Thread.current[:sidekiq_via_pool] = pool + current_sidekiq_pool = Thread.current[:sidekiq_redis_pool] + Thread.current[:sidekiq_redis_pool] = pool yield ensure - Thread.current[:sidekiq_via_pool] = current_sidekiq_pool + Thread.current[:sidekiq_redis_pool] = current_sidekiq_pool end class << self diff --git a/lib/sidekiq/component.rb b/lib/sidekiq/component.rb index f0001811..d399c411 100644 --- a/lib/sidekiq/component.rb +++ b/lib/sidekiq/component.rb @@ -50,6 +50,7 @@ module Sidekiq oneshot = options.fetch(:oneshot, true) reverse = options[:reverse] reraise = options[:reraise] + logger.debug("Firing #{event} event") if oneshot arr = config[:lifecycle_events][event] arr.reverse! if reverse diff --git a/lib/sidekiq/config.rb b/lib/sidekiq/config.rb index 012f27ab..fd483d35 100644 --- a/lib/sidekiq/config.rb +++ b/lib/sidekiq/config.rb @@ -76,13 +76,13 @@ module Sidekiq end def client_middleware - @client_chain ||= Sidekiq::Middleware::Chain.new + @client_chain ||= Sidekiq::Middleware::Chain.new(self) yield @client_chain if block_given? @client_chain end def server_middleware - @server_chain ||= Sidekiq::Middleware::Chain.new + @server_chain ||= Sidekiq::Middleware::Chain.new(self) yield @server_chain if block_given? @server_chain end @@ -106,7 +106,11 @@ module Sidekiq end def redis_pool - # this is our global client/housekeeping pool. each capsule has its + Thread.current[:sidekiq_redis_pool] || Thread.current[:sidekiq_capsule]&.redis_pool || local_redis_pool + end + + private def local_redis_pool + # this is our default client/housekeeping pool. each capsule has its # own pool for executing threads. size = Integer(ENV["RAILS_MAX_THREADS"] || 5) @redis ||= new_redis_pool(size) @@ -162,9 +166,12 @@ module Sidekiq end # find a singleton - def lookup(name) + def lookup(name, default_class = nil) # JNDI is just a fancy name for a hash lookup - @directory[name] + @directory.fetch(name) do |key| + return nil unless default_class + @directory[key] = default_class.new(self) + end end ## @@ -237,6 +244,9 @@ module Sidekiq # INTERNAL USE ONLY def handle_exception(ex, ctx = {}) + if @options[:error_handlers].size == 0 + p ["!!!!!", ex] + end @options[:error_handlers].each do |handler| handler.call(ex, ctx, self) rescue => e diff --git a/lib/sidekiq/fetch.rb b/lib/sidekiq/fetch.rb index 0b416b64..9de9cd47 100644 --- a/lib/sidekiq/fetch.rb +++ b/lib/sidekiq/fetch.rb @@ -2,6 +2,7 @@ require "sidekiq" require "sidekiq/component" +require "sidekiq/capsule" module Sidekiq # :nodoc: class BasicFetch @@ -29,7 +30,7 @@ module Sidekiq # :nodoc: def initialize(cap) raise ArgumentError, "missing queue list" unless cap.queues @config = cap - @strictly_ordered_queues = !!@config.strict + @strictly_ordered_queues = (config.queues.size == config.queues.uniq.size) @queues = config.queues.map { |q| "queue:#{q}" } if @strictly_ordered_queues @queues.uniq! diff --git a/lib/sidekiq/middleware/chain.rb b/lib/sidekiq/middleware/chain.rb index 7e867e7b..8e8a0e8f 100644 --- a/lib/sidekiq/middleware/chain.rb +++ b/lib/sidekiq/middleware/chain.rb @@ -80,15 +80,6 @@ module Sidekiq class Chain include Enumerable - # A unique instance of the middleware chain is created for - # each job executed in order to be thread-safe. - # @param copy [Sidekiq::Middleware::Chain] New instance of Chain - # @returns nil - def initialize_copy(copy) - copy.instance_variable_set(:@entries, entries.dup) - nil - end - # Iterate through each middleware in the chain def each(&block) entries.each(&block) @@ -105,6 +96,12 @@ module Sidekiq @entries ||= [] end + def copy_for(capsule) + chain = Sidekiq::Middleware::Chain.new(capsule) + chain.instance_variable_set(:@entries, entries.dup) + chain + end + # Remove all middleware matching the given Class # @param klass [Class] def remove(klass) diff --git a/lib/sidekiq/testing.rb b/lib/sidekiq/testing.rb index 875cd186..5b2ca09a 100644 --- a/lib/sidekiq/testing.rb +++ b/lib/sidekiq/testing.rb @@ -51,7 +51,7 @@ module Sidekiq end def server_middleware - @server_chain ||= Middleware::Chain.new + @server_chain ||= Middleware::Chain.new(Sidekiq.default_configuration) yield @server_chain if block_given? @server_chain end diff --git a/myapp/Gemfile b/myapp/Gemfile index 95678ecf..d9e369c1 100644 --- a/myapp/Gemfile +++ b/myapp/Gemfile @@ -11,4 +11,4 @@ platforms :ruby do gem "sqlite3" end -gem "after_commit_everywhere" \ No newline at end of file +gem "after_commit_everywhere" diff --git a/test/capsule.rb b/test/capsule.rb index b3cb10f2..bce52321 100644 --- a/test/capsule.rb +++ b/test/capsule.rb @@ -31,20 +31,16 @@ describe Sidekiq::Capsule do it "parses queues correctly" do cap = @cap assert_equal ["default"], cap.queues - assert cap.strict cap.queues = %w[foo bar,2] assert_equal %w[foo bar bar], cap.queues - refute cap.strict cap.queues = ["default"] assert_equal %w[default], cap.queues - assert cap.strict # config/sidekiq.yml input will look like this cap.queues = [["foo"], ["baz", 3]] assert_equal %w[foo baz baz baz], cap.queues - refute cap.strict end it "can have customized middleware chains" do diff --git a/test/cli.rb b/test/cli.rb index fdcb4446..b9eb6245 100644 --- a/test/cli.rb +++ b/test/cli.rb @@ -22,10 +22,6 @@ describe Sidekiq::CLI do @cli.config.capsules.first.concurrency end - def strict - @cli.config.capsules.first.strict - end - describe "#parse" do describe "options" do it "accepts -r" do @@ -63,16 +59,6 @@ describe Sidekiq::CLI do end end - describe "setting internal options via the config file" do - describe "setting the `strict` option via the config file" do - it "discards the `strict` option specified via the config file" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_with_internal_options.yml]) - - assert_equal true, !!strict - end - end - end - describe "queues" do it "accepts with -q" do @cli.parse(%w[sidekiq -q foo -r ./test/fake_env.rb]) @@ -86,12 +72,6 @@ describe Sidekiq::CLI do assert_equal ["foo", "bar"], queues end - - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -q foo -q bar -r ./test/fake_env.rb]) - - assert_equal true, !!strict - end end describe "when weights are present" do @@ -100,12 +80,6 @@ describe Sidekiq::CLI do assert_equal ["foo", "foo", "foo", "bar"], queues end - - it "does not set strictly ordered queues" do - @cli.parse(%w[sidekiq -q foo,3 -q bar -r ./test/fake_env.rb]) - - assert_equal false, !!strict - end end it "accepts queues with multi-word names" do @@ -274,99 +248,6 @@ describe Sidekiq::CLI do assert_equal 7, queues.count { |q| q == "often" } assert_equal 3, queues.count { |q| q == "seldom" } end - - describe "when the config file specifies queues with weights" do - describe "when -q specifies queues without weights" do - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/config.yml - -r ./test/fake_env.rb - -q foo -q bar]) - - assert_equal true, !!strict - end - end - - describe "when -q specifies no queues" do - it "does not set strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/config.yml - -r ./test/fake_env.rb]) - - assert_equal false, !!strict - end - end - - describe "when -q specifies queues with weights" do - it "does not set strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/config.yml - -r ./test/fake_env.rb - -q foo,2 -q bar,3]) - - assert_equal false, !!strict - end - end - end - - describe "when the config file specifies queues without weights" do - describe "when -q specifies queues without weights" do - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml - -r ./test/fake_env.rb - -q foo -q bar]) - - assert_equal true, !!strict - end - end - - describe "when -q specifies no queues" do - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml - -r ./test/fake_env.rb]) - - assert_equal true, !!strict - end - end - - describe "when -q specifies queues with weights" do - it "does not set strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_queues_without_weights.yml - -r ./test/fake_env.rb - -q foo,2 -q bar,3]) - - assert_equal false, !!strict - end - end - end - - describe "when the config file specifies no queues" do - describe "when -q specifies queues without weights" do - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml - -r ./test/fake_env.rb - -q foo -q bar]) - - assert_equal true, !!strict - end - end - - describe "when -q specifies no queues" do - it "sets strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml - -r ./test/fake_env.rb]) - - assert_equal true, !!strict - end - end - - describe "when -q specifies queues with weights" do - it "does not set strictly ordered queues" do - @cli.parse(%w[sidekiq -C ./test/cfg/config_empty.yml - -r ./test/fake_env.rb - -q foo,2 -q bar,3]) - - assert_equal false, !!strict - end - end - end end describe "default config file" do diff --git a/test/fetch.rb b/test/fetch.rb index 0d073684..c7194160 100644 --- a/test/fetch.rb +++ b/test/fetch.rb @@ -16,7 +16,6 @@ describe Sidekiq::BasicFetch do it "retrieves" do @cap.queues = ["basic", "bar,3"] - refute @cap.strict fetch = Sidekiq::BasicFetch.new(@cap) uow = fetch.retrieve_work @@ -30,9 +29,8 @@ describe Sidekiq::BasicFetch do assert_nil uow.acknowledge end - it "retrieves with strict setting" do + it "retrieves with strict ordering" do @cap.queues = ["basic", "bar"] - assert @cap.strict fetch = Sidekiq::BasicFetch.new(@cap) cmd = fetch.queues_cmd assert_equal cmd, ["queue:basic", "queue:bar", Sidekiq::BasicFetch::TIMEOUT] diff --git a/test/job.rb b/test/job.rb index 8c5336fe..6bfa8159 100644 --- a/test/job.rb +++ b/test/job.rb @@ -35,11 +35,11 @@ class MyCustomMiddleware end describe Sidekiq::Job do - describe "#set" do - before do - @cfg = reset! - end + before do + @config = reset! + end + describe "#set" do it "provides basic ActiveJob compatibilility" do q = Sidekiq::ScheduledSet.new assert_equal 0, q.size @@ -140,16 +140,10 @@ describe Sidekiq::Job do $my_recorder = [] it "executes middleware & runs job inline" do - server_chain = Sidekiq::Middleware::Chain.new - server_chain.add MyCustomMiddleware, "1-server", $my_recorder - client_chain = Sidekiq::Middleware::Chain.new - client_chain.add MyCustomMiddleware, "1-client", $my_recorder - Sidekiq.default_configuration.stub(:server_middleware, server_chain) do - Sidekiq.default_configuration.stub(:client_middleware, client_chain) do - MyCustomJob.perform_inline($my_recorder) - assert_equal $my_recorder.flatten, %w[1-client-before 1-client-after 1-server-before work_performed 1-server-after] - end - end + @config.server_middleware.add MyCustomMiddleware, "1-server", $my_recorder + @config.client_middleware.add MyCustomMiddleware, "1-client", $my_recorder + MyCustomJob.perform_inline($my_recorder) + assert_equal $my_recorder.flatten, %w[1-client-before 1-client-after 1-server-before work_performed 1-server-after] end end end