diff --git a/lib/sidekiq/scheduled.rb b/lib/sidekiq/scheduled.rb index 8f5b2aef..000960e0 100644 --- a/lib/sidekiq/scheduled.rb +++ b/lib/sidekiq/scheduled.rb @@ -97,9 +97,34 @@ module Sidekiq sleep 5 end - # Calculates a random interval that is ±50% the desired average. def random_poll_interval - poll_interval_average * rand + poll_interval_average.to_f / 2 + # We want one Sidekiq process to schedule jobs every N seconds. We have M processes + # and **don't** want to coordinate. + # + # So in N*M second timespan, we want each process to schedule once. The basic loop is: + # + # * sleep # a random amount within that N*M timespan + # * wake up, schedule + # + # There are pathological edge cases: + # + # Imagine a set of 4 processes, scheduling every 5 seconds, so N*M = 20. Each process + # decides to randomly sleep 18 seconds, now we've failed to meet that 5 second average. + # Thankfully each schedule cycle will sleep randomly so the next iteration could see each + # process sleep for 1 second, undercutting our average. + # + # So below 10 processes, we special case and ensure the processes sleep closer to the average. + # As we run more processes, the scheduling interval average should approach the desired + # amount. + # + if process_count < 10 + # For small clusters, # calculates a random interval that is ±50% the desired average. + poll_interval_average * rand + poll_interval_average.to_f / 2 + else + # With 10+ processes, we should have enough randomness to get decent polling + # across the entire timespan + poll_interval_average * rand * 2 + end end # We do our best to tune the poll interval to the size of the active Sidekiq @@ -123,9 +148,13 @@ module Sidekiq # This minimizes a single point of failure by dispersing check-ins but without taxing # Redis if you run many Sidekiq processes. def scaled_poll_interval + process_count * Sidekiq.options[:average_scheduled_poll_interval] + end + + def process_count pcount = Sidekiq::ProcessSet.new.size pcount = 1 if pcount == 0 - pcount * Sidekiq.options[:average_scheduled_poll_interval] + pcount end def initial_wait