1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

No cramp and use celluloid workers to run callbacks

This commit is contained in:
Pratik Naik 2015-02-05 16:35:11 +05:30
parent 55c956b346
commit 7fef6b01a3
7 changed files with 59 additions and 16 deletions

View file

@ -1,8 +1,6 @@
source 'http://rubygems.org'
gemspec
gem 'cramp', github: "lifo/cramp"
group :test do
gem 'rake'
gem 'puma'

View file

@ -10,7 +10,6 @@ Gem::Specification.new do |s|
s.homepage = 'http://basecamp.com'
s.add_dependency('activesupport', '~> 4.2.0')
s.add_dependency('cramp', '~> 0.15.4')
s.files = Dir['README', 'lib/**/*']
s.has_rdoc = false

View file

@ -1,12 +1,13 @@
require 'cramp'
require 'active_support'
require 'active_support/json'
require 'active_support/concern'
require 'active_support/core_ext/hash/indifferent_access'
require 'active_support/callbacks'
module ActionCable
VERSION = '0.0.1'
autoload :Channel, 'action_cable/channel'
autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
end

View file

@ -41,13 +41,13 @@ module ActionCable
def subscribe
self.class.on_subscribe_callbacks.each do |callback|
EM.next_tick { send(callback) }
send(callback)
end
end
def unsubscribe
self.class.on_unsubscribe_callbacks.each do |callback|
EM.next_tick { send(callback) }
send(callback)
end
end

View file

@ -1,10 +1,11 @@
require 'set'
require 'faye/websocket'
require 'celluloid'
Celluloid::Actor[:worker_pool] = ActionCable::Worker.pool(size: 100)
module ActionCable
class Server < Cramp::Websocket
on_data :received_data
on_finish :cleanup_subscriptions
class Server
class_attribute :registered_channels
self.registered_channels = Set.new
@ -12,12 +13,35 @@ module ActionCable
def register_channels(*channel_classes)
self.registered_channels += channel_classes
end
def call(env)
new(env).process
end
end
def initialize(*)
def initialize(env)
@env = env
end
def process
if Faye::WebSocket.websocket?(@env)
@subscriptions = {}
super
@websocket = Faye::WebSocket.new(@env)
@websocket.on(:message) do |event|
message = event.data
Celluloid::Actor[:worker_pool].async.received_data(self, message) if message.is_a?(String)
end
@websocket.on(:close) do |event|
Celluloid::Actor[:worker_pool].async.cleanup_subscriptions(self)
end
@websocket.rack_response
else
invalid_request
end
end
def received_data(data)
@ -40,7 +64,7 @@ module ActionCable
end
def broadcast(data)
render data
@websocket.send data
end
private
@ -71,5 +95,8 @@ module ActionCable
@subscriptions.delete(id_key)
end
def invalid_request
[404, {'Content-Type' => 'text/plain'}, ['Page not found']]
end
end
end

View file

@ -0,0 +1,19 @@
module ActionCable
class Worker
include ActiveSupport::Callbacks
include Celluloid
define_callbacks :work
def received_data(connection, data)
run_callbacks :work do
connection.received_data(data)
end
end
def cleanup_subscriptions(connection)
connection.cleanup_subscriptions
end
end
end

View file

@ -15,9 +15,8 @@ ActiveSupport.test_order = :sorted
require 'logger'
logger = Logger.new(File.join(File.dirname(__FILE__), "tests.log"))
logger.level = Logger::DEBUG
Cramp.logger = logger
class ActionCableTest < Cramp::TestCase
class ActionCableTest < ActiveSupport::TestCase
PORT = 420420
setup :start_puma_server