1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

use global redis with web UI

This commit is contained in:
Mike Perham 2022-09-08 08:47:11 -07:00
commit 39e87f7b10
No known key found for this signature in database
6 changed files with 88 additions and 17 deletions

View file

@ -2,6 +2,11 @@
[Sidekiq Changes](https://github.com/mperham/sidekiq/blob/main/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/main/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/main/Ent-Changes.md) [Sidekiq Changes](https://github.com/mperham/sidekiq/blob/main/Changes.md) | [Sidekiq Pro Changes](https://github.com/mperham/sidekiq/blob/main/Pro-Changes.md) | [Sidekiq Enterprise Changes](https://github.com/mperham/sidekiq/blob/main/Ent-Changes.md)
HEAD
----------
- Further optimizations for scheduled polling [#5513]
6.5.6 6.5.6
---------- ----------

View file

@ -9,6 +9,17 @@ require "base64"
require "sidekiq/metrics/deploy" require "sidekiq/metrics/deploy"
require "sidekiq/metrics/query" require "sidekiq/metrics/query"
#
# Sidekiq's Data API provides a Ruby object model on top
# of Sidekiq's runtime data in Redis. This API should never
# be used within application code for business logic.
#
# The Sidekiq server process never uses this API: all data
# manipulation is done directly for performance reasons to
# ensure we are using Redis as efficiently as possible at
# every callsite.
#
module Sidekiq module Sidekiq
# Retrieve runtime statistics from Redis regarding # Retrieve runtime statistics from Redis regarding
# this Sidekiq cluster. # this Sidekiq cluster.
@ -829,10 +840,12 @@ module Sidekiq
# :nodoc: # :nodoc:
# @api private # @api private
def cleanup def cleanup
# dont run cleanup more than once per minute
return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) } return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
count = 0 count = 0
Sidekiq.redis do |conn| Sidekiq.redis do |conn|
procs = conn.sscan("processes").to_a.sort procs = conn.sscan("processes").to_a
heartbeats = conn.pipelined { |pipeline| heartbeats = conn.pipelined { |pipeline|
procs.each do |key| procs.each do |key|
pipeline.hget(key, "info") pipeline.hget(key, "info")

View file

@ -146,13 +146,16 @@ module Sidekiq
# As we run more processes, the scheduling interval average will approach an even spread # As we run more processes, the scheduling interval average will approach an even spread
# between 0 and poll interval so we don't need this artifical boost. # between 0 and poll interval so we don't need this artifical boost.
# #
if process_count < 10 count = process_count
interval = poll_interval_average(count)
if count < 10
# For small clusters, calculate a random interval that is ±50% the desired average. # For small clusters, calculate a random interval that is ±50% the desired average.
poll_interval_average * rand + poll_interval_average.to_f / 2 interval * rand + interval.to_f / 2
else else
# With 10+ processes, we should have enough randomness to get decent polling # With 10+ processes, we should have enough randomness to get decent polling
# across the entire timespan # across the entire timespan
poll_interval_average * rand interval * rand
end end
end end
@ -169,14 +172,14 @@ module Sidekiq
# the same time: the thundering herd problem. # the same time: the thundering herd problem.
# #
# We only do this if poll_interval_average is unset (the default). # We only do this if poll_interval_average is unset (the default).
def poll_interval_average def poll_interval_average(count)
@config[:poll_interval_average] ||= scaled_poll_interval @config[:poll_interval_average] || scaled_poll_interval(count)
end end
# Calculates an average poll interval based on the number of known Sidekiq processes. # Calculates an average poll interval based on the number of known Sidekiq processes.
# This minimizes a single point of failure by dispersing check-ins but without taxing # This minimizes a single point of failure by dispersing check-ins but without taxing
# Redis if you run many Sidekiq processes. # Redis if you run many Sidekiq processes.
def scaled_poll_interval def scaled_poll_interval(process_count)
process_count * @config[:average_scheduled_poll_interval] process_count * @config[:average_scheduled_poll_interval]
end end
@ -186,9 +189,35 @@ module Sidekiq
pcount pcount
end end
# A copy of Sidekiq::ProcessSet#cleanup because server
# should never depend on sidekiq/api.
def cleanup
# dont run cleanup more than once per minute
return 0 unless redis { |conn| conn.set("process_cleanup", "1", nx: true, ex: 60) }
count = 0
redis do |conn|
procs = conn.sscan("processes").to_a
heartbeats = conn.pipelined { |pipeline|
procs.each do |key|
pipeline.hget(key, "info")
end
}
# the hash named key has an expiry of 60 seconds.
# if it's not found, that means the process has not reported
# in to Redis and probably died.
to_prune = procs.select.with_index { |proc, i|
heartbeats[i].nil?
}
count = conn.srem("processes", to_prune) unless to_prune.empty?
end
count
end
def initial_wait def initial_wait
# Have all processes sleep between 5-15 seconds. 10 seconds # Have all processes sleep between 5-15 seconds. 10 seconds to give time for
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number # the heartbeat to register (if the poll interval is going to be calculated by the number
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time. # of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
total = 0 total = 0
total += INITIAL_WAIT unless @config[:poll_interval_average] total += INITIAL_WAIT unless @config[:poll_interval_average]
@ -196,6 +225,11 @@ module Sidekiq
@sleeper.pop(total) @sleeper.pop(total)
rescue Timeout::Error rescue Timeout::Error
ensure
# periodically clean out the `processes` set in Redis which can collect
# references to dead processes over time. The process count affects how
# often we scan for scheduled jobs.
cleanup
end end
end end
end end

View file

@ -155,7 +155,7 @@ module Sidekiq
@sorted_processes ||= begin @sorted_processes ||= begin
return processes unless processes.all? { |p| p["hostname"] } return processes unless processes.all? { |p| p["hostname"] }
split_characters = /[._-]/ split_characters = /[._-]+/
padding = processes.flat_map { |p| p["hostname"].split(split_characters) }.map(&:size).max padding = processes.flat_map { |p| p["hostname"].split(split_characters) }.map(&:size).max
@ -176,7 +176,7 @@ module Sidekiq
end end
def redis_url def redis_url
Sidekiq.default_configuration.redis do |conn| Sidekiq.redis do |conn|
conn._config.server_url conn._config.server_url
end end
end end

View file

@ -125,16 +125,35 @@ describe Sidekiq::Scheduled do
end end
end end
it "calculates an average poll interval based on the number of known Sidekiq processes" do it "generates random intervals based on the number of known Sidekiq processes" do
with_sidekiq_option(:average_scheduled_poll_interval, 10) do with_sidekiq_option(:average_scheduled_poll_interval, 10) do
3.times do |i| intervals_count = 500
# Start with 10 processes
10.times do |i|
@config.redis do |conn| @config.redis do |conn|
conn.sadd("processes", ["process-#{i}"]) conn.sadd("processes", ["process-#{i}"])
conn.hset("process-#{i}", "info", "")
end end
end end
assert_equal 30, @poller.send(:scaled_poll_interval) intervals = Array.new(intervals_count) { @poller.send(:random_poll_interval) }
assert intervals.all? { |x| x.between?(0, 100) }
# Reduce to 3 processes
(3..9).each do |i|
@config.redis do |conn|
conn.srem("processes", ["process-#{i}"])
end
end
intervals = Array.new(intervals_count) { @poller.send(:random_poll_interval) }
assert intervals.all? { |x| x.between?(15, 45) }
end
end
it "calculates an average poll interval based on a given number of processes" do
with_sidekiq_option(:average_scheduled_poll_interval, 10) do
assert_equal 30, @poller.send(:scaled_poll_interval, 3)
end end
end end
end end

View file

@ -148,7 +148,7 @@ describe "Web helpers" do
end end
it "sorts processes using the natural sort order" do it "sorts processes using the natural sort order" do
["a.10.2", "a.2", "busybee-10_1", "a.23", "a.10.1", "a.1", "192.168.0.10", "192.168.0.2", "2.1.1.1", "busybee-2_34"].each do |hostname| ["a.10.2", "a.2", "busybee--10_1", "a.23", "a.10.1", "a.1", "192.168.0.10", "192.168.0.2", "2.1.1.1", "busybee-2__34"].each do |hostname|
pdata = {"hostname" => hostname, "pid" => "123", "started_at" => Time.now.to_i} pdata = {"hostname" => hostname, "pid" => "123", "started_at" => Time.now.to_i}
key = "#{hostname}:123" key = "#{hostname}:123"
@ -161,6 +161,6 @@ describe "Web helpers" do
obj = Helpers.new obj = Helpers.new
assert obj.sorted_processes.all? { |process| assert_instance_of Sidekiq::Process, process } assert obj.sorted_processes.all? { |process| assert_instance_of Sidekiq::Process, process }
assert_equal ["2.1.1.1", "192.168.0.2", "192.168.0.10", "a.1", "a.2", "a.10.1", "a.10.2", "a.23", "busybee-2_34", "busybee-10_1"], obj.sorted_processes.map { |process| process["hostname"] } assert_equal ["2.1.1.1", "192.168.0.2", "192.168.0.10", "a.1", "a.2", "a.10.1", "a.10.2", "a.23", "busybee-2__34", "busybee--10_1"], obj.sorted_processes.map { |process| process["hostname"] }
end end
end end