From 547713b4c91e5c232ac3b2a8288b72001c6dfc46 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 5 Jan 2016 14:31:16 -0800 Subject: [PATCH 1/4] Move async execution from celluloid to concurrent-ruby This removes 8 runtime gem dependencies from Rails: ``` Using hitimes 1.2.3 Using timers 4.1.1 Using celluloid-essentials 0.20.5 Using celluloid-extras 0.20.5 Using celluloid-fsm 0.20.5 Using celluloid-pool 0.20.5 Using celluloid-supervision 0.20.5 Using celluloid 0.17.2 ``` --- Gemfile.lock | 2 +- actioncable/actioncable.gemspec | 2 +- .../action_cable/channel/periodic_timers.rb | 2 +- .../lib/action_cable/connection/base.rb | 2 +- .../lib/action_cable/process/logging.rb | 3 - actioncable/lib/action_cable/server/base.rb | 3 - actioncable/lib/action_cable/server/worker.rb | 59 ++++++++++++++----- actioncable/test/test_helper.rb | 5 -- actioncable/test/worker_test.rb | 2 - 9 files changed, 49 insertions(+), 31 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 7c6622dbed..baa8dc15c4 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -31,8 +31,8 @@ PATH specs: actioncable (5.0.0.beta1) actionpack (= 5.0.0.beta1) - celluloid (~> 0.17.2) coffee-rails (~> 4.1.0) + concurrent-ruby (~> 1.0.0) em-hiredis (~> 0.3.0) faye-websocket (~> 0.10.0) redis (~> 3.0) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 74c21bd24d..b6e529e879 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |s| s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' - s.add_dependency 'celluloid', '~> 0.17.2' + s.add_dependency 'concurrent-ruby', '~> 1.0.0' s.add_dependency 'em-hiredis', '~> 0.3.0' s.add_dependency 'redis', '~> 3.0' diff --git a/actioncable/lib/action_cable/channel/periodic_timers.rb b/actioncable/lib/action_cable/channel/periodic_timers.rb index 25fe8e5e54..7f0fb37afc 100644 --- a/actioncable/lib/action_cable/channel/periodic_timers.rb +++ b/actioncable/lib/action_cable/channel/periodic_timers.rb @@ -28,7 +28,7 @@ module ActionCable def start_periodic_timers self.class.periodic_timers.each do |callback, options| active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do - connection.worker_pool.async.run_periodic_timer(self, callback) + connection.worker_pool.async_run_periodic_timer(self, callback) end end end diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 977856d656..a8cfdf90f3 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -103,7 +103,7 @@ module ActionCable # Invoke a method on the connection asynchronously through the pool of thread workers. def send_async(method, *arguments) - worker_pool.async.invoke(self, method, *arguments) + worker_pool.async_invoke(self, method, *arguments) end # Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`. diff --git a/actioncable/lib/action_cable/process/logging.rb b/actioncable/lib/action_cable/process/logging.rb index 72b1a080d1..dce637b3ca 100644 --- a/actioncable/lib/action_cable/process/logging.rb +++ b/actioncable/lib/action_cable/process/logging.rb @@ -1,10 +1,7 @@ require 'action_cable/server' require 'eventmachine' -require 'celluloid' EM.error_handler do |e| puts "Error raised inside the event loop: #{e.message}" puts e.backtrace.join("\n") end - -Celluloid.logger = ActionCable.server.logger diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 740e4b301e..cfd0a65f0f 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,6 +1,3 @@ -# FIXME: Cargo culted fix from https://github.com/celluloid/celluloid-pool/issues/10 -require 'celluloid/current' - require 'em-hiredis' module ActionCable diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index e063b2a2e1..6cddb4e7a5 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,39 +1,70 @@ -require 'celluloid' require 'active_support/callbacks' +require 'concurrent' module ActionCable module Server # Worker used by Server.send_async to do connection work in threads. Only for internal use. class Worker include ActiveSupport::Callbacks - include Celluloid - attr_reader :connection define_callbacks :work include ActiveRecordConnectionManagement - def invoke(receiver, method, *args) - @connection = receiver + def initialize(max_size=5) + @pool = Concurrent::ThreadPoolExecutor.new( + min_threads: 1, + max_threads: max_size, + max_queue: 0, + ) + end - run_callbacks :work do - receiver.send method, *args + def connection + Thread.current[:connection] || raise("No connection set") + end + + def async_invoke(receiver, method, *args) + @pool.post do + invoke(receiver, method, *args) end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") + end - receiver.handle_exception if receiver.respond_to?(:handle_exception) + def invoke(receiver, method, *args) + begin + Thread.current[:connection] = receiver + + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + ensure + Thread.current[:connection] = nil + end + end + + def async_run_periodic_timer(channel, callback) + @pool.post do + run_periodic_timer(channel, callback) + end end def run_periodic_timer(channel, callback) - @connection = channel.connection + begin + Thread.current[:connection] = channel.connection - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + ensure + Thread.current[:connection] = nil end end private + def logger ActionCable.server.logger end diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 12dcd98402..325305939f 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -14,11 +14,6 @@ require 'rack/mock' # Require all the stubs and models Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file } -$CELLULOID_DEBUG = false -$CELLULOID_TEST = false -require 'celluloid' -Celluloid.logger = Logger.new(StringIO.new) - require 'faye/websocket' class << Faye::WebSocket remove_method :ensure_reactor_running diff --git a/actioncable/test/worker_test.rb b/actioncable/test/worker_test.rb index 69c4b6529d..9911a3b98b 100644 --- a/actioncable/test/worker_test.rb +++ b/actioncable/test/worker_test.rb @@ -17,8 +17,6 @@ class WorkerTest < ActiveSupport::TestCase end setup do - Celluloid.boot - @worker = ActionCable::Server::Worker.new @receiver = Receiver.new end From 2bb65e64e6dd3fad06c2742bf856eb4816971689 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 5 Jan 2016 15:10:11 -0800 Subject: [PATCH 2/4] Remove celluloid references --- actioncable/README.md | 2 +- actioncable/test/connection/authorization_test.rb | 1 - actioncable/test/connection/base_test.rb | 1 - actioncable/test/connection/cross_site_forgery_test.rb | 1 - actioncable/test/connection/string_identifier_test.rb | 1 - actioncable/test/connection/subscriptions_test.rb | 1 - 6 files changed, 1 insertion(+), 6 deletions(-) diff --git a/actioncable/README.md b/actioncable/README.md index c7420d48bc..f58c8fdb16 100644 --- a/actioncable/README.md +++ b/actioncable/README.md @@ -427,7 +427,7 @@ messages back and forth over the WebSocket cable connection. This dependency may be alleviated in the future, but for the moment that's what it is. So be sure to have Redis installed and running. -The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid). +The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [concurrent-ruby](https://github.com/ruby-concurrency/concurrent-ruby). ## Deployment diff --git a/actioncable/test/connection/authorization_test.rb b/actioncable/test/connection/authorization_test.rb index 68668b2835..87d0e79ef3 100644 --- a/actioncable/test/connection/authorization_test.rb +++ b/actioncable/test/connection/authorization_test.rb @@ -10,7 +10,6 @@ class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase end def send_async(method, *args) - # Bypass Celluloid send method, *args end end diff --git a/actioncable/test/connection/base_test.rb b/actioncable/test/connection/base_test.rb index da6041db4a..182562db82 100644 --- a/actioncable/test/connection/base_test.rb +++ b/actioncable/test/connection/base_test.rb @@ -14,7 +14,6 @@ class ActionCable::Connection::BaseTest < ActionCable::TestCase end def send_async(method, *args) - # Bypass Celluloid send method, *args end end diff --git a/actioncable/test/connection/cross_site_forgery_test.rb b/actioncable/test/connection/cross_site_forgery_test.rb index d445e08f2a..a29f65fb97 100644 --- a/actioncable/test/connection/cross_site_forgery_test.rb +++ b/actioncable/test/connection/cross_site_forgery_test.rb @@ -6,7 +6,6 @@ class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase class Connection < ActionCable::Connection::Base def send_async(method, *args) - # Bypass Celluloid send method, *args end end diff --git a/actioncable/test/connection/string_identifier_test.rb b/actioncable/test/connection/string_identifier_test.rb index ab69df57b3..9d0bda83ef 100644 --- a/actioncable/test/connection/string_identifier_test.rb +++ b/actioncable/test/connection/string_identifier_test.rb @@ -10,7 +10,6 @@ class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase end def send_async(method, *args) - # Bypass Celluloid send method, *args end end diff --git a/actioncable/test/connection/subscriptions_test.rb b/actioncable/test/connection/subscriptions_test.rb index 4f6760827e..62e41484fe 100644 --- a/actioncable/test/connection/subscriptions_test.rb +++ b/actioncable/test/connection/subscriptions_test.rb @@ -5,7 +5,6 @@ class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase attr_reader :websocket def send_async(method, *args) - # Bypass Celluloid send method, *args end end From e529e3428e2fdcbc03ab9010127c743cc276dbf2 Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 5 Jan 2016 15:11:10 -0800 Subject: [PATCH 3/4] Use Module#thread_mattr_accessor --- actioncable/lib/action_cable/server/worker.rb | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 6cddb4e7a5..7787e99baf 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -1,4 +1,5 @@ require 'active_support/callbacks' +require 'active_support/core_ext/module/attribute_accessors_per_thread' require 'concurrent' module ActionCable @@ -7,6 +8,7 @@ module ActionCable class Worker include ActiveSupport::Callbacks + thread_mattr_accessor :connection define_callbacks :work include ActiveRecordConnectionManagement @@ -18,10 +20,6 @@ module ActionCable ) end - def connection - Thread.current[:connection] || raise("No connection set") - end - def async_invoke(receiver, method, *args) @pool.post do invoke(receiver, method, *args) @@ -30,7 +28,7 @@ module ActionCable def invoke(receiver, method, *args) begin - Thread.current[:connection] = receiver + self.connection = receiver run_callbacks :work do receiver.send method, *args @@ -41,7 +39,7 @@ module ActionCable receiver.handle_exception if receiver.respond_to?(:handle_exception) ensure - Thread.current[:connection] = nil + self.connection = nil end end @@ -53,13 +51,13 @@ module ActionCable def run_periodic_timer(channel, callback) begin - Thread.current[:connection] = channel.connection + self.connection = channel.connection run_callbacks :work do callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) end ensure - Thread.current[:connection] = nil + self.connection = nil end end From cd1d7e287b09a6461d8eb5febd877602de6aca6c Mon Sep 17 00:00:00 2001 From: Mike Perham Date: Tue, 5 Jan 2016 16:26:53 -0800 Subject: [PATCH 4/4] don't need explicit dep and a pretty neat pick --- actioncable/actioncable.gemspec | 1 - actioncable/lib/action_cable/server/worker.rb | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index b6e529e879..a04fc932aa 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -23,7 +23,6 @@ Gem::Specification.new do |s| s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' - s.add_dependency 'concurrent-ruby', '~> 1.0.0' s.add_dependency 'em-hiredis', '~> 0.3.0' s.add_dependency 'redis', '~> 3.0' diff --git a/actioncable/lib/action_cable/server/worker.rb b/actioncable/lib/action_cable/server/worker.rb index 7787e99baf..3b6c6d44a1 100644 --- a/actioncable/lib/action_cable/server/worker.rb +++ b/actioncable/lib/action_cable/server/worker.rb @@ -12,7 +12,7 @@ module ActionCable define_callbacks :work include ActiveRecordConnectionManagement - def initialize(max_size=5) + def initialize(max_size: 5) @pool = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: max_size,