1
0
Fork 0
mirror of https://github.com/mperham/sidekiq.git synced 2022-11-09 13:52:34 -05:00

Message dispatch loop now working

This commit is contained in:
Mike Perham 2012-01-23 22:08:38 -08:00
parent 4043cace3a
commit da8c837e98
3 changed files with 41 additions and 25 deletions

View file

@ -1,6 +1,7 @@
require 'optparse' require 'optparse'
require 'sidekiq' require 'sidekiq'
require 'sidekiq/server' require 'sidekiq/server'
require 'connection_pool'
module Sidekiq module Sidekiq
class CLI class CLI
@ -12,14 +13,17 @@ module Sidekiq
boot_rails boot_rails
end end
FOREVER = 2_000_000_000
def run def run
write_pid if @options[:daemon] write_pid if @options[:daemon]
::Sidekiq::Client.redis = ConnectionPool.new { Redis.connect(:url => @options[:server]) }
server = Sidekiq::Server.new(@options[:server], @options) server = Sidekiq::Server.new(@options[:server], @options)
begin begin
log 'Starting processing, hit Ctrl-C to stop' log 'Starting processing, hit Ctrl-C to stop'
server.start! server.start!
sleep 1000 sleep FOREVER
rescue Interrupt rescue Interrupt
log 'Shutting down...' log 'Shutting down...'
server.stop! server.stop!
@ -30,10 +34,9 @@ module Sidekiq
private private
def boot_rails def boot_rails
#APP_PATH = File.expand_path('config/application.rb') ENV['RAILS_ENV'] = @options[:environment] || 'production'
ENV['RAILS_ENV'] = 'production'
require File.expand_path("#{@options[:rails]}/config/environment.rb") require File.expand_path("#{@options[:rails]}/config/environment.rb")
Rails.application.config.threadsafe! Rails.application.eager_load!
end end
def validate! def validate!
@ -60,6 +63,7 @@ module Sidekiq
:server => 'redis://localhost:6379/0', :server => 'redis://localhost:6379/0',
:pidfile => nil, :pidfile => nil,
:rails => '.', :rails => '.',
:environment => 'production',
} }
@parser = OptionParser.new do |o| @parser = OptionParser.new do |o|
@ -86,6 +90,10 @@ module Sidekiq
@options[:server] = arg @options[:server] = arg
end end
o.on '-e', '--environment ENV', "Rails application environment" do |arg|
@options[:environment] = arg
end
o.on '-r', '--rails PATH', "Rails application with workers" do |arg| o.on '-r', '--rails PATH', "Rails application with workers" do |arg|
@options[:rails] = arg @options[:rails] = arg
end end

View file

@ -19,7 +19,7 @@ module Sidekiq
trap_exit :worker_died trap_exit :worker_died
def initialize(location, options={}) def initialize(location, options={})
log "Starting sidekiq #{Sidekiq::VERSION} with Redis at #{location}" log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}"
verbose options.inspect verbose options.inspect
@count = options[:worker_count] @count = options[:worker_count]
@queues = options[:queues] @queues = options[:queues]
@ -31,7 +31,7 @@ module Sidekiq
@busy = [] @busy = []
@ready = [] @ready = []
@count.times do @count.times do
@ready << Worker.new_link @ready << Worker.new_link(current_actor)
end end
end end
@ -46,7 +46,7 @@ module Sidekiq
end end
def start def start
dispatch dispatch(true)
end end
def worker_done(worker) def worker_done(worker)
@ -68,48 +68,51 @@ module Sidekiq
end end
unless stopped? unless stopped?
@ready << Worker.new_link @ready << Worker.new_link(current_actor)
dispatch dispatch
end end
end end
private private
def dispatch def find_work(queue_idx)
current_queue = @queues[queue_idx]
msg = @redis.lpop("queue:#{current_queue}")
if msg
worker = @ready.pop
@busy << worker
worker.process! MultiJson.decode(msg)
end
msg
end
def dispatch(schedule = false)
watchdog("Fatal error in sidekiq, dispatch loop died") do watchdog("Fatal error in sidekiq, dispatch loop died") do
return if stopped? return if stopped?
# Our dispatch loop # Our dispatch loop
# Loop through the queues, looking for a message in each.
queue_idx = 0 queue_idx = 0
none_found = true found = false
loop do loop do
# return so that we don't dispatch again until worker_done # return so that we don't dispatch again until worker_done
return if @ready.size == 0 break verbose('no workers') if @ready.size == 0
current_queue = @queues[queue_idx]
msg = @redis.lpop("queue:#{current_queue}")
if msg
worker = @ready.pop
@busy << worker
worker.process! MultiJson.decode(msg)
none_found = false
end
found ||= find_work(queue_idx)
queue_idx += 1 queue_idx += 1
# Loop through the queues, looking for a message in each.
# if we find no messages in any of the queues, we can break # if we find no messages in any of the queues, we can break
# out of the loop. Otherwise we loop again. # out of the loop. Otherwise we loop again.
lastq = (queue_idx % @queues.size == 0) lastq = (queue_idx % @queues.size == 0)
if lastq && none_found if lastq && !found
break verbose('nothing to process'); break
elsif lastq elsif lastq
queue_idx = 0 queue_idx = 0
none_found = true found = false
end end
end end
after(1) { dispatch } after(1) { verbose('ping'); dispatch(schedule) } if schedule
end end
end end

View file

@ -4,10 +4,15 @@ module Sidekiq
class Worker class Worker
include Celluloid include Celluloid
def initialize(boss)
@boss = boss
end
def process(msg) def process(msg)
begin begin
klass = msg['class'].constantize klass = msg['class'].constantize
klass.new.perform(*msg['args']) klass.new.perform(*msg['args'])
@boss.worker_done!(current_actor)
rescue => ex rescue => ex
send_to_airbrake(msg, ex) if defined?(::Airbrake) send_to_airbrake(msg, ex) if defined?(::Airbrake)
raise ex raise ex