1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Initial stab at adding Action Cable to rails/master

This commit is contained in:
David Heinemeier Hansson 2015-12-14 16:38:37 +01:00
commit 760de782f7
64 changed files with 3547 additions and 1 deletions

11
Gemfile
View file

@ -64,6 +64,17 @@ group :job do
gem 'sequel', require: false
end
# Action Cable
group :cable do
gem 'faye-websocket', '~> 0.10.0', require: false
gem 'websocket-driver', '~> 0.6.1', require: false
gem 'celluloid', '~> 0.17.2', require: false
gem 'em-hiredis', '~> 0.3.0', require: false
gem 'redis', '~> 3.0', require: false
gem 'puma', require: false
end
# Add your own local bundler stuff.
local_gemfile = File.dirname(__FILE__) + "/.Gemfile"
instance_eval File.read local_gemfile if File.exist? local_gemfile

View file

@ -107,6 +107,14 @@ GIT
PATH
remote: .
specs:
actioncable (5.0.0.alpha)
actionpack (= 5.0.0.alpha)
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0)
em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0)
redis (~> 3.0)
websocket-driver (~> 0.6.1)
actionmailer (5.0.0.alpha)
actionpack (= 5.0.0.alpha)
actionview (= 5.0.0.alpha)
@ -144,6 +152,7 @@ PATH
minitest (~> 5.1)
tzinfo (~> 1.1)
rails (5.0.0.alpha)
actioncable (= 5.0.0.alpha)
actionmailer (= 5.0.0.alpha)
actionpack (= 5.0.0.alpha)
actionview (= 5.0.0.alpha)
@ -210,11 +219,19 @@ GEM
delayed_job_active_record (4.1.0)
activerecord (>= 3.0, < 5)
delayed_job (>= 3.0, < 5)
em-hiredis (0.3.0)
eventmachine (~> 1.0)
hiredis (~> 0.5.0)
erubis (2.7.0)
eventmachine (1.0.8)
execjs (2.6.0)
faye-websocket (0.10.2)
eventmachine (>= 0.12.0)
websocket-driver (>= 0.5.1)
ffi (1.9.10)
ffi (1.9.10-x64-mingw32)
ffi (1.9.10-x86-mingw32)
hiredis (0.5.2)
hitimes (1.2.3)
hitimes (1.2.3-x86-mingw32)
i18n (0.7.0)
@ -241,8 +258,13 @@ GEM
mysql2 (0.4.1)
nokogiri (1.6.7)
mini_portile2 (~> 2.0.0.rc2)
nokogiri (1.6.7-x64-mingw32)
mini_portile2 (~> 2.0.0.rc2)
nokogiri (1.6.7-x86-mingw32)
mini_portile2 (~> 2.0.0.rc2)
pg (0.18.3)
psych (2.0.15)
puma (2.15.3)
que (0.11.2)
racc (1.4.13)
rack-cache (1.5.0)
@ -319,6 +341,9 @@ GEM
w3c_validators (1.2)
json
nokogiri
websocket-driver (0.6.3)
websocket-extensions (>= 0.1.0)
websocket-extensions (0.1.2)
PLATFORMS
ruby
@ -334,10 +359,13 @@ DEPENDENCIES
bcrypt (~> 3.1.10)
benchmark-ips
byebug
celluloid (~> 0.17.2)
coffee-rails (~> 4.1.0)
dalli (>= 2.2.1)
delayed_job
delayed_job_active_record
em-hiredis (~> 0.3.0)
faye-websocket (~> 0.10.0)
globalid!
jquery-rails!
json
@ -351,6 +379,7 @@ DEPENDENCIES
nokogiri (>= 1.6.7)
pg (>= 0.18.0)
psych (~> 2.0)
puma
qu-rails!
qu-redis
que
@ -361,6 +390,7 @@ DEPENDENCIES
rails!
rake (>= 10.3)
redcarpet (~> 3.2.3)
redis (~> 3.0)
resque
resque-scheduler
sass!
@ -378,6 +408,7 @@ DEPENDENCIES
tzinfo-data
uglifier (>= 1.3.0)
w3c_validators
websocket-driver (~> 0.6.1)
BUNDLED WITH
1.10.6
1.11.0

3
actioncable/CHANGELOG.md Normal file
View file

@ -0,0 +1,3 @@
* Added to Rails!
*DHH*

20
actioncable/MIT-LICENSE Normal file
View file

@ -0,0 +1,20 @@
Copyright (c) 2015 Basecamp, LLC
Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:
The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.

485
actioncable/README.md Normal file
View file

@ -0,0 +1,485 @@
# Action Cable Integrated WebSockets for Rails
[![Build Status](https://travis-ci.org/rails/actioncable.svg)](https://travis-ci.org/rails/actioncable)
Action Cable seamlessly integrates WebSockets with the rest of your Rails application.
It allows for real-time features to be written in Ruby in the same style
and form as the rest of your Rails application, while still being performant
and scalable. It's a full-stack offering that provides both a client-side
JavaScript framework and a server-side Ruby framework. You have access to your full
domain model written with ActiveRecord or your ORM of choice.
## Terminology
A single Action Cable server can handle multiple connection instances. It has one
connection instance per WebSocket connection. A single user may have multiple
WebSockets open to your application if they use multiple browser tabs or devices.
The client of a WebSocket connection is called the consumer.
Each consumer can in turn subscribe to multiple cable channels. Each channel encapsulates
a logical unit of work, similar to what a controller does in a regular MVC setup. For example, you could have a `ChatChannel` and a `AppearancesChannel`, and a consumer could be subscribed to either
or to both of these channels. At the very least, a consumer should be subscribed to one channel.
When the consumer is subscribed to a channel, they act as a subscriber. The connection between
the subscriber and the channel is, surprise-surprise, called a subscription. A consumer
can act as a subscriber to a given channel any number of times. For example, a consumer
could subscribe to multiple chat rooms at the same time. (And remember that a physical user may
have multiple consumers, one per tab/device open to your connection).
Each channel can then again be streaming zero or more broadcastings. A broadcasting is a
pubsub link where anything transmitted by the broadcaster is sent directly to the channel
subscribers who are streaming that named broadcasting.
As you can see, this is a fairly deep architectural stack. There's a lot of new terminology
to identify the new pieces, and on top of that, you're dealing with both client and server side
reflections of each unit.
## Examples
### A full-stack example
The first thing you must do is define your `ApplicationCable::Connection` class in Ruby. This
is the place where you authorize the incoming connection, and proceed to establish it
if all is well. Here's the simplest example starting with the server-side connection class:
```ruby
# app/channels/application_cable/connection.rb
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :current_user
def connect
self.current_user = find_verified_user
end
protected
def find_verified_user
if current_user = User.find_by(id: cookies.signed[:user_id])
current_user
else
reject_unauthorized_connection
end
end
end
end
```
Here `identified_by` is a connection identifier that can be used to find the specific connection again or later.
Note that anything marked as an identifier will automatically create a delegate by the same name on any channel instances created off the connection.
Then you should define your `ApplicationCable::Channel` class in Ruby. This is the place where you put
shared logic between your channels.
```ruby
# app/channels/application_cable/channel.rb
module ApplicationCable
class Channel < ActionCable::Channel::Base
end
end
```
This relies on the fact that you will already have handled authentication of the user, and
that a successful authentication sets a signed cookie with the `user_id`. This cookie is then
automatically sent to the connection instance when a new connection is attempted, and you
use that to set the `current_user`. By identifying the connection by this same current_user,
you're also ensuring that you can later retrieve all open connections by a given user (and
potentially disconnect them all if the user is deleted or deauthorized).
The client-side needs to setup a consumer instance of this connection. That's done like so:
```coffeescript
# app/assets/javascripts/application_cable.coffee
#= require cable
@App = {}
App.cable = Cable.createConsumer "ws://cable.example.com"
```
The ws://cable.example.com address must point to your set of Action Cable servers, and it
must share a cookie namespace with the rest of the application (which may live under http://example.com).
This ensures that the signed cookie will be correctly sent.
That's all you need to establish the connection! But of course, this isn't very useful in
itself. This just gives you the plumbing. To make stuff happen, you need content. That content
is defined by declaring channels on the server and allowing the consumer to subscribe to them.
### Channel example 1: User appearances
Here's a simple example of a channel that tracks whether a user is online or not and what page they're on.
(This is useful for creating presence features like showing a green dot next to a user name if they're online).
First you declare the server-side channel:
```ruby
# app/channels/appearance_channel.rb
class AppearanceChannel < ApplicationCable::Channel
def subscribed
current_user.appear
end
def unsubscribed
current_user.disappear
end
def appear(data)
current_user.appear on: data['appearing_on']
end
def away
current_user.away
end
end
```
The `#subscribed` callback is invoked when, as we'll show below, a client-side subscription is initiated. In this case,
we take that opportunity to say "the current user has indeed appeared". That appear/disappear API could be backed by
Redis or a database or whatever else. Here's what the client-side of that looks like:
```coffeescript
# app/assets/javascripts/cable/subscriptions/appearance.coffee
App.cable.subscriptions.create "AppearanceChannel",
# Called when the subscription is ready for use on the server
connected: ->
@install()
@appear()
# Called when the WebSocket connection is closed
disconnected: ->
@uninstall()
# Called when the subscription is rejected by the server
rejected: ->
@uninstall()
appear: ->
# Calls `AppearanceChannel#appear(data)` on the server
@perform("appear", appearing_on: $("main").data("appearing-on"))
away: ->
# Calls `AppearanceChannel#away` on the server
@perform("away")
buttonSelector = "[data-behavior~=appear_away]"
install: ->
$(document).on "page:change.appearance", =>
@appear()
$(document).on "click.appearance", buttonSelector, =>
@away()
false
$(buttonSelector).show()
uninstall: ->
$(document).off(".appearance")
$(buttonSelector).hide()
```
Simply calling `App.cable.subscriptions.create` will setup the subscription, which will call `AppearanceChannel#subscribed`,
which in turn is linked to original `App.cable` -> `ApplicationCable::Connection` instances.
We then link the client-side `appear` method to `AppearanceChannel#appear(data)`. This is possible because the server-side
channel instance will automatically expose the public methods declared on the class (minus the callbacks), so that these
can be reached as remote procedure calls via a subscription's `perform` method.
### Channel example 2: Receiving new web notifications
The appearance example was all about exposing server functionality to client-side invocation over the WebSocket connection.
But the great thing about WebSockets is that it's a two-way street. So now let's show an example where the server invokes
action on the client.
This is a web notification channel that allows you to trigger client-side web notifications when you broadcast to the right
streams:
```ruby
# app/channels/web_notifications_channel.rb
class WebNotificationsChannel < ApplicationCable::Channel
def subscribed
stream_from "web_notifications_#{current_user.id}"
end
end
```
```coffeescript
# Client-side which assumes you've already requested the right to send web notifications
App.cable.subscriptions.create "WebNotificationsChannel",
received: (data) ->
new Notification data["title"], body: data["body"]
```
```ruby
# Somewhere in your app this is called, perhaps from a NewCommentJob
ActionCable.server.broadcast \
"web_notifications_#{current_user.id}", { title: 'New things!', body: 'All the news that is fit to print' }
```
The `ActionCable.server.broadcast` call places a message in the Redis' pubsub queue under a separate broadcasting name for each user. For a user with an ID of 1, the broadcasting name would be `web_notifications_1`.
The channel has been instructed to stream everything that arrives at `web_notifications_1` directly to the client by invoking the
`#received(data)` callback. The data is the hash sent as the second parameter to the server-side broadcast call, JSON encoded for the trip
across the wire, and unpacked for the data argument arriving to `#received`.
### Passing Parameters to Channel
You can pass parameters from the client side to the server side when creating a subscription. For example:
```ruby
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room]}"
end
end
```
Pass an object as the first argument to `subscriptions.create`, and that object will become your params hash in your cable channel. The keyword `channel` is required.
```coffeescript
# Client-side which assumes you've already requested the right to send web notifications
App.cable.subscriptions.create { channel: "ChatChannel", room: "Best Room" },
received: (data) ->
@appendLine(data)
appendLine: (data) ->
html = @createLine(data)
$("[data-chat-room='Best Room']").append(html)
createLine: (data) ->
"""
<article class="chat-line">
<span class="speaker">#{data["sent_by"]}</span>
<span class="body">#{data["body"]}</span>
</article>
"""
```
```ruby
# Somewhere in your app this is called, perhaps from a NewCommentJob
ActionCable.server.broadcast \
"chat_#{room}", { sent_by: 'Paul', body: 'This is a cool chat app.' }
```
### Rebroadcasting message
A common use case is to rebroadcast a message sent by one client to any other connected clients.
```ruby
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room]}"
end
def receive(data)
ActionCable.server.broadcast "chat_#{params[:room]}", data
end
end
```
```coffeescript
# Client-side which assumes you've already requested the right to send web notifications
App.chatChannel = App.cable.subscriptions.create { channel: "ChatChannel", room: "Best Room" },
received: (data) ->
# data => { sent_by: "Paul", body: "This is a cool chat app." }
App.chatChannel.send({ sent_by: "Paul", body: "This is a cool chat app." })
```
The rebroadcast will be received by all connected clients, _including_ the client that sent the message. Note that params are the same as they were when you subscribed to the channel.
### More complete examples
See the [rails/actioncable-examples](http://github.com/rails/actioncable-examples) repository for a full example of how to setup Action Cable in a Rails app and adding channels.
## Configuration
Action Cable has two required configurations: the Redis connection and specifying allowed request origins.
### Redis
By default, `ActionCable::Server::Base` will look for a configuration file in `Rails.root.join('config/redis/cable.yml')`. The file must follow the following format:
```yaml
production: &production
:url: redis://10.10.3.153:6381
:host: 10.10.3.153
:port: 6381
:timeout: 1
development: &development
:url: redis://localhost:6379
:host: localhost
:port: 6379
:timeout: 1
:inline: true
test: *development
```
This format allows you to specify one configuration per Rails environment. You can also change the location of the Redis config file in
a Rails initializer with something like:
```ruby
ActionCable.server.config.redis_path = Rails.root('somewhere/else/cable.yml')
```
### Allowed Request Origins
Action Cable will only accept requests from specified origins, which are passed to the server config as an array. The origins can be instances of strings or regular expressions, against which a check for match will be performed.
```ruby
ActionCable.server.config.allowed_request_origins = ['http://rubyonrails.com', /http:\/\/ruby.*/]
```
To disable and allow requests from any origin:
```ruby
ActionCable.server.config.disable_request_forgery_protection = true
```
By default, Action Cable allows all requests from localhost:3000 when running in the development environment.
### Other Configurations
The other common option to configure is the log tags applied to the per-connection logger. Here's close to what we're using in Basecamp:
```ruby
ActionCable.server.config.log_tags = [
-> request { request.env['bc.account_id'] || "no-account" },
:action_cable,
-> request { request.uuid }
]
```
Your websocket url might change between environments. If you host your production server via https, you will need to use the wss scheme
for your ActionCable server, but development might remain http and use the ws scheme. You might use localhost in development and your
domain in production. In any case, to vary the websocket url between environments, add the following configuration to each environment:
```ruby
config.action_cable.url = "ws://example.com:28080"
```
Then add the following line to your layout before your JavaScript tag:
```erb
<%= action_cable_meta_tag %>
```
And finally, create your consumer like so:
```coffeescript
App.cable = Cable.createConsumer()
```
For a full list of all configuration options, see the `ActionCable::Server::Configuration` class.
Also note that your server must provide at least the same number of database connections as you have workers. The default worker pool is set to 100, so that means you have to make at least that available. You can change that in `config/database.yml` through the `pool` attribute.
## Running the cable server
### Standalone
The cable server(s) is separated from your normal application server. It's still a rack application, but it is its own rack
application. The recommended basic setup is as follows:
```ruby
# cable/config.ru
require ::File.expand_path('../../config/environment', __FILE__)
Rails.application.eager_load!
require 'action_cable/process/logging'
run ActionCable.server
```
Then you start the server using a binstub in bin/cable ala:
```
#!/bin/bash
bundle exec puma -p 28080 cable/config.ru
```
The above will start a cable server on port 28080. Remember to point your client-side setup against that using something like:
`App.cable.createConsumer('ws://basecamp.dev:28080')`.
### In app
If you are using a threaded server like Puma or Thin, the current implementation of ActionCable can run side-along with your Rails application. For example, to listen for WebSocket requests on `/websocket`, match requests on that path:
```ruby
# config/routes.rb
Example::Application.routes.draw do
match "/websocket", :to => ActionCable.server, via: [:get, :post]
end
```
You can use `App.cable.createConsumer('ws://' + window.location.host + '/websocket')` to connect to the cable server.
For every instance of your server you create and for every worker your server spawns, you will also have a new instance of ActionCable, but the use of Redis keeps messages synced across connections.
### Notes
Beware that currently the cable server will _not_ auto-reload any changes in the framework. As we've discussed, long-running cable connections mean long-running objects. We don't yet have a way of reloading the classes of those objects in a safe manner. So when you change your channels, or the model your channels use, you must restart the cable server.
We'll get all this abstracted properly when the framework is integrated into Rails.
The WebSocket server doesn't have access to the session, but it has access to the cookies. This can be used when you need to handle authentication. You can see one way of doing that with Devise in this [article](http://www.rubytutorial.io/actioncable-devise-authentication).
## Dependencies
Action Cable is currently tied to Redis through its use of the pubsub feature to route
messages back and forth over the WebSocket cable connection. This dependency may well
be alleviated in the future, but for the moment that's what it is. So be sure to have
Redis installed and running.
The Ruby side of things is built on top of [faye-websocket](https://github.com/faye/faye-websocket-ruby) and [celluloid](https://github.com/celluloid/celluloid).
## Deployment
Action Cable is powered by a combination of EventMachine and threads. The
framework plumbing needed for connection handling is handled in the
EventMachine loop, but the actual channel, user-specified, work is handled
in a normal Ruby thread. This means you can use all your regular Rails models
with no problem, as long as you haven't committed any thread-safety sins.
But this also means that Action Cable needs to run in its own server process.
So you'll have one set of server processes for your normal web work, and another
set of server processes for the Action Cable. The former can be single-threaded,
like Unicorn, but the latter must be multi-threaded, like Puma.
## Alpha disclaimer
Action Cable is currently considered alpha software. The API is almost guaranteed to change between
now and its first production release as part of Rails 5.0. Real applications using the framework
are all well underway, but as of July 8th, 2015, there are no deployments in the wild yet.
So this current release, which resides in rails/actioncable, is primarily intended for
the adventurous kind, who do not mind reading the full source code of the framework. And it
serves as an invitation for all those crafty folks to contribute to and test what we have so far,
in advance of that general production release.
Action Cable will move from rails/actioncable to rails/rails and become a full-fledged default
framework alongside Action Pack, Active Record, and the like once we cross the bridge from alpha
to beta software (which will happen once the API and missing pieces have solidified).
Finally, note that testing is a unfinished/unstarted area of this framework. The framework
has been developed in-app up until this point. We need to find a good way to allow the user to test
their connection and channel logic.
## License
Action Cable is released under the MIT license:
* http://www.opensource.org/licenses/MIT
## Support
Bug reports can be filed for the alpha development project here:
* https://github.com/rails/actioncable/issues

13
actioncable/Rakefile Normal file
View file

@ -0,0 +1,13 @@
require 'rake/testtask'
dir = File.dirname(__FILE__)
task :default => :test
Rake::TestTask.new do |t|
t.libs << "test"
t.test_files = Dir.glob("#{dir}/test/**/*_test.rb")
t.warning = true
t.verbose = true
t.ruby_opts = ["--dev"] if defined?(JRUBY_VERSION)
end

View file

@ -0,0 +1,32 @@
version = File.read(File.expand_path('../../RAILS_VERSION', __FILE__)).strip
Gem::Specification.new do |s|
s.platform = Gem::Platform::RUBY
s.name = 'actioncable'
s.version = version
s.summary = 'WebSocket framework for Rails.'
s.description = 'Structure many real-time application concerns into channels over a single WebSocket connection.'
s.required_ruby_version = '>= 2.2.2'
s.license = 'MIT'
s.author = ['Pratik Naik', 'David Heinemeier Hansson']
s.email = ['pratiknaik@gmail.com', 'david@heinemeierhansson.com']
s.homepage = 'http://rubyonrails.org'
s.files = Dir['CHANGELOG.md', 'MIT-LICENSE', 'README.md', 'lib/**/*']
s.require_path = 'lib'
s.add_dependency 'actionpack', version
s.add_dependency 'coffee-rails', '~> 4.1.0'
s.add_dependency 'faye-websocket', '~> 0.10.0'
s.add_dependency 'websocket-driver', '~> 0.6.1'
s.add_dependency 'celluloid', '~> 0.17.2'
s.add_dependency 'em-hiredis', '~> 0.3.0'
s.add_dependency 'redis', '~> 3.0'
s.add_development_dependency 'puma'
s.add_development_dependency 'mocha'
end

View file

@ -0,0 +1,50 @@
#--
# Copyright (c) 2015 Basecamp, LLC
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#++
require 'active_support'
require 'active_support/rails'
require 'action_cable/version'
module ActionCable
extend ActiveSupport::Autoload
INTERNAL = {
identifiers: {
ping: '_ping'.freeze
},
message_types: {
confirmation: 'confirm_subscription'.freeze,
rejection: 'reject_subscription'.freeze
}
}
# Singleton instance of the server
module_function def server
@server ||= ActionCable::Server::Base.new
end
autoload :Server
autoload :Connection
autoload :Channel
autoload :RemoteConnections
end

View file

@ -0,0 +1,14 @@
module ActionCable
module Channel
extend ActiveSupport::Autoload
eager_autoload do
autoload :Base
autoload :Broadcasting
autoload :Callbacks
autoload :Naming
autoload :PeriodicTimers
autoload :Streams
end
end
end

View file

@ -0,0 +1,274 @@
require 'set'
module ActionCable
module Channel
# The channel provides the basic structure of grouping behavior into logical units when communicating over the WebSocket connection.
# You can think of a channel like a form of controller, but one that's capable of pushing content to the subscriber in addition to simply
# responding to the subscriber's direct requests.
#
# Channel instances are long-lived. A channel object will be instantiated when the cable consumer becomes a subscriber, and then
# lives until the consumer disconnects. This may be seconds, minutes, hours, or even days. That means you have to take special care
# not to do anything silly in a channel that would balloon its memory footprint or whatever. The references are forever, so they won't be released
# as is normally the case with a controller instance that gets thrown away after every request.
#
# Long-lived channels (and connections) also mean you're responsible for ensuring that the data is fresh. If you hold a reference to a user
# record, but the name is changed while that reference is held, you may be sending stale data if you don't take precautions to avoid it.
#
# The upside of long-lived channel instances is that you can use instance variables to keep reference to objects that future subscriber requests
# can interact with. Here's a quick example:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# end
#
# def speak(data)
# @room.speak data, user: current_user
# end
# end
#
# The #speak action simply uses the Chat::Room object that was created when the channel was first subscribed to by the consumer when that
# subscriber wants to say something in the room.
#
# == Action processing
#
# Unlike Action Controllers, channels do not follow a REST constraint form for its actions. It's an remote-procedure call model. You can
# declare any public method on the channel (optionally taking a data argument), and this method is automatically exposed as callable to the client.
#
# Example:
#
# class AppearanceChannel < ApplicationCable::Channel
# def subscribed
# @connection_token = generate_connection_token
# end
#
# def unsubscribed
# current_user.disappear @connection_token
# end
#
# def appear(data)
# current_user.appear @connection_token, on: data['appearing_on']
# end
#
# def away
# current_user.away @connection_token
# end
#
# private
# def generate_connection_token
# SecureRandom.hex(36)
# end
# end
#
# In this example, subscribed/unsubscribed are not callable methods, as they were already declared in ActionCable::Channel::Base, but #appear/away
# are. #generate_connection_token is also not callable as its a private method. You'll see that appear accepts a data parameter, which it then
# uses as part of its model call. #away does not, it's simply a trigger action.
#
# Also note that in this example, current_user is available because it was marked as an identifying attribute on the connection.
# All such identifiers will automatically create a delegation method of the same name on the channel instance.
#
# == Rejecting subscription requests
#
# A channel can reject a subscription request in the #subscribed callback by invoking #reject!
#
# Example:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
# reject unless current_user.can_access?(@room)
# end
# end
#
# In this example, the subscription will be rejected if the current_user does not have access to the chat room.
# On the client-side, Channel#rejected callback will get invoked when the server rejects the subscription request.
class Base
include Callbacks
include PeriodicTimers
include Streams
include Naming
include Broadcasting
attr_reader :params, :connection, :identifier
delegate :logger, to: :connection
class << self
# A list of method names that should be considered actions. This
# includes all public instance methods on a channel, less
# any internal methods (defined on Base), adding back in
# any methods that are internal, but still exist on the class
# itself.
#
# ==== Returns
# * <tt>Set</tt> - A set of all methods that should be considered actions.
def action_methods
@action_methods ||= begin
# All public instance methods of this class, including ancestors
methods = (public_instance_methods(true) -
# Except for public instance methods of Base and its ancestors
ActionCable::Channel::Base.public_instance_methods(true) +
# Be sure to include shadowed public instance methods of this class
public_instance_methods(false)).uniq.map(&:to_s)
methods.to_set
end
end
protected
# action_methods are cached and there is sometimes need to refresh
# them. ::clear_action_methods! allows you to do that, so next time
# you run action_methods, they will be recalculated
def clear_action_methods!
@action_methods = nil
end
# Refresh the cached action_methods when a new action_method is added.
def method_added(name)
super
clear_action_methods!
end
end
def initialize(connection, identifier, params = {})
@connection = connection
@identifier = identifier
@params = params
# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false
delegate_connection_identifiers
subscribe_to_channel
end
# Extract the action name from the passed data and process it via the channel. The process will ensure
# that the action requested is a public method on the channel declared by the user (so not one of the callbacks
# like #subscribed).
def perform_action(data)
action = extract_action(data)
if processable_action?(action)
dispatch_action(action, data)
else
logger.error "Unable to process #{action_signature(action, data)}"
end
end
# Called by the cable connection when its cut so the channel has a chance to cleanup with callbacks.
# This method is not intended to be called directly by the user. Instead, overwrite the #unsubscribed callback.
def unsubscribe_from_channel
run_callbacks :unsubscribe do
unsubscribed
end
end
protected
# Called once a consumer has become a subscriber of the channel. Usually the place to setup any streams
# you want this channel to be sending to the subscriber.
def subscribed
# Override in subclasses
end
# Called once a consumer has cut its cable connection. Can be used for cleaning up connections or marking
# people as offline or the like.
def unsubscribed
# Override in subclasses
end
# Transmit a hash of data to the subscriber. The hash will automatically be wrapped in a JSON envelope with
# the proper channel identifier marked as the recipient.
def transmit(data, via: nil)
logger.info "#{self.class.name} transmitting #{data.inspect}".tap { |m| m << " (via #{via})" if via }
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, message: data)
end
def defer_subscription_confirmation!
@defer_subscription_confirmation = true
end
def defer_subscription_confirmation?
@defer_subscription_confirmation
end
def subscription_confirmation_sent?
@subscription_confirmation_sent
end
def reject
@reject_subscription = true
end
def subscription_rejected?
@reject_subscription
end
private
def delegate_connection_identifiers
connection.identifiers.each do |identifier|
define_singleton_method(identifier) do
connection.send(identifier)
end
end
end
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
end
if subscription_rejected?
reject_subscription
else
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
end
def extract_action(data)
(data['action'].presence || :receive).to_sym
end
def processable_action?(action)
self.class.action_methods.include?(action.to_s)
end
def dispatch_action(action, data)
logger.info action_signature(action, data)
if method(action).arity == 1
public_send action, data
else
public_send action
end
end
def action_signature(action, data)
"#{self.class.name}##{action}".tap do |signature|
if (arguments = data.except('action')).any?
signature << "(#{arguments.inspect})"
end
end
end
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
@subscription_confirmation_sent = true
end
end
def reject_subscription
connection.subscriptions.remove_subscription self
transmit_subscription_rejection
end
def transmit_subscription_rejection
logger.info "#{self.class.name} is transmitting the subscription rejection"
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:rejection])
end
end
end
end

View file

@ -0,0 +1,29 @@
require 'active_support/core_ext/object/to_param'
module ActionCable
module Channel
module Broadcasting
extend ActiveSupport::Concern
delegate :broadcasting_for, to: :class
class_methods do
# Broadcast a hash to a unique broadcasting for this <tt>model</tt> in this channel.
def broadcast_to(model, message)
ActionCable.server.broadcast(broadcasting_for([ channel_name, model ]), message)
end
def broadcasting_for(model) #:nodoc:
case
when model.is_a?(Array)
model.map { |m| broadcasting_for(m) }.join(':')
when model.respond_to?(:to_gid_param)
model.to_gid_param
else
model.to_param
end
end
end
end
end
end

View file

@ -0,0 +1,35 @@
require 'active_support/callbacks'
module ActionCable
module Channel
module Callbacks
extend ActiveSupport::Concern
include ActiveSupport::Callbacks
included do
define_callbacks :subscribe
define_callbacks :unsubscribe
end
class_methods do
def before_subscribe(*methods, &block)
set_callback(:subscribe, :before, *methods, &block)
end
def after_subscribe(*methods, &block)
set_callback(:subscribe, :after, *methods, &block)
end
alias_method :on_subscribe, :after_subscribe
def before_unsubscribe(*methods, &block)
set_callback(:unsubscribe, :before, *methods, &block)
end
def after_unsubscribe(*methods, &block)
set_callback(:unsubscribe, :after, *methods, &block)
end
alias_method :on_unsubscribe, :after_unsubscribe
end
end
end
end

View file

@ -0,0 +1,22 @@
module ActionCable
module Channel
module Naming
extend ActiveSupport::Concern
class_methods do
# Returns the name of the channel, underscored, without the <tt>Channel</tt> ending.
# If the channel is in a namespace, then the namespaces are represented by single
# colon separators in the channel name.
#
# ChatChannel.channel_name # => 'chat'
# Chats::AppearancesChannel.channel_name # => 'chats:appearances'
def channel_name
@channel_name ||= name.sub(/Channel$/, '').gsub('::',':').underscore
end
end
# Delegates to the class' <tt>channel_name</tt>
delegate :channel_name, to: :class
end
end
end

View file

@ -0,0 +1,41 @@
module ActionCable
module Channel
module PeriodicTimers
extend ActiveSupport::Concern
included do
class_attribute :periodic_timers, instance_reader: false
self.periodic_timers = []
after_subscribe :start_periodic_timers
after_unsubscribe :stop_periodic_timers
end
module ClassMethods
# Allow you to call a private method <tt>every</tt> so often seconds. This periodic timer can be useful
# for sending a steady flow of updates to a client based off an object that was configured on subscription.
# It's an alternative to using streams if the channel is able to do the work internally.
def periodically(callback, every:)
self.periodic_timers += [ [ callback, every: every ] ]
end
end
private
def active_periodic_timers
@active_periodic_timers ||= []
end
def start_periodic_timers
self.class.periodic_timers.each do |callback, options|
active_periodic_timers << EventMachine::PeriodicTimer.new(options[:every]) do
connection.worker_pool.async.run_periodic_timer(self, callback)
end
end
end
def stop_periodic_timers
active_periodic_timers.each { |timer| timer.cancel }
end
end
end
end

View file

@ -0,0 +1,114 @@
module ActionCable
module Channel
# Streams allow channels to route broadcastings to the subscriber. A broadcasting is, as discussed elsewhere, a pub/sub queue where any data
# put into it is automatically sent to the clients that are connected at that time. It's purely an online queue, though. If you're not
# streaming a broadcasting at the very moment it sends out an update, you'll not get that update when connecting later.
#
# Most commonly, the streamed broadcast is sent straight to the subscriber on the client-side. The channel just acts as a connector between
# the two parties (the broadcaster and the channel subscriber). Here's an example of a channel that allows subscribers to get all new
# comments on a given page:
#
# class CommentsChannel < ApplicationCable::Channel
# def follow(data)
# stream_from "comments_for_#{data['recording_id']}"
# end
#
# def unfollow
# stop_all_streams
# end
# end
#
# So the subscribers of this channel will get whatever data is put into the, let's say, `comments_for_45` broadcasting as soon as it's put there.
# That looks like so from that side of things:
#
# ActionCable.server.broadcast "comments_for_45", author: 'DHH', content: 'Rails is just swell'
#
# If you have a stream that is related to a model, then the broadcasting used can be generated from the model and channel.
# The following example would subscribe to a broadcasting like `comments:Z2lkOi8vVGVzdEFwcC9Qb3N0LzE`
#
# class CommentsChannel < ApplicationCable::Channel
# def subscribed
# post = Post.find(params[:id])
# stream_for post
# end
# end
#
# You can then broadcast to this channel using:
#
# CommentsChannel.broadcast_to(@post, @comment)
#
# If you don't just want to parlay the broadcast unfiltered to the subscriber, you can supply a callback that lets you alter what goes out.
# Example below shows how you can use this to provide performance introspection in the process:
#
# class ChatChannel < ApplicationCable::Channel
# def subscribed
# @room = Chat::Room[params[:room_number]]
#
# stream_for @room, -> (encoded_message) do
# message = ActiveSupport::JSON.decode(encoded_message)
#
# if message['originated_at'].present?
# elapsed_time = (Time.now.to_f - message['originated_at']).round(2)
#
# ActiveSupport::Notifications.instrument :performance, measurement: 'Chat.message_delay', value: elapsed_time, action: :timing
# logger.info "Message took #{elapsed_time}s to arrive"
# end
#
# transmit message
# end
# end
#
# You can stop streaming from all broadcasts by calling #stop_all_streams.
module Streams
extend ActiveSupport::Concern
included do
on_unsubscribe :stop_all_streams
end
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
# Hold off the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
EM.next_tick do
pubsub.subscribe(broadcasting, &callback).callback do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
end
end
# Start streaming the pubsub queue for the <tt>model</tt> in this channel. Optionally, you can pass a
# <tt>callback</tt> that'll be used instead of the default of just transmitting the updates straight
# to the subscriber.
def stream_for(model, callback = nil)
stream_from(broadcasting_for([ channel_name, model ]), callback)
end
def stop_all_streams
streams.each do |broadcasting, callback|
pubsub.unsubscribe_proc broadcasting, callback
logger.info "#{self.class.name} stopped streaming from #{broadcasting}"
end.clear
end
private
delegate :pubsub, to: :connection
def streams
@_streams ||= []
end
def default_stream_callback(broadcasting)
-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
end
end
end
end
end

View file

@ -0,0 +1,16 @@
module ActionCable
module Connection
extend ActiveSupport::Autoload
eager_autoload do
autoload :Authorization
autoload :Base
autoload :Identification
autoload :InternalChannel
autoload :MessageBuffer
autoload :WebSocket
autoload :Subscriptions
autoload :TaggedLoggerProxy
end
end
end

View file

@ -0,0 +1,13 @@
module ActionCable
module Connection
module Authorization
class UnauthorizedError < StandardError; end
private
def reject_unauthorized_connection
logger.error "An unauthorized connection attempt was rejected"
raise UnauthorizedError
end
end
end
end

View file

@ -0,0 +1,219 @@
require 'action_dispatch'
module ActionCable
module Connection
# For every WebSocket the cable server is accepting, a Connection object will be instantiated. This instance becomes the parent
# of all the channel subscriptions that are created from there on. Incoming messages are then routed to these channel subscriptions
# based on an identifier sent by the cable consumer. The Connection itself does not deal with any specific application logic beyond
# authentication and authorization.
#
# Here's a basic example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
#
# def connect
# self.current_user = find_verified_user
# logger.add_tags current_user.name
# end
#
# def disconnect
# # Any cleanup work needed when the cable connection is cut.
# end
#
# protected
# def find_verified_user
# if current_user = User.find_by_identity cookies.signed[:identity_id]
# current_user
# else
# reject_unauthorized_connection
# end
# end
# end
# end
#
# First, we declare that this connection can be identified by its current_user. This allows us later to be able to find all connections
# established for that current_user (and potentially disconnect them if the user was removed from an account). You can declare as many
# identification indexes as you like. Declaring an identification means that a attr_accessor is automatically set for that key.
#
# Second, we rely on the fact that the WebSocket connection is established with the cookies from the domain being sent along. This makes
# it easy to use signed cookies that were set when logging in via a web interface to authorize the WebSocket connection.
#
# Finally, we add a tag to the connection-specific logger with name of the current user to easily distinguish their messages in the log.
#
# Pretty simple, eh?
class Base
include Identification
include InternalChannel
include Authorization
attr_reader :server, :env, :subscriptions
delegate :worker_pool, :pubsub, to: :server
attr_reader :logger
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@started_at = Time.now
end
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
respond_to_invalid_request
end
end
# Data received over the cable is handled by this method. It's expected that everything inbound is JSON encoded.
# The data is routed to the proper channel that the connection has subscribed to.
def receive(data_in_json)
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
# Close the WebSocket connection.
def close
websocket.close
end
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
end
# Return a basic hash of statistics for the connection keyed with `identifier`, `started_at`, and `subscriptions`.
# This can be returned by a health check against the connection.
def statistics
{
identifier: connection_identifier,
started_at: @started_at,
subscriptions: subscriptions.identifiers,
request_id: @env['action_dispatch.request_id']
}
end
def beat
transmit ActiveSupport::JSON.encode(identifier: ActionCable::INTERNAL[:identifiers][:ping], message: Time.now.to_i)
end
protected
# The request that initiated the WebSocket connection is available here. This gives access to the environment, cookies, etc.
def request
@request ||= begin
environment = Rails.application.env_config.merge(env) if defined?(Rails.application) && Rails.application
ActionDispatch::Request.new(environment || env)
end
end
# The cookies of the request that initiated the WebSocket connection. Useful for performing authorization checks.
def cookies
request.cookie_jar
end
private
attr_reader :websocket
attr_reader :message_buffer
def on_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
end
def on_message(message)
message_buffer.append message
end
def on_close
logger.info finished_request_message
server.remove_connection(self)
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
disconnect if respond_to?(:disconnect)
end
def allow_request_origin?
return true if server.config.disable_request_forgery_protection
if Array(server.config.allowed_request_origins).any? { |allowed_origin| allowed_origin === env['HTTP_ORIGIN'] }
true
else
logger.error("Request origin not allowed: #{env['HTTP_ORIGIN']}")
false
end
end
def respond_to_successful_request
websocket.rack_response
end
def respond_to_invalid_request
close if websocket.alive?
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
# Tags are declared in the server but computed in the connection. This allows us per-connection tailored tags.
def new_tagged_logger
TaggedLoggerProxy.new server.logger,
tags: server.config.log_tags.map { |tag| tag.respond_to?(:call) ? tag.call(request) : tag.to_s.camelize }
end
def started_request_message
'Started %s "%s"%s for %s at %s' % [
request.request_method,
request.filtered_path,
websocket.possible? ? ' [WebSocket]' : '',
request.ip,
Time.now.to_s ]
end
def finished_request_message
'Finished "%s"%s for %s at %s' % [
request.filtered_path,
websocket.possible? ? ' [WebSocket]' : '',
request.ip,
Time.now.to_s ]
end
end
end
end

View file

@ -0,0 +1,46 @@
require 'set'
module ActionCable
module Connection
module Identification
extend ActiveSupport::Concern
included do
class_attribute :identifiers
self.identifiers = Set.new
end
class_methods do
# Mark a key as being a connection identifier index that can then used to find the specific connection again later.
# Common identifiers are current_user and current_account, but could be anything really.
#
# Note that anything marked as an identifier will automatically create a delegate by the same name on any
# channel instances created off the connection.
def identified_by(*identifiers)
Array(identifiers).each { |identifier| attr_accessor identifier }
self.identifiers += identifiers
end
end
# Return a single connection identifier that combines the value of all the registered identifiers into a single gid.
def connection_identifier
unless defined? @connection_identifier
@connection_identifier = connection_gid identifiers.map { |id| instance_variable_get("@#{id}") }.compact
end
@connection_identifier
end
private
def connection_gid(ids)
ids.map do |o|
if o.respond_to? :to_gid_param
o.to_gid_param
else
o.to_s
end
end.sort.join(":")
end
end
end
end

View file

@ -0,0 +1,45 @@
module ActionCable
module Connection
# Makes it possible for the RemoteConnection to disconnect a specific connection.
module InternalChannel
extend ActiveSupport::Concern
private
def internal_redis_channel
"action_cable/#{connection_identifier}"
end
def subscribe_to_internal_channel
if connection_identifier.present?
callback = -> (message) { process_internal_message(message) }
@_internal_redis_subscriptions ||= []
@_internal_redis_subscriptions << [ internal_redis_channel, callback ]
EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) }
logger.info "Registered connection (#{connection_identifier})"
end
end
def unsubscribe_from_internal_channel
if @_internal_redis_subscriptions.present?
@_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } }
end
end
def process_internal_message(message)
message = ActiveSupport::JSON.decode(message)
case message['type']
when 'disconnect'
logger.info "Removing connection (#{connection_identifier})"
websocket.close
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
close
end
end
end
end

View file

@ -0,0 +1,53 @@
module ActionCable
module Connection
# Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@connection = connection
@buffered_messages = []
end
def append(message)
if valid? message
if processing?
receive message
else
buffer message
end
else
connection.logger.error "Couldn't handle non-string message: #{message.class}"
end
end
def processing?
@processing
end
def process!
@processing = true
receive_buffered_messages
end
private
attr_reader :connection
attr_accessor :buffered_messages
def valid?(message)
message.is_a?(String)
end
def receive(message)
connection.send_async :receive, message
end
def buffer(message)
buffered_messages << message
end
def receive_buffered_messages
receive buffered_messages.shift until buffered_messages.empty?
end
end
end
end

View file

@ -0,0 +1,75 @@
require 'active_support/core_ext/hash/indifferent_access'
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
# the connection to the proper channel. Should not be used directly by the user.
class Subscriptions
def initialize(connection)
@connection = connection
@subscriptions = {}
end
def execute_command(data)
case data['command']
when 'subscribe' then add data
when 'unsubscribe' then remove data
when 'message' then perform_action data
else
logger.error "Received unrecognized command in #{data.inspect}"
end
rescue Exception => e
logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
def add(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.channel_classes[id_options[:channel]]
if subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
end
def remove(data)
logger.info "Unsubscribing from channel: #{data['identifier']}"
remove_subscription subscriptions[data['identifier']]
end
def remove_subscription(subscription)
subscription.unsubscribe_from_channel
subscriptions.delete(subscription.identifier)
end
def perform_action(data)
find(data).perform_action ActiveSupport::JSON.decode(data['data'])
end
def identifiers
subscriptions.keys
end
def unsubscribe_from_all
subscriptions.each { |id, channel| channel.unsubscribe_from_channel }
end
private
attr_reader :connection, :subscriptions
delegate :logger, to: :connection
def find(data)
if subscription = subscriptions[data['identifier']]
subscription
else
raise "Unable to find subscription with identifier: #{data['identifier']}"
end
end
end
end
end

View file

@ -0,0 +1,40 @@
module ActionCable
module Connection
# Allows the use of per-connection tags against the server logger. This wouldn't work using the tradional
# ActiveSupport::TaggedLogging-enhanced Rails.logger, as that logger will reset the tags between requests.
# The connection is long-lived, so it needs its own set of tags for its independent duration.
class TaggedLoggerProxy
attr_reader :tags
def initialize(logger, tags:)
@logger = logger
@tags = tags.flatten
end
def add_tags(*tags)
@tags += tags.flatten
@tags = @tags.uniq
end
def tag(logger)
if logger.respond_to?(:tagged)
current_tags = tags - logger.formatter.current_tags
logger.tagged(*current_tags) { yield }
else
yield
end
end
%i( debug info warn error fatal unknown ).each do |severity|
define_method(severity) do |message|
log severity, message
end
end
protected
def log(type, message)
tag(@logger) { @logger.send type, message }
end
end
end
end

View file

@ -0,0 +1,29 @@
require 'faye/websocket'
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
class WebSocket
delegate :rack_response, :close, :on, to: :websocket
def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
def possible?
websocket
end
def alive?
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
end
def transmit(data)
websocket.send data
end
private
attr_reader :websocket
end
end
end

View file

@ -0,0 +1,27 @@
require 'rails/engine'
require 'active_support/ordered_options'
require 'action_cable/helpers/action_cable_helper'
module ActionCable
class Engine < ::Rails::Engine
config.action_cable = ActiveSupport::OrderedOptions.new
config.to_prepare do
ApplicationController.helper ActionCable::Helpers::ActionCableHelper
end
initializer "action_cable.logger" do
ActiveSupport.on_load(:action_cable) { self.logger ||= ::Rails.logger }
end
initializer "action_cable.set_configs" do |app|
options = app.config.action_cable
options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development?
ActiveSupport.on_load(:action_cable) do
options.each { |k,v| send("#{k}=", v) }
end
end
end
end

View file

@ -0,0 +1,15 @@
module ActionCable
# Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>.
def self.gem_version
Gem::Version.new VERSION::STRING
end
module VERSION
MAJOR = 5
MINOR = 0
TINY = 0
PRE = "alpha"
STRING = [MAJOR, MINOR, TINY, PRE].compact.join(".")
end
end

View file

@ -0,0 +1,29 @@
module ActionCable
module Helpers
module ActionCableHelper
# Returns an "action-cable-url" meta tag with the value of the url specified in your
# configuration. Ensure this is above your javascript tag:
#
# <head>
# <%= action_cable_meta_tag %>
# <%= javascript_include_tag 'application', 'data-turbolinks-track' => true %>
# </head>
#
# This is then used by ActionCable to determine the url of your websocket server.
# Your CoffeeScript can then connect to the server without needing to specify the
# url directly:
#
# #= require cable
# @App = {}
# App.cable = Cable.createConsumer()
#
# Make sure to specify the correct server location in each of your environments
# config file:
#
# config.action_cable.url = "ws://example.com:28080"
def action_cable_meta_tag
tag "meta", name: "action-cable-url", content: Rails.application.config.action_cable.url
end
end
end
end

View file

@ -0,0 +1,12 @@
require 'action_cable/server'
require 'eventmachine'
require 'celluloid'
EM.error_handler do |e|
puts "Error raised inside the event loop: #{e.message}"
puts e.backtrace.join("\n")
end
Celluloid.logger = ActionCable.server.logger
ActionCable.server.config.log_to_stdout if Rails.env.development?

View file

@ -0,0 +1,64 @@
module ActionCable
# If you need to disconnect a given connection, you go through the RemoteConnections. You find the connections you're looking for by
# searching the identifier declared on the connection. Example:
#
# module ApplicationCable
# class Connection < ActionCable::Connection::Base
# identified_by :current_user
# ....
# end
# end
#
# ActionCable.server.remote_connections.where(current_user: User.find(1)).disconnect
#
# That will disconnect all the connections established for User.find(1) across all servers running on all machines (because it uses
# the internal channel that all these servers are subscribed to).
class RemoteConnections
attr_reader :server
def initialize(server)
@server = server
end
def where(identifier)
RemoteConnection.new(server, identifier)
end
private
# Represents a single remote connection found via ActionCable.server.remote_connections.where(*).
# Exists for the solely for the purpose of calling #disconnect on that connection.
class RemoteConnection
class InvalidIdentifiersError < StandardError; end
include Connection::Identification, Connection::InternalChannel
def initialize(server, ids)
@server = server
set_identifier_instance_vars(ids)
end
# Uses the internal channel to disconnect the connection.
def disconnect
server.broadcast internal_redis_channel, type: 'disconnect'
end
# Returns all the identifiers that were applied to this connection.
def identifiers
server.connection_identifiers
end
private
attr_reader :server
def set_identifier_instance_vars(ids)
raise InvalidIdentifiersError unless valid_identifiers?(ids)
ids.each { |k,v| instance_variable_set("@#{k}", v) }
end
def valid_identifiers?(ids)
keys = ids.keys
identifiers.all? { |id| keys.include?(id) }
end
end
end
end

View file

@ -0,0 +1,19 @@
require 'eventmachine'
EventMachine.epoll if EventMachine.epoll?
EventMachine.kqueue if EventMachine.kqueue?
module ActionCable
module Server
extend ActiveSupport::Autoload
eager_autoload do
autoload :Base
autoload :Broadcasting
autoload :Connections
autoload :Configuration
autoload :Worker
autoload :ActiveRecordConnectionManagement, 'action_cable/server/worker/active_record_connection_management'
end
end
end

View file

@ -0,0 +1,74 @@
require 'em-hiredis'
module ActionCable
module Server
# A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but
# also by the user to reach the RemoteConnections instead for finding and disconnecting connections across all servers.
#
# Also, this is the server instance used for broadcasting. See Broadcasting for details.
class Base
include ActionCable::Server::Broadcasting
include ActionCable::Server::Connections
cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
def self.logger; config.logger; end
delegate :logger, to: :config
def initialize
end
# Called by rack to setup the server.
def call(env)
setup_heartbeat_timer
config.connection_class.new(self, env).process
end
# Disconnect all the connections identified by `identifiers` on this server or any others via RemoteConnections.
def disconnect(identifiers)
remote_connections.where(identifiers).disconnect
end
# Gateway to RemoteConnections. See that class for details.
def remote_connections
@remote_connections ||= RemoteConnections.new(self)
end
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
end
# Requires and returns an hash of all the channel class constants keyed by name.
def channel_classes
@channel_classes ||= begin
config.channel_paths.each { |channel_path| require channel_path }
config.channel_class_names.each_with_object({}) { |name, hash| hash[name] = name.constantize }
end
end
# The redis pubsub adapter used for all streams/broadcasting.
def pubsub
@pubsub ||= redis.pubsub
end
# The EventMachine Redis instance used by the pubsub adapter.
def redis
@redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis|
redis.on(:reconnect_failed) do
logger.info "[ActionCable] Redis reconnect failed."
# logger.info "[ActionCable] Redis reconnected. Closing all the open connections."
# @connections.map &:close
end
end
end
# All the identifiers applied to the connection class associated with this server.
def connection_identifiers
config.connection_class.identifiers
end
end
ActiveSupport.run_load_hooks(:action_cable, Base.config)
end
end

View file

@ -0,0 +1,54 @@
require 'redis'
module ActionCable
module Server
# Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these
# broadcastings are streamed directly to the clients subscribed to the named broadcasting. Let's explain with a full-stack example:
#
# class WebNotificationsChannel < ApplicationCable::Channel
# def subscribed
# stream_from "web_notifications_#{current_user.id}"
# end
# end
#
# # Somewhere in your app this is called, perhaps from a NewCommentJob
# ActionCable.server.broadcast \
# "web_notifications_1", { title: 'New things!', body: 'All shit fit for print' }
#
# # Client-side coffescript which assumes you've already requested the right to send web notifications
# App.cable.subscriptions.create "WebNotificationsChannel",
# received: (data) ->
# new Notification data['title'], body: data['body']
module Broadcasting
# Broadcast a hash directly to a named <tt>broadcasting</tt>. It'll automatically be JSON encoded.
def broadcast(broadcasting, message)
broadcaster_for(broadcasting).broadcast(message)
end
# Returns a broadcaster for a named <tt>broadcasting</tt> that can be reused. Useful when you have a object that
# may need multiple spots to transmit to a specific broadcasting over and over.
def broadcaster_for(broadcasting)
Broadcaster.new(self, broadcasting)
end
# The redis instance used for broadcasting. Not intended for direct user use.
def broadcasting_redis
@broadcasting_redis ||= Redis.new(config.redis)
end
private
class Broadcaster
attr_reader :server, :broadcasting
def initialize(server, broadcasting)
@server, @broadcasting = server, broadcasting
end
def broadcast(message)
server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}"
server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message)
end
end
end
end
end

View file

@ -0,0 +1,67 @@
require 'active_support/core_ext/hash/indifferent_access'
module ActionCable
module Server
# An instance of this configuration object is available via ActionCable.server.config, which allows you to tweak the configuration points
# in a Rails config initializer.
class Configuration
attr_accessor :logger, :log_tags
attr_accessor :connection_class, :worker_pool_size
attr_accessor :redis_path, :channels_path
attr_accessor :disable_request_forgery_protection, :allowed_request_origins
attr_accessor :url
def initialize
@logger = Rails.logger
@log_tags = []
@connection_class = ApplicationCable::Connection
@worker_pool_size = 100
@redis_path = Rails.root.join('config/redis/cable.yml')
@channels_path = Rails.root.join('app/channels')
@disable_request_forgery_protection = false
end
def log_to_stdout
console = ActiveSupport::Logger.new($stdout)
console.formatter = @logger.formatter
console.level = @logger.level
@logger.extend(ActiveSupport::Logger.broadcast(console))
end
def channel_paths
@channels ||= Dir["#{channels_path}/**/*_channel.rb"]
end
def channel_class_names
@channel_class_names ||= channel_paths.collect do |channel_path|
Pathname.new(channel_path).basename.to_s.split('.').first.camelize
end
end
def redis
@redis ||= config_for(redis_path).with_indifferent_access
end
private
# FIXME: Extract this from Rails::Application in a way it can be used here.
def config_for(path)
if path.exist?
require "yaml"
require "erb"
(YAML.load(ERB.new(path.read).result) || {})[Rails.env] || {}
else
raise "Could not load configuration. No such file - #{path}"
end
rescue Psych::SyntaxError => e
raise "YAML syntax error occurred while parsing #{path}. " \
"Please note that YAML must be consistently indented using spaces. Tabs are not allowed. " \
"Error: #{e.message}"
end
end
end
end

View file

@ -0,0 +1,37 @@
module ActionCable
module Server
# Collection class for all the connections that's been established on this specific server. Remember, usually you'll run many cable servers, so
# you can't use this collection as an full list of all the connections established against your application. Use RemoteConnections for that.
# As such, this is primarily for internal use.
module Connections
BEAT_INTERVAL = 3
def connections
@connections ||= []
end
def add_connection(connection)
connections << connection
end
def remove_connection(connection)
connections.delete connection
end
# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
EM.next_tick do
@heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
EM.next_tick { connections.map(&:beat) }
end
end
end
def open_connections_statistics
connections.map(&:statistics)
end
end
end
end

View file

@ -0,0 +1,42 @@
require 'celluloid'
require 'active_support/callbacks'
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads. Only for internal use.
class Worker
include ActiveSupport::Callbacks
include Celluloid
attr_reader :connection
define_callbacks :work
include ActiveRecordConnectionManagement
def invoke(receiver, method, *args)
@connection = receiver
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
logger.error "There was an exception - #{e.class}(#{e.message})"
logger.error e.backtrace.join("\n")
receiver.handle_exception if receiver.respond_to?(:handle_exception)
end
def run_periodic_timer(channel, callback)
@connection = channel.connection
run_callbacks :work do
callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback)
end
end
private
def logger
ActionCable.server.logger
end
end
end
end

View file

@ -0,0 +1,22 @@
module ActionCable
module Server
class Worker
# Clear active connections between units of work so the long-running channel or connection processes do not hoard connections.
module ActiveRecordConnectionManagement
extend ActiveSupport::Concern
included do
if defined?(ActiveRecord::Base)
set_callback :work, :around, :with_database_connections
end
end
def with_database_connections
connection.logger.tag(ActiveRecord::Base.logger) { yield }
ensure
ActiveRecord::Base.clear_active_connections!
end
end
end
end
end

View file

@ -0,0 +1,8 @@
require_relative 'gem_version'
module ActionCable
# Returns the version of the currently loaded Action Cable as a <tt>Gem::Version</tt>
def self.version
gem_version
end
end

View file

@ -0,0 +1,12 @@
#= require_self
#= require cable/consumer
@Cable =
INTERNAL: <%= ActionCable::INTERNAL.to_json %>
createConsumer: (url = @getConfig("url")) ->
new Cable.Consumer url
getConfig: (name) ->
element = document.head.querySelector("meta[name='action-cable-#{name}']")
element?.getAttribute("content")

View file

@ -0,0 +1,84 @@
# Encapsulate the cable connection held by the consumer. This is an internal class not intended for direct user manipulation.
{message_types} = Cable.INTERNAL
class Cable.Connection
@reopenDelay: 500
constructor: (@consumer) ->
@open()
send: (data) ->
if @isOpen()
@webSocket.send(JSON.stringify(data))
true
else
false
open: =>
if @webSocket and not @isState("closed")
throw new Error("Existing connection must be closed before opening")
else
@webSocket = new WebSocket(@consumer.url)
@installEventHandlers()
true
close: ->
@webSocket?.close()
reopen: ->
if @isState("closed")
@open()
else
try
@close()
finally
setTimeout(@open, @constructor.reopenDelay)
isOpen: ->
@isState("open")
# Private
isState: (states...) ->
@getState() in states
getState: ->
return state.toLowerCase() for state, value of WebSocket when value is @webSocket?.readyState
null
installEventHandlers: ->
for eventName of @events
handler = @events[eventName].bind(this)
@webSocket["on#{eventName}"] = handler
return
events:
message: (event) ->
{identifier, message, type} = JSON.parse(event.data)
switch type
when message_types.confirmation
@consumer.subscriptions.notify(identifier, "connected")
when message_types.rejection
@consumer.subscriptions.reject(identifier)
else
@consumer.subscriptions.notify(identifier, "received", message)
open: ->
@disconnected = false
@consumer.subscriptions.reload()
close: ->
@disconnect()
error: ->
@disconnect()
disconnect: ->
return if @disconnected
@disconnected = true
@consumer.subscriptions.notifyAll("disconnected")
toJSON: ->
state: @getState()

View file

@ -0,0 +1,84 @@
# Responsible for ensuring the cable connection is in good health by validating the heartbeat pings sent from the server, and attempting
# revival reconnections if things go astray. Internal class, not intended for direct user manipulation.
class Cable.ConnectionMonitor
@pollInterval:
min: 3
max: 30
@staleThreshold: 6 # Server::Connections::BEAT_INTERVAL * 2 (missed two pings)
identifier: Cable.INTERNAL.identifiers.ping
constructor: (@consumer) ->
@consumer.subscriptions.add(this)
@start()
connected: ->
@reset()
@pingedAt = now()
delete @disconnectedAt
disconnected: ->
@disconnectedAt = now()
received: ->
@pingedAt = now()
reset: ->
@reconnectAttempts = 0
start: ->
@reset()
delete @stoppedAt
@startedAt = now()
@poll()
document.addEventListener("visibilitychange", @visibilityDidChange)
stop: ->
@stoppedAt = now()
document.removeEventListener("visibilitychange", @visibilityDidChange)
poll: ->
setTimeout =>
unless @stoppedAt
@reconnectIfStale()
@poll()
, @getInterval()
getInterval: ->
{min, max} = @constructor.pollInterval
interval = 5 * Math.log(@reconnectAttempts + 1)
clamp(interval, min, max) * 1000
reconnectIfStale: ->
if @connectionIsStale()
@reconnectAttempts++
unless @disconnectedRecently()
@consumer.connection.reopen()
connectionIsStale: ->
secondsSince(@pingedAt ? @startedAt) > @constructor.staleThreshold
disconnectedRecently: ->
@disconnectedAt and secondsSince(@disconnectedAt) < @constructor.staleThreshold
visibilityDidChange: =>
if document.visibilityState is "visible"
setTimeout =>
if @connectionIsStale() or not @consumer.connection.isOpen()
@consumer.connection.reopen()
, 200
toJSON: ->
interval = @getInterval()
connectionIsStale = @connectionIsStale()
{@startedAt, @stoppedAt, @pingedAt, @reconnectAttempts, connectionIsStale, interval}
now = ->
new Date().getTime()
secondsSince = (time) ->
(now() - time) / 1000
clamp = (number, min, max) ->
Math.max(min, Math.min(max, number))

View file

@ -0,0 +1,31 @@
#= require cable/connection
#= require cable/connection_monitor
#= require cable/subscriptions
#= require cable/subscription
# The Cable.Consumer establishes the connection to a server-side Ruby Connection object. Once established,
# the Cable.ConnectionMonitor will ensure that its properly maintained through heartbeats and checking for stale updates.
# The Consumer instance is also the gateway to establishing subscriptions to desired channels through the #createSubscription
# method.
#
# The following example shows how this can be setup:
#
# @App = {}
# App.cable = Cable.createConsumer "ws://example.com/accounts/1"
# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
#
# For more details on how you'd configure an actual channel subscription, see Cable.Subscription.
class Cable.Consumer
constructor: (@url) ->
@subscriptions = new Cable.Subscriptions this
@connection = new Cable.Connection this
@connectionMonitor = new Cable.ConnectionMonitor this
send: (data) ->
@connection.send(data)
inspect: ->
JSON.stringify(this, null, 2)
toJSON: ->
{@url, @subscriptions, @connection, @connectionMonitor}

View file

@ -0,0 +1,68 @@
# A new subscription is created through the Cable.Subscriptions instance available on the consumer.
# It provides a number of callbacks and a method for calling remote procedure calls on the corresponding
# Channel instance on the server side.
#
# An example demonstrates the basic functionality:
#
# App.appearance = App.cable.subscriptions.create "AppearanceChannel",
# connected: ->
# # Called once the subscription has been successfully completed
#
# appear: ->
# @perform 'appear', appearing_on: @appearingOn()
#
# away: ->
# @perform 'away'
#
# appearingOn: ->
# $('main').data 'appearing-on'
#
# The methods #appear and #away forward their intent to the remote AppearanceChannel instance on the server
# by calling the `@perform` method with the first parameter being the action (which maps to AppearanceChannel#appear/away).
# The second parameter is a hash that'll get JSON encoded and made available on the server in the data parameter.
#
# This is how the server component would look:
#
# class AppearanceChannel < ApplicationCable::Channel
# def subscribed
# current_user.appear
# end
#
# def unsubscribed
# current_user.disappear
# end
#
# def appear(data)
# current_user.appear on: data['appearing_on']
# end
#
# def away
# current_user.away
# end
# end
#
# The "AppearanceChannel" name is automatically mapped between the client-side subscription creation and the server-side Ruby class name.
# The AppearanceChannel#appear/away public methods are exposed automatically to client-side invocation through the @perform method.
class Cable.Subscription
constructor: (@subscriptions, params = {}, mixin) ->
@identifier = JSON.stringify(params)
extend(this, mixin)
@subscriptions.add(this)
@consumer = @subscriptions.consumer
# Perform a channel action with the optional data passed as an attribute
perform: (action, data = {}) ->
data.action = action
@send(data)
send: (data) ->
@consumer.send(command: "message", identifier: @identifier, data: JSON.stringify(data))
unsubscribe: ->
@subscriptions.remove(this)
extend = (object, properties) ->
if properties?
for key, value of properties
object[key] = value
object

View file

@ -0,0 +1,78 @@
# Collection class for creating (and internally managing) channel subscriptions. The only method intended to be triggered by the user
# us Cable.Subscriptions#create, and it should be called through the consumer like so:
#
# @App = {}
# App.cable = Cable.createConsumer "ws://example.com/accounts/1"
# App.appearance = App.cable.subscriptions.create "AppearanceChannel"
#
# For more details on how you'd configure an actual channel subscription, see Cable.Subscription.
class Cable.Subscriptions
constructor: (@consumer) ->
@subscriptions = []
@history = []
create: (channelName, mixin) ->
channel = channelName
params = if typeof channel is "object" then channel else {channel}
new Cable.Subscription this, params, mixin
# Private
add: (subscription) ->
@subscriptions.push(subscription)
@notify(subscription, "initialized")
@sendCommand(subscription, "subscribe")
remove: (subscription) ->
@forget(subscription)
unless @findAll(subscription.identifier).length
@sendCommand(subscription, "unsubscribe")
reject: (identifier) ->
for subscription in @findAll(identifier)
@forget(subscription)
@notify(subscription, "rejected")
forget: (subscription) ->
@subscriptions = (s for s in @subscriptions when s isnt subscription)
findAll: (identifier) ->
s for s in @subscriptions when s.identifier is identifier
reload: ->
for subscription in @subscriptions
@sendCommand(subscription, "subscribe")
notifyAll: (callbackName, args...) ->
for subscription in @subscriptions
@notify(subscription, callbackName, args...)
notify: (subscription, callbackName, args...) ->
if typeof subscription is "string"
subscriptions = @findAll(subscription)
else
subscriptions = [subscription]
for subscription in subscriptions
subscription[callbackName]?(args...)
if callbackName in ["initialized", "connected", "disconnected", "rejected"]
{identifier} = subscription
@record(notification: {identifier, callbackName, args})
sendCommand: (subscription, command) ->
{identifier} = subscription
if identifier is Cable.INTERNAL.identifiers.ping
@consumer.connection.isOpen()
else
@consumer.send({command, identifier})
record: (data) ->
data.time = new Date()
@history = @history.slice(-19)
@history.push(data)
toJSON: ->
history: @history
identifiers: (subscription.identifier for subscription in @subscriptions)

View file

@ -0,0 +1,148 @@
require 'test_helper'
require 'stubs/test_connection'
require 'stubs/room'
class ActionCable::Channel::BaseTest < ActiveSupport::TestCase
class ActionCable::Channel::Base
def kick
@last_action = [ :kick ]
end
def topic
end
end
class BasicChannel < ActionCable::Channel::Base
def chatters
@last_action = [ :chatters ]
end
end
class ChatChannel < BasicChannel
attr_reader :room, :last_action
after_subscribe :toggle_subscribed
after_unsubscribe :toggle_subscribed
def initialize(*)
@subscribed = false
super
end
def subscribed
@room = Room.new params[:id]
@actions = []
end
def unsubscribed
@room = nil
end
def toggle_subscribed
@subscribed = !@subscribed
end
def leave
@last_action = [ :leave ]
end
def speak(data)
@last_action = [ :speak, data ]
end
def topic(data)
@last_action = [ :topic, data ]
end
def subscribed?
@subscribed
end
def get_latest
transmit data: 'latest'
end
private
def rm_rf
@last_action = [ :rm_rf ]
end
end
setup do
@user = User.new "lifo"
@connection = TestConnection.new(@user)
@channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
end
test "should subscribe to a channel on initialize" do
assert_equal 1, @channel.room.id
end
test "on subscribe callbacks" do
assert @channel.subscribed
end
test "channel params" do
assert_equal({ id: 1 }, @channel.params)
end
test "unsubscribing from a channel" do
assert @channel.room
assert @channel.subscribed?
@channel.unsubscribe_from_channel
assert ! @channel.room
assert ! @channel.subscribed?
end
test "connection identifiers" do
assert_equal @user.name, @channel.current_user.name
end
test "callable action without any argument" do
@channel.perform_action 'action' => :leave
assert_equal [ :leave ], @channel.last_action
end
test "callable action with arguments" do
data = { 'action' => :speak, 'content' => "Hello World" }
@channel.perform_action data
assert_equal [ :speak, data ], @channel.last_action
end
test "should not dispatch a private method" do
@channel.perform_action 'action' => :rm_rf
assert_nil @channel.last_action
end
test "should not dispatch a public method defined on Base" do
@channel.perform_action 'action' => :kick
assert_nil @channel.last_action
end
test "should dispatch a public method defined on Base and redefined on channel" do
data = { 'action' => :topic, 'content' => "This is Sparta!" }
@channel.perform_action data
assert_equal [ :topic, data ], @channel.last_action
end
test "should dispatch calling a public method defined in an ancestor" do
@channel.perform_action 'action' => :chatters
assert_equal [ :chatters ], @channel.last_action
end
test "transmitting data" do
@channel.perform_action 'action' => :get_latest
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "message" => { "data" => "latest" }
assert_equal expected, @connection.last_transmission
end
test "subscription confirmation" do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
assert_equal expected, @connection.last_transmission
end
end

View file

@ -0,0 +1,29 @@
require 'test_helper'
require 'stubs/test_connection'
require 'stubs/room'
class ActionCable::Channel::BroadcastingTest < ActiveSupport::TestCase
class ChatChannel < ActionCable::Channel::Base
end
setup do
@connection = TestConnection.new
end
test "broadcasts_to" do
ActionCable.stubs(:server).returns mock().tap { |m| m.expects(:broadcast).with('action_cable:channel:broadcasting_test:chat:Room#1-Campfire', "Hello World") }
ChatChannel.broadcast_to(Room.new(1), "Hello World")
end
test "broadcasting_for with an object" do
assert_equal "Room#1-Campfire", ChatChannel.broadcasting_for(Room.new(1))
end
test "broadcasting_for with an array" do
assert_equal "Room#1-Campfire:Room#2-Campfire", ChatChannel.broadcasting_for([ Room.new(1), Room.new(2) ])
end
test "broadcasting_for with a string" do
assert_equal "hello", ChatChannel.broadcasting_for("hello")
end
end

View file

@ -0,0 +1,10 @@
require 'test_helper'
class ActionCable::Channel::NamingTest < ActiveSupport::TestCase
class ChatChannel < ActionCable::Channel::Base
end
test "channel_name" do
assert_equal "action_cable:channel:naming_test:chat", ChatChannel.channel_name
end
end

View file

@ -0,0 +1,40 @@
require 'test_helper'
require 'stubs/test_connection'
require 'stubs/room'
class ActionCable::Channel::PeriodicTimersTest < ActiveSupport::TestCase
class ChatChannel < ActionCable::Channel::Base
periodically -> { ping }, every: 5
periodically :send_updates, every: 1
private
def ping
end
end
setup do
@connection = TestConnection.new
end
test "periodic timers definition" do
timers = ChatChannel.periodic_timers
assert_equal 2, timers.size
first_timer = timers[0]
assert_kind_of Proc, first_timer[0]
assert_equal 5, first_timer[1][:every]
second_timer = timers[1]
assert_equal :send_updates, second_timer[0]
assert_equal 1, second_timer[1][:every]
end
test "timer start and stop" do
EventMachine::PeriodicTimer.expects(:new).times(2).returns(true)
channel = ChatChannel.new @connection, "{id: 1}", { id: 1 }
channel.expects(:stop_periodic_timers).once
channel.unsubscribe_from_channel
end
end

View file

@ -0,0 +1,25 @@
require 'test_helper'
require 'stubs/test_connection'
require 'stubs/room'
class ActionCable::Channel::RejectionTest < ActiveSupport::TestCase
class SecretChannel < ActionCable::Channel::Base
def subscribed
reject if params[:id] > 0
end
end
setup do
@user = User.new "lifo"
@connection = TestConnection.new(@user)
end
test "subscription rejection" do
@connection.expects(:subscriptions).returns mock().tap { |m| m.expects(:remove_subscription).with instance_of(SecretChannel) }
@channel = SecretChannel.new @connection, "{id: 1}", { id: 1 }
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "reject_subscription"
assert_equal expected, @connection.last_transmission
end
end

View file

@ -0,0 +1,80 @@
require 'test_helper'
require 'stubs/test_connection'
require 'stubs/room'
class ActionCable::Channel::StreamTest < ActionCable::TestCase
class ChatChannel < ActionCable::Channel::Base
def subscribed
if params[:id]
@room = Room.new params[:id]
stream_from "test_room_#{@room.id}"
end
end
def send_confirmation
transmit_subscription_confirmation
end
end
test "streaming start and stop" do
run_in_eventmachine do
connection = TestConnection.new
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) }
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) }
channel.unsubscribe_from_channel
end
end
test "stream_for" do
run_in_eventmachine do
connection = TestConnection.new
EM.next_tick do
connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) }
end
channel = ChatChannel.new connection, ""
channel.stream_for Room.new(1)
end
end
test "stream_from subscription confirmation" do
EM.run do
connection = TestConnection.new
connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub
channel = ChatChannel.new connection, "{id: 1}", { id: 1 }
assert_nil connection.last_transmission
EM::Timer.new(0.1) do
expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s"
EM.run_deferred_callbacks
EM.stop
end
end
end
test "subscription confirmation should only be sent out once" do
EM.run do
connection = TestConnection.new
connection.stubs(:pubsub).returns EM::Hiredis.connect.pubsub
channel = ChatChannel.new connection, "test_channel"
channel.send_confirmation
channel.send_confirmation
EM.run_deferred_callbacks
expected = ActiveSupport::JSON.encode "identifier" => "test_channel", "type" => "confirm_subscription"
assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation"
assert_equal 1, connection.transmissions.size
EM.stop
end
end
end

View file

@ -0,0 +1,32 @@
require 'test_helper'
require 'stubs/test_server'
class ActionCable::Connection::AuthorizationTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
attr_reader :websocket
def connect
reject_unauthorized_connection
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
test "unauthorized connection" do
run_in_eventmachine do
server = TestServer.new
server.config.allowed_request_origins = %w( http://rubyonrails.com )
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
'HTTP_ORIGIN' => 'http://rubyonrails.com'
connection = Connection.new(server, env)
connection.websocket.expects(:close)
connection.process
end
end
end

View file

@ -0,0 +1,118 @@
require 'test_helper'
require 'stubs/test_server'
class ActionCable::Connection::BaseTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
attr_reader :websocket, :subscriptions, :message_buffer, :connected
def connect
@connected = true
end
def disconnect
@connected = false
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
setup do
@server = TestServer.new
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
test "making a connection with invalid headers" do
run_in_eventmachine do
connection = ActionCable::Connection::Base.new(@server, Rack::MockRequest.env_for("/test"))
response = connection.process
assert_equal 404, response[0]
end
end
test "websocket connection" do
run_in_eventmachine do
connection = open_connection
connection.process
assert connection.websocket.possible?
assert connection.websocket.alive?
end
end
test "rack response" do
run_in_eventmachine do
connection = open_connection
response = connection.process
assert_equal [ -1, {}, [] ], response
end
end
test "on connection open" do
run_in_eventmachine do
connection = open_connection
connection.process
connection.websocket.expects(:transmit).with(regexp_matches(/\_ping/))
connection.message_buffer.expects(:process!)
# Allow EM to run on_open callback
EM.next_tick do
assert_equal [ connection ], @server.connections
assert connection.connected
end
end
end
test "on connection close" do
run_in_eventmachine do
connection = open_connection
connection.process
# Setup the connection
EventMachine.stubs(:add_periodic_timer).returns(true)
connection.send :on_open
assert connection.connected
connection.subscriptions.expects(:unsubscribe_from_all)
connection.send :on_close
assert ! connection.connected
assert_equal [], @server.connections
end
end
test "connection statistics" do
run_in_eventmachine do
connection = open_connection
connection.process
statistics = connection.statistics
assert statistics[:identifier].blank?
assert_kind_of Time, statistics[:started_at]
assert_equal [], statistics[:subscriptions]
end
end
test "explicitly closing a connection" do
run_in_eventmachine do
connection = open_connection
connection.process
connection.websocket.expects(:close)
connection.close
end
end
private
def open_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket',
'HTTP_ORIGIN' => 'http://rubyonrails.com'
Connection.new(@server, env)
end
end

View file

@ -0,0 +1,82 @@
require 'test_helper'
require 'stubs/test_server'
class ActionCable::Connection::CrossSiteForgeryTest < ActionCable::TestCase
HOST = 'rubyonrails.com'
class Connection < ActionCable::Connection::Base
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
setup do
@server = TestServer.new
@server.config.allowed_request_origins = %w( http://rubyonrails.com )
end
teardown do
@server.config.disable_request_forgery_protection = false
@server.config.allowed_request_origins = []
end
test "disable forgery protection" do
@server.config.disable_request_forgery_protection = true
assert_origin_allowed 'http://rubyonrails.com'
assert_origin_allowed 'http://hax.com'
end
test "explicitly specified a single allowed origin" do
@server.config.allowed_request_origins = 'http://hax.com'
assert_origin_not_allowed 'http://rubyonrails.com'
assert_origin_allowed 'http://hax.com'
end
test "explicitly specified multiple allowed origins" do
@server.config.allowed_request_origins = %w( http://rubyonrails.com http://www.rubyonrails.com )
assert_origin_allowed 'http://rubyonrails.com'
assert_origin_allowed 'http://www.rubyonrails.com'
assert_origin_not_allowed 'http://hax.com'
end
test "explicitly specified a single regexp allowed origin" do
@server.config.allowed_request_origins = /.*ha.*/
assert_origin_not_allowed 'http://rubyonrails.com'
assert_origin_allowed 'http://hax.com'
end
test "explicitly specified multiple regexp allowed origins" do
@server.config.allowed_request_origins = [/http:\/\/ruby.*/, /.*rai.s.*com/, 'string' ]
assert_origin_allowed 'http://rubyonrails.com'
assert_origin_allowed 'http://www.rubyonrails.com'
assert_origin_not_allowed 'http://hax.com'
assert_origin_not_allowed 'http://rails.co.uk'
end
private
def assert_origin_allowed(origin)
response = connect_with_origin origin
assert_equal -1, response[0]
end
def assert_origin_not_allowed(origin)
response = connect_with_origin origin
assert_equal 404, response[0]
end
def connect_with_origin(origin)
response = nil
run_in_eventmachine do
response = Connection.new(@server, env_for_origin(origin)).process
end
response
end
def env_for_origin(origin)
Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket', 'SERVER_NAME' => HOST,
'HTTP_ORIGIN' => origin
end
end

View file

@ -0,0 +1,77 @@
require 'test_helper'
require 'stubs/test_server'
require 'stubs/user'
class ActionCable::Connection::IdentifierTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
identified_by :current_user
attr_reader :websocket
public :process_internal_message
def connect
self.current_user = User.new "lifo"
end
end
test "connection identifier" do
run_in_eventmachine do
open_connection_with_stubbed_pubsub
assert_equal "User#lifo", @connection.connection_identifier
end
end
test "should subscribe to internal channel on open and unsubscribe on close" do
run_in_eventmachine do
pubsub = mock('pubsub')
pubsub.expects(:subscribe).with('action_cable/User#lifo')
pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc))
server = TestServer.new
server.stubs(:pubsub).returns(pubsub)
open_connection server: server
close_connection
end
end
test "processing disconnect message" do
run_in_eventmachine do
open_connection_with_stubbed_pubsub
@connection.websocket.expects(:close)
message = ActiveSupport::JSON.encode('type' => 'disconnect')
@connection.process_internal_message message
end
end
test "processing invalid message" do
run_in_eventmachine do
open_connection_with_stubbed_pubsub
@connection.websocket.expects(:close).never
message = ActiveSupport::JSON.encode('type' => 'unknown')
@connection.process_internal_message message
end
end
protected
def open_connection_with_stubbed_pubsub
server = TestServer.new
server.stubs(:pubsub).returns(stub_everything('pubsub'))
open_connection server: server
end
def open_connection(server:)
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process
@connection.send :on_open
end
def close_connection
@connection.send :on_close
end
end

View file

@ -0,0 +1,41 @@
require 'test_helper'
require 'stubs/test_server'
require 'stubs/user'
class ActionCable::Connection::MultipleIdentifiersTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
identified_by :current_user, :current_room
def connect
self.current_user = User.new "lifo"
self.current_room = Room.new "my", "room"
end
end
test "multiple connection identifiers" do
run_in_eventmachine do
open_connection_with_stubbed_pubsub
assert_equal "Room#my-room:User#lifo", @connection.connection_identifier
end
end
protected
def open_connection_with_stubbed_pubsub
server = TestServer.new
server.stubs(:pubsub).returns(stub_everything('pubsub'))
open_connection server: server
end
def open_connection(server:)
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(server, env)
@connection.process
@connection.send :on_open
end
def close_connection
@connection.send :on_close
end
end

View file

@ -0,0 +1,44 @@
require 'test_helper'
require 'stubs/test_server'
class ActionCable::Connection::StringIdentifierTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
identified_by :current_token
def connect
self.current_token = "random-string"
end
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
test "connection identifier" do
run_in_eventmachine do
open_connection_with_stubbed_pubsub
assert_equal "random-string", @connection.connection_identifier
end
end
protected
def open_connection_with_stubbed_pubsub
@server = TestServer.new
@server.stubs(:pubsub).returns(stub_everything('pubsub'))
open_connection
end
def open_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@connection.process
@connection.send :on_open
end
def close_connection
@connection.send :on_close
end
end

View file

@ -0,0 +1,116 @@
require 'test_helper'
class ActionCable::Connection::SubscriptionsTest < ActionCable::TestCase
class Connection < ActionCable::Connection::Base
attr_reader :websocket
def send_async(method, *args)
# Bypass Celluloid
send method, *args
end
end
class ChatChannel < ActionCable::Channel::Base
attr_reader :room, :lines
def subscribed
@room = Room.new params[:id]
@lines = []
end
def speak(data)
@lines << data
end
end
setup do
@server = TestServer.new
@server.stubs(:channel_classes).returns(ChatChannel.name => ChatChannel)
@chat_identifier = ActiveSupport::JSON.encode(id: 1, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
end
test "subscribe command" do
run_in_eventmachine do
setup_connection
channel = subscribe_to_chat_channel
assert_kind_of ChatChannel, channel
assert_equal 1, channel.room.id
end
end
test "subscribe command without an identifier" do
run_in_eventmachine do
setup_connection
@subscriptions.execute_command 'command' => 'subscribe'
assert @subscriptions.identifiers.empty?
end
end
test "unsubscribe command" do
run_in_eventmachine do
setup_connection
subscribe_to_chat_channel
channel = subscribe_to_chat_channel
channel.expects(:unsubscribe_from_channel)
@subscriptions.execute_command 'command' => 'unsubscribe', 'identifier' => @chat_identifier
assert @subscriptions.identifiers.empty?
end
end
test "unsubscribe command without an identifier" do
run_in_eventmachine do
setup_connection
@subscriptions.execute_command 'command' => 'unsubscribe'
assert @subscriptions.identifiers.empty?
end
end
test "message command" do
run_in_eventmachine do
setup_connection
channel = subscribe_to_chat_channel
data = { 'content' => 'Hello World!', 'action' => 'speak' }
@subscriptions.execute_command 'command' => 'message', 'identifier' => @chat_identifier, 'data' => ActiveSupport::JSON.encode(data)
assert_equal [ data ], channel.lines
end
end
test "unsubscrib from all" do
run_in_eventmachine do
setup_connection
channel1 = subscribe_to_chat_channel
channel2_id = ActiveSupport::JSON.encode(id: 2, channel: 'ActionCable::Connection::SubscriptionsTest::ChatChannel')
channel2 = subscribe_to_chat_channel(channel2_id)
channel1.expects(:unsubscribe_from_channel)
channel2.expects(:unsubscribe_from_channel)
@subscriptions.unsubscribe_from_all
end
end
private
def subscribe_to_chat_channel(identifier = @chat_identifier)
@subscriptions.execute_command 'command' => 'subscribe', 'identifier' => identifier
assert_equal identifier, @subscriptions.identifiers.last
@subscriptions.send :find, 'identifier' => identifier
end
def setup_connection
env = Rack::MockRequest.env_for "/test", 'HTTP_CONNECTION' => 'upgrade', 'HTTP_UPGRADE' => 'websocket'
@connection = Connection.new(@server, env)
@subscriptions = ActionCable::Connection::Subscriptions.new(@connection)
end
end

View file

@ -0,0 +1,8 @@
class GlobalID
attr_reader :uri
delegate :to_param, :to_s, to: :uri
def initialize(gid, options = {})
@uri = gid
end
end

View file

@ -0,0 +1,16 @@
class Room
attr_reader :id, :name
def initialize(id, name='Campfire')
@id = id
@name = name
end
def to_global_id
GlobalID.new("Room##{id}-#{name}")
end
def to_gid_param
to_global_id.to_param
end
end

View file

@ -0,0 +1,21 @@
require 'stubs/user'
class TestConnection
attr_reader :identifiers, :logger, :current_user, :transmissions
def initialize(user = User.new("lifo"))
@identifiers = [ :current_user ]
@current_user = user
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@transmissions = []
end
def transmit(data)
@transmissions << data
end
def last_transmission
@transmissions.last
end
end

View file

@ -0,0 +1,15 @@
require 'ostruct'
class TestServer
include ActionCable::Server::Connections
attr_reader :logger, :config
def initialize
@logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new)
@config = OpenStruct.new(log_tags: [])
end
def send_async
end
end

View file

@ -0,0 +1,15 @@
class User
attr_reader :name
def initialize(name)
@name = name
end
def to_global_id
GlobalID.new("User##{name}")
end
def to_gid_param
to_global_id.to_param
end
end

View file

@ -0,0 +1,42 @@
require File.expand_path('../../../load_paths', __FILE__)
require 'action_cable'
require 'active_support/testing/autorun'
require 'puma'
require 'em-hiredis'
require 'mocha/setup'
require 'rack/mock'
# Require all the stubs and models
Dir[File.dirname(__FILE__) + '/stubs/*.rb'].each {|file| require file }
require 'celluloid'
$CELLULOID_DEBUG = false
$CELLULOID_TEST = false
Celluloid.logger = Logger.new(StringIO.new)
require 'faye/websocket'
class << Faye::WebSocket
remove_method :ensure_reactor_running
# We don't want Faye to start the EM reactor in tests because it makes testing much harder.
# We want to be able to start and stop EM loop in tests to make things simpler.
def ensure_reactor_running
# no-op
end
end
class ActionCable::TestCase < ActiveSupport::TestCase
def run_in_eventmachine
EM.run do
yield
EM.run_deferred_callbacks
EM.stop
end
end
end

View file

@ -0,0 +1,49 @@
require 'test_helper'
class WorkerTest < ActiveSupport::TestCase
class Receiver
attr_accessor :last_action
def run
@last_action = :run
end
def process(message)
@last_action = [ :process, message ]
end
def connection
end
end
setup do
Celluloid.boot
@worker = ActionCable::Server::Worker.new
@receiver = Receiver.new
end
teardown do
@receiver.last_action = nil
end
test "invoke" do
@worker.invoke @receiver, :run
assert_equal :run, @receiver.last_action
end
test "invoke with arguments" do
@worker.invoke @receiver, :process, "Hello"
assert_equal [ :process, "Hello" ], @receiver.last_action
end
test "running periodic timers with a proc" do
@worker.run_periodic_timer @receiver, @receiver.method(:run)
assert_equal :run, @receiver.last_action
end
test "running periodic timers with a method" do
@worker.run_periodic_timer @receiver, :run
assert_equal :run, @receiver.last_action
end
end

View file

@ -25,6 +25,7 @@ Gem::Specification.new do |s|
s.add_dependency 'activerecord', version
s.add_dependency 'actionmailer', version
s.add_dependency 'activejob', version
s.add_dependency 'actioncable', version
s.add_dependency 'railties', version
s.add_dependency 'bundler', '>= 1.3.0', '< 2.0'