mirror of
https://github.com/mperham/sidekiq.git
synced 2022-11-09 13:52:34 -05:00
Port the scheduler actor to bare threads
This commit is contained in:
parent
da563e73b1
commit
b182b117f1
3 changed files with 66 additions and 25 deletions
|
@ -39,28 +39,46 @@ module Sidekiq
|
|||
# workers can pick it up like any other job.
|
||||
class Poller
|
||||
include Util
|
||||
include Actor
|
||||
|
||||
INITIAL_WAIT = 10
|
||||
|
||||
def initialize
|
||||
@enq = (Sidekiq.options[:scheduled_enq] || Sidekiq::Scheduled::Enq).new
|
||||
@done = false
|
||||
@queue = ConnectionPool::TimedStack.new
|
||||
end
|
||||
|
||||
def poll(first_time=false)
|
||||
watchdog('scheduling poller thread died!') do
|
||||
initial_wait if first_time
|
||||
# Shut down this Fetcher instance, will pause until
|
||||
# the thread is dead.
|
||||
def terminate
|
||||
@done = true
|
||||
if @thread
|
||||
t = @thread
|
||||
@thread = nil
|
||||
@queue << 0
|
||||
t.value
|
||||
end
|
||||
end
|
||||
|
||||
begin
|
||||
@enq.enqueue_jobs
|
||||
rescue => ex
|
||||
# Most likely a problem with redis networking.
|
||||
# Punt and try again at the next interval
|
||||
logger.error ex.message
|
||||
logger.error ex.backtrace.first
|
||||
def start
|
||||
@thread ||= safe_thread("scheduler") do
|
||||
@queue.pop(initial_wait)
|
||||
|
||||
while !@done
|
||||
enqueue
|
||||
@queue.pop(random_poll_interval)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
after(random_poll_interval) { poll }
|
||||
def enqueue
|
||||
begin
|
||||
@enq.enqueue_jobs
|
||||
rescue => ex
|
||||
# Most likely a problem with redis networking.
|
||||
# Punt and try again at the next interval
|
||||
logger.error ex.message
|
||||
logger.error ex.backtrace.first
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -98,16 +116,12 @@ module Sidekiq
|
|||
end
|
||||
|
||||
def initial_wait
|
||||
begin
|
||||
# Have all processes sleep between 5-15 seconds. 10 seconds
|
||||
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
|
||||
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
|
||||
sleep(INITIAL_WAIT) unless Sidekiq.options[:poll_interval_average]
|
||||
sleep(5 * rand)
|
||||
rescue Celluloid::TaskTerminated
|
||||
# Hit Ctrl-C when Sidekiq is finished booting and we have a chance
|
||||
# to get here.
|
||||
end
|
||||
# Have all processes sleep between 5-15 seconds. 10 seconds
|
||||
# to give time for the heartbeat to register (if the poll interval is going to be calculated by the number
|
||||
# of workers), and 5 random seconds to ensure they don't all hit Redis at the same time.
|
||||
total = 0
|
||||
total += INITIAL_WAIT unless Sidekiq.options[:poll_interval_average]
|
||||
total += (5 * rand)
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -18,7 +18,6 @@ class TestActors < Sidekiq::Test
|
|||
|
||||
mgr = Minitest::Mock.new
|
||||
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
|
||||
mgr.expect(:assign, nil, [Sidekiq::BasicFetch::UnitOfWork])
|
||||
f = Sidekiq::Fetcher.new(mgr, { queues: ['default'] })
|
||||
f.start
|
||||
f.request_job
|
||||
|
@ -30,4 +29,32 @@ class TestActors < Sidekiq::Test
|
|||
end
|
||||
end
|
||||
|
||||
describe 'scheduler' do
|
||||
it 'can start and stop' do
|
||||
f = Sidekiq::Scheduled::Poller.new
|
||||
f.start
|
||||
f.terminate
|
||||
end
|
||||
|
||||
it 'can schedule' do
|
||||
ss = Sidekiq::ScheduledSet.new
|
||||
ss.clear
|
||||
|
||||
q = Sidekiq::Queue.new
|
||||
q.clear
|
||||
|
||||
SomeWorker.perform_in(0.01)
|
||||
|
||||
assert_equal 0, q.size
|
||||
assert_equal 1, ss.size
|
||||
|
||||
sleep 0.01
|
||||
s = Sidekiq::Scheduled::Poller.new
|
||||
s.enqueue
|
||||
assert_equal 1, q.size
|
||||
assert_equal 0, ss.size
|
||||
s.terminate
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -37,7 +37,7 @@ class TestScheduled < Sidekiq::Test
|
|||
@scheduled.schedule (Time.now - 60).to_f, @future_2
|
||||
@scheduled.schedule (Time.now - 60).to_f, @future_3
|
||||
|
||||
@poller.poll
|
||||
@poller.enqueue
|
||||
|
||||
assert_equal 0, Sidekiq::Queue.new("queue_1").size
|
||||
assert_equal 1, Sidekiq::Queue.new("queue_2").size
|
||||
|
@ -62,7 +62,7 @@ class TestScheduled < Sidekiq::Test
|
|||
end
|
||||
|
||||
Time.stub(:now, enqueued_time) do
|
||||
@poller.poll
|
||||
@poller.enqueue
|
||||
|
||||
Sidekiq.redis do |conn|
|
||||
%w(queue:queue_1 queue:queue_2 queue:queue_4 queue:queue_5).each do |queue_name|
|
||||
|
|
Loading…
Reference in a new issue