1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Merge pull request #22934 from mperham/master

Move async execution from celluloid to concurrent-ruby
This commit is contained in:
David Heinemeier Hansson 2016-01-08 18:23:41 +01:00
commit 3b7ccadfc1
15 changed files with 47 additions and 37 deletions

View file

@ -31,8 +31,8 @@ PATH
specs: specs:
actioncable (5.0.0.beta1) actioncable (5.0.0.beta1)
actionpack (= 5.0.0.beta1) actionpack (= 5.0.0.beta1)
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0) coffee-rails (~> 4.1.0)
concurrent-ruby (~> 1.0.0)
em-hiredis (~> 0.3.0) em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0) faye-websocket (~> 0.10.0)
redis (~> 3.0) redis (~> 3.0)

View file

@ -427,7 +427,7 @@ messages back and forth over the WebSocket cable connection. This dependency may
be alleviated in the future, but for the moment that's what it is. So be sure to have be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running. Redis installed and running.
The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid). The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby).
## Deployment ## Deployment

View file

@ -23,7 +23,6 @@ Gem::Specification.new do |s|
s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1' s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'celluloid', '~> 0.17.2'
s.add_dependency 'em-hiredis', '~> 0.3.0' s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0' s.add_dependency 'redis', '~> 3.0'

View file

@ -28,7 +28,7 @@ module ActionCable
def start_periodic_timers def start_periodic_timers
self.class.periodic_timers.each do |callback, options| self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async.run_periodic_timer(self, callback) connection.worker_pool.async_run_periodic_timer(self, callback)
end end
end end
end end

View file

@ -103,7 +103,7 @@ module ActionCable
# Invoke a method on the connection asynchronously through the pool of thread workers. # Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments) def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments) worker_pool.async_invoke(self, method, *arguments)
end end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.

View file

@ -1,10 +1,7 @@
require 'action_cable/server' require 'action_cable/server'
require 'eventmachine' require 'eventmachine'
require 'celluloid'
EM.error_handler do |e| EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}" puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n") puts e.backtrace.join("\n")
end end
Celluloid.logger = ActionCable.server.logger

View file

@ -1,6 +1,3 @@
# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10
require 'celluloid/current'
require 'em-hiredis' require 'em-hiredis'
module ActionCable module ActionCable

View file

@ -1,19 +1,34 @@
require 'celluloid'
require 'active_support/callbacks' require 'active_support/callbacks'
require 'active_support/core_ext/module/attribute_accessors_per_thread'
require 'concurrent'
module ActionCable module ActionCable
module Server module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use. # Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker class Worker
include ActiveSupport::Callbacks include ActiveSupport::Callbacks
include Celluloid
attr_reader :connection thread_mattr_accessor :connection
define_callbacks :work define_callbacks :work
include ActiveRecordConnectionManagement include ActiveRecordConnectionManagement
def initialize(max_size: 5)
@pool = Concurrent::ThreadPoolExecutor.new(
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
def async_invoke(receiver, method, *args)
@pool.post do
invoke(receiver, method, *args)
end
end
def invoke(receiver, method, *args) def invoke(receiver, method, *args)
@connection = receiver begin
self.connection = receiver
run_callbacks :work do run_callbacks :work do
receiver.send method, *args receiver.send method, *args
@ -23,17 +38,31 @@ module ActionCable
logger.error e.backtrace.join("\n") logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception) receiver.handle_exception if receiver.respond_to?(:handle_exception)
ensure
self.connection = nil
end
end
def async_run_periodic_timer(channel, callback)
@pool.post do
run_periodic_timer(channel, callback)
end
end end
def run_periodic_timer(channel, callback) def run_periodic_timer(channel, callback)
@connection = channel.connection begin
self.connection = channel.connection
run_callbacks :work do run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end end
ensure
self.connection = nil
end
end end
private private
def logger def logger
ActionCable.server.logger ActionCable.server.logger
end end

View file

@ -10,7 +10,6 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase
end end
def send_async(method, *args) def send_async(method, *args)
# Bypass Celluloid
send method, *args send method, *args
end end
end end

View file

@ -14,7 +14,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase
end end
def send_async(method, *args) def send_async(method, *args)
# Bypass Celluloid
send method, *args send method, *args
end end
end end

View file

@ -6,7 +6,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base class Connection < ActionCable::Connection::Base
def send_async(method, *args) def send_async(method, *args)
# Bypass Celluloid
send method, *args send method, *args
end end
end end

View file

@ -10,7 +10,6 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
end end
def send_async(method, *args) def send_async(method, *args)
# Bypass Celluloid
send method, *args send method, *args
end end
end end

View file

@ -5,7 +5,6 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
attr_reader :websocket attr_reader :websocket
def send_async(method, *args) def send_async(method, *args)
# Bypass Celluloid
send method, *args send method, *args
end end
end end

View file

@ -14,11 +14,6 @@ require 'rack/mock'
# Require all the stubs and models # Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
$CELLULOID_DEBUG = false
$CELLULOID_TEST = false
require 'celluloid'
Celluloid.logger = Logger.new(StringIO.new)
require 'faye/websocket' require 'faye/websocket'
class << Faye::WebSocket class << Faye::WebSocket
remove_method :ensure_reactor_running remove_method :ensure_reactor_running

View file

@ -17,8 +17,6 @@ class WorkerTest < ActiveSupport::TestCase
end end
setup do setup do
Celluloid.boot
@worker = ActionCable::Server::Worker.new @worker = ActionCable::Server::Worker.new
@receiver = Receiver.new @receiver = Receiver.new
end end