diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index c7b8c270..93cb163f 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -1,6 +1,7 @@ require 'optparse' require 'sidekiq' require 'sidekiq/server' +require 'connection_pool' module Sidekiq class CLI @@ -12,14 +13,17 @@ module Sidekiq boot_rails end + FOREVER = 2_000_000_000 + def run write_pid if @options[:daemon] + ::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) } server = Sidekiq::Server.new(@options[:server], @options) begin log 'Starting processing, hit Ctrl-C to stop' server.start! - sleep 1000 + sleep FOREVER rescue Interrupt log 'Shutting down...' server.stop! @@ -30,10 +34,9 @@ module Sidekiq private def boot_rails - #APP_PATH = File.expand_path('config/application.rb') - ENV['RAILS_ENV'] = 'production' + ENV['RAILS_ENV'] = @options[:environment] || 'production' require File.expand_path("#{@options[:rails]}/config/environment.rb") - Rails.application.config.threadsafe! + Rails.application.eager_load! end def validate! @@ -60,6 +63,7 @@ module Sidekiq :server => 'redis://localhost:6379/0', :pidfile => nil, :rails => '.', + :environment => 'production', } @parser = OptionParser.new do |o| @@ -86,6 +90,10 @@ module Sidekiq @options[:server] = arg end + o.on '-e', '--environment ENV', "Rails application environment" do |arg| + @options[:environment] = arg + end + o.on '-r', '--rails PATH', "Rails application with workers" do |arg| @options[:rails] = arg end diff --git a/lib/sidekiq/server.rb b/lib/sidekiq/server.rb index 67c49696..01e7ac00 100644 --- a/lib/sidekiq/server.rb +++ b/lib/sidekiq/server.rb @@ -19,7 +19,7 @@ module Sidekiq trap_exit :worker_died def initialize(location, options={}) - log "Starting sidekiq #{Sidekiq::VERSION} with Redis at #{location}" + log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}" verbose options.inspect @count = options[:worker_count] @queues = options[:queues] @@ -31,7 +31,7 @@ module Sidekiq @busy = [] @ready = [] @count.times do - @ready << Worker.new_link + @ready << Worker.new_link(current_actor) end end @@ -46,7 +46,7 @@ module Sidekiq end def start - dispatch + dispatch(true) end def worker_done(worker) @@ -68,48 +68,51 @@ module Sidekiq end unless stopped? - @ready << Worker.new_link + @ready << Worker.new_link(current_actor) dispatch end end private - def dispatch + def find_work(queue_idx) + current_queue = @queues[queue_idx] + msg = @redis.lpop("queue:#{current_queue}") + if msg + worker = @ready.pop + @busy << worker + worker.process! MultiJson.decode(msg) + end + msg + end + + def dispatch(schedule = false) watchdog("Fatal error in sidekiq, dispatch loop died") do return if stopped? # Our dispatch loop + # Loop through the queues, looking for a message in each. queue_idx = 0 - none_found = true + found = false loop do # return so that we don't dispatch again until worker_done - return if @ready.size == 0 - - current_queue = @queues[queue_idx] - msg = @redis.lpop("queue:#{current_queue}") - if msg - worker = @ready.pop - @busy << worker - worker.process! MultiJson.decode(msg) - none_found = false - end + break verbose('no workers') if @ready.size == 0 + found ||= find_work(queue_idx) queue_idx += 1 - # Loop through the queues, looking for a message in each. # if we find no messages in any of the queues, we can break # out of the loop. Otherwise we loop again. lastq = (queue_idx % @queues.size == 0) - if lastq && none_found - break + if lastq && !found + verbose('nothing to process'); break elsif lastq queue_idx = 0 - none_found = true + found = false end end - after(1) { dispatch } + after(1) { verbose('ping'); dispatch(schedule) } if schedule end end diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 38248698..22a926a3 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -4,10 +4,15 @@ module Sidekiq class Worker include Celluloid + def initialize(boss) + @boss = boss + end + def process(msg) begin klass = msg['class'].constantize klass.new.perform(*msg['args']) + @boss.worker_done!(current_actor) rescue => ex send_to_airbrake(msg, ex) if defined?(::Airbrake) raise ex