mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Cable: Periodic timers refresh
* Rewrite docs * Support blocks in addition to method names and Proc args * Check for valid arguments * Convert `periodically :method_name` to Proc callbacks * Drop periodic runner methods from the worker pool * Ensure we clear active periodic timers after shutdown
This commit is contained in:
parent
811c532351
commit
983b743c8c
4 changed files with 83 additions and 39 deletions
|
@ -12,11 +12,42 @@ module ActionCable
|
|||
end
|
||||
|
||||
module ClassMethods
|
||||
# Allows you to call a private method periodically. Specify the period, in seconds, using the <tt>every</tt> keyword argument.
|
||||
# This periodic timer can be useful for sending a steady flow of updates to a client based off an object that was configured on subscription.
|
||||
# It's an alternative to using streams if the channel is able to do the work internally.
|
||||
def periodically(callback, every:)
|
||||
self.periodic_timers += [ [ callback, every: every ] ]
|
||||
# Periodically performs a task on the channel, like updating an online
|
||||
# user counter, polling a backend for new status messages, sending
|
||||
# regular "heartbeat" messages, or doing some internal work and giving
|
||||
# progress updates.
|
||||
#
|
||||
# Pass a method name or lambda argument or provide a block to call.
|
||||
# Specify the calling period in seconds using the <tt>every:</tt>
|
||||
# keyword argument.
|
||||
#
|
||||
# periodically :transmit_progress, every: 5.seconds
|
||||
#
|
||||
# periodically every: 3.minutes do
|
||||
# transmit action: :update_count, count: current_count
|
||||
# end
|
||||
#
|
||||
def periodically(callback_or_method_name = nil, every:, &block)
|
||||
callback =
|
||||
if block_given?
|
||||
raise ArgumentError, 'Pass a block or provide a callback arg, not both' if callback_or_method_name
|
||||
block
|
||||
else
|
||||
case callback_or_method_name
|
||||
when Proc
|
||||
callback_or_method_name
|
||||
when Symbol
|
||||
-> { __send__ callback_or_method_name }
|
||||
else
|
||||
raise ArgumentError, "Expected a Symbol method name or a Proc, got #{callback_or_method_name.inspect}"
|
||||
end
|
||||
end
|
||||
|
||||
unless every.kind_of?(Numeric) && every > 0
|
||||
raise ArgumentError, "Expected every: to be a positive number of seconds, got #{every.inspect}"
|
||||
end
|
||||
|
||||
self.periodic_timers += [[ callback, every: every ]]
|
||||
end
|
||||
end
|
||||
|
||||
|
@ -27,14 +58,21 @@ module ActionCable
|
|||
|
||||
def start_periodic_timers
|
||||
self.class.periodic_timers.each do |callback, options|
|
||||
active_periodic_timers << connection.server.event_loop.timer(options[:every]) do
|
||||
connection.worker_pool.async_run_periodic_timer(self, callback)
|
||||
active_periodic_timers << start_periodic_timer(callback, every: options.fetch(:every))
|
||||
end
|
||||
end
|
||||
|
||||
def start_periodic_timer(callback, every:)
|
||||
connection.server.event_loop.timer every do
|
||||
connection.worker_pool.async_invoke connection do
|
||||
instance_exec(&callback)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def stop_periodic_timers
|
||||
active_periodic_timers.each { |timer| timer.shutdown }
|
||||
active_periodic_timers.clear
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -59,18 +59,6 @@ module ActionCable
|
|||
end
|
||||
end
|
||||
|
||||
def async_run_periodic_timer(channel, callback)
|
||||
@pool.post do
|
||||
run_periodic_timer(channel, callback)
|
||||
end
|
||||
end
|
||||
|
||||
def run_periodic_timer(channel, callback)
|
||||
work(channel.connection) do
|
||||
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def logger
|
||||
|
|
|
@ -1,12 +1,21 @@
|
|||
require 'test_helper'
|
||||
require 'stubs/test_connection'
|
||||
require 'stubs/room'
|
||||
require 'active_support/time'
|
||||
|
||||
class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
|
||||
class ChatChannel < ActionCable::Channel::Base
|
||||
periodically -> { ping }, every: 5
|
||||
# Method name arg
|
||||
periodically :send_updates, every: 1
|
||||
|
||||
# Proc arg
|
||||
periodically -> { ping }, every: 2
|
||||
|
||||
# Block arg
|
||||
periodically every: 3 do
|
||||
ping
|
||||
end
|
||||
|
||||
private
|
||||
def ping
|
||||
end
|
||||
|
@ -19,22 +28,41 @@ class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
|
|||
test "periodic timers definition" do
|
||||
timers = ChatChannel.periodic_timers
|
||||
|
||||
assert_equal 2, timers.size
|
||||
assert_equal 3, timers.size
|
||||
|
||||
first_timer = timers[0]
|
||||
assert_kind_of Proc, first_timer[0]
|
||||
assert_equal 5, first_timer[1][:every]
|
||||
timers.each_with_index do |timer, i|
|
||||
assert_kind_of Proc, timer[0]
|
||||
assert_equal i+1, timer[1][:every]
|
||||
end
|
||||
end
|
||||
|
||||
second_timer = timers[1]
|
||||
assert_equal :send_updates, second_timer[0]
|
||||
assert_equal 1, second_timer[1][:every]
|
||||
test 'disallow negative and zero periods' do
|
||||
[ 0, 0.0, 0.seconds, -1, -1.seconds, 'foo', :foo, Object.new ].each do |invalid|
|
||||
assert_raise ArgumentError, /Expected every:/ do
|
||||
ChatChannel.periodically :send_updates, every: invalid
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
test 'disallow block and arg together' do
|
||||
assert_raise ArgumentError, /not both/ do
|
||||
ChatChannel.periodically(:send_updates, every: 1) { ping }
|
||||
end
|
||||
end
|
||||
|
||||
test 'disallow unknown args' do
|
||||
[ 'send_updates', Object.new, nil ].each do |invalid|
|
||||
assert_raise ArgumentError, /Expected a Symbol/ do
|
||||
ChatChannel.periodically invalid, every: 1
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
test "timer start and stop" do
|
||||
@connection.server.event_loop.expects(:timer).times(2).returns(true)
|
||||
@connection.server.event_loop.expects(:timer).times(3).returns(stub(shutdown: nil))
|
||||
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
|
||||
|
||||
channel.expects(:stop_periodic_timers).once
|
||||
channel.unsubscribe_from_channel
|
||||
assert_equal [], channel.send(:active_periodic_timers)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -41,14 +41,4 @@ class WorkerTest < ActiveSupport::TestCase
|
|||
@worker.invoke @receiver, :process, "Hello"
|
||||
assert_equal [ :process, "Hello" ], @receiver.last_action
|
||||
end
|
||||
|
||||
test "running periodic timers with a proc" do
|
||||
@worker.run_periodic_timer @receiver, @receiver.method(:run)
|
||||
assert_equal :run, @receiver.last_action
|
||||
end
|
||||
|
||||
test "running periodic timers with a method" do
|
||||
@worker.run_periodic_timer @receiver, :run
|
||||
assert_equal :run, @receiver.last_action
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in a new issue