From 354018bf9b5f5bf0fbbc6e6efddc719e7523b39d Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Sat, 4 Apr 2015 00:26:14 -0500 Subject: [PATCH 1/8] Separate connection and server classes --- action_cable.gemspec | 1 + lib/action_cable.rb | 4 + lib/action_cable/channel/redis.rb | 7 +- lib/action_cable/connection.rb | 133 ++++++++++++++++++++++++++ lib/action_cable/connections.rb | 17 ++++ lib/action_cable/server.rb | 154 ++---------------------------- 6 files changed, 166 insertions(+), 150 deletions(-) create mode 100644 lib/action_cable/connection.rb create mode 100644 lib/action_cable/connections.rb diff --git a/action_cable.gemspec b/action_cable.gemspec index 1dade2a394..ba5c4159d6 100644 --- a/action_cable.gemspec +++ b/action_cable.gemspec @@ -12,6 +12,7 @@ Gem::Specification.new do |s| s.add_dependency('activesupport', '>= 4.2.0') s.add_dependency('faye-websocket', '~> 0.9.2') s.add_dependency('celluloid', '~> 0.16.0') + s.add_dependency('em-hiredis', '~> 0.3.0') s.files = Dir['README', 'lib/**/*'] s.has_rdoc = false diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 0681b8bdde..62046c7717 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -7,10 +7,12 @@ require 'active_support' require 'active_support/json' require 'active_support/concern' require 'active_support/core_ext/hash/indifferent_access' +require 'active_support/core_ext/module/delegation' require 'active_support/callbacks' require 'faye/websocket' require 'celluloid' +require 'em-hiredis' require 'action_cable/engine' if defined?(Rails) @@ -20,4 +22,6 @@ module ActionCable autoload :Channel, 'action_cable/channel' autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' + autoload :Connection, 'action_cable/connection' + autoload :Connections, 'action_cable/connections' end diff --git a/lib/action_cable/channel/redis.rb b/lib/action_cable/channel/redis.rb index bdbd3c95b1..2691a3b145 100644 --- a/lib/action_cable/channel/redis.rb +++ b/lib/action_cable/channel/redis.rb @@ -6,11 +6,10 @@ module ActionCable included do on_unsubscribe :unsubscribe_from_redis_channels + delegate :pubsub, to: :connection end def subscribe_to(redis_channel, callback = nil) - raise "`ActionCable::Server.pubsub` class method is not defined" unless connection.class.respond_to?(:pubsub) - callback ||= -> (message) { broadcast ActiveSupport::JSON.decode(message) } @_redis_channels ||= [] @_redis_channels << [ redis_channel, callback ] @@ -24,10 +23,6 @@ module ActionCable @_redis_channels.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } end end - - def pubsub - connection.class.pubsub - end end end diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb new file mode 100644 index 0000000000..00fb8ca817 --- /dev/null +++ b/lib/action_cable/connection.rb @@ -0,0 +1,133 @@ +module ActionCable + class Connection + PING_INTERVAL = 3 + + attr_reader :env, :server + delegate :worker_pool, :pubsub, :logger, to: :server + + def initialize(server, env) + @server = server + @env = env + @accept_messages = false + @pending_messages = [] + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) + + @websocket.on(:open) do |event| + broadcast_ping_timestamp + @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } + worker_pool.async.invoke(self, :initialize_client) + end + + @websocket.on(:message) do |event| + message = event.data + + if message.is_a?(String) + if @accept_messages + worker_pool.async.invoke(self, :received_data, message) + else + @pending_messages << message + end + end + end + + @websocket.on(:close) do |event| + worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) + + EventMachine.cancel_timer(@ping_timer) if @ping_timer + end + + @websocket.rack_response + else + invalid_request + end + end + + def received_data(data) + return unless websocket_alive? + + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def broadcast(data) + logger.info "Sending data: #{data}" + @websocket.send data + end + + def handle_exception + logger.error "[ActionCable] Closing connection" + + @websocket.close + end + + private + def initialize_client + connect if respond_to?(:connect) + @accept_messages = true + + worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + end + + def broadcast_ping_timestamp + broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + + if subscription_klass + logger.info "Subscribing to channel: #{id_key}" + @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) + else + logger.error "Unable to subscribe to channel: #{id_key}" + end + end + + def process_message(message) + if @subscriptions[message['identifier']] + @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) + else + logger.error "Unable to process message: #{message}" + end + end + + def unsubscribe_channel(data) + logger.info "Unsubscribing from channel: #{data['identifier']}" + @subscriptions[data['identifier']].unsubscribe + @subscriptions.delete(data['identifier']) + end + + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end + + def websocket_alive? + @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN + end + + end +end diff --git a/lib/action_cable/connections.rb b/lib/action_cable/connections.rb new file mode 100644 index 0000000000..e68cc6e7a4 --- /dev/null +++ b/lib/action_cable/connections.rb @@ -0,0 +1,17 @@ +module ActionCable + module Connections + class << self + def active + end + + def where(identification) + end + end + + def disconnect + end + + def reconnect + end + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index ebf98171c1..6e9265dc06 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,157 +1,23 @@ module ActionCable class Server - class_attribute :registered_channels - self.registered_channels = Set.new - - class_attribute :worker_pool_size - self.worker_pool_size = 100 - cattr_accessor(:logger, instance_reader: true) { Rails.logger } - PING_INTERVAL = 3 + attr_accessor :registered_channels, :worker_pool - class << self - def register_channels(*channel_classes) - self.registered_channels += channel_classes - end - - def call(env) - new(env).process - end - - def worker_pool - @worker_pool ||= ActionCable::Worker.pool(size: worker_pool_size) - end + def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection) + @redis_config = redis_config + @registered_channels = Set.new(channels) + @worker_pool = ActionCable::Worker.pool(size: worker_pool_size) + @connection_class = connection end - attr_reader :env - - def initialize(env) - @env = env - @accept_messages = false - @pending_messages = [] + def call(env) + @connection_class.new(self, env).process end - def process - if Faye::WebSocket.websocket?(@env) - @subscriptions = {} - - @websocket = Faye::WebSocket.new(@env) - - @websocket.on(:open) do |event| - broadcast_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } - worker_pool.async.invoke(self, :initialize_client) - end - - @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if @accept_messages - worker_pool.async.invoke(self, :received_data, message) - else - @pending_messages << message - end - end - end - - @websocket.on(:close) do |event| - worker_pool.async.invoke(self, :cleanup_subscriptions) - worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) - - EventMachine.cancel_timer(@ping_timer) if @ping_timer - end - - @websocket.rack_response - else - invalid_request - end + def pubsub + @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub end - def received_data(data) - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['action'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.unsubscribe - end - end - - def broadcast(data) - logger.info "Sending data: #{data}" - @websocket.send data - end - - def worker_pool - self.class.worker_pool - end - - def handle_exception - logger.error "[ActionCable] Closing connection" - - @websocket.close - end - - private - def initialize_client - connect if respond_to?(:connect) - @accept_messages = true - - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? - end - - def broadcast_ping_timestamp - broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - logger.info "Subscribing to channel: #{id_key}" - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Unable to subscribe to channel: #{id_key}" - end - end - - def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) - else - logger.error "Unable to process message: #{message}" - end - end - - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].unsubscribe - @subscriptions.delete(data['identifier']) - end - - def invalid_request - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] - end - - def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN - end - end end From eec92d0229a7bceb62d49d58c70b5629fe140d7f Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 12:21:22 -0500 Subject: [PATCH 2/8] Add connection identifier and an internal redis channel --- Gemfile.lock | 9 +- lib/action_cable.rb | 2 +- lib/action_cable/connection.rb | 133 +---------------- lib/action_cable/connection/base.rb | 139 ++++++++++++++++++ lib/action_cable/connection/registry.rb | 65 ++++++++ .../{connections.rb => connection_proxy.rb} | 5 +- 6 files changed, 216 insertions(+), 137 deletions(-) create mode 100644 lib/action_cable/connection/base.rb create mode 100644 lib/action_cable/connection/registry.rb rename lib/action_cable/{connections.rb => connection_proxy.rb} (76%) diff --git a/Gemfile.lock b/Gemfile.lock index e767e58784..6fcf4aa39f 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -4,6 +4,7 @@ PATH action_cable (0.0.2) activesupport (>= 4.2.0) celluloid (~> 0.16.0) + em-hiredis (~> 0.3.0) faye-websocket (~> 0.9.2) GEM @@ -17,10 +18,14 @@ GEM tzinfo (~> 1.1) celluloid (0.16.0) timers (~> 4.0.0) + em-hiredis (0.3.0) + eventmachine (~> 1.0) + hiredis (~> 0.5.0) eventmachine (1.0.7) faye-websocket (0.9.2) eventmachine (>= 0.12.0) websocket-driver (>= 0.5.1) + hiredis (0.5.2) hitimes (1.2.2) i18n (0.7.0) json (1.8.2) @@ -34,9 +39,9 @@ GEM hitimes tzinfo (1.2.2) thread_safe (~> 0.1) - websocket-driver (0.5.1) + websocket-driver (0.5.3) websocket-extensions (>= 0.1.0) - websocket-extensions (0.1.1) + websocket-extensions (0.1.2) PLATFORMS ruby diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 62046c7717..fd42dc6cdd 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -23,5 +23,5 @@ module ActionCable autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' autoload :Connection, 'action_cable/connection' - autoload :Connections, 'action_cable/connections' + autoload :ConnectionProxy, 'action_cable/connection_proxy' end diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 00fb8ca817..102903c6ef 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,133 +1,6 @@ module ActionCable - class Connection - PING_INTERVAL = 3 - - attr_reader :env, :server - delegate :worker_pool, :pubsub, :logger, to: :server - - def initialize(server, env) - @server = server - @env = env - @accept_messages = false - @pending_messages = [] - end - - def process - if Faye::WebSocket.websocket?(@env) - @subscriptions = {} - - @websocket = Faye::WebSocket.new(@env) - - @websocket.on(:open) do |event| - broadcast_ping_timestamp - @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } - worker_pool.async.invoke(self, :initialize_client) - end - - @websocket.on(:message) do |event| - message = event.data - - if message.is_a?(String) - if @accept_messages - worker_pool.async.invoke(self, :received_data, message) - else - @pending_messages << message - end - end - end - - @websocket.on(:close) do |event| - worker_pool.async.invoke(self, :cleanup_subscriptions) - worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) - - EventMachine.cancel_timer(@ping_timer) if @ping_timer - end - - @websocket.rack_response - else - invalid_request - end - end - - def received_data(data) - return unless websocket_alive? - - data = ActiveSupport::JSON.decode data - - case data['action'] - when 'subscribe' - subscribe_channel(data) - when 'unsubscribe' - unsubscribe_channel(data) - when 'message' - process_message(data) - end - end - - def cleanup_subscriptions - @subscriptions.each do |id, channel| - channel.unsubscribe - end - end - - def broadcast(data) - logger.info "Sending data: #{data}" - @websocket.send data - end - - def handle_exception - logger.error "[ActionCable] Closing connection" - - @websocket.close - end - - private - def initialize_client - connect if respond_to?(:connect) - @accept_messages = true - - worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? - end - - def broadcast_ping_timestamp - broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) - end - - def subscribe_channel(data) - id_key = data['identifier'] - id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access - - subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } - - if subscription_klass - logger.info "Subscribing to channel: #{id_key}" - @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) - else - logger.error "Unable to subscribe to channel: #{id_key}" - end - end - - def process_message(message) - if @subscriptions[message['identifier']] - @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) - else - logger.error "Unable to process message: #{message}" - end - end - - def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" - @subscriptions[data['identifier']].unsubscribe - @subscriptions.delete(data['identifier']) - end - - def invalid_request - [404, {'Content-Type' => 'text/plain'}, ['Page not found']] - end - - def websocket_alive? - @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN - end - + module Connection + autoload :Base, 'action_cable/connection/base' + autoload :Registry, 'action_cable/connection/Registry' end end diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb new file mode 100644 index 0000000000..c6c9899094 --- /dev/null +++ b/lib/action_cable/connection/base.rb @@ -0,0 +1,139 @@ +module ActionCable + module Connection + class Base + include Registry + + PING_INTERVAL = 3 + + attr_reader :env, :server + delegate :worker_pool, :pubsub, :logger, to: :server + + def initialize(server, env) + @server = server + @env = env + @accept_messages = false + @pending_messages = [] + end + + def process + if Faye::WebSocket.websocket?(@env) + @subscriptions = {} + + @websocket = Faye::WebSocket.new(@env) + + @websocket.on(:open) do |event| + broadcast_ping_timestamp + @ping_timer = EventMachine.add_periodic_timer(PING_INTERVAL) { broadcast_ping_timestamp } + worker_pool.async.invoke(self, :initialize_client) + end + + @websocket.on(:message) do |event| + message = event.data + + if message.is_a?(String) + if @accept_messages + worker_pool.async.invoke(self, :received_data, message) + else + @pending_messages << message + end + end + end + + @websocket.on(:close) do |event| + worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) + + EventMachine.cancel_timer(@ping_timer) if @ping_timer + end + + @websocket.rack_response + else + invalid_request + end + end + + def received_data(data) + return unless websocket_alive? + + data = ActiveSupport::JSON.decode data + + case data['action'] + when 'subscribe' + subscribe_channel(data) + when 'unsubscribe' + unsubscribe_channel(data) + when 'message' + process_message(data) + end + end + + def cleanup_subscriptions + @subscriptions.each do |id, channel| + channel.unsubscribe + end + end + + def broadcast(data) + logger.info "Sending data: #{data}" + @websocket.send data + end + + def handle_exception + logger.error "[ActionCable] Closing connection" + + @websocket.close + end + + private + def initialize_client + connect if respond_to?(:connect) + register_connection + + @accept_messages = true + worker_pool.async.invoke(self, :received_data, @pending_messages.shift) until @pending_messages.empty? + end + + def broadcast_ping_timestamp + broadcast({ identifier: '_ping', message: Time.now.to_i }.to_json) + end + + def subscribe_channel(data) + id_key = data['identifier'] + id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access + + subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } + + if subscription_klass + logger.info "Subscribing to channel: #{id_key}" + @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) + else + logger.error "Unable to subscribe to channel: #{id_key}" + end + end + + def process_message(message) + if @subscriptions[message['identifier']] + @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) + else + logger.error "Unable to process message: #{message}" + end + end + + def unsubscribe_channel(data) + logger.info "Unsubscribing from channel: #{data['identifier']}" + @subscriptions[data['identifier']].unsubscribe + @subscriptions.delete(data['identifier']) + end + + def invalid_request + [404, {'Content-Type' => 'text/plain'}, ['Page not found']] + end + + def websocket_alive? + @websocket && @websocket.ready_state == Faye::WebSocket::API::OPEN + end + + end + end +end diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/registry.rb new file mode 100644 index 0000000000..121f87f9f0 --- /dev/null +++ b/lib/action_cable/connection/registry.rb @@ -0,0 +1,65 @@ +module ActionCable + module Connection + module Registry + extend ActiveSupport::Concern + + included do + class_attribute :identifiers + self.identifiers = Set.new + end + + module ClassMethods + def identified_by(*identifiers) + self.identifiers += identifiers + end + end + + def register_connection + if connection_identifier.present? + callback = -> (message) { process_registry_message(message) } + @_internal_redis_subscriptions ||= [] + @_internal_redis_subscriptions << [ internal_redis_channel, callback ] + + pubsub.subscribe(internal_redis_channel, &callback) + logger.info "[ActionCable] Registered connection (#{connection_identifier})" + puts "[ActionCable] Registered connection: #{connection_identifier}(#{internal_redis_channel})" + end + end + + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} + end + + def connection_gid(ids) + ids.map {|o| o.to_global_id.to_s }.sort.join(":") + end + + def cleanup_internal_redis_subscriptions + if @_internal_redis_subscriptions.present? + @_internal_redis_subscriptions.each { |channel, callback| pubsub.unsubscribe_proc(channel, callback) } + end + end + + private + def process_registry_message(message) + message = ActiveSupport::JSON.decode(message) + + case message['type'] + when 'disconnect' + logger.info "[ActionCable] Removing connection (#{connection_identifier})" + @websocket.close + end + rescue Exception => e + logger.error "[ActionCable] There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + handle_exception + end + + end + end +end diff --git a/lib/action_cable/connections.rb b/lib/action_cable/connection_proxy.rb similarity index 76% rename from lib/action_cable/connections.rb rename to lib/action_cable/connection_proxy.rb index e68cc6e7a4..980e037ff3 100644 --- a/lib/action_cable/connections.rb +++ b/lib/action_cable/connection_proxy.rb @@ -1,5 +1,5 @@ module ActionCable - module Connections + module ConnectionProxy class << self def active end @@ -10,8 +10,5 @@ module ActionCable def disconnect end - - def reconnect - end end end From 1c9d82dbf0e743534c5fa9be936eaa46c5b07523 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:17:02 -0500 Subject: [PATCH 3/8] Add remote connection to talk over internal redis channel --- action_cable.gemspec | 1 + lib/action_cable.rb | 2 +- lib/action_cable/connection.rb | 3 +- lib/action_cable/connection/identifier.rb | 19 ++++++++++++ lib/action_cable/connection_proxy.rb | 14 --------- lib/action_cable/remote_connection.rb | 38 +++++++++++++++++++++++ lib/action_cable/server.rb | 12 +++++-- 7 files changed, 71 insertions(+), 18 deletions(-) create mode 100644 lib/action_cable/connection/identifier.rb delete mode 100644 lib/action_cable/connection_proxy.rb create mode 100644 lib/action_cable/remote_connection.rb diff --git a/action_cable.gemspec b/action_cable.gemspec index ba5c4159d6..4f252d4b1e 100644 --- a/action_cable.gemspec +++ b/action_cable.gemspec @@ -13,6 +13,7 @@ Gem::Specification.new do |s| s.add_dependency('faye-websocket', '~> 0.9.2') s.add_dependency('celluloid', '~> 0.16.0') s.add_dependency('em-hiredis', '~> 0.3.0') + s.add_dependency('redis', '~> 3.0') s.files = Dir['README', 'lib/**/*'] s.has_rdoc = false diff --git a/lib/action_cable.rb b/lib/action_cable.rb index fd42dc6cdd..159ee2bcc0 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -23,5 +23,5 @@ module ActionCable autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' autoload :Connection, 'action_cable/connection' - autoload :ConnectionProxy, 'action_cable/connection_proxy' + autoload :RemoteConnection, 'action_cable/remote_connection' end diff --git a/lib/action_cable/connection.rb b/lib/action_cable/connection.rb index 102903c6ef..91fc73713c 100644 --- a/lib/action_cable/connection.rb +++ b/lib/action_cable/connection.rb @@ -1,6 +1,7 @@ module ActionCable module Connection autoload :Base, 'action_cable/connection/base' - autoload :Registry, 'action_cable/connection/Registry' + autoload :Registry, 'action_cable/connection/registry' + autoload :Identifier, 'action_cable/connection/identifier' end end diff --git a/lib/action_cable/connection/identifier.rb b/lib/action_cable/connection/identifier.rb new file mode 100644 index 0000000000..9bfd773ab1 --- /dev/null +++ b/lib/action_cable/connection/identifier.rb @@ -0,0 +1,19 @@ +module ActionCable + module Connection + module Identifier + + def internal_redis_channel + "action_cable/#{connection_identifier}" + end + + def connection_identifier + @connection_identifier ||= connection_gid identifiers.map { |id| instance_variable_get("@#{id}")} + end + + def connection_gid(ids) + ids.map {|o| o.to_global_id.to_s }.sort.join(":") + end + + end + end +end diff --git a/lib/action_cable/connection_proxy.rb b/lib/action_cable/connection_proxy.rb deleted file mode 100644 index 980e037ff3..0000000000 --- a/lib/action_cable/connection_proxy.rb +++ /dev/null @@ -1,14 +0,0 @@ -module ActionCable - module ConnectionProxy - class << self - def active - end - - def where(identification) - end - end - - def disconnect - end - end -end diff --git a/lib/action_cable/remote_connection.rb b/lib/action_cable/remote_connection.rb new file mode 100644 index 0000000000..e2cb2d932c --- /dev/null +++ b/lib/action_cable/remote_connection.rb @@ -0,0 +1,38 @@ +module ActionCable + class RemoteConnection + class InvalidIdentifiersError < StandardError; end + + include Connection::Identifier + + delegate :redis, to: :server + + def initialize(server, ids) + @server = server + set_identifier_instance_vars(ids) + end + + def disconnect + message = { type: 'disconnect' }.to_json + redis.publish(internal_redis_channel, message) + end + + def identifiers + @server.connection_identifiers + end + + def redis + @redis ||= Redis.new(@server.redis_config) + end + + private + def set_identifier_instance_vars(ids) + raise InvalidIdentifiersError unless valid_identifiers?(ids) + ids.each { |k,v| instance_variable_set("@#{k}", v) } + end + + def valid_identifiers?(ids) + keys = ids.keys + identifiers.all? { |id| keys.include?(id) } + end + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 6e9265dc06..51e246c232 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -2,12 +2,12 @@ module ActionCable class Server cattr_accessor(:logger, instance_reader: true) { Rails.logger } - attr_accessor :registered_channels, :worker_pool + attr_accessor :registered_channels, :redis_config def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection) @redis_config = redis_config @registered_channels = Set.new(channels) - @worker_pool = ActionCable::Worker.pool(size: worker_pool_size) + @worker_pool_size = worker_pool_size @connection_class = connection end @@ -15,9 +15,17 @@ module ActionCable @connection_class.new(self, env).process end + def worker_pool + @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size) + end + def pubsub @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub end + def connection_identifiers + @connection_class.identifiers + end + end end From 9501dd2f6d2f0ece938e2a15a31429e9b8cf1001 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:17:14 -0500 Subject: [PATCH 4/8] Be sure to cleanup internal redis subscriptions on close --- lib/action_cable/connection/base.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index c6c9899094..c5b982acf8 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -41,7 +41,7 @@ module ActionCable @websocket.on(:close) do |event| worker_pool.async.invoke(self, :cleanup_subscriptions) - worker_pool.async.invoke(self, :cleanup_subscriptions) + worker_pool.async.invoke(self, :cleanup_internal_redis_subscriptions) worker_pool.async.invoke(self, :disconnect) if respond_to?(:disconnect) EventMachine.cancel_timer(@ping_timer) if @ping_timer From 0c143c03441cf2a66557ec7ba2f5d3d2f889fb5d Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:17:41 -0500 Subject: [PATCH 5/8] Remove a puts message --- lib/action_cable/connection/registry.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/action_cable/connection/registry.rb b/lib/action_cable/connection/registry.rb index 121f87f9f0..03a0bf4fe9 100644 --- a/lib/action_cable/connection/registry.rb +++ b/lib/action_cable/connection/registry.rb @@ -22,7 +22,6 @@ module ActionCable pubsub.subscribe(internal_redis_channel, &callback) logger.info "[ActionCable] Registered connection (#{connection_identifier})" - puts "[ActionCable] Registered connection: #{connection_identifier}(#{internal_redis_channel})" end end From 90566fce53a71660c746e23499f8aa134b457335 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 13:31:17 -0500 Subject: [PATCH 6/8] Remote connections API for the server --- lib/action_cable.rb | 1 + lib/action_cable/remote_connections.rb | 13 +++++++++++++ lib/action_cable/server.rb | 4 ++++ 3 files changed, 18 insertions(+) create mode 100644 lib/action_cable/remote_connections.rb diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 159ee2bcc0..3352453491 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -24,4 +24,5 @@ module ActionCable autoload :Server, 'action_cable/server' autoload :Connection, 'action_cable/connection' autoload :RemoteConnection, 'action_cable/remote_connection' + autoload :RemoteConnections, 'action_cable/remote_connections' end diff --git a/lib/action_cable/remote_connections.rb b/lib/action_cable/remote_connections.rb new file mode 100644 index 0000000000..f9d7c49a27 --- /dev/null +++ b/lib/action_cable/remote_connections.rb @@ -0,0 +1,13 @@ +module ActionCable + class RemoteConnections + attr_reader :server + + def initialize(server) + @server = server + end + + def where(identifier) + RemoteConnection.new(server, identifier) + end + end +end diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 51e246c232..222c77fd51 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -23,6 +23,10 @@ module ActionCable @pubsub ||= EM::Hiredis.connect(@redis_config['url']).pubsub end + def remote_connections + @remote_connections ||= RemoteConnections.new(self) + end + def connection_identifiers @connection_class.identifiers end From 8c4c782c78b45d94e97ee9a26a05c44cae7cd428 Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 15:38:22 -0500 Subject: [PATCH 7/8] Catch exceptions when subscribing to a channel and processing a message --- lib/action_cable/connection/base.rb | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index c5b982acf8..ea5d52e99c 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -105,23 +105,29 @@ module ActionCable subscription_klass = server.registered_channels.detect { |channel_klass| channel_klass.find_name == id_options[:channel] } if subscription_klass - logger.info "Subscribing to channel: #{id_key}" + logger.info "[ActionCable] Subscribing to channel: #{id_key}" @subscriptions[id_key] = subscription_klass.new(self, id_key, id_options) else - logger.error "Unable to subscribe to channel: #{id_key}" + logger.error "[ActionCable] Subscription class not found (#{data.inspect})" end + rescue Exception => e + logger.error "[ActionCable] Could not subscribe to channel (#{data.inspect})" + logger.error e.backtrace.join("\n") end def process_message(message) if @subscriptions[message['identifier']] @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) else - logger.error "Unable to process message: #{message}" + logger.error "[ActionCable] Unable to process message because no subscription found (#{message.inspect})" end + rescue Exception => e + logger.error "[ActionCable] Could not process message (#{data.inspect})" + logger.error e.backtrace.join("\n") end def unsubscribe_channel(data) - logger.info "Unsubscribing from channel: #{data['identifier']}" + logger.info "[ActionCable] Unsubscribing from channel: #{data['identifier']}" @subscriptions[data['identifier']].unsubscribe @subscriptions.delete(data['identifier']) end From fb797ad1f1c3b0d96968c5feef783a2b8fe07eed Mon Sep 17 00:00:00 2001 From: Pratik Naik Date: Mon, 6 Apr 2015 15:43:23 -0500 Subject: [PATCH 8/8] Fix an error message --- lib/action_cable/connection/base.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/action_cable/connection/base.rb b/lib/action_cable/connection/base.rb index ea5d52e99c..4ad1e7d065 100644 --- a/lib/action_cable/connection/base.rb +++ b/lib/action_cable/connection/base.rb @@ -119,7 +119,7 @@ module ActionCable if @subscriptions[message['identifier']] @subscriptions[message['identifier']].receive_data(ActiveSupport::JSON.decode message['data']) else - logger.error "[ActionCable] Unable to process message because no subscription found (#{message.inspect})" + logger.error "[ActionCable] Unable to process message because no subscription was found (#{message.inspect})" end rescue Exception => e logger.error "[ActionCable] Could not process message (#{data.inspect})"