From 0b59ed8c2bc74823ae579b99e37295bbdb2b416b Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Thu, 1 Oct 2015 16:37:58 -0700 Subject: [PATCH] Add load tester script, fixes #2582 --- bin/sidekiqload | 137 ++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100755 bin/sidekiqload diff --git a/bin/sidekiqload b/bin/sidekiqload new file mode 100755 index 00000000..7eb69376 --- /dev/null +++ b/bin/sidekiqload @@ -0,0 +1,137 @@ +#!/usr/bin/env ruby + +# Quiet some warnings we see when running in warning mode: +# RUBYOPT=-w bundle exec sidekiq +$TESTING = false +$CELLULOID_DEBUG = false + +require 'celluloid/current' +puts Celluloid::VERSION +require_relative '../lib/sidekiq/cli' +require_relative '../lib/sidekiq/launcher' +Celluloid.logger = nil + +include Sidekiq::Util + +Sidekiq.configure_server do |config| + config.redis = { db: 13 } + config.options[:queues] << 'default' + config.logger.level = Logger::ERROR + config.average_scheduled_poll_interval = 2 +end +Sidekiq.redis {|c| c.flushdb} + +class LoadWorker + include Sidekiq::Worker + sidekiq_options retry: 1 + sidekiq_retry_in do |x| + 1 + end + + def perform(idx) + raise idx.to_s if idx % 100 == 1 + end +end + +self_read, self_write = IO.pipe +%w(INT TERM USR1 USR2 TTIN).each do |sig| + begin + trap sig do + self_write.puts(sig) + end + rescue ArgumentError + puts "Signal #{sig} not supported" + end +end + +def handle_signal(launcher, sig) + Sidekiq.logger.debug "Got #{sig} signal" + case sig + when 'INT' + # Handle Ctrl-C in JRuby like MRI + # http://jira.codehaus.org/browse/JRUBY-4637 + raise Interrupt + when 'TERM' + # Heroku sends TERM and then waits 10 seconds for process to exit. + raise Interrupt + when 'USR1' + Sidekiq.logger.info "Received USR1, no longer accepting new work" + launcher.manager.async.stop + #fire_event(:quiet, true) + when 'USR2' + if Sidekiq.options[:logfile] + Sidekiq.logger.info "Received USR2, reopening log file" + Sidekiq::Logging.reopen_logs + end + when 'TTIN' + Thread.list.each do |thread| + Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}" + if thread.backtrace + Sidekiq.logger.warn thread.backtrace.join("\n") + else + Sidekiq.logger.warn "" + end + end + end +end + +def Process.rss + `ps -o rss= -p #{Process.pid}`.chomp.to_i +end + +iter = 10 +count = 10_000 + +Client = Thread.new do + watchdog("client thread") do + iter.times do + arr = Array.new(count) do + [] + end + count.times do |idx| + arr[idx][0] = idx + end + Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr) + end + Sidekiq.logger.error "Created #{count*iter} jobs" + end +end + +Monitoring = Thread.new do + watchdog("monitor thread") do + while true + sleep 2 + qsize, retries = Sidekiq.redis do |conn| + conn.pipelined do + conn.llen "queue:default" + conn.zcard "retry" + end + end.map(&:to_i) + total = qsize + retries + GC.start + Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}") + if total == 0 + Sidekiq.logger.error("Done") + Celluloid.shutdown + exit(0) + end + end + end +end + +begin + launcher = Sidekiq::Launcher.new(Sidekiq.options) + launcher.run + + while readable_io = IO.select([self_read]) + signal = readable_io.first[0].gets.strip + handle_signal(launcher, signal) + end +rescue SystemExit => e + # normal +rescue => e + raise e if $DEBUG + STDERR.puts e.message + STDERR.puts e.backtrace.join("\n") + exit 1 +end