1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Server starts up now!

This commit is contained in:
Mike Perham 2012-01-22 16:01:46 -08:00
parent 5c46cc896c
commit 5366506fe1
4 changed files with 59 additions and 40 deletions

View file

@ -1 +1,3 @@
require 'sidekiq/server'
require 'sidekiq/version'
require 'sidekiq/util'
require 'sidekiq/client'

View file

@ -1,8 +1,11 @@
require 'optparse'
require 'sidekiq'
require 'sidekiq/server'
module Sidekiq
class CLI
include Util
def initialize
parse_options
validate!
@ -10,16 +13,16 @@ module Sidekiq
end
def run
write_pid
write_pid if @options[:daemon]
server = Sidekiq::Server.new(@options[:server], @options)
begin
log 'Starting processing, hit Ctrl-C to stop'
server.run
server.start!
sleep 1_000_000_000
rescue Interrupt
log 'Shutting down...'
server.stop
log '...bye!'
server.stop!
end
end
@ -30,18 +33,12 @@ module Sidekiq
require File.expand_path('config/boot.rb')
end
def log(str)
STDOUT.puts str
end
def error(str)
@STDERR.puts "ERROR: #{str}"
end
def validate!
$DEBUG = @options[:verbose]
if @options[:queues].size == 0
log "========== Please configure at least one queue to process =========="
log @parser
exit(1)
end
end
@ -51,7 +48,7 @@ module Sidekiq
:verbose => false,
:queues => [],
:worker_count => 25,
:server => 'localhost:6379',
:server => 'redis://localhost:6379/0',
:pidfile => nil,
}
@ -84,7 +81,7 @@ module Sidekiq
end
end
@parser.banner = "sidekiq -q foo,1 -q bar,2 <more options>"
@parser.banner = "sidekiq -q foo -q bar <more options>"
@parser.on_tail "-h", "--help", "Show help" do
log @parser
exit 1

View file

@ -1,3 +1,9 @@
require 'celluloid'
require 'redis'
require 'multi_json'
require 'sidekiq/worker'
module Sidekiq
##
@ -12,14 +18,20 @@ module Sidekiq
trap_exit :worker_died
def initialize(location, options={})
log "Starting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
verbose options.inspect
@count = options[:worker_count]
@queues = options[:queues]
@queue_idx = 0
@queues_size = @queues.size
@redis = Redis.new(location)
@redis = Redis.new(:host => options[:redis_host], :port => options[:redis_port])
start
dispatch
@done = false
@busy = []
@ready = []
@count.times do
@ready << Worker.new_link
end
end
def stop
@ -34,16 +46,11 @@ module Sidekiq
end
def start
@done = false
@busy = []
@ready = []
@count.times do
@ready << Worker.new_link
end
dispatch
end
def worker_done(worker)
@busy.remove(worker)
@busy.delete(worker)
if stopped?
worker.terminate
else
@ -53,12 +60,18 @@ module Sidekiq
end
def worker_died(worker, reason)
@busy.remove(worker)
@ready << Worker.new_link unless stopped?
@busy.delete(worker)
log "Worker death: #{reason}"
log reason.backtrace.join("\n")
log reason.backtrace.join("\n") if reason
unless stopped?
@ready << Worker.new_link
dispatch
end
end
private
def dispatch
watchdog("Fatal error in sidekiq, dispatch loop died") do
return if stopped?
@ -67,27 +80,30 @@ module Sidekiq
queue_idx = 0
none_found = true
loop do
break if @ready.size == 0
# return so that we don't dispatch again until worker_done
return if @ready.size == 0
current_queue = @queues[queue_idx]
msg = @redis.lpop("queue:#{current_queue}")
if msg
worker = @ready.pop
@busy << worker
worker.process! MultiJson.decode(msg)
none_found = false
end
queue_idx += 1
# we loop through the queues, looking for a message in each.
# Loop through the queues, looking for a message in each.
# if we find no messages in any of the queues, we can break
# out of the loop. Otherwise we loop again.
if (queue_idx % @queues.size == 0) && none_found
lastq = (queue_idx % @queues.size == 0)
if lastq && none_found
break
else
elsif lastq
queue_idx = 0
none_found = true
end
current_queue = @queues[queue_idx]
msg = redis.lpop("queue:#{current_queue}")
if msg
@busy << worker = @ready.pop
worker.process! MultiJson.decode(msg)
none_found = false
end
end
after(1) { dispatch }

View file

@ -17,5 +17,9 @@ module Sidekiq
STDOUT.puts(msg)
end
def verbose(msg)
STDOUT.puts(msg) if $DEBUG
end
end
end