diff --git a/lib/sidekiq.rb b/lib/sidekiq.rb index 7db31dc2..6906affa 100644 --- a/lib/sidekiq.rb +++ b/lib/sidekiq.rb @@ -1,2 +1,4 @@ require 'sidekiq/version' require 'sidekiq/client' +require 'sidekiq/worker' +require 'sidekiq/rails' if defined?(Rails) diff --git a/lib/sidekiq/cli.rb b/lib/sidekiq/cli.rb index 627a6396..8e493c78 100644 --- a/lib/sidekiq/cli.rb +++ b/lib/sidekiq/cli.rb @@ -1,5 +1,7 @@ require 'optparse' -require 'sidekiq' +require 'sidekiq/version' +require 'sidekiq/util' +require 'sidekiq/client' require 'sidekiq/server' require 'connection_pool' @@ -34,7 +36,7 @@ module Sidekiq def boot_rails ENV['RAILS_ENV'] = @options[:environment] require File.expand_path("#{@options[:rails]}/config/environment.rb") - Rails.application.eager_load! + ::Rails.application.eager_load! end def validate! @@ -51,7 +53,7 @@ module Sidekiq @options = { :verbose => false, :queues => ['default'], - :worker_count => 25, + :processor_count => 25, :server => 'redis://localhost:6379/0', :rails => '.', :environment => 'production', @@ -77,12 +79,12 @@ module Sidekiq @options[:environment] = arg end - o.on '-r', '--rails PATH', "Rails application with workers" do |arg| + o.on '-r', '--rails PATH', "Rails application with processors" do |arg| @options[:rails] = arg end - o.on '-c', '--concurrency INT', "Worker threads to use" do |arg| - @options[:worker_count] = arg.to_i + o.on '-c', '--concurrency INT', "processor threads to use" do |arg| + @options[:processor_count] = arg.to_i end end diff --git a/lib/sidekiq/processor.rb b/lib/sidekiq/processor.rb new file mode 100644 index 00000000..ef3b0419 --- /dev/null +++ b/lib/sidekiq/processor.rb @@ -0,0 +1,28 @@ +require 'active_support/inflector' + +module Sidekiq + class Processor + include Celluloid + + def initialize(boss) + @boss = boss + end + + def process(msg) + begin + klass = msg['class'].constantize + klass.new.perform(*msg['args']) + @boss.processor_done!(current_actor) + rescue => ex + send_to_airbrake(msg, ex) if defined?(::Airbrake) + raise ex + end + end + + def send_to_airbrake(msg, ex) + ::Airbrake.notify(:error_class => ex.class.name, + :error_message => "#{ex.class.name}: #{e.message}", + :parameters => json) + end + end +end diff --git a/lib/sidekiq/rails.rb b/lib/sidekiq/rails.rb new file mode 100644 index 00000000..0863c0b7 --- /dev/null +++ b/lib/sidekiq/rails.rb @@ -0,0 +1,5 @@ +module Sidekiq + class Rails < ::Rails::Engine + config.autoload_paths << File.expand_path("#{config.root}/app/workers") if File.exist?("#{config.root}/app/workers") + end +end diff --git a/lib/sidekiq/server.rb b/lib/sidekiq/server.rb index 01e7ac00..d92f5c1d 100644 --- a/lib/sidekiq/server.rb +++ b/lib/sidekiq/server.rb @@ -3,25 +3,25 @@ require 'redis' require 'multi_json' require 'sidekiq/util' -require 'sidekiq/worker' +require 'sidekiq/processor' module Sidekiq ## # This is the main router in the system. This - # manages the worker state and fetches messages - # from Redis to be dispatched to ready workers. + # manages the processor state and fetches messages + # from Redis to be dispatched to ready processor. # class Server include Util include Celluloid - trap_exit :worker_died + trap_exit :processor_died def initialize(location, options={}) log "Booting sidekiq #{Sidekiq::VERSION} with Redis at #{location}" verbose options.inspect - @count = options[:worker_count] + @count = options[:processor_count] @queues = options[:queues] @queue_idx = 0 @queues_size = @queues.size @@ -31,7 +31,7 @@ module Sidekiq @busy = [] @ready = [] @count.times do - @ready << Worker.new_link(current_actor) + @ready << Processor.new_link(current_actor) end end @@ -49,26 +49,26 @@ module Sidekiq dispatch(true) end - def worker_done(worker) - @busy.delete(worker) + def processor_done(processor) + @busy.delete(processor) if stopped? - worker.terminate + processor.terminate else - @ready << worker + @ready << processor end dispatch end - def worker_died(worker, reason) - @busy.delete(worker) + def processor_died(processor, reason) + @busy.delete(processor) if reason - log "Worker death: #{reason}" + log "Processor death: #{reason}" log reason.backtrace.join("\n") end unless stopped? - @ready << Worker.new_link(current_actor) + @ready << Processor.new_link(current_actor) dispatch end end @@ -79,9 +79,9 @@ module Sidekiq current_queue = @queues[queue_idx] msg = @redis.lpop("queue:#{current_queue}") if msg - worker = @ready.pop - @busy << worker - worker.process! MultiJson.decode(msg) + processor = @ready.pop + @busy << processor + processor.process! MultiJson.decode(msg) end msg end @@ -95,8 +95,8 @@ module Sidekiq queue_idx = 0 found = false loop do - # return so that we don't dispatch again until worker_done - break verbose('no workers') if @ready.size == 0 + # return so that we don't dispatch again until processor_done + break verbose('no processors') if @ready.size == 0 found ||= find_work(queue_idx) queue_idx += 1 diff --git a/lib/sidekiq/worker.rb b/lib/sidekiq/worker.rb index 22a926a3..dae10a5f 100644 --- a/lib/sidekiq/worker.rb +++ b/lib/sidekiq/worker.rb @@ -1,28 +1,29 @@ -require 'active_support/inflector' +require 'sidekiq/client' module Sidekiq - class Worker - include Celluloid - def initialize(boss) - @boss = boss - end + ## + # Include this module in your worker class and you can easily create + # asynchronous jobs: + # + # class HardWorker + # include Sidekiq::Worker + # + # def perform(*args) + # # do some work + # end + # end + # + # Then in your Rails app, you can do this: + # + # HardWorker.perform_async(1, 2, 3) + # + # Note that perform_async is a class method, perform is an instance method. + module Worker + extend self - def process(msg) - begin - klass = msg['class'].constantize - klass.new.perform(*msg['args']) - @boss.worker_done!(current_actor) - rescue => ex - send_to_airbrake(msg, ex) if defined?(::Airbrake) - raise ex - end - end - - def send_to_airbrake(msg, ex) - ::Airbrake.notify(:error_class => ex.class.name, - :error_message => "#{ex.class.name}: #{e.message}", - :parameters => json) + def perform_async(*args) + Sidekiq::Client.enqueue('class' => self.name, 'args' => args) end end end diff --git a/myapp/Gemfile b/myapp/Gemfile index 1f14edff..9ea0f319 100644 --- a/myapp/Gemfile +++ b/myapp/Gemfile @@ -2,3 +2,4 @@ source 'https://rubygems.org' gem 'rails', '3.2.0' gem 'sqlite3' +gem 'sidekiq', :path => '..' diff --git a/myapp/Gemfile.lock b/myapp/Gemfile.lock index 45b3e55f..69345526 100644 --- a/myapp/Gemfile.lock +++ b/myapp/Gemfile.lock @@ -1,3 +1,12 @@ +PATH + remote: .. + specs: + sidekiq (0.1.0) + celluloid + connection_pool + multi_json + redis + GEM remote: https://rubygems.org/ specs: @@ -30,6 +39,8 @@ GEM multi_json (~> 1.0) arel (3.0.0) builder (3.0.0) + celluloid (0.7.2) + connection_pool (0.1.0) erubis (2.7.0) hike (1.2.1) i18n (0.6.0) @@ -67,6 +78,7 @@ GEM rake (0.9.2.2) rdoc (3.12) json (~> 1.4) + redis (2.2.2) sprockets (2.1.2) hike (~> 1.2) rack (~> 1.0) @@ -84,4 +96,5 @@ PLATFORMS DEPENDENCIES rails (= 3.2.0) + sidekiq! sqlite3 diff --git a/myapp/app/workers/hard_worker.rb b/myapp/app/workers/hard_worker.rb index 5031523d..d408ee7d 100644 --- a/myapp/app/workers/hard_worker.rb +++ b/myapp/app/workers/hard_worker.rb @@ -1,4 +1,6 @@ class HardWorker def perform(name, count) + sleep 0.01 + puts 'done' end end diff --git a/myapp/config/application.rb b/myapp/config/application.rb index daa30ba7..422b350a 100644 --- a/myapp/config/application.rb +++ b/myapp/config/application.rb @@ -17,7 +17,6 @@ module Myapp # Custom directories with classes and modules you want to be autoloadable. # config.autoload_paths += %W(#{config.root}/extras) - config.autoload_paths += %W(#{config.root}/app/workers) # Only load the plugins named here, in the order given (default is alphabetical). # :all can be used as a placeholder for all plugins not explicitly named.