#!/usr/bin/env ruby # Quiet some warnings we see when running in warning mode: # RUBYOPT=-w bundle exec sidekiq $TESTING = false require_relative '../lib/sidekiq/cli' require_relative '../lib/sidekiq/launcher' include Sidekiq::Util Sidekiq.configure_server do |config| config.redis = { db: 13 } config.options[:queues] << 'default' config.logger.level = Logger::ERROR config.average_scheduled_poll_interval = 2 end Sidekiq.redis {|c| c.flushdb} class LoadWorker include Sidekiq::Worker sidekiq_options retry: 1 sidekiq_retry_in do |x| 1 end def perform(idx) raise idx.to_s if idx % 100 == 1 end end self_read, self_write = IO.pipe %w(INT TERM USR1 USR2 TTIN).each do |sig| begin trap sig do self_write.puts(sig) end rescue ArgumentError puts "Signal #{sig} not supported" end end def handle_signal(launcher, sig) Sidekiq.logger.debug "Got #{sig} signal" case sig when 'INT' # Handle Ctrl-C in JRuby like MRI # http://jira.codehaus.org/browse/JRUBY-4637 raise Interrupt when 'TERM' # Heroku sends TERM and then waits 10 seconds for process to exit. raise Interrupt when 'USR1' Sidekiq.logger.info "Received USR1, no longer accepting new work" launcher.quiet when 'USR2' if Sidekiq.options[:logfile] Sidekiq.logger.info "Received USR2, reopening log file" Sidekiq::Logging.reopen_logs end when 'TTIN' Thread.list.each do |thread| Sidekiq.logger.warn "Thread TID-#{thread.object_id.to_s(36)} #{thread['label']}" if thread.backtrace Sidekiq.logger.warn thread.backtrace.join("\n") else Sidekiq.logger.warn "" end end end end def Process.rss `ps -o rss= -p #{Process.pid}`.chomp.to_i end iter = 10 count = 10_000 Client = Thread.new do watchdog("client thread") do iter.times do arr = Array.new(count) do [] end count.times do |idx| arr[idx][0] = idx end Sidekiq::Client.push_bulk('class' => LoadWorker, 'args' => arr) end Sidekiq.logger.error "Created #{count*iter} jobs" end end Monitoring = Thread.new do watchdog("monitor thread") do while true sleep 2 qsize, retries = Sidekiq.redis do |conn| conn.pipelined do conn.llen "queue:default" conn.zcard "retry" end end.map(&:to_i) total = qsize + retries GC.start Sidekiq.logger.error("RSS: #{Process.rss} Pending: #{total}") if total == 0 Sidekiq.logger.error("Done") exit(0) end end end end begin launcher = Sidekiq::Launcher.new(Sidekiq.options) launcher.run while readable_io = IO.select([self_read]) signal = readable_io.first[0].gets.strip handle_signal(launcher, signal) end rescue SystemExit => e # normal rescue => e raise e if $DEBUG STDERR.puts e.message STDERR.puts e.backtrace.join("\n") exit 1 end