1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Merge branch 'master' of github.com:basecamp/action_cable

This commit is contained in:
David Heinemeier Hansson 2015-07-04 21:30:48 +02:00
commit 0de65cf2d8
32 changed files with 676 additions and 494 deletions

View file

@ -1,7 +1,7 @@
Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
s.name = 'action_cable'
s.version = '0.0.3'
s.version = '0.1.0'
s.summary = 'Framework for websockets.'
s.description = 'Action Cable is a framework for realtime communication over websockets.'

View file

@ -19,10 +19,10 @@ require 'action_cable/engine' if defined?(Rails)
module ActionCable
VERSION = '0.0.3'
autoload :Channel, 'action_cable/channel'
autoload :Worker, 'action_cable/worker'
autoload :Server, 'action_cable/server'
autoload :Connection, 'action_cable/connection'
autoload :Channel, 'action_cable/channel'
autoload :RemoteConnection, 'action_cable/remote_connection'
autoload :RemoteConnections, 'action_cable/remote_connections'
autoload :Broadcaster, 'action_cable/broadcaster'

View file

@ -1,17 +0,0 @@
module ActionCable
class Broadcaster
attr_reader :server, :channel, :redis
delegate :logger, to: :server
def initialize(server, channel)
@server = server
@channel = channel
@redis = @server.threaded_redis
end
def broadcast(message)
logger.info "[ActionCable] Broadcasting to #{channel}: #{message}"
redis.publish channel, message.to_json
end
end
end

View file

@ -1,7 +1,7 @@
module ActionCable
module Channel
autoload :Callbacks, 'action_cable/channel/callbacks'
autoload :Redis, 'action_cable/channel/redis'
autoload :Streams, 'action_cable/channel/streams'
autoload :Base, 'action_cable/channel/base'
end
end

View file

@ -2,7 +2,7 @@ module ActionCable
module Channel
class Base
include Callbacks
include Redis
include Streams
on_subscribe :start_periodic_timers
on_unsubscribe :stop_periodic_timers
@ -10,16 +10,10 @@ module ActionCable
attr_reader :params, :connection
delegate :logger, to: :connection
class_attribute :channel_name
class << self
def matches?(identifier)
raise "Please implement #{name}#matches? method"
end
def find_name
@name ||= channel_name || to_s.demodulize.underscore
end
end
def initialize(connection, channel_identifier, params = {})
@ -138,4 +132,4 @@ module ActionCable
end
end
end
end
end

View file

@ -1,37 +0,0 @@
module ActionCable
module Channel
module Redis
extend ActiveSupport::Concern
included do
on_unsubscribe :unsubscribe_from_all_channels
delegate :pubsub, to: :connection
end
def subscribe_to(redis_channel, callback = nil)
callback ||= default_subscription_callback(redis_channel)
@_redis_channels ||= []
@_redis_channels << [ redis_channel, callback ]
pubsub.subscribe(redis_channel, &callback)
logger.info "#{channel_name} subscribed to broadcasts from #{redis_channel}"
end
def unsubscribe_from_all_channels
if @_redis_channels
@_redis_channels.each do |redis_channel, callback|
pubsub.unsubscribe_proc(redis_channel, callback)
logger.info "#{channel_name} unsubscribed to broadcasts from #{redis_channel}"
end
end
end
protected
def default_subscription_callback(channel)
-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "broadcast from #{channel}"
end
end
end
end
end

View file

@ -0,0 +1,40 @@
module ActionCable
module Channel
module Streams
extend ActiveSupport::Concern
included do
on_unsubscribe :stop_all_streams
end
def stream_from(broadcasting, callback = nil)
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
pubsub.subscribe broadcasting, &callback
logger.info "#{channel_name} is streaming from #{broadcasting}"
end
def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe_proc broadcasting, callback
logger.info "#{channel_name} stopped streaming from #{broadcasting}"
end
end
private
delegate :pubsub, to: :connection
def streams
@_streams ||= []
end
def default_stream_callback(broadcasting)
-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
end
end
end
end
end

View file

@ -1,8 +1,12 @@
module ActionCable
module Connection
autoload :Base, 'action_cable/connection/base'
autoload :Heartbeat, 'action_cable/connection/heartbeat'
autoload :Identification, 'action_cable/connection/identification'
autoload :InternalChannel, 'action_cable/connection/internal_channel'
autoload :Identifier, 'action_cable/connection/identifier'
autoload :MessageBuffer, 'action_cable/connection/message_buffer'
autoload :WebSocket, 'action_cable/connection/web_socket'
autoload :Subscriptions, 'action_cable/connection/subscriptions'
autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
end
end

View file

@ -1,112 +1,67 @@
module ActionCable
module Connection
class Base
include InternalChannel, Identifier
include Identification
include InternalChannel
PING_INTERVAL = 3
class_attribute :identifiers
self.identifiers = Set.new
def self.identified_by(*identifiers)
self.identifiers += identifiers
end
attr_reader :env, :server, :logger
attr_reader :server, :env
delegate :worker_pool, :pubsub, to: :server
attr_reader :logger
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
@heartbeat = ActionCable::Connection::Heartbeat.new(self)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@started_at = Time.now
@server = server
@env = env
@accept_messages = false
@pending_messages = []
@subscriptions = {}
@logger = TaggedLoggerProxy.new(server.logger, tags: log_tags)
end
def process
logger.info started_request_message
if websocket?
@websocket = Faye::WebSocket.new(@env)
@websocket.on(:open) do |event|
transmit_ping_timestamp
@ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { transmit_ping_timestamp }
worker_pool.async.invoke(self, :initialize_connection)
end
@websocket.on(:message) do |event|
message = event.data
if message.is_a?(String)
if @accept_messages
worker_pool.async.invoke(self, :received_data, message)
else
@pending_messages << message
end
end
end
@websocket.on(:close) do |event|
logger.info finished_request_message
worker_pool.async.invoke(self, :on_connection_closed)
EventMachine.cancel_timer(@ping_timer) if @ping_timer
end
@websocket.rack_response
if websocket.possible?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
invalid_request
respond_to_invalid_request
end
end
def received_data(data)
return unless websocket_alive?
data = ActiveSupport::JSON.decode data
case data['command']
when 'subscribe'
subscribe_channel(data)
when 'unsubscribe'
unsubscribe_channel(data)
when 'message'
process_message(data)
def receive(data_in_json)
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
logger.error "Received unrecognized command in #{data.inspect}"
end
end
def cleanup_subscriptions
@subscriptions.each do |id, channel|
channel.perform_disconnection
logger.error "Received data without a live websocket (#{data.inspect})"
end
end
def transmit(data)
@websocket.send data
websocket.transmit data
end
def close
logger.error "Closing connection"
websocket.close
end
def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
end
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: @subscriptions.keys
}
{ identifier: connection_identifier, started_at: @started_at, subscriptions: subscriptions.identifiers }
end
def handle_exception
close_connection
end
def close_connection
logger.error "Closing connection"
@websocket.close
end
protected
def request
@ -117,79 +72,59 @@ module ActionCable
request.cookie_jar
end
def initialize_connection
private
attr_reader :websocket
attr_reader :heartbeat, :subscriptions, :message_buffer
def on_open
server.add_connection(self)
connect if respond_to?(:connect)
subscribe_to_internal_channel
heartbeat.start
@accept_messages = true
worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty?
message_buffer.process!
end
def on_connection_closed
def on_message(message)
message_buffer.append message
end
def on_close
logger.info finished_request_message
server.remove_connection(self)
cleanup_subscriptions
subscriptions.cleanup
unsubscribe_from_internal_channel
heartbeat.stop
disconnect if respond_to?(:disconnect)
end
def transmit_ping_timestamp
transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
def respond_to_successful_request
websocket.rack_response
end
def subscribe_channel(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] }
if subscription_klass
@subscriptions[id_key] = subscription_klass.new(self, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
rescue Exception => e
logger.error "Could not subscribe to channel (#{data.inspect})"
log_exception(e)
end
def process_message(message)
if @subscriptions[message['identifier']]
@subscriptions[message['identifier']].perform_action(ActiveSupport::JSON.decode message['data'])
else
raise "Unable to process message because no subscription was found (#{message.inspect})"
end
rescue Exception => e
logger.error "Could not process message (#{message.inspect})"
log_exception(e)
end
def unsubscribe_channel(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
@subscriptions[data['identifier']].perform_disconnection
@subscriptions.delete(data['identifier'])
end
def invalid_request
def respond_to_invalid_request
logger.info finished_request_message
[404, {'Content-Type' => 'text/plain'}, ['Page not found']]
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
def websocket_alive?
@websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN
end
def websocket?
@is_websocket ||= Faye::WebSocket.websocket?(@env)
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,
tags: server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
websocket? ? ' [Websocket]' : '',
websocket.possible? ? ' [Websocket]' : '',
request.ip,
Time.now.to_default_s ]
end
@ -197,19 +132,10 @@ module ActionCable
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
websocket? ? ' [Websocket]' : '',
websocket.possible? ? ' [Websocket]' : '',
request.ip,
Time.now.to_default_s ]
end
def log_exception(e)
logger.error "There was an exception: #{e.class} - #{e.message}"
logger.error e.backtrace.join("\n")
end
def log_tags
server.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
end
end
end

View file

@ -0,0 +1,27 @@
module ActionCable
module Connection
class Heartbeat
BEAT_INTERVAL = 3
def initialize(connection)
@connection = connection
end
def start
beat
@timer = EventMachine.add_periodic_timer(BEAT_INTERVAL) { beat }
end
def stop
EventMachine.cancel_timer(@timer) if @timer
end
private
attr_reader :connection
def beat
connection.transmit({ identifier: '_ping', message: Time.now.to_i }.to_json)
end
end
end
end

View file

@ -0,0 +1,26 @@
module ActionCable
module Connection
module Identification
extend ActiveSupport::Concern
included do
class_attribute :identifiers
self.identifiers = Set.new
end
class_methods do
def identified_by(*identifiers)
self.identifiers += identifiers
end
end
def connection_identifier
@connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
end
def connection_gid(ids)
ids.map { |o| o.to_global_id.to_s }.sort.join(":")
end
end
end
end

View file

@ -1,17 +0,0 @@
module ActionCable
module Connection
module Identifier
def internal_redis_channel
"action_cable/#{connection_identifier}"
end
def connection_identifier
@connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")}.compact
end
def connection_gid(ids)
ids.map {|o| o.to_global_id.to_s }.sort.join(":")
end
end
end
end

View file

@ -3,6 +3,10 @@ module ActionCable
module InternalChannel
extend ActiveSupport::Concern
def internal_redis_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
@ -27,13 +31,13 @@ module ActionCable
case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
@websocket.close
websocket.close
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
handle_exception
close
end
end
end

View file

@ -0,0 +1,51 @@
module ActionCable
module Connection
class MessageBuffer
def initialize(connection)
@connection = connection
@buffered_messages = []
end
def append(message)
if valid? message
if processing?
receive message
else
buffer message
end
else
connection.logger.error "Couldn't handle non-string message: #{message.class}"
end
end
def processing?
@processing
end
def process!
@processing = true
receive_buffered_messages
end
private
attr_reader :connection
attr_accessor :buffered_messages
def valid?(message)
message.is_a?(String)
end
def receive(message)
connection.send_async :receive, message
end
def buffer(message)
buffered_messages << message
end
def receive_buffered_messages
receive buffered_messages.shift until buffered_messages.empty?
end
end
end
end

View file

@ -0,0 +1,69 @@
module ActionCable
module Connection
class Subscriptions
def initialize(connection)
@connection = connection
@subscriptions = {}
end
def execute_command(data)
case data['command']
when 'subscribe' then add data
when 'unsubscribe' then remove data
when 'message' then perform_action data
else
logger.error "Received unrecognized command in #{data.inspect}"
end
rescue Exception => e
logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
def add(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.registered_channels.detect do |channel_klass|
channel_klass == id_options[:channel].safe_constantize
end
if subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
end
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
subscriptions[data['identifier']].perform_disconnection
subscriptions.delete(data['identifier'])
end
def perform_action(data)
find(data).perform_action ActiveSupport::JSON.decode(data['data'])
end
def identifiers
subscriptions.keys
end
def cleanup
subscriptions.each { |id, channel| channel.perform_disconnection }
end
private
attr_reader :connection, :subscriptions
delegate :logger, to: :connection
def find(data)
if subscription = subscriptions[data['identifier']]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
end
end
end
end
end

View file

@ -1,7 +1,9 @@
module ActionCable
module Connection
# Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten

View file

@ -0,0 +1,27 @@
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
class WebSocket
delegate :rack_response, :close, :on, to: :websocket
def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
def possible?
websocket
end
def alive?
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
end
def transmit(data)
websocket.send data
end
private
attr_reader :websocket
end
end
end

View file

@ -2,7 +2,7 @@ module ActionCable
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
include Connection::Identifier
include Connection::Identification, Connection::InternalChannel
def initialize(server, ids)
@server = server
@ -10,19 +10,16 @@ module ActionCable
end
def disconnect
message = { type: 'disconnect' }.to_json
redis.publish(internal_redis_channel, message)
server.broadcast_without_logging internal_redis_channel, type: 'disconnect'
end
def identifiers
@server.connection_identifiers
end
def redis
@server.threaded_redis
server.connection_identifiers
end
private
attr_reader :server
def set_identifier_instance_vars(ids)
raise InvalidIdentifiersError unless valid_identifiers?(ids)
ids.each { |k,v| instance_variable_set("@#{k}", v) }

View file

@ -1,75 +1,7 @@
module ActionCable
class Server
cattr_accessor(:logger, instance_reader: true) { Rails.logger }
attr_accessor :registered_channels, :redis_config, :log_tags
def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ])
@redis_config = redis_config.with_indifferent_access
@registered_channels = Set.new(channels)
@worker_pool_size = worker_pool_size
@connection_class = connection
@log_tags = log_tags
@connections = []
logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})"
end
def call(env)
@connection_class.new(self, env).process
end
def worker_pool
@worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size)
end
def pubsub
@pubsub ||= redis.pubsub
end
def redis
@redis ||= begin
redis = EM::Hiredis.connect(@redis_config[:url])
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close_connection
end
redis
end
end
def threaded_redis
@threaded_redis ||= Redis.new(redis_config)
end
def remote_connections
@remote_connections ||= RemoteConnections.new(self)
end
def broadcaster_for(channel)
Broadcaster.new(self, channel)
end
def broadcast(channel, message)
broadcaster_for(channel).broadcast(message)
end
def connection_identifiers
@connection_class.identifiers
end
def add_connection(connection)
@connections << connection
end
def remove_connection(connection)
@connections.delete connection
end
def open_connections_statistics
@connections.map(&:statistics)
end
module Server
autoload :Base, 'action_cable/server/base'
autoload :Broadcasting, 'action_cable/server/broadcasting'
autoload :Worker, 'action_cable/server/worker'
end
end

View file

@ -0,0 +1,67 @@
module ActionCable
module Server
class Base
include ActionCable::Server::Broadcasting
cattr_accessor(:logger, instance_reader: true) { Rails.logger }
attr_accessor :registered_channels, :redis_config, :log_tags
def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ])
@redis_config = redis_config.with_indifferent_access
@registered_channels = Set.new(channels)
@worker_pool_size = worker_pool_size
@connection_class = connection
@log_tags = log_tags
@connections = []
logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})"
end
def call(env)
@connection_class.new(self, env).process
end
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size)
end
def pubsub
@pubsub ||= redis.pubsub
end
def redis
@redis ||= begin
redis = EM::Hiredis.connect(@redis_config[:url])
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close
end
redis
end
end
def remote_connections
@remote_connections ||= RemoteConnections.new(self)
end
def connection_identifiers
@connection_class.identifiers
end
def add_connection(connection)
@connections << connection
end
def remove_connection(connection)
@connections.delete connection
end
def open_connections_statistics
@connections.map(&:statistics)
end
end
end
end

View file

@ -0,0 +1,39 @@
module ActionCable
module Server
module Broadcasting
def broadcast(broadcasting, message)
broadcaster_for(broadcasting).broadcast(message)
end
def broadcast_without_logging(broadcasting, message)
broadcaster_for(broadcasting).broadcast_without_logging(message)
end
def broadcaster_for(broadcasting)
Broadcaster.new(self, broadcasting)
end
def broadcasting_redis
@broadcasting_redis ||= Redis.new(redis_config)
end
private
class Broadcaster
attr_reader :server, :broadcasting
def initialize(server, broadcasting)
@server, @broadcasting = server, broadcasting
end
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
broadcast_without_logging(message)
end
def broadcast_without_logging(message)
server.broadcasting_redis.publish broadcasting, message.to_json
end
end
end
end
end

View file

@ -0,0 +1,32 @@
module ActionCable
module Server
class Worker
include ActiveSupport::Callbacks
include Celluloid
define_callbacks :work
def invoke(receiver, method, *args)
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
def run_periodic_timer(channel, callback)
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
private
def logger
ActionCable::Server::Base.logger
end
end
end
end

View file

@ -1,30 +0,0 @@
module ActionCable
class Worker
include ActiveSupport::Callbacks
include Celluloid
define_callbacks :work
def invoke(receiver, method, *args)
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
def run_periodic_timer(channel, callback)
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
private
def logger
ActionCable::Server.logger
end
end
end

View file

@ -1,124 +1,8 @@
#= require_self
#= require_tree .
#= require cable/consumer
class @Cable
MAX_CONNECTION_INTERVAL: 5 * 1000
PING_STALE_INTERVAL: 8
@Cable =
PING_IDENTIFIER: "_ping"
constructor: (@cableUrl) ->
@subscribers = {}
@resetPingTime()
@resetConnectionAttemptsCount()
@connect()
connect: ->
@connection = @createConnection()
createConnection: ->
connection = new WebSocket(@cableUrl)
connection.onmessage = @receiveData
connection.onopen = @connected
connection.onclose = @reconnect
connection.onerror = @reconnect
connection
isConnected: =>
@connection?.readyState is 1
sendData: (identifier, data) =>
if @isConnected()
@connection.send JSON.stringify { command: 'message', identifier: identifier, data: data }
receiveData: (message) =>
data = JSON.parse message.data
if data.identifier is '_ping'
@pingReceived(data.message)
else
@subscribers[data.identifier]?.onReceiveData(data.message)
connected: =>
@startWaitingForPing()
@resetConnectionAttemptsCount()
for identifier, callbacks of @subscribers
@subscribeOnServer(identifier)
callbacks['onConnect']?()
reconnect: =>
@removeExistingConnection()
@resetPingTime()
@disconnected()
setTimeout =>
@incrementConnectionAttemptsCount()
@connect()
, @generateReconnectInterval()
removeExistingConnection: =>
if @connection?
@clearPingWaitTimeout()
@connection.onclose = -> # no-op
@connection.onerror = -> # no-op
@connection.close()
@connection = null
resetConnectionAttemptsCount: =>
@connectionAttempts = 1
incrementConnectionAttemptsCount: =>
@connectionAttempts += 1
generateReconnectInterval: () ->
interval = (Math.pow(2, @connectionAttempts) - 1) * 1000
if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval
startWaitingForPing: =>
@clearPingWaitTimeout()
@waitForPingTimeout = setTimeout =>
console.log "Ping took too long to arrive. Reconnecting.."
@reconnect()
, @PING_STALE_INTERVAL * 1000
clearPingWaitTimeout: =>
clearTimeout(@waitForPingTimeout)
resetPingTime: =>
@lastPingTime = null
disconnected: =>
callbacks['onDisconnect']?() for identifier, callbacks of @subscribers
giveUp: =>
# Show an error message
subscribe: (identifier, callbacks) =>
@subscribers[identifier] = callbacks
if @isConnected()
@subscribeOnServer(identifier)
@subscribers[identifier]['onConnect']?()
unsubscribe: (identifier) =>
@unsubscribeOnServer(identifier, 'unsubscribe')
delete @subscribers[identifier]
subscribeOnServer: (identifier) =>
if @isConnected()
@connection.send JSON.stringify { command: 'subscribe', identifier: identifier }
unsubscribeOnServer: (identifier) =>
if @isConnected()
@connection.send JSON.stringify { command: 'unsubscribe', identifier: identifier }
pingReceived: (timestamp) =>
if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL
console.log "Websocket connection is stale. Reconnecting.."
@reconnect()
else
@startWaitingForPing()
@lastPingTime = timestamp
createConsumer: (url) ->
new Cable.Consumer url

View file

@ -0,0 +1,53 @@
class Cable.Connection
constructor: (@consumer) ->
@open()
send: (data) ->
if @isOpen()
@websocket.send(JSON.stringify(data))
true
else
false
open: =>
@websocket = new WebSocket(@consumer.url)
@websocket.onmessage = @onMessage
@websocket.onopen = @onOpen
@websocket.onclose = @onClose
@websocket.onerror = @onError
close: ->
@websocket.close() unless @isClosed()
reopen: ->
if @isClosed()
@open()
else
@websocket.onclose = @open
@websocket.onerror = @open
@websocket.close()
isOpen: ->
@websocket.readyState is WebSocket.OPEN
isClosed: ->
@websocket.readyState in [ WebSocket.CLOSED, WebSocket.CLOSING ]
onMessage: (message) =>
data = JSON.parse message.data
@consumer.subscribers.notify(data.identifier, "received", data.message)
onOpen: =>
@consumer.subscribers.reload()
onClose: =>
@disconnect()
onError: =>
@disconnect()
@websocket.onclose = -> # no-op
@websocket.onerror = -> # no-op
try @close()
disconnect: ->
@consumer.subscribers.notifyAll("disconnected")

View file

@ -0,0 +1,65 @@
class Cable.ConnectionMonitor
identifier: Cable.PING_IDENTIFIER
pollInterval:
min: 2
max: 30
staleThreshold:
startedAt: 4
pingedAt: 8
constructor: (@consumer) ->
@consumer.subscribers.add(this)
@start()
connected: ->
@reset()
@pingedAt = now()
received: ->
@pingedAt = now()
reset: ->
@reconnectAttempts = 0
start: ->
@reset()
delete @stoppedAt
@startedAt = now()
@poll()
stop: ->
@stoppedAt = now()
poll: ->
setTimeout =>
unless @stoppedAt
@reconnectIfStale()
@poll()
, @getInterval()
getInterval: ->
{min, max} = @pollInterval
interval = 4 * Math.log(@reconnectAttempts + 1)
clamp(interval, min, max) * 1000
reconnectIfStale: ->
if @connectionIsStale()
@reconnectAttempts += 1
@consumer.connection.reopen()
connectionIsStale: ->
if @pingedAt
secondsSince(@pingedAt) > @staleThreshold.pingedAt
else
secondsSince(@startedAt) > @staleThreshold.startedAt
now = ->
new Date().getTime()
secondsSince = (time) ->
(now() - time) / 1000
clamp = (number, min, max) ->
Math.max(min, Math.min(max, number))

View file

@ -0,0 +1,18 @@
#= require cable/connection
#= require cable/connection_monitor
#= require cable/subscription
#= require cable/subscriber_manager
class Cable.Consumer
constructor: (@url) ->
@subscribers = new Cable.SubscriberManager this
@connection = new Cable.Connection this
@connectionMonitor = new Cable.ConnectionMonitor this
createSubscription: (channelName, mixin) ->
channel = channelName
params = if typeof channel is "object" then channel else {channel}
new Cable.Subscription this, params, mixin
send: (data) ->
@connection.send(data)

View file

@ -0,0 +1,38 @@
class Cable.SubscriberManager
constructor: (@consumer) ->
@subscribers = []
add: (subscriber) ->
@subscribers.push(subscriber)
@notify(subscriber, "initialized")
if @sendCommand(subscriber, "subscribe")
@notify(subscriber, "connected")
reload: ->
for subscriber in @subscribers
if @sendCommand(subscriber, "subscribe")
@notify(subscriber, "connected")
remove: (subscriber) ->
@sendCommand(subscriber, "unsubscribe")
@subscribers = (s for s in @subscribers when s isnt subscriber)
notifyAll: (callbackName, args...) ->
for subscriber in @subscribers
@notify(subscriber, callbackName, args...)
notify: (subscriber, callbackName, args...) ->
if typeof subscriber is "string"
subscribers = (s for s in @subscribers when s.identifier is subscriber)
else
subscribers = [subscriber]
for subscriber in subscribers
subscriber[callbackName]?(args...)
sendCommand: (subscriber, command) ->
{identifier} = subscriber
if identifier is Cable.PING_IDENTIFIER
@consumer.connection.isOpen()
else
@consumer.send({command, identifier})

View file

@ -0,0 +1,22 @@
class Cable.Subscription
constructor: (@consumer, params = {}, mixin) ->
@identifier = JSON.stringify(params)
extend(this, mixin)
@consumer.subscribers.add(this)
# Perform a channel action with the optional data passed as an attribute
perform: (action, data = {}) ->
data.action = action
@send(data)
send: (data) ->
@consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data))
unsubscribe: ->
@consumer.subscribers.remove(this)
extend = (object, properties) ->
if properties?
for key, value of properties
object[key] = value
object

View file

@ -1,34 +0,0 @@
class @Cable.Channel
constructor: (params = {}) ->
@channelName ?= "#{@underscore(@constructor.name)}_channel"
params['channel'] = @channelName
@channelIdentifier = JSON.stringify params
cable.subscribe(@channelIdentifier, {
onConnect: @connected
onDisconnect: @disconnected
onReceiveData: @received
})
connected: =>
# Override in the subclass
disconnected: =>
# Override in the subclass
received: (data) =>
# Override in the subclass
# Perform a channel action with the optional data passed as an attribute
perform: (action, data = {}) ->
data.action = action
cable.sendData @channelIdentifier, JSON.stringify data
send: (data) ->
cable.sendData @channelIdentifier, JSON.stringify data
underscore: (value) ->
value.replace(/[A-Z]/g, (match) => "_#{match.toLowerCase()}").substr(1)

View file

@ -8,7 +8,7 @@ class ChannelTest < ActionCableTest
end
end
class PingServer < ActionCable::Server
class PingServer < ActionCable::Server::Base
register_channels PingChannel
end

View file

@ -8,7 +8,7 @@ class ServerTest < ActionCableTest
end
end
class ChatServer < ActionCable::Server
class ChatServer < ActionCable::Server::Base
register_channels ChatChannel
end