diff --git a/examples/simple.rb b/examples/simple.rb index 261d65a..949a5f1 100644 --- a/examples/simple.rb +++ b/examples/simple.rb @@ -3,10 +3,12 @@ require 'sidekiq' require 'sidetiq' +Sidekiq.logger.level = Logger::DEBUG + Sidekiq.options[:poll_interval] = 1 Sidekiq.configure_server do |config| - Sidetiq::Clock.start! + Sidetiq.clock.start! end class MyWorker diff --git a/lib/sidetiq.rb b/lib/sidetiq.rb index 8fab71a..6d08d86 100644 --- a/lib/sidetiq.rb +++ b/lib/sidetiq.rb @@ -6,23 +6,43 @@ require 'socket' # gems require 'ice_cube' require 'sidekiq' +require 'celluloid' # internal -require 'sidetiq/api' require 'sidetiq/config' +require 'sidetiq/logging' +require 'sidetiq/api' require 'sidetiq/clock' require 'sidetiq/lock' -require 'sidetiq/logging' -require 'sidetiq/middleware' require 'sidetiq/schedule' require 'sidetiq/schedulable' require 'sidetiq/version' +# actor topology +require 'sidetiq/actor/clock' +require 'sidetiq/supervisor' + # The Sidetiq namespace. module Sidetiq include Sidetiq::API - include Sidetiq::Logging # Expose all instance methods as singleton methods. extend self + + class << self + # Public: Setter for the Sidetiq logger. + attr_writer :logger + end + + # Public: Reader for the Sidetiq logger. + # + # Defaults to `Sidekiq.logger`. + def logger + @logger ||= Sidekiq.logger + end + + # Public: Returns the Sidetiq::Clock actor. + def clock + Sidetiq::Supervisor.clock + end end diff --git a/lib/sidetiq/actor/clock.rb b/lib/sidetiq/actor/clock.rb new file mode 100644 index 0000000..65e84fc --- /dev/null +++ b/lib/sidetiq/actor/clock.rb @@ -0,0 +1,32 @@ +module Sidetiq + module Actor + class Clock < Sidetiq::Clock + include Celluloid + + # Public: Starts and supervises the clock actor. + def self.start! + actor.start! + end + + # Public: Starts the clock loop. + def start! + debug "Sidetiq::Clock start" + loop! + end + + private + + def loop! + after([time { tick }, 0].max) do + loop! + end + end + + def time + start = gettime + yield + Sidetiq.config.resolution - (gettime.to_f - start.to_f) + end + end + end +end diff --git a/lib/sidetiq/api.rb b/lib/sidetiq/api.rb index d72690b..91c9519 100644 --- a/lib/sidetiq/api.rb +++ b/lib/sidetiq/api.rb @@ -8,7 +8,7 @@ module Sidetiq # Public: Returns a Hash of Sidetiq::Schedule instances. def schedules - Clock.schedules.dup + clock.schedules.dup end # Public: Currently scheduled recurring jobs. diff --git a/lib/sidetiq/clock.rb b/lib/sidetiq/clock.rb index f35b8d8..c3dd052 100644 --- a/lib/sidetiq/clock.rb +++ b/lib/sidetiq/clock.rb @@ -1,25 +1,11 @@ module Sidetiq - configure do |config| - config.priority = Thread.main.priority - config.resolution = 1 - config.lock_expire = 1000 - config.utc = false - end - # Public: The Sidetiq clock. class Clock - include Singleton + include Logging # Internal: Returns a hash of Sidetiq::Schedule instances. attr_reader :schedules - # Internal: Returns the clock thread. - attr_reader :thread - - def self.method_missing(meth, *args, &block) # :nodoc: - instance.__send__(meth, *args, &block) - end - def initialize # :nodoc: super @schedules = {} @@ -74,56 +60,6 @@ module Sidetiq Sidetiq.config.utc ? Time.now.utc : Time.now end - # Public: Starts the clock unless it is already running. - # - # Examples - # - # start! - # # => Thread - # - # Returns the Thread instance of the clock thread. - def start! - return if ticking? - - Sidetiq.logger.info "Sidetiq::Clock start" - - @thread = Thread.start { clock { tick } } - @thread.abort_on_exception = true - @thread.priority = Sidetiq.config.priority - @thread - end - - # Public: Stops the clock if it is running. - # - # Examples - # - # stop! - # # => nil - # - # Returns nil. - def stop! - if ticking? - @thread.kill - Sidetiq.logger.info "Sidetiq::Clock stop" - end - end - - # Public: Returns the status of the clock. - # - # Examples - # - # ticking? - # # => false - # - # start! - # ticking? - # # => true - # - # Returns true or false. - def ticking? - @thread && @thread.alive? - end - private def enqueue(worker, time, redis) @@ -132,7 +68,7 @@ module Sidetiq next_run = (redis.get("#{key}:next") || -1).to_f if next_run < time_f - Sidetiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})" + info "Enqueue: #{worker.name} (at: #{time_f}) (last: #{next_run})" redis.mset("#{key}:last", next_run, "#{key}:next", time_f) @@ -146,23 +82,6 @@ module Sidetiq end end end - - def clock - loop do - sleep_time = time { yield } - - if sleep_time > 0 - Thread.pass - sleep sleep_time - end - end - end - - def time - start = gettime - yield - Sidetiq.config.resolution - (gettime.to_f - start.to_f) - end end end diff --git a/lib/sidetiq/lock.rb b/lib/sidetiq/lock.rb index 4264eae..8374b09 100644 --- a/lib/sidetiq/lock.rb +++ b/lib/sidetiq/lock.rb @@ -1,5 +1,7 @@ module Sidetiq class Lock # :nodoc: all + include Logging + attr_reader :key, :timeout OWNER = "#{Socket.gethostname}:#{Process.pid}" @@ -12,13 +14,13 @@ module Sidetiq def synchronize Sidekiq.redis do |redis| if lock(redis) - Sidetiq.logger.debug "Sidetiq::Clock lock #{key}" + debug "Sidetiq::Clock lock #{key}" begin yield redis ensure unlock(redis) - Sidetiq.logger.debug "Sidetiq::Clock unlock #{key}" + debug "Sidetiq::Clock unlock #{key}" end end end diff --git a/lib/sidetiq/logging.rb b/lib/sidetiq/logging.rb index b838483..beb3dea 100644 --- a/lib/sidetiq/logging.rb +++ b/lib/sidetiq/logging.rb @@ -1,14 +1,12 @@ module Sidetiq # Public: Sidetiq logging interface. module Logging - # Public: Setter for the Sidetiq logger. - attr_writer :logger + %w(fatal error warn info debug).each do |level| + level = level.to_sym - # Public: Reader for the Sidetiq logger. - # - # Defaults to `Sidekiq.logger`. - def logger - @logger ||= Sidekiq.logger + define_method(level) do |msg| + Sidetiq.logger.__send__(level, "[Sidetiq] #{msg}") + end end end end diff --git a/lib/sidetiq/middleware.rb b/lib/sidetiq/middleware.rb deleted file mode 100644 index 03f7625..0000000 --- a/lib/sidetiq/middleware.rb +++ /dev/null @@ -1,23 +0,0 @@ -module Sidetiq - class Middleware - def initialize - @clock = Sidetiq::Clock.instance - end - - def call(*args) - # Restart the clock if the thread died. - if !@clock.ticking? - Sidetiq.logger.warn "Sidetiq::Clock thread died. Restarting..." - @clock.start! - end - yield - end - end -end - -Sidekiq.configure_server do |config| - config.server_middleware do |chain| - chain.add Sidetiq::Middleware - end -end - diff --git a/lib/sidetiq/schedulable.rb b/lib/sidetiq/schedulable.rb index a7d3755..f53a829 100644 --- a/lib/sidetiq/schedulable.rb +++ b/lib/sidetiq/schedulable.rb @@ -12,6 +12,8 @@ module Sidetiq # end module Schedulable module ClassMethods + include Logging + # Public: Returns a Float timestamp of the last scheduled run. def last_scheduled_occurrence get_timestamp "last" @@ -23,15 +25,14 @@ module Sidetiq end def tiq(*args, &block) # :nodoc: - Sidetiq.logger.warn "DEPRECATION WARNING: Sidetiq::Schedulable#tiq" << + warn "DEPRECATION WARNING: Sidetiq::Schedulable#tiq" << " is deprecated and will be removed. Use" << " Sidetiq::Schedulable#recurrence instead." recurrence(*args, &block) end def recurrence(options = {}, &block) # :nodoc: - clock = Sidetiq::Clock.instance - schedule = clock.schedule_for(self) + schedule = Sidetiq.clock.schedule_for(self) schedule.instance_eval(&block) schedule.set_options(options) end diff --git a/lib/sidetiq/supervisor.rb b/lib/sidetiq/supervisor.rb new file mode 100644 index 0000000..5e90c00 --- /dev/null +++ b/lib/sidetiq/supervisor.rb @@ -0,0 +1,36 @@ +module Sidetiq + class Supervisor < Celluloid::SupervisionGroup + supervise Sidetiq::Actor::Clock, as: :sidetiq_clock + + class << self + include Logging + + def clock + run! if Celluloid::Actor[:sidetiq_clock].nil? + + Celluloid::Actor[:sidetiq_clock] + end + + def run! + motd + debug "Sidetiq::Supervisor start" + + super + end + + def run + motd + debug "Sidetiq::Supervisor start (foreground)" + + super + end + + private + + def motd + info "Sidetiq v#{VERSION::STRING} booting ..." + end + end + end +end + diff --git a/lib/sidetiq/web.rb b/lib/sidetiq/web.rb index 3d81ac2..8d27439 100644 --- a/lib/sidetiq/web.rb +++ b/lib/sidetiq/web.rb @@ -5,28 +5,18 @@ module Sidetiq VIEWS = File.expand_path('views', File.dirname(__FILE__)) def self.registered(app) - app.helpers do - def sidetiq_clock - Sidetiq::Clock.instance - end - - def sidetiq_schedules - sidetiq_clock.schedules - end - end - app.get "/sidetiq" do - @schedules = sidetiq_schedules - @time = sidetiq_clock.gettime + @schedules = Sidetiq.schedules + @time = Sidetiq.clock.gettime erb File.read(File.join(VIEWS, 'sidetiq.erb')) end app.get "/sidetiq/:name" do halt 404 unless (name = params[:name]) - @time = sidetiq_clock.gettime + @time = Sidetiq.clock.gettime - @worker, @schedule = sidetiq_schedules.select do |worker, _| + @worker, @schedule = Sidetiq.schedules.select do |worker, _| worker.name == name end.flatten @@ -36,7 +26,7 @@ module Sidetiq app.post "/sidetiq/:name/trigger" do halt 404 unless (name = params[:name]) - worker, _ = sidetiq_schedules.select do |w, _| + worker, _ = Sidetiq.schedules.select do |w, _| w.name == name end.flatten diff --git a/sidetiq.gemspec b/sidetiq.gemspec index 15d2081..4624584 100644 --- a/sidetiq.gemspec +++ b/sidetiq.gemspec @@ -19,8 +19,9 @@ Gem::Specification.new do |gem| gem.require_paths = ["lib"] gem.extensions = [] - gem.add_dependency 'sidekiq', '~> 2.14.0' - gem.add_dependency 'ice_cube', '~> 0.11.0' + gem.add_dependency 'sidekiq', '~> 2.14.0' + gem.add_dependency 'celluloid', '>= 0.14.1' + gem.add_dependency 'ice_cube', '~> 0.11.0' gem.add_development_dependency 'rake' gem.add_development_dependency 'sinatra' diff --git a/test/helper.rb b/test/helper.rb index 5c31cc3..d345c98 100644 --- a/test/helper.rb +++ b/test/helper.rb @@ -13,6 +13,12 @@ require 'sidekiq/testing' require 'sidetiq' require 'sidetiq/web' +class Sidetiq::Supervisor + def self.clock + @clock ||= Sidetiq::Clock.new + end +end + # Keep the test output clean. Sidetiq.logger = Logger.new(nil) @@ -40,7 +46,7 @@ class Sidetiq::TestCase < MiniTest::Unit::TestCase end def clock - @clock ||= Sidetiq::Clock.instance + Sidetiq.clock end # Blatantly stolen from Sidekiq's test suite. diff --git a/test/test_clock.rb b/test/test_clock.rb index 3822643..040fdf7 100644 --- a/test/test_clock.rb +++ b/test/test_clock.rb @@ -1,30 +1,6 @@ require_relative 'helper' class TestClock < Sidetiq::TestCase - def test_delegates_to_instance - Sidetiq::Clock.instance.expects(:foo).once - Sidetiq::Clock.foo - end - - def test_start_stop - refute clock.ticking? - assert_nil clock.thread - - clock.start! - Thread.pass - sleep 0.01 - - assert clock.ticking? - assert_kind_of Thread, clock.thread - - clock.stop! - Thread.pass - sleep 0.01 - - refute clock.ticking? - refute clock.thread.alive? - end - def test_gettime_seconds assert_equal clock.gettime.tv_sec, Time.now.tv_sec end diff --git a/test/test_middleware.rb b/test/test_middleware.rb deleted file mode 100644 index 12a5b91..0000000 --- a/test/test_middleware.rb +++ /dev/null @@ -1,18 +0,0 @@ -require_relative 'helper' - -class TestMiddleware < Sidetiq::TestCase - def middleware - Sidetiq::Middleware.new - end - - def test_restarts_clock - clock.stubs(:ticking?).returns(false) - clock.expects(:start!).once - middleware.call {} - - clock.stubs(:ticking?).returns(true) - clock.expects(:start!).never - middleware.call {} - end -end -