1
0
Fork 0
mirror of https://github.com/endofunky/sidetiq.git synced 2022-11-09 13:53:30 -05:00

Move to actor-based concurrency.

Using plain threads in Sidekiq's Celluloid-based actor
model is a little bit like riding a motorcycle without
a helmet, so let's try to fit in a bit better.
This commit is contained in:
Tobias Svensson 2013-09-16 12:01:37 +01:00
parent 37cc6976bf
commit e0ebcaacd9
15 changed files with 126 additions and 184 deletions

View file

@ -3,10 +3,12 @@
require 'sidekiq'
require 'sidetiq'
Sidekiq.logger.level = Logger::DEBUG
Sidekiq.options[:poll_interval] = 1
Sidekiq.configure_server do |config|
Sidetiq::Clock.start!
Sidetiq.clock.start!
end
class MyWorker

View file

@ -6,23 +6,43 @@ require 'socket'
# gems
require 'ice_cube'
require 'sidekiq'
require 'celluloid'
# internal
require 'sidetiq/api'
require 'sidetiq/config'
require 'sidetiq/logging'
require 'sidetiq/api'
require 'sidetiq/clock'
require 'sidetiq/lock'
require 'sidetiq/logging'
require 'sidetiq/middleware'
require 'sidetiq/schedule'
require 'sidetiq/schedulable'
require 'sidetiq/version'
# actor topology
require 'sidetiq/actor/clock'
require 'sidetiq/supervisor'
# The Sidetiq namespace.
module Sidetiq
include Sidetiq::API
include Sidetiq::Logging
# Expose all instance methods as singleton methods.
extend self
class << self
# Public: Setter for the Sidetiq logger.
attr_writer :logger
end
# Public: Reader for the Sidetiq logger.
#
# Defaults to `Sidekiq.logger`.
def logger
@logger ||= Sidekiq.logger
end
# Public: Returns the Sidetiq::Clock actor.
def clock
Sidetiq::Supervisor.clock
end
end

View file

@ -0,0 +1,32 @@
module Sidetiq
module Actor
class Clock < Sidetiq::Clock
include Celluloid
# Public: Starts and supervises the clock actor.
def self.start!
actor.start!
end
# Public: Starts the clock loop.
def start!
debug "Sidetiq::Clock start"
loop!
end
private
def loop!
after([time { tick }, 0].max) do
loop!
end
end
def time
start = gettime
yield
Sidetiq.config.resolution - (gettime.to_f - start.to_f)
end
end
end
end

View file

@ -8,7 +8,7 @@ module Sidetiq
# Public: Returns a Hash of Sidetiq::Schedule instances.
def schedules
Clock.schedules.dup
clock.schedules.dup
end
# Public: Currently scheduled recurring jobs.

View file

@ -1,25 +1,11 @@
module Sidetiq
configure do |config|
config.priority = Thread.main.priority
config.resolution = 1
config.lock_expire = 1000
config.utc = false
end
# Public: The Sidetiq clock.
class Clock
include Singleton
include Logging
# Internal: Returns a hash of Sidetiq::Schedule instances.
attr_reader :schedules
# Internal: Returns the clock thread.
attr_reader :thread
def self.method_missing(meth, *args, &block) # :nodoc:
instance.__send__(meth, *args, &block)
end
def initialize # :nodoc:
super
@schedules = {}
@ -74,56 +60,6 @@ module Sidetiq
Sidetiq.config.utc ? Time.now.utc : Time.now
end
# Public: Starts the clock unless it is already running.
#
# Examples
#
# start!
# # => Thread
#
# Returns the Thread instance of the clock thread.
def start!
return if ticking?
Sidetiq.logger.info "Sidetiq::Clock start"
@thread = Thread.start { clock { tick } }
@thread.abort_on_exception = true
@thread.priority = Sidetiq.config.priority
@thread
end
# Public: Stops the clock if it is running.
#
# Examples
#
# stop!
# # => nil
#
# Returns nil.
def stop!
if ticking?
@thread.kill
Sidetiq.logger.info "Sidetiq::Clock stop"
end
end
# Public: Returns the status of the clock.
#
# Examples
#
# ticking?
# # => false
#
# start!
# ticking?
# # => true
#
# Returns true or false.
def ticking?
@thread && @thread.alive?
end
private
def enqueue(worker, time, redis)
@ -132,7 +68,7 @@ module Sidetiq
next_run = (redis.get("#{key}:next") || -1).to_f
if next_run < time_f
Sidetiq.logger.info "Sidetiq::Clock enqueue #{worker.name} (at: #{time_f}) (last: #{next_run})"
info "Enqueue: #{worker.name} (at: #{time_f}) (last: #{next_run})"
redis.mset("#{key}:last", next_run, "#{key}:next", time_f)
@ -146,23 +82,6 @@ module Sidetiq
end
end
end
def clock
loop do
sleep_time = time { yield }
if sleep_time > 0
Thread.pass
sleep sleep_time
end
end
end
def time
start = gettime
yield
Sidetiq.config.resolution - (gettime.to_f - start.to_f)
end
end
end

View file

@ -1,5 +1,7 @@
module Sidetiq
class Lock # :nodoc: all
include Logging
attr_reader :key, :timeout
OWNER = "#{Socket.gethostname}:#{Process.pid}"
@ -12,13 +14,13 @@ module Sidetiq
def synchronize
Sidekiq.redis do |redis|
if lock(redis)
Sidetiq.logger.debug "Sidetiq::Clock lock #{key}"
debug "Sidetiq::Clock lock #{key}"
begin
yield redis
ensure
unlock(redis)
Sidetiq.logger.debug "Sidetiq::Clock unlock #{key}"
debug "Sidetiq::Clock unlock #{key}"
end
end
end

View file

@ -1,14 +1,12 @@
module Sidetiq
# Public: Sidetiq logging interface.
module Logging
# Public: Setter for the Sidetiq logger.
attr_writer :logger
%w(fatal error warn info debug).each do |level|
level = level.to_sym
# Public: Reader for the Sidetiq logger.
#
# Defaults to `Sidekiq.logger`.
def logger
@logger ||= Sidekiq.logger
define_method(level) do |msg|
Sidetiq.logger.__send__(level, "[Sidetiq] #{msg}")
end
end
end
end

View file

@ -1,23 +0,0 @@
module Sidetiq
class Middleware
def initialize
@clock = Sidetiq::Clock.instance
end
def call(*args)
# Restart the clock if the thread died.
if !@clock.ticking?
Sidetiq.logger.warn "Sidetiq::Clock thread died. Restarting..."
@clock.start!
end
yield
end
end
end
Sidekiq.configure_server do |config|
config.server_middleware do |chain|
chain.add Sidetiq::Middleware
end
end

View file

@ -12,6 +12,8 @@ module Sidetiq
# end
module Schedulable
module ClassMethods
include Logging
# Public: Returns a Float timestamp of the last scheduled run.
def last_scheduled_occurrence
get_timestamp "last"
@ -23,15 +25,14 @@ module Sidetiq
end
def tiq(*args, &block) # :nodoc:
Sidetiq.logger.warn "DEPRECATION WARNING: Sidetiq::Schedulable#tiq" <<
warn "DEPRECATION WARNING: Sidetiq::Schedulable#tiq" <<
" is deprecated and will be removed. Use" <<
" Sidetiq::Schedulable#recurrence instead."
recurrence(*args, &block)
end
def recurrence(options = {}, &block) # :nodoc:
clock = Sidetiq::Clock.instance
schedule = clock.schedule_for(self)
schedule = Sidetiq.clock.schedule_for(self)
schedule.instance_eval(&block)
schedule.set_options(options)
end

36
lib/sidetiq/supervisor.rb Normal file
View file

@ -0,0 +1,36 @@
module Sidetiq
class Supervisor < Celluloid::SupervisionGroup
supervise Sidetiq::Actor::Clock, as: :sidetiq_clock
class << self
include Logging
def clock
run! if Celluloid::Actor[:sidetiq_clock].nil?
Celluloid::Actor[:sidetiq_clock]
end
def run!
motd
debug "Sidetiq::Supervisor start"
super
end
def run
motd
debug "Sidetiq::Supervisor start (foreground)"
super
end
private
def motd
info "Sidetiq v#{VERSION::STRING} booting ..."
end
end
end
end

View file

@ -5,28 +5,18 @@ module Sidetiq
VIEWS = File.expand_path('views', File.dirname(__FILE__))
def self.registered(app)
app.helpers do
def sidetiq_clock
Sidetiq::Clock.instance
end
def sidetiq_schedules
sidetiq_clock.schedules
end
end
app.get "/sidetiq" do
@schedules = sidetiq_schedules
@time = sidetiq_clock.gettime
@schedules = Sidetiq.schedules
@time = Sidetiq.clock.gettime
erb File.read(File.join(VIEWS, 'sidetiq.erb'))
end
app.get "/sidetiq/:name" do
halt 404 unless (name = params[:name])
@time = sidetiq_clock.gettime
@time = Sidetiq.clock.gettime
@worker, @schedule = sidetiq_schedules.select do |worker, _|
@worker, @schedule = Sidetiq.schedules.select do |worker, _|
worker.name == name
end.flatten
@ -36,7 +26,7 @@ module Sidetiq
app.post "/sidetiq/:name/trigger" do
halt 404 unless (name = params[:name])
worker, _ = sidetiq_schedules.select do |w, _|
worker, _ = Sidetiq.schedules.select do |w, _|
w.name == name
end.flatten

View file

@ -19,8 +19,9 @@ Gem::Specification.new do |gem|
gem.require_paths = ["lib"]
gem.extensions = []
gem.add_dependency 'sidekiq', '~> 2.14.0'
gem.add_dependency 'ice_cube', '~> 0.11.0'
gem.add_dependency 'sidekiq', '~> 2.14.0'
gem.add_dependency 'celluloid', '>= 0.14.1'
gem.add_dependency 'ice_cube', '~> 0.11.0'
gem.add_development_dependency 'rake'
gem.add_development_dependency 'sinatra'

View file

@ -13,6 +13,12 @@ require 'sidekiq/testing'
require 'sidetiq'
require 'sidetiq/web'
class Sidetiq::Supervisor
def self.clock
@clock ||= Sidetiq::Clock.new
end
end
# Keep the test output clean.
Sidetiq.logger = Logger.new(nil)
@ -40,7 +46,7 @@ class Sidetiq::TestCase < MiniTest::Unit::TestCase
end
def clock
@clock ||= Sidetiq::Clock.instance
Sidetiq.clock
end
# Blatantly stolen from Sidekiq's test suite.

View file

@ -1,30 +1,6 @@
require_relative 'helper'
class TestClock < Sidetiq::TestCase
def test_delegates_to_instance
Sidetiq::Clock.instance.expects(:foo).once
Sidetiq::Clock.foo
end
def test_start_stop
refute clock.ticking?
assert_nil clock.thread
clock.start!
Thread.pass
sleep 0.01
assert clock.ticking?
assert_kind_of Thread, clock.thread
clock.stop!
Thread.pass
sleep 0.01
refute clock.ticking?
refute clock.thread.alive?
end
def test_gettime_seconds
assert_equal clock.gettime.tv_sec, Time.now.tv_sec
end

View file

@ -1,18 +0,0 @@
require_relative 'helper'
class TestMiddleware < Sidetiq::TestCase
def middleware
Sidetiq::Middleware.new
end
def test_restarts_clock
clock.stubs(:ticking?).returns(false)
clock.expects(:start!).once
middleware.call {}
clock.stubs(:ticking?).returns(true)
clock.expects(:start!).never
middleware.call {}
end
end