diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index c7166a6c..11b8236f 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -1 +1,3 @@ -require 'sidekiq/server' +require 'sidekiq/version' +require 'sidekiq/util' +require 'sidekiq/client' diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 8b8a2ad6..01813fd5 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -1,8 +1,11 @@ require 'optparse' require 'sidekiq' +require 'sidekiq/server' module Sidekiq class CLI + include Util + def initialize parse_options validate! @@ -10,16 +13,16 @@ module Sidekiq end def run - write_pid + write_pid if @options[:daemon] server = Sidekiq::Server.new(@options[:server], @options) begin log 'Starting processing, hit Ctrl-C to stop' - server.run + server.start! + sleep 1_000_000_000 rescue Interrupt log 'Shutting down...' - server.stop - log '...bye!' + server.stop! end end @@ -30,18 +33,12 @@ module Sidekiq require File.expand_path('config/boot.rb') end - def log(str) - STDOUT.puts str - end - - def error(str) - @STDERR.puts "ERROR: #{str}" - end - def validate! + $DEBUG = @options[:verbose] if @options[:queues].size == 0 log "========== Please configure at least one queue to process ==========" log @parser + exit(1) end end @@ -51,7 +48,7 @@ module Sidekiq :verbose => false, :queues => [], :worker_count => 25, - :server => 'localhost:6379', + :server => 'redis://localhost:6379/0', :pidfile => nil, } @@ -84,7 +81,7 @@ module Sidekiq end end - @parser.banner = "sidekiq -q foo,1 -q bar,2 " + @parser.banner = "sidekiq -q foo -q bar " @parser.on_tail "-h", "--help", "Show help" do log @parser exit 1 diff --git a/lib/sidekiq/server.rb b/lib/sidekiq/server.rb index 59724935..169170ec 100644 --- a/lib/sidekiq/server.rb +++ b/lib/sidekiq/server.rb @@ -1,3 +1,9 @@ +require 'celluloid' +require 'redis' +require 'multi_json' + +require 'sidekiq/worker' + module Sidekiq ## @@ -12,14 +18,20 @@ module Sidekiq trap_exit :worker_died def initialize(location, options={}) + log "Starting sidekiq #{Sidekiq::VERSION} with Redis at #{location}" + verbose options.inspect @count = options[:worker_count] @queues = options[:queues] @queue_idx = 0 @queues_size = @queues.size - @redis = Redis.new(location) + @redis = Redis.new(:host => options[:redis_host], :port => options[:redis_port]) - start - dispatch + @done = false + @busy = [] + @ready = [] + @count.times do + @ready << Worker.new_link + end end def stop @@ -34,16 +46,11 @@ module Sidekiq end def start - @done = false - @busy = [] - @ready = [] - @count.times do - @ready << Worker.new_link - end + dispatch end def worker_done(worker) - @busy.remove(worker) + @busy.delete(worker) if stopped? worker.terminate else @@ -53,12 +60,18 @@ module Sidekiq end def worker_died(worker, reason) - @busy.remove(worker) - @ready << Worker.new_link unless stopped? + @busy.delete(worker) log "Worker death: #{reason}" - log reason.backtrace.join("\n") + log reason.backtrace.join("\n") if reason + + unless stopped? + @ready << Worker.new_link + dispatch + end end + private + def dispatch watchdog("Fatal error in sidekiq, dispatch loop died") do return if stopped? @@ -67,27 +80,30 @@ module Sidekiq queue_idx = 0 none_found = true loop do - break if @ready.size == 0 + # return so that we don't dispatch again until worker_done + return if @ready.size == 0 + + current_queue = @queues[queue_idx] + msg = @redis.lpop("queue:#{current_queue}") + if msg + worker = @ready.pop + @busy << worker + worker.process! MultiJson.decode(msg) + none_found = false + end queue_idx += 1 - # we loop through the queues, looking for a message in each. + # Loop through the queues, looking for a message in each. # if we find no messages in any of the queues, we can break # out of the loop. Otherwise we loop again. - if (queue_idx % @queues.size == 0) && none_found + lastq = (queue_idx % @queues.size == 0) + if lastq && none_found break - else + elsif lastq queue_idx = 0 none_found = true end - - current_queue = @queues[queue_idx] - msg = redis.lpop("queue:#{current_queue}") - if msg - @busy << worker = @ready.pop - worker.process! MultiJson.decode(msg) - none_found = false - end end after(1) { dispatch } diff --git a/lib/sidekiq/util.rb b/lib/sidekiq/util.rb index 24078c3d..b16497cd 100644 --- a/lib/sidekiq/util.rb +++ b/lib/sidekiq/util.rb @@ -17,5 +17,9 @@ module Sidekiq STDOUT.puts(msg) end + def verbose(msg) + STDOUT.puts(msg) if $DEBUG + end + end end