From 7fef6b01a3011438d48136e3f95bb9a823e87ec6 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Thu, 5 Feb 2015 16:35:11 +0530 Subject: [PATCH] No cramp and use celluloid workers to run callbacks --- Gemfile | 2 -- action_cable.gemspec | 1 - lib/action_cable.rb | 3 ++- lib/action_cable/channel/base.rb | 4 +-- lib/action_cable/server.rb | 43 ++++++++++++++++++++++++++------ lib/action_cable/worker.rb | 19 ++++++++++++++ test/test_helper.rb | 3 +-- 7 files changed, 59 insertions(+), 16 deletions(-) create mode 100644 lib/action_cable/worker.rb diff --git a/Gemfile b/Gemfile index 3ef2cb6af8..7dfe51bf00 100644 --- a/Gemfile +++ b/Gemfile @@ -1,8 +1,6 @@ source 'http://rubygems.org' gemspec -gem 'cramp', github: "lifo/cramp" - group :test do gem 'rake' gem 'puma' diff --git a/action_cable.gemspec b/action_cable.gemspec index 63ba751e9d..f6fcc92fee 100644 --- a/action_cable.gemspec +++ b/action_cable.gemspec @@ -10,7 +10,6 @@ Gem::Specification.new do |s| s.homepage = 'http://basecamp.com' s.add_dependency('activesupport', '~> 4.2.0') - s.add_dependency('cramp', '~> 0.15.4') s.files = Dir['README', 'lib/**/*'] s.has_rdoc = false diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 7df2a8c5eb..993c260e49 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -1,12 +1,13 @@ -require 'cramp' require 'active_support' require 'active_support/json' require 'active_support/concern' require 'active_support/core_ext/hash/indifferent_access' +require 'active_support/callbacks' module ActionCable VERSION = '0.0.1' autoload :Channel, 'action_cable/channel' + autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' end diff --git a/lib/action_cable/channel/base.rb b/lib/action_cable/channel/base.rb index ae8822d2a2..e311cc97e9 100644 --- a/lib/action_cable/channel/base.rb +++ b/lib/action_cable/channel/base.rb @@ -41,13 +41,13 @@ module ActionCable def subscribe self.class.on_subscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end def unsubscribe self.class.on_unsubscribe_callbacks.each do |callback| - EM.next_tick { send(callback) } + send(callback) end end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index cdf8ea0f66..ea22f0014e 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,10 +1,11 @@ require 'set' +require 'faye/websocket' +require 'celluloid' + +Celluloid::Actor[:worker_pool] = ActionCable::Worker.pool(size: 100) module ActionCable - class Server < Cramp::Websocket - on_data :received_data - on_finish :cleanup_subscriptions - + class Server class_attribute :registered_channels self.registered_channels = Set.new @@ -12,12 +13,35 @@ module ActionCable def register_channels(*channel_classes) self.registered_channels += channel_classes end + + def call(env) + new(env).process + end end - def initialize(*) - @subscriptions = {} + def initialize(env) + @env = env + end - super + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) + + @websocket.on(:message) do |event| + message = event.data + Celluloid::Actor[:worker_pool].async.received_data(self, message) if message.is_a?(String) + end + + @websocket.on(:close) do |event| + Celluloid::Actor[:worker_pool].async.cleanup_subscriptions(self) + end + + @websocket.rack_response + else + invalid_request + end end def received_data(data) @@ -40,7 +64,7 @@ module ActionCable end def broadcast(data) - render data + @websocket.send data end private @@ -71,5 +95,8 @@ module ActionCable @subscriptions.delete(id_key) end + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end end end diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb new file mode 100644 index 0000000000..46b5f7edc0 --- /dev/null +++ b/lib/action_cable/worker.rb @@ -0,0 +1,19 @@ +module ActionCable + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def received_data(connection, data) + run_callbacks :work do + connection.received_data(data) + end + end + + def cleanup_subscriptions(connection) + connection.cleanup_subscriptions + end + + end +end diff --git a/test/test_helper.rb b/test/test_helper.rb index 5251e711b7..10a4827281 100644 --- a/test/test_helper.rb +++ b/test/test_helper.rb @@ -15,9 +15,8 @@ ActiveSupport.test_order = :sorted require 'logger' logger = Logger.new(File.join(File.dirname(__FILE__), "tests.log")) logger.level = Logger::DEBUG -Cramp.logger = logger -class ActionCableTest < Cramp::TestCase +class ActionCableTest < ActiveSupport::TestCase PORT = 420420 setup :start_puma_server