diff --git a/Changes.md b/Changes.md index 98085aaa..11bbc92f 100644 --- a/Changes.md +++ b/Changes.md @@ -2,6 +2,11 @@ [Sidekiq Changes](https://github.com/mperham/sidekiq/blob/main/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/main/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/main/Ent-Changes.md) +HEAD +---------- + +- Further optimizations for scheduled polling [#5513] + 6.5.6 ---------- diff --git a/lib/sidekiq/api.rb b/lib/sidekiq/api.rb index 4c65f4e2..2179ee37 100644 --- a/lib/sidekiq/api.rb +++ b/lib/sidekiq/api.rb @@ -9,6 +9,17 @@ require "base64" require "sidekiq/metrics/deploy" require "sidekiq/metrics/query" +# +# Sidekiq's Data API provides a Ruby object model on top +# of Sidekiq's runtime data in Redis. This API should never +# be used within application code for business logic. +# +# The Sidekiq server process never uses this API: all data +# manipulation is done directly for performance reasons to +# ensure we are using Redis as efficiently as possible at +# every callsite. +# + module Sidekiq # Retrieve runtime statistics from Redis regarding # this Sidekiq cluster. @@ -829,10 +840,12 @@ module Sidekiq # :nodoc: # @api private def cleanup + # dont run cleanup more than once per minute return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } + count = 0 Sidekiq.redis do |conn| - procs = conn.sscan("processes").to_a.sort + procs = conn.sscan("processes").to_a heartbeats = conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "info") diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 44be3dd4..1fe9af33 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -146,13 +146,16 @@ module Sidekiq # As we run more processes, the scheduling interval average will approach an even spread # between 0 and poll interval so we don't need this artifical boost. # - if process_count < 10 + count = process_count + interval = poll_interval_average(count) + + if count < 10 # For small clusters, calculate a random interval that is ±50% the desired average. - poll_interval_average * rand + poll_interval_average.to_f / 2 + interval * rand + interval.to_f / 2 else # With 10+ processes, we should have enough randomness to get decent polling # across the entire timespan - poll_interval_average * rand + interval * rand end end @@ -169,14 +172,14 @@ module Sidekiq # the same time: the thundering herd problem. # # We only do this if poll_interval_average is unset (the default). - def poll_interval_average - @config[:poll_interval_average] ||= scaled_poll_interval + def poll_interval_average(count) + @config[:poll_interval_average] || scaled_poll_interval(count) end # Calculates an average poll interval based on the number of known Sidekiq processes. # This minimizes a single point of failure by dispersing check-ins but without taxing # Redis if you run many Sidekiq processes. - def scaled_poll_interval + def scaled_poll_interval(process_count) process_count * @config[:average_scheduled_poll_interval] end @@ -186,9 +189,35 @@ module Sidekiq pcount end + # A copy of Sidekiq::ProcessSet#cleanup because server + # should never depend on sidekiq/api. + def cleanup + # dont run cleanup more than once per minute + return 0 unless redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } + + count = 0 + redis do |conn| + procs = conn.sscan("processes").to_a + heartbeats = conn.pipelined { |pipeline| + procs.each do |key| + pipeline.hget(key, "info") + end + } + + # the hash named key has an expiry of 60 seconds. + # if it's not found, that means the process has not reported + # in to Redis and probably died. + to_prune = procs.select.with_index { |proc, i| + heartbeats[i].nil? + } + count = conn.srem("processes", to_prune) unless to_prune.empty? + end + count + end + def initial_wait - # Have all processes sleep between 5-15 seconds. 10 seconds - # to give time for the heartbeat to register (if the poll interval is going to be calculated by the number + # Have all processes sleep between 5-15 seconds. 10 seconds to give time for + # the heartbeat to register (if the poll interval is going to be calculated by the number # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time. total = 0 total += INITIAL_WAIT unless @config[:poll_interval_average] @@ -196,6 +225,11 @@ module Sidekiq @sleeper.pop(total) rescue Timeout::Error + ensure + # periodically clean out the `processes` set in Redis which can collect + # references to dead processes over time. The process count affects how + # often we scan for scheduled jobs. + cleanup end end end diff --git a/lib/sidekiq/web/helpers.rb b/lib/sidekiq/web/helpers.rb index fa8678b0..452e915e 100644 --- a/lib/sidekiq/web/helpers.rb +++ b/lib/sidekiq/web/helpers.rb @@ -155,7 +155,7 @@ module Sidekiq @sorted_processes ||= begin return processes unless processes.all? { |p| p["hostname"] } - split_characters = /[._-]/ + split_characters = /[._-]+/ padding = processes.flat_map { |p| p["hostname"].split(split_characters) }.map(&:size).max @@ -176,7 +176,7 @@ module Sidekiq end def redis_url - Sidekiq.default_configuration.redis do |conn| + Sidekiq.redis do |conn| conn._config.server_url end end diff --git a/test/scheduled.rb b/test/scheduled.rb index 6418950f..de339a15 100644 --- a/test/scheduled.rb +++ b/test/scheduled.rb @@ -125,16 +125,35 @@ describe Sidekiq::Scheduled do end end - it "calculates an average poll interval based on the number of known Sidekiq processes" do + it "generates random intervals based on the number of known Sidekiq processes" do with_sidekiq_option(:average_scheduled_poll_interval, 10) do - 3.times do |i| + intervals_count = 500 + + # Start with 10 processes + 10.times do |i| @config.redis do |conn| conn.sadd("processes", ["process-#{i}"]) - conn.hset("process-#{i}", "info", "") end end - assert_equal 30, @poller.send(:scaled_poll_interval) + intervals = Array.new(intervals_count) { @poller.send(:random_poll_interval) } + assert intervals.all? { |x| x.between?(0, 100) } + + # Reduce to 3 processes + (3..9).each do |i| + @config.redis do |conn| + conn.srem("processes", ["process-#{i}"]) + end + end + + intervals = Array.new(intervals_count) { @poller.send(:random_poll_interval) } + assert intervals.all? { |x| x.between?(15, 45) } + end + end + + it "calculates an average poll interval based on a given number of processes" do + with_sidekiq_option(:average_scheduled_poll_interval, 10) do + assert_equal 30, @poller.send(:scaled_poll_interval, 3) end end end diff --git a/test/web_helpers.rb b/test/web_helpers.rb index d271212a..0c498da4 100644 --- a/test/web_helpers.rb +++ b/test/web_helpers.rb @@ -148,7 +148,7 @@ describe "Web helpers" do end it "sorts processes using the natural sort order" do - ["a.10.2", "a.2", "busybee-10_1", "a.23", "a.10.1", "a.1", "192.168.0.10", "192.168.0.2", "2.1.1.1", "busybee-2_34"].each do |hostname| + ["a.10.2", "a.2", "busybee--10_1", "a.23", "a.10.1", "a.1", "192.168.0.10", "192.168.0.2", "2.1.1.1", "busybee-2__34"].each do |hostname| pdata = {"hostname" => hostname, "pid" => "123", "started_at" => Time.now.to_i} key = "#{hostname}:123" @@ -161,6 +161,6 @@ describe "Web helpers" do obj = Helpers.new assert obj.sorted_processes.all? { |process| assert_instance_of Sidekiq::Process, process } - assert_equal ["2.1.1.1", "192.168.0.2", "192.168.0.10", "a.1", "a.2", "a.10.1", "a.10.2", "a.23", "busybee-2_34", "busybee-10_1"], obj.sorted_processes.map { |process| process["hostname"] } + assert_equal ["2.1.1.1", "192.168.0.2", "192.168.0.10", "a.1", "a.2", "a.10.1", "a.10.2", "a.23", "busybee-2__34", "busybee--10_1"], obj.sorted_processes.map { |process| process["hostname"] } end end