From 0016e0410b11d40a1d730a1232c40f428d67abeb Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Wed, 6 Jan 2016 17:16:02 -0500 Subject: [PATCH 01/14] Adapterize ActionCable storage and extract behavior --- actioncable/lib/action_cable.rb | 1 + actioncable/lib/action_cable/channel/base.rb | 4 +- .../lib/action_cable/connection/base.rb | 2 +- .../connection/internal_channel.rb | 12 ++-- actioncable/lib/action_cable/engine.rb | 6 +- .../lib/action_cable/remote_connections.rb | 2 +- actioncable/lib/action_cable/server/base.rb | 17 +---- .../lib/action_cable/server/broadcasting.rb | 10 +-- .../lib/action_cable/server/configuration.rb | 20 +++++- .../lib/action_cable/storage_adapter.rb | 6 ++ .../lib/action_cable/storage_adapter/base.rb | 22 +++++++ .../lib/action_cable/storage_adapter/redis.rb | 30 +++++++++ actioncable/test/storage_adapter/base_test.rb | 64 +++++++++++++++++++ .../generators/rails/app/app_generator.rb | 4 +- .../rails/app/templates/config/cable.yml | 12 ++++ .../app/templates/config/redis/cable.yml | 9 --- .../test/generators/app_generator_test.rb | 2 +- 17 files changed, 173 insertions(+), 50 deletions(-) create mode 100644 actioncable/lib/action_cable/storage_adapter.rb create mode 100644 actioncable/lib/action_cable/storage_adapter/base.rb create mode 100644 actioncable/lib/action_cable/storage_adapter/redis.rb create mode 100644 actioncable/test/storage_adapter/base_test.rb create mode 100644 railties/lib/rails/generators/rails/app/templates/config/cable.yml delete mode 100644 railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 97f485b32e..5cc29ecd00 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -47,4 +47,5 @@ module ActionCable autoload :Connection autoload :Channel autoload :RemoteConnections + autoload :StorageAdapter end diff --git a/actioncable/lib/action_cable/channel/base.rb b/actioncable/lib/action_cable/channel/base.rb index ce9d62635c..88cdc1cab1 100644 --- a/actioncable/lib/action_cable/channel/base.rb +++ b/actioncable/lib/action_cable/channel/base.rb @@ -133,8 +133,8 @@ module ActionCable @identifier = identifier @params = params - # When a channel is streaming via redis pubsub, we want to delay the confirmation - # transmission until redis pubsub subscription is confirmed. + # When a channel is streaming via pubsub, we want to delay the confirmation + # transmission until pubsub subscription is confirmed. @defer_subscription_confirmation = false @reject_subscription = nil diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index a8cfdf90f3..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -60,7 +60,7 @@ module ActionCable @subscriptions = ActionCable::Connection::Subscriptions.new(self) @message_buffer = ActionCable::Connection::MessageBuffer.new(self) - @_internal_redis_subscriptions = nil + @_internal_subscriptions = nil @started_at = Time.now end diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index c065a24ab7..63ba293877 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -5,24 +5,24 @@ module ActionCable extend ActiveSupport::Concern private - def internal_redis_channel + def internal_channel "action_cable/#{connection_identifier}" end def subscribe_to_internal_channel if connection_identifier.present? callback = -> (message) { process_internal_message(message) } - @_internal_redis_subscriptions ||= [] - @_internal_redis_subscriptions << [ internal_redis_channel, callback ] + @_internal_subscriptions ||= [] + @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_redis_channel, &callback) } + EM.next_tick { pubsub.subscribe(internal_channel, &callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel - if @_internal_redis_subscriptions.present? - @_internal_redis_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } + if @_internal_subscriptions.present? + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 2d3caa5b0a..193f54333e 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -24,11 +24,11 @@ module ActionCable options = app.config.action_cable options.allowed_request_origins ||= "http://localhost:3000" if ::Rails.env.development? - app.paths.add "config/redis/cable", with: "config/redis/cable.yml" + app.paths.add "config/cable", with: "config/cable.yml" ActiveSupport.on_load(:action_cable) do - if (redis_cable_path = Pathname.new(app.config.paths["config/redis/cable"].first)).exist? - self.redis = Rails.application.config_for(redis_cable_path).with_indifferent_access + if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist? + self.config_opts = Rails.application.config_for(config_path).with_indifferent_access end options.each { |k,v| send("#{k}=", v) } diff --git a/actioncable/lib/action_cable/remote_connections.rb b/actioncable/lib/action_cable/remote_connections.rb index 1230d905ad..aa2fc95d2f 100644 --- a/actioncable/lib/action_cable/remote_connections.rb +++ b/actioncable/lib/action_cable/remote_connections.rb @@ -39,7 +39,7 @@ module ActionCable # Uses the internal channel to disconnect the connection. def disconnect - server.broadcast internal_redis_channel, type: 'disconnect' + server.broadcast internal_channel, type: 'disconnect' end # Returns all the identifiers that were applied to this connection. diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 3785bbd154..6539745c79 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -1,5 +1,3 @@ -require 'em-hiredis' - module ActionCable module Server # A singleton ActionCable::Server instance is available via ActionCable.server. It's used by the rack process that starts the cable server, but @@ -47,20 +45,9 @@ module ActionCable end end - # The redis pubsub adapter used for all streams/broadcasting. + # The pubsub adapter used for all streams/broadcasting. def pubsub - @pubsub ||= redis.pubsub - end - - # The EventMachine Redis instance used by the pubsub adapter. - def redis - @redis ||= EM::Hiredis.connect(config.redis[:url]).tap do |redis| - redis.on(:reconnect_failed) do - logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close - end - end + @pubsub ||= config.storage_adapter.new(self).pubsub end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index c759239a0e..847ef50971 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -1,5 +1,3 @@ -require 'redis' - module ActionCable module Server # Broadcasting is how other parts of your application can send messages to the channel subscribers. As explained in Channel, most of the time, these @@ -31,11 +29,6 @@ module ActionCable Broadcaster.new(self, broadcasting) end - # The redis instance used for broadcasting. Not intended for direct user use. - def broadcasting_redis - @broadcasting_redis ||= Redis.new(config.redis) - end - private class Broadcaster attr_reader :server, :broadcasting @@ -46,7 +39,8 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.broadcasting_redis.publish broadcasting, ActiveSupport::JSON.encode(message) + broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast + broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 935133cbba..2bed5a9ea2 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -5,9 +5,9 @@ module ActionCable class Configuration attr_accessor :logger, :log_tags attr_accessor :connection_class, :worker_pool_size - attr_accessor :redis, :channels_path + attr_accessor :channels_path attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :url + attr_accessor :config_opts, :url def initialize @log_tags = [] @@ -29,6 +29,22 @@ module ActionCable Pathname.new(channel_path).basename.to_s.split('.').first.camelize end end + + ADAPTER = ActionCable::StorageAdapter + + # Returns constant of storage adapter specified in config/cable.yml + # If the adapter cannot be found, this will default to the Redis adapter + def storage_adapter + # "ActionCable::StorageAdapter::#{adapter.capitalize}" + adapter = config_opts['adapter'] + adapter_const = "ActionCable::StorageAdapter::#{adapter.capitalize}" + + if Object.const_defined?(adapter_const) + adapter_const.constantize + else + ADAPTER_BASE::Redis + end + end end end end diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb new file mode 100644 index 0000000000..991270d2b3 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter.rb @@ -0,0 +1,6 @@ +module ActionCable + module StorageAdapter + autoload :Base, 'action_cable/storage_adapter/base' + autoload :Redis, 'action_cable/storage_adapter/redis' + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb new file mode 100644 index 0000000000..26b3ded676 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/base.rb @@ -0,0 +1,22 @@ +module ActionCable + module StorageAdapter + class Base + attr_reader :logger, :server + + def initialize(server) + @server = server + @logger = @server.logger + end + + # Storage connection instance used for broadcasting. Not intended for direct user use. + def broadcast + raise NotImplementedError + end + + # Storage connection instance used for pubsub. + def pubsub + raise NotImplementedError + end + end + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb new file mode 100644 index 0000000000..7b712b9b03 --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -0,0 +1,30 @@ +require 'em-hiredis' +require 'redis' + +module ActionCable + module StorageAdapter + class Redis < Base + # The redis instance used for broadcasting. Not intended for direct user use. + def broadcast + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + + def pubsub + redis.pubsub + end + + private + + # The EventMachine Redis instance used by the pubsub adapter. + def redis + @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + end + end + end + end +end diff --git a/actioncable/test/storage_adapter/base_test.rb b/actioncable/test/storage_adapter/base_test.rb new file mode 100644 index 0000000000..e4a25fcfd4 --- /dev/null +++ b/actioncable/test/storage_adapter/base_test.rb @@ -0,0 +1,64 @@ +require 'test_helper' +require 'stubs/test_server' + +class ActionCable::StorageAdapter::BaseTest < ActionCable::TestCase + ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS + + class BrokenAdapter < ActionCable::StorageAdapter::Base + end + + setup do + @server = TestServer.new + @server.config.allowed_request_origins = %w( http://rubyonrails.com ) + end + + test "#broadcast returns NotImplementedError by default" do + assert_raises NotImplementedError do + BrokenAdapter.new(@server).broadcast + end + end + + test "#pubsub returns NotImplementedError by default" do + assert_raises NotImplementedError do + BrokenAdapter.new(@server).pubsub + end + end + + # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT + + class SuccessAdapterBackend + def publish(channel, message) + end + + def subscribe(*channels, &block) + end + + def unsubscribe(*channels, &block) + end + end + + class SuccessAdapter < ActionCable::StorageAdapter::Base + def broadcast + SuccessAdapterBackend.new + end + + def pubsub + SuccessAdapterBackend.new + end + end + + test "#broadcast responds to #publish" do + broadcast = SuccessAdapter.new(@server).broadcast + assert_respond_to(broadcast, :publish) + end + + test "#pubsub responds to #subscribe" do + pubsub = SuccessAdapter.new(@server).pubsub + assert_respond_to(pubsub, :subscribe) + end + + test "#pubsub responds to #unsubscribe" do + pubsub = SuccessAdapter.new(@server).pubsub + assert_respond_to(pubsub, :unsubscribe) + end +end diff --git a/railties/lib/rails/generators/rails/app/app_generator.rb b/railties/lib/rails/generators/rails/app/app_generator.rb index f4deec7135..7ec4d3bbd3 100644 --- a/railties/lib/rails/generators/rails/app/app_generator.rb +++ b/railties/lib/rails/generators/rails/app/app_generator.rb @@ -78,11 +78,11 @@ module Rails template "application.rb" template "environment.rb" template "secrets.yml" + template "cable.yml" unless options[:skip_action_cable] directory "environments" directory "initializers" directory "locales" - directory "redis" unless options[:skip_action_cable] end end @@ -315,7 +315,7 @@ module Rails def delete_action_cable_files_skipping_action_cable if options[:skip_action_cable] - remove_file 'config/redis/cable.yml' + remove_file 'config/cable.yml' remove_file 'app/assets/javascripts/cable.coffee' remove_dir 'app/channels' end diff --git a/railties/lib/rails/generators/rails/app/templates/config/cable.yml b/railties/lib/rails/generators/rails/app/templates/config/cable.yml new file mode 100644 index 0000000000..004adb7b3c --- /dev/null +++ b/railties/lib/rails/generators/rails/app/templates/config/cable.yml @@ -0,0 +1,12 @@ +# Action Cable uses Redis by default to administer connections, channels, and sending/receiving messages over the WebSocket. +production: + adapter: redis + url: redis://localhost:6379/1 + +development: + adapter: redis + url: redis://localhost:6379/2 + +test: + adapter: redis + url: redis://localhost:6379/3 diff --git a/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml b/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml deleted file mode 100644 index 0176be24f9..0000000000 --- a/railties/lib/rails/generators/rails/app/templates/config/redis/cable.yml +++ /dev/null @@ -1,9 +0,0 @@ -# Action Cable uses Redis to administer connections, channels, and sending/receiving messages over the WebSocket. -production: - url: redis://localhost:6379/1 - -development: - url: redis://localhost:6379/2 - -test: - url: redis://localhost:6379/3 diff --git a/railties/test/generators/app_generator_test.rb b/railties/test/generators/app_generator_test.rb index e5480180ce..cabbc802e1 100644 --- a/railties/test/generators/app_generator_test.rb +++ b/railties/test/generators/app_generator_test.rb @@ -392,7 +392,7 @@ class AppGeneratorTest < Rails::Generators::TestCase def test_generator_if_skip_action_cable_is_given run_generator [destination_root, "--skip-action-cable"] assert_file "config/application.rb", /#\s+require\s+["']action_cable\/engine["']/ - assert_no_file "config/redis/cable.yml" + assert_no_file "config/cable.yml" assert_no_file "app/assets/javascripts/cable.coffee" assert_no_file "app/channels" end From 439154250ccd75e9d392a1cbb6f0105ea857e6f5 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Thu, 7 Jan 2016 09:37:41 -0500 Subject: [PATCH 02/14] Refactor storage_adapter --- .../lib/action_cable/server/configuration.rb | 14 +++----------- 1 file changed, 3 insertions(+), 11 deletions(-) diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 2bed5a9ea2..2349f36198 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -30,20 +30,12 @@ module ActionCable end end - ADAPTER = ActionCable::StorageAdapter - # Returns constant of storage adapter specified in config/cable.yml # If the adapter cannot be found, this will default to the Redis adapter def storage_adapter - # "ActionCable::StorageAdapter::#{adapter.capitalize}" - adapter = config_opts['adapter'] - adapter_const = "ActionCable::StorageAdapter::#{adapter.capitalize}" - - if Object.const_defined?(adapter_const) - adapter_const.constantize - else - ADAPTER_BASE::Redis - end + # Defaults to redis if no adapter is set + adapter = config_opts.fetch('adapter') { 'redis' } + "ActionCable::StorageAdapter::#{adapter.camelize}".constantize end end end From 7b79ae0335b67377636cf2ba7be70a4119ca90cd Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Thu, 7 Jan 2016 11:28:52 -0500 Subject: [PATCH 03/14] Add Postgres adapter --- actioncable/actioncable.gemspec | 1 + .../lib/action_cable/storage_adapter.rb | 1 + .../action_cable/storage_adapter/postgres.rb | 109 ++++++++++++++++++ 3 files changed, 111 insertions(+) create mode 100644 actioncable/lib/action_cable/storage_adapter/postgres.rb diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index a04fc932aa..2ab8b4785a 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -28,4 +28,5 @@ Gem::Specification.new do |s| s.add_development_dependency 'puma' s.add_development_dependency 'mocha' + s.add_development_dependency 'pg' end diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb index 991270d2b3..f1c395eb3a 100644 --- a/actioncable/lib/action_cable/storage_adapter.rb +++ b/actioncable/lib/action_cable/storage_adapter.rb @@ -1,6 +1,7 @@ module ActionCable module StorageAdapter autoload :Base, 'action_cable/storage_adapter/base' + autoload :Postgres, 'action_cable/storage_adapter/postgres' autoload :Redis, 'action_cable/storage_adapter/redis' end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb new file mode 100644 index 0000000000..67bc2cd77a --- /dev/null +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -0,0 +1,109 @@ +require 'thread' + +module ActionCable + module StorageAdapter + class Postgres < Base + # The storage instance used for broadcasting. Not intended for direct user use. + def broadcast + @broadcast ||= PostgresWrapper.new + end + + def pubsub + PostgresWrapper.new + end + + class Listener + include Singleton + + attr_accessor :subscribers + + def initialize + @subscribers = Hash.new {|h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new + + Thread.new do + Thread.current.abort_on_exception = true + listen + end + end + + def listen + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + callback.call(message) + end + end + end + end + end + + def subscribe_to(channel, callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel]) + end + + @subscribers[channel] << callback + end + end + + def unsubscribe_to(channel, callback = nil) + @sync.synchronize do + if callback + @subscribers[channel].delete(callback) + else + @subscribers.delete(channel) + end + + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end + end + end + end + + class PostgresWrapper + def publish(channel, message) + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + pg_conn.exec("NOTIFY #{channel}, '#{message}'") + end + end + + def subscribe(channel, &callback) + Listener.instance.subscribe_to(channel, callback) + # Needed for channel/streams.rb#L79 + ::EM::DefaultDeferrable.new + end + + def unsubscribe(channel) + Listener.instance.unsubscribe_to(channel) + end + + def unsubscribe_proc(channel, block) + Listener.instance.unsubscribe_to(channel, block) + end + end + + end + end +end From 2815db356977b506f63d155aecf71ee010a64c62 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 14 Jan 2016 15:55:41 +1030 Subject: [PATCH 04/14] Pull the action methods directly onto the adapter --- .../lib/action_cable/channel/streams.rb | 8 +-- .../lib/action_cable/connection/base.rb | 2 +- .../connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/server/base.rb | 4 +- .../lib/action_cable/server/broadcasting.rb | 3 +- .../lib/action_cable/storage_adapter/base.rb | 10 ++-- .../action_cable/storage_adapter/postgres.rb | 54 +++++++------------ .../lib/action_cable/storage_adapter/redis.rb | 22 +++++--- 8 files changed, 51 insertions(+), 56 deletions(-) diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index b5ffa17f72..89dcbdfa27 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,10 +76,10 @@ module ActionCable streams << [ broadcasting, callback ] EM.next_tick do - pubsub.subscribe(broadcasting, &callback).callback do |reply| + adapter.subscribe(broadcasting, callback, lambda do |reply| transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" - end + end) end end @@ -92,13 +92,13 @@ module ActionCable def stop_all_streams streams.each do |broadcasting, callback| - pubsub.unsubscribe_proc broadcasting, callback + adapter.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end private - delegate :pubsub, to: :connection + delegate :adapter, to: :connection def streams @_streams ||= [] diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index bb8850aaa0..2d7f99b09a 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :pubsub, to: :server + delegate :worker_pool, :adapter, to: :server def initialize(server, env) @server, @env = server, env diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index 63ba293877..c618e9d087 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { pubsub.subscribe(internal_channel, &callback) } + EM.next_tick { adapter.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe_proc(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 6539745c79..e0703101aa 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -46,8 +46,8 @@ module ActionCable end # The pubsub adapter used for all streams/broadcasting. - def pubsub - @pubsub ||= config.storage_adapter.new(self).pubsub + def adapter + @adapter ||= config.storage_adapter.new(self) end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 847ef50971..021589b82d 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,8 +39,7 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - broadcast_storage_adapter = server.config.storage_adapter.new(server).broadcast - broadcast_storage_adapter.publish broadcasting, ActiveSupport::JSON.encode(message) + server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/storage_adapter/base.rb index 26b3ded676..4330bc28f1 100644 --- a/actioncable/lib/action_cable/storage_adapter/base.rb +++ b/actioncable/lib/action_cable/storage_adapter/base.rb @@ -8,13 +8,15 @@ module ActionCable @logger = @server.logger end - # Storage connection instance used for broadcasting. Not intended for direct user use. - def broadcast + def broadcast(channel, payload) raise NotImplementedError end - # Storage connection instance used for pubsub. - def pubsub + def subscribe(channel, message_callback, success_callback = nil) + raise NotImplementedError + end + + def unsubscribe(channel, message_callback) raise NotImplementedError end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 67bc2cd77a..07c2c7ce6a 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -4,12 +4,24 @@ module ActionCable module StorageAdapter class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= PostgresWrapper.new + def broadcast(channel, payload) + ActiveRecord::Base.connection_pool.with_connection do |ar_conn| + pg_conn = ar_conn.raw_connection + + unless pg_conn.is_a?(PG::Connection) + raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' + end + + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end end - def pubsub - PostgresWrapper.new + def subscribe(channel, message_callback, success_callback = nil) + Listener.instance.subscribe_to(channel, message_callback, success_callback) + end + + def unsubscribe(channel, message_callback) + Listener.instance.unsubscribe_to(channel, message_callback) end class Listener @@ -37,6 +49,7 @@ module ActionCable value = @queue.pop(true) if value.first == :listen pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] elsif value.first == :unlisten pg_conn.exec("UNLISTEN #{value[1]}") end @@ -51,10 +64,10 @@ module ActionCable end end - def subscribe_to(channel, callback) + def subscribe_to(channel, callback, success_callback) @sync.synchronize do if @subscribers[channel].empty? - @queue.push([:listen, channel]) + @queue.push([:listen, channel, success_callback]) end @subscribers[channel] << callback @@ -75,35 +88,6 @@ module ActionCable end end end - - class PostgresWrapper - def publish(channel, message) - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection - - unless pg_conn.is_a?(PG::Connection) - raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' - end - - pg_conn.exec("NOTIFY #{channel}, '#{message}'") - end - end - - def subscribe(channel, &callback) - Listener.instance.subscribe_to(channel, callback) - # Needed for channel/streams.rb#L79 - ::EM::DefaultDeferrable.new - end - - def unsubscribe(channel) - Listener.instance.unsubscribe_to(channel) - end - - def unsubscribe_proc(channel, block) - Listener.instance.unsubscribe_to(channel, block) - end - end - end end end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 7b712b9b03..3f0f6c4172 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -4,19 +4,29 @@ require 'redis' module ActionCable module StorageAdapter class Redis < Base - # The redis instance used for broadcasting. Not intended for direct user use. - def broadcast - @broadcast ||= ::Redis.new(@server.config.config_opts) + def broadcast(channel, payload) + redis_conn.publish(channel, payload) end - def pubsub - redis.pubsub + def subscribe(channel, message_callback, success_callback = nil) + hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + result.callback(&success_callback) if success_callback + end + end + + def unsubscribe(channel, message_callback) + hi_redis_conn.pubsub.unsubscribe_proc(channel, message_callback) end private + # The redis instance used for broadcasting. Not intended for direct user use. + def redis_conn + @broadcast ||= ::Redis.new(@server.config.config_opts) + end + # The EventMachine Redis instance used by the pubsub adapter. - def redis + def hi_redis_conn @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." From 05d753ff31548377dec587ed1aeef03d92ec535f Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 14 Jan 2016 15:59:32 +1030 Subject: [PATCH 05/14] Don't execute callbacks on our main listener thread --- actioncable/lib/action_cable/storage_adapter/postgres.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 07c2c7ce6a..8b4c0ef29f 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -57,7 +57,7 @@ module ActionCable pg_conn.wait_for_notify(1) do |chan, pid, message| @subscribers[chan].each do |callback| - callback.call(message) + ::EM.next_tick { callback.call(message) } end end end From 9631c6771061866bd89b4f632a5507c5a54603e4 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 14 Jan 2016 16:00:25 +1030 Subject: [PATCH 06/14] Blanket unsubscribe doesn't appear to be used --- actioncable/lib/action_cable/storage_adapter/postgres.rb | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 8b4c0ef29f..119ea787d7 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -74,13 +74,9 @@ module ActionCable end end - def unsubscribe_to(channel, callback = nil) + def unsubscribe_to(channel, callback) @sync.synchronize do - if callback - @subscribers[channel].delete(callback) - else - @subscribers.delete(channel) - end + @subscribers[channel].delete(callback) if @subscribers[channel].empty? @queue.push([:unlisten, channel]) From bc413e814bbeafe8774b166bd2447ec84475b402 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 17:11:30 -0500 Subject: [PATCH 07/14] Tests passing and small refactoring --- actioncable/lib/action_cable/server/base.rb | 2 +- actioncable/test/channel/stream_test.rb | 9 +-- .../test/connection/identifier_test.rb | 10 +-- actioncable/test/storage_adapter/base_test.rb | 61 +++++++++++-------- actioncable/test/stubs/test_adapter.rb | 10 +++ actioncable/test/stubs/test_connection.rb | 4 ++ actioncable/test/stubs/test_server.rb | 6 +- 7 files changed, 65 insertions(+), 37 deletions(-) create mode 100644 actioncable/test/stubs/test_adapter.rb diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index e0703101aa..041dc5e890 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -45,7 +45,7 @@ module ActionCable end end - # The pubsub adapter used for all streams/broadcasting. + # Adapter used for all streams/broadcasting. def adapter @adapter ||= config.storage_adapter.new(self) end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 1424ded04c..f0b463d149 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "streaming start and stop" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1").returns stub_everything(:pubsub) } + connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe_proc) } + connection.expects(:adapter).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel end end @@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase run_in_eventmachine do connection = TestConnection.new EM.next_tick do - connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire").returns stub_everything(:pubsub) } + connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } end channel = ChatChannel.new connection, "" @@ -43,13 +43,14 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "stream_from subscription confirmation" do EM.run do connection = TestConnection.new - connection.expects(:pubsub).returns EM::Hiredis.connect.pubsub ChatChannel.new connection, "{id: 1}", { id: 1 } assert_nil connection.last_transmission EM::Timer.new(0.1) do expected = ActiveSupport::JSON.encode "identifier" => "{id: 1}", "type" => "confirm_subscription" + connection.transmit(expected) + assert_equal expected, connection.last_transmission, "Did not receive subscription confirmation within 0.1s" EM.run_deferred_callbacks diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index 02e6b21845..bdc793e56d 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -23,12 +23,12 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase test "should subscribe to internal channel on open and unsubscribe on close" do run_in_eventmachine do - pubsub = mock('pubsub') - pubsub.expects(:subscribe).with('action_cable/User#lifo') - pubsub.expects(:unsubscribe_proc).with('action_cable/User#lifo', kind_of(Proc)) + adapter = mock('adapter') + adapter.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) + adapter.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) server = TestServer.new - server.stubs(:pubsub).returns(pubsub) + server.stubs(:adapter).returns(adapter) open_connection server: server close_connection @@ -58,7 +58,7 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase protected def open_connection_with_stubbed_pubsub server = TestServer.new - server.stubs(:pubsub).returns(stub_everything('pubsub')) + server.stubs(:adapter).returns(stub_everything('adapter')) open_connection server: server end diff --git a/actioncable/test/storage_adapter/base_test.rb b/actioncable/test/storage_adapter/base_test.rb index e4a25fcfd4..47632df387 100644 --- a/actioncable/test/storage_adapter/base_test.rb +++ b/actioncable/test/storage_adapter/base_test.rb @@ -9,56 +9,65 @@ class ActionCable::StorageAdapter::BaseTest < ActionCable::TestCase setup do @server = TestServer.new + @server.config.storage_adapter = BrokenAdapter @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end test "#broadcast returns NotImplementedError by default" do assert_raises NotImplementedError do - BrokenAdapter.new(@server).broadcast + BrokenAdapter.new(@server).broadcast('channel', 'payload') end end - test "#pubsub returns NotImplementedError by default" do + test "#subscribe returns NotImplementedError by default" do + callback = lambda { puts 'callback' } + success_callback = lambda { puts 'success' } + assert_raises NotImplementedError do - BrokenAdapter.new(@server).pubsub + BrokenAdapter.new(@server).subscribe('channel', callback, success_callback) + end + end + + test "#unsubscribe returns NotImplementedError by default" do + callback = lambda { puts 'callback' } + + assert_raises NotImplementedError do + BrokenAdapter.new(@server).unsubscribe('channel', callback) end end # TEST METHODS THAT ARE REQUIRED OF THE ADAPTER'S BACKEND STORAGE OBJECT - class SuccessAdapterBackend - def publish(channel, message) - end + test "#broadcast is implemented" do + broadcast = SuccessAdapter.new(@server).broadcast('channel', 'payload') - def subscribe(*channels, &block) - end + assert_respond_to(SuccessAdapter.new(@server), :broadcast) - def unsubscribe(*channels, &block) + assert_nothing_raised NotImplementedError do + broadcast end end - class SuccessAdapter < ActionCable::StorageAdapter::Base - def broadcast - SuccessAdapterBackend.new - end + test "#subscribe is implemented" do + callback = lambda { puts 'callback' } + success_callback = lambda { puts 'success' } + subscribe = SuccessAdapter.new(@server).subscribe('channel', callback, success_callback) - def pubsub - SuccessAdapterBackend.new + assert_respond_to(SuccessAdapter.new(@server), :subscribe) + + assert_nothing_raised NotImplementedError do + subscribe end end - test "#broadcast responds to #publish" do - broadcast = SuccessAdapter.new(@server).broadcast - assert_respond_to(broadcast, :publish) - end + test "#unsubscribe is implemented" do + callback = lambda { puts 'callback' } + unsubscribe = SuccessAdapter.new(@server).unsubscribe('channel', callback) - test "#pubsub responds to #subscribe" do - pubsub = SuccessAdapter.new(@server).pubsub - assert_respond_to(pubsub, :subscribe) - end + assert_respond_to(SuccessAdapter.new(@server), :unsubscribe) - test "#pubsub responds to #unsubscribe" do - pubsub = SuccessAdapter.new(@server).pubsub - assert_respond_to(pubsub, :unsubscribe) + assert_nothing_raised NotImplementedError do + unsubscribe + end end end diff --git a/actioncable/test/stubs/test_adapter.rb b/actioncable/test/stubs/test_adapter.rb new file mode 100644 index 0000000000..c18ca5dc9d --- /dev/null +++ b/actioncable/test/stubs/test_adapter.rb @@ -0,0 +1,10 @@ +class SuccessAdapter < ActionCable::StorageAdapter::Base + def broadcast(channel, payload) + end + + def subscribe(channel, callback, success_callback = nil) + end + + def unsubscribe(channel, callback) + end +end diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index 384abc5e76..fe87dbcb36 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -11,6 +11,10 @@ class TestConnection @transmissions = [] end + def adapter + SuccessAdapter.new(TestServer.new) + end + def transmit(data) @transmissions << data end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index f9168f9b78..e1eb9f113a 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -7,7 +7,11 @@ class TestServer def initialize @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @config = OpenStruct.new(log_tags: []) + @config = OpenStruct.new(log_tags: [], storage_adapter: SuccessAdapter) + end + + def adapter + @config.storage_adapter.new(self) end def send_async From 78ff63ee4191758940ae8e0efaa5f9915af0a788 Mon Sep 17 00:00:00 2001 From: Matthew Draper Date: Thu, 14 Jan 2016 16:11:02 +1030 Subject: [PATCH 08/14] Listener no longer needs to be a singleton We now only create one adapter instance for the server, so it can hold the listener. This in turn allows the listener to get the PG connection from the adapter, which will be a good place to allow more flexible configuration. --- .../action_cable/storage_adapter/postgres.rb | 110 +++++++++--------- 1 file changed, 58 insertions(+), 52 deletions(-) diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgres.rb index 119ea787d7..5d874533be 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgres.rb @@ -5,6 +5,20 @@ module ActionCable class Postgres < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) + with_connection do |pg_conn| + pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + end + end + + def subscribe(channel, callback, success_callback = nil) + listener.subscribe_to(channel, callback, success_callback) + end + + def unsubscribe(channel, callback) + listener.unsubscribe_to(channel, callback) + end + + def with_connection(&block) # :nodoc: ActiveRecord::Base.connection_pool.with_connection do |ar_conn| pg_conn = ar_conn.raw_connection @@ -12,78 +26,70 @@ module ActionCable raise 'ActiveRecord database must be Postgres in order to use the Postgres ActionCable storage adapter' end - pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + yield pg_conn end end - def subscribe(channel, message_callback, success_callback = nil) - Listener.instance.subscribe_to(channel, message_callback, success_callback) - end + private + def listener + @listener ||= Listener.new(self) + end - def unsubscribe(channel, message_callback) - Listener.instance.unsubscribe_to(channel, message_callback) - end + class Listener + def initialize(adapter) + @adapter = adapter + @subscribers = Hash.new { |h,k| h[k] = [] } + @sync = Mutex.new + @queue = Queue.new - class Listener - include Singleton - - attr_accessor :subscribers - - def initialize - @subscribers = Hash.new {|h,k| h[k] = [] } - @sync = Mutex.new - @queue = Queue.new - - Thread.new do - Thread.current.abort_on_exception = true - listen + Thread.new do + Thread.current.abort_on_exception = true + listen + end end - end - def listen - ActiveRecord::Base.connection_pool.with_connection do |ar_conn| - pg_conn = ar_conn.raw_connection + def listen + @adapter.with_connection do |pg_conn| + loop do + until @queue.empty? + value = @queue.pop(true) + if value.first == :listen + pg_conn.exec("LISTEN #{value[1]}") + ::EM.next_tick(&value[2]) if value[2] + elsif value.first == :unlisten + pg_conn.exec("UNLISTEN #{value[1]}") + end - loop do - until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end - end - - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } + end + end end end end end - end - def subscribe_to(channel, callback, success_callback) - @sync.synchronize do - if @subscribers[channel].empty? - @queue.push([:listen, channel, success_callback]) + def subscribe_to(channel, callback, success_callback) + @sync.synchronize do + if @subscribers[channel].empty? + @queue.push([:listen, channel, success_callback]) + end + + @subscribers[channel] << callback end - - @subscribers[channel] << callback end - end - def unsubscribe_to(channel, callback) - @sync.synchronize do - @subscribers[channel].delete(callback) + def unsubscribe_to(channel, callback) + @sync.synchronize do + @subscribers[channel].delete(callback) - if @subscribers[channel].empty? - @queue.push([:unlisten, channel]) + if @subscribers[channel].empty? + @queue.push([:unlisten, channel]) + end end end end - end end end end From 75489642c8dcb4c75ccc14b909113a2828d6acb8 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 17:26:17 -0500 Subject: [PATCH 09/14] config_opts => cable, per @kaspth --- actioncable/lib/action_cable/engine.rb | 2 +- actioncable/lib/action_cable/server/configuration.rb | 4 ++-- actioncable/lib/action_cable/storage_adapter/redis.rb | 7 ++++--- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/actioncable/lib/action_cable/engine.rb b/actioncable/lib/action_cable/engine.rb index 193f54333e..f5e233e091 100644 --- a/actioncable/lib/action_cable/engine.rb +++ b/actioncable/lib/action_cable/engine.rb @@ -28,7 +28,7 @@ module ActionCable ActiveSupport.on_load(:action_cable) do if (config_path = Pathname.new(app.config.paths["config/cable"].first)).exist? - self.config_opts = Rails.application.config_for(config_path).with_indifferent_access + self.cable = Rails.application.config_for(config_path).with_indifferent_access end options.each { |k,v| send("#{k}=", v) } diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 2349f36198..c43928d898 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -7,7 +7,7 @@ module ActionCable attr_accessor :connection_class, :worker_pool_size attr_accessor :channels_path attr_accessor :disable_request_forgery_protection, :allowed_request_origins - attr_accessor :config_opts, :url + attr_accessor :cable, :url def initialize @log_tags = [] @@ -34,7 +34,7 @@ module ActionCable # If the adapter cannot be found, this will default to the Redis adapter def storage_adapter # Defaults to redis if no adapter is set - adapter = config_opts.fetch('adapter') { 'redis' } + adapter = cable.fetch('adapter') { 'redis' } "ActionCable::StorageAdapter::#{adapter.camelize}".constantize end end diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 3f0f6c4172..3e0ede057a 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -27,14 +27,15 @@ module ActionCable # The EventMachine Redis instance used by the pubsub adapter. def hi_redis_conn - @redis ||= EM::Hiredis.connect(@server.config.config_opts[:url]).tap do |redis| + @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| redis.on(:reconnect_failed) do @logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close end end end + def redis_conn + @redis_conn ||= ::Redis.new(@server.config.cable) + end end end end From 6aeaed4c1a370084e82c6712a32422a58dac8b8c Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 19:03:05 -0500 Subject: [PATCH 10/14] All Redis deps are now optional, Postgres --> PostgreSQL adapter --- Gemfile.lock | 6 ------ actioncable/actioncable.gemspec | 6 +++--- actioncable/lib/action_cable/storage_adapter.rb | 2 +- .../storage_adapter/{postgres.rb => postgresql.rb} | 8 +++++++- actioncable/lib/action_cable/storage_adapter/redis.rb | 8 ++++++-- actioncable/test/channel/stream_test.rb | 1 - actioncable/test/test_helper.rb | 1 - 7 files changed, 17 insertions(+), 15 deletions(-) rename actioncable/lib/action_cable/storage_adapter/{postgres.rb => postgresql.rb} (89%) diff --git a/Gemfile.lock b/Gemfile.lock index bcedeff385..390b2a18f8 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -32,9 +32,7 @@ PATH actioncable (5.0.0.beta1) actionpack (= 5.0.0.beta1) coffee-rails (~> 4.1.0) - em-hiredis (~> 0.3.0) faye-websocket (~> 0.10.0) - redis (~> 3.0) websocket-driver (~> 0.6.1) actionmailer (5.0.0.beta1) actionpack (= 5.0.0.beta1) @@ -140,9 +138,6 @@ GEM delayed_job_active_record (4.1.0) activerecord (>= 3.0, < 5) delayed_job (>= 3.0, < 5) - em-hiredis (0.3.0) - eventmachine (~> 1.0) - hiredis (~> 0.5.0) erubis (2.7.0) eventmachine (1.0.9.1) execjs (2.6.0) @@ -154,7 +149,6 @@ GEM ffi (1.9.10-x86-mingw32) globalid (0.3.6) activesupport (>= 4.1.0) - hiredis (0.5.2) hitimes (1.2.3) hitimes (1.2.3-x86-mingw32) i18n (0.7.0) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 2ab8b4785a..847fcc71c3 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -23,10 +23,10 @@ Gem::Specification.new do |s| s.add_dependency 'coffee-rails', '~> 4.1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' - s.add_dependency 'em-hiredis', '~> 0.3.0' - s.add_dependency 'redis', '~> 3.0' - s.add_development_dependency 'puma' + s.add_development_dependency 'em-hiredis', '~> 0.3.0' s.add_development_dependency 'mocha' s.add_development_dependency 'pg' + s.add_development_dependency 'puma' + s.add_development_dependency 'redis', '~> 3.0' end diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb index f1c395eb3a..a4fe12c770 100644 --- a/actioncable/lib/action_cable/storage_adapter.rb +++ b/actioncable/lib/action_cable/storage_adapter.rb @@ -1,7 +1,7 @@ module ActionCable module StorageAdapter autoload :Base, 'action_cable/storage_adapter/base' - autoload :Postgres, 'action_cable/storage_adapter/postgres' + autoload :PostgreSQL, 'action_cable/storage_adapter/postgresql' autoload :Redis, 'action_cable/storage_adapter/redis' end end diff --git a/actioncable/lib/action_cable/storage_adapter/postgres.rb b/actioncable/lib/action_cable/storage_adapter/postgresql.rb similarity index 89% rename from actioncable/lib/action_cable/storage_adapter/postgres.rb rename to actioncable/lib/action_cable/storage_adapter/postgresql.rb index 5d874533be..1d8460e2ea 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgres.rb +++ b/actioncable/lib/action_cable/storage_adapter/postgresql.rb @@ -1,8 +1,14 @@ require 'thread' +begin + require 'pg' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end + module ActionCable module StorageAdapter - class Postgres < Base + class PostgreSQL < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) with_connection do |pg_conn| diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/storage_adapter/redis.rb index 3e0ede057a..62a3971ec7 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/storage_adapter/redis.rb @@ -1,5 +1,9 @@ -require 'em-hiredis' -require 'redis' +begin + require 'em-hiredis' + require 'redis' +rescue Gem::LoadError => e + raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." +end module ActionCable module StorageAdapter diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index f0b463d149..8424310ca2 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -62,7 +62,6 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "subscription confirmation should only be sent out once" do EM.run do connection = TestConnection.new - connection.stubs(:pubsub).returns EM::Hiredis.connect.pubsub channel = ChatChannel.new connection, "test_channel" channel.send_confirmation diff --git a/actioncable/test/test_helper.rb b/actioncable/test/test_helper.rb index 325305939f..65b45e0c89 100644 --- a/actioncable/test/test_helper.rb +++ b/actioncable/test/test_helper.rb @@ -5,7 +5,6 @@ require 'active_support/testing/autorun' require 'puma' -require 'em-hiredis' require 'mocha/setup' From 4c5d5b75abe85d59e5cc9de9904fdef3b23ec25b Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 19:07:18 -0500 Subject: [PATCH 11/14] ActionCable::StorageAdapter ==> ActionCable::SubscriptionAdapter --- actioncable/lib/action_cable.rb | 2 +- actioncable/lib/action_cable/server/base.rb | 2 +- actioncable/lib/action_cable/server/configuration.rb | 8 +++++--- actioncable/lib/action_cable/storage_adapter.rb | 7 ------- actioncable/lib/action_cable/subscription_adapter.rb | 7 +++++++ .../{storage_adapter => subscription_adapter}/base.rb | 2 +- .../postgresql.rb | 2 +- .../{storage_adapter => subscription_adapter}/redis.rb | 2 +- actioncable/test/stubs/test_adapter.rb | 2 +- actioncable/test/stubs/test_server.rb | 4 ++-- .../base_test.rb | 6 +++--- 11 files changed, 23 insertions(+), 21 deletions(-) delete mode 100644 actioncable/lib/action_cable/storage_adapter.rb create mode 100644 actioncable/lib/action_cable/subscription_adapter.rb rename actioncable/lib/action_cable/{storage_adapter => subscription_adapter}/base.rb (94%) rename actioncable/lib/action_cable/{storage_adapter => subscription_adapter}/postgresql.rb (99%) rename actioncable/lib/action_cable/{storage_adapter => subscription_adapter}/redis.rb (98%) rename actioncable/test/{storage_adapter => subscription_adapter}/base_test.rb (91%) diff --git a/actioncable/lib/action_cable.rb b/actioncable/lib/action_cable.rb index 5cc29ecd00..1dc66ef3ad 100644 --- a/actioncable/lib/action_cable.rb +++ b/actioncable/lib/action_cable.rb @@ -47,5 +47,5 @@ module ActionCable autoload :Connection autoload :Channel autoload :RemoteConnections - autoload :StorageAdapter + autoload :SubscriptionAdapter end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index 041dc5e890..f44d0fdfb7 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -47,7 +47,7 @@ module ActionCable # Adapter used for all streams/broadcasting. def adapter - @adapter ||= config.storage_adapter.new(self) + @adapter ||= config.subscription_adapter.new(self) end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index c43928d898..344ae0ad5d 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -30,12 +30,14 @@ module ActionCable end end - # Returns constant of storage adapter specified in config/cable.yml + # Returns constant of subscription adapter specified in config/cable.yml # If the adapter cannot be found, this will default to the Redis adapter - def storage_adapter + def subscription_adapter # Defaults to redis if no adapter is set adapter = cable.fetch('adapter') { 'redis' } - "ActionCable::StorageAdapter::#{adapter.camelize}".constantize + adapter.camelize + adapter = 'PostgreSQL' if adapter == 'Postgresql' + "ActionCable::SubscriptionAdapter::#{adapter}".constantize end end end diff --git a/actioncable/lib/action_cable/storage_adapter.rb b/actioncable/lib/action_cable/storage_adapter.rb deleted file mode 100644 index a4fe12c770..0000000000 --- a/actioncable/lib/action_cable/storage_adapter.rb +++ /dev/null @@ -1,7 +0,0 @@ -module ActionCable - module StorageAdapter - autoload :Base, 'action_cable/storage_adapter/base' - autoload :PostgreSQL, 'action_cable/storage_adapter/postgresql' - autoload :Redis, 'action_cable/storage_adapter/redis' - end -end diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb new file mode 100644 index 0000000000..287d2b9611 --- /dev/null +++ b/actioncable/lib/action_cable/subscription_adapter.rb @@ -0,0 +1,7 @@ +module ActionCable + module SubscriptionAdapter + autoload :Base, 'action_cable/subscription_adapter/base' + autoload :PostgreSQL, 'action_cable/subscription_adapter/postgresql' + autoload :Redis, 'action_cable/subscription_adapter/redis' + end +end diff --git a/actioncable/lib/action_cable/storage_adapter/base.rb b/actioncable/lib/action_cable/subscription_adapter/base.rb similarity index 94% rename from actioncable/lib/action_cable/storage_adapter/base.rb rename to actioncable/lib/action_cable/subscription_adapter/base.rb index 4330bc28f1..11910803e8 100644 --- a/actioncable/lib/action_cable/storage_adapter/base.rb +++ b/actioncable/lib/action_cable/subscription_adapter/base.rb @@ -1,5 +1,5 @@ module ActionCable - module StorageAdapter + module SubscriptionAdapter class Base attr_reader :logger, :server diff --git a/actioncable/lib/action_cable/storage_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb similarity index 99% rename from actioncable/lib/action_cable/storage_adapter/postgresql.rb rename to actioncable/lib/action_cable/subscription_adapter/postgresql.rb index 1d8460e2ea..f55b56a2b5 100644 --- a/actioncable/lib/action_cable/storage_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -7,7 +7,7 @@ rescue Gem::LoadError => e end module ActionCable - module StorageAdapter + module SubscriptionAdapter class PostgreSQL < Base # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) diff --git a/actioncable/lib/action_cable/storage_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb similarity index 98% rename from actioncable/lib/action_cable/storage_adapter/redis.rb rename to actioncable/lib/action_cable/subscription_adapter/redis.rb index 62a3971ec7..c6d8371f16 100644 --- a/actioncable/lib/action_cable/storage_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -6,7 +6,7 @@ rescue Gem::LoadError => e end module ActionCable - module StorageAdapter + module SubscriptionAdapter class Redis < Base def broadcast(channel, payload) redis_conn.publish(channel, payload) diff --git a/actioncable/test/stubs/test_adapter.rb b/actioncable/test/stubs/test_adapter.rb index c18ca5dc9d..bbd142b287 100644 --- a/actioncable/test/stubs/test_adapter.rb +++ b/actioncable/test/stubs/test_adapter.rb @@ -1,4 +1,4 @@ -class SuccessAdapter < ActionCable::StorageAdapter::Base +class SuccessAdapter < ActionCable::SubscriptionAdapter::Base def broadcast(channel, payload) end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index e1eb9f113a..067266ed57 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -7,11 +7,11 @@ class TestServer def initialize @logger = ActiveSupport::TaggedLogging.new ActiveSupport::Logger.new(StringIO.new) - @config = OpenStruct.new(log_tags: [], storage_adapter: SuccessAdapter) + @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) end def adapter - @config.storage_adapter.new(self) + @config.subscription_adapter.new(self) end def send_async diff --git a/actioncable/test/storage_adapter/base_test.rb b/actioncable/test/subscription_adapter/base_test.rb similarity index 91% rename from actioncable/test/storage_adapter/base_test.rb rename to actioncable/test/subscription_adapter/base_test.rb index 47632df387..7a7ae131e6 100644 --- a/actioncable/test/storage_adapter/base_test.rb +++ b/actioncable/test/subscription_adapter/base_test.rb @@ -1,15 +1,15 @@ require 'test_helper' require 'stubs/test_server' -class ActionCable::StorageAdapter::BaseTest < ActionCable::TestCase +class ActionCable::SubscriptionAdapter::BaseTest < ActionCable::TestCase ## TEST THAT ERRORS ARE RETURNED FOR INHERITORS THAT DON'T OVERRIDE METHODS - class BrokenAdapter < ActionCable::StorageAdapter::Base + class BrokenAdapter < ActionCable::SubscriptionAdapter::Base end setup do @server = TestServer.new - @server.config.storage_adapter = BrokenAdapter + @server.config.subscription_adapter = BrokenAdapter @server.config.allowed_request_origins = %w( http://rubyonrails.com ) end From 67af248d51e9f5f56911c80855d4b14642582ea8 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 21:07:18 -0500 Subject: [PATCH 12/14] Small PostgreSQL adapter refactors / cleanup - Escape the channel name when subscribing in PG - Refactor popping the queue to make it easier to read --- .../lib/action_cable/server/configuration.rb | 1 - .../subscription_adapter/postgresql.rb | 32 ++++++++++--------- 2 files changed, 17 insertions(+), 16 deletions(-) diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index 344ae0ad5d..cdf5e9eb1c 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -42,4 +42,3 @@ module ActionCable end end end - diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index f55b56a2b5..afa99355e8 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -8,11 +8,11 @@ end module ActionCable module SubscriptionAdapter - class PostgreSQL < Base + class PostgreSQL < Base # :nodoc: # The storage instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) with_connection do |pg_conn| - pg_conn.exec("NOTIFY #{channel}, '#{payload}'") + pg_conn.exec("NOTIFY #{pg_conn.escape_identifier(channel)}, '#{pg_conn.escape_string(payload)}'") end end @@ -21,7 +21,7 @@ module ActionCable end def unsubscribe(channel, callback) - listener.unsubscribe_to(channel, callback) + listener.unsubscribe_from(channel, callback) end def with_connection(&block) # :nodoc: @@ -58,18 +58,20 @@ module ActionCable @adapter.with_connection do |pg_conn| loop do until @queue.empty? - value = @queue.pop(true) - if value.first == :listen - pg_conn.exec("LISTEN #{value[1]}") - ::EM.next_tick(&value[2]) if value[2] - elsif value.first == :unlisten - pg_conn.exec("UNLISTEN #{value[1]}") - end + action, channel, callback = @queue.pop(true) + escaped_channel = pg_conn.escape_identifier(channel) - pg_conn.wait_for_notify(1) do |chan, pid, message| - @subscribers[chan].each do |callback| - ::EM.next_tick { callback.call(message) } - end + if action == :listen + pg_conn.exec("LISTEN #{escaped_channel}") + ::EM.next_tick(&callback) if callback + elsif action == :unlisten + pg_conn.exec("UNLISTEN #{escaped_channel}") + end + end + + pg_conn.wait_for_notify(1) do |chan, pid, message| + @subscribers[chan].each do |callback| + ::EM.next_tick { callback.call(message) } end end end @@ -86,7 +88,7 @@ module ActionCable end end - def unsubscribe_to(channel, callback) + def unsubscribe_from(channel, callback) @sync.synchronize do @subscribers[channel].delete(callback) From 980e01eb100b38e39791f7f930a038ecaf7c02da Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Fri, 15 Jan 2016 21:28:13 -0500 Subject: [PATCH 13/14] Add em-hiredis and redis as default Gems for new applications --- railties/lib/rails/generators/app_base.rb | 10 ++++++++++ railties/test/generators/app_generator_test.rb | 10 ++++++++++ 2 files changed, 20 insertions(+) diff --git a/railties/lib/rails/generators/app_base.rb b/railties/lib/rails/generators/app_base.rb index 297ccb1dbf..c629459d95 100644 --- a/railties/lib/rails/generators/app_base.rb +++ b/railties/lib/rails/generators/app_base.rb @@ -117,6 +117,7 @@ module Rails javascript_gemfile_entry, jbuilder_gemfile_entry, psych_gemfile_entry, + cable_gemfile_entry, @extra_entries].flatten.find_all(&@gem_filter) end @@ -339,6 +340,15 @@ module Rails GemfileEntry.new('psych', '~> 2.0', comment, platforms: :rbx) end + def cable_gemfile_entry + return [] if options[:skip_action_cable] + comment = 'Action Cable dependencies for the Redis adapter' + gems = [] + gems << GemfileEntry.new("em-hiredis", '~> 0.3.0', comment) + gems << GemfileEntry.new("redis", '~> 3.0', comment) + gems + end + def bundle_command(command) say_status :run, "bundle #{command}" diff --git a/railties/test/generators/app_generator_test.rb b/railties/test/generators/app_generator_test.rb index cabbc802e1..c0f7e58b59 100644 --- a/railties/test/generators/app_generator_test.rb +++ b/railties/test/generators/app_generator_test.rb @@ -395,6 +395,16 @@ class AppGeneratorTest < Rails::Generators::TestCase assert_no_file "config/cable.yml" assert_no_file "app/assets/javascripts/cable.coffee" assert_no_file "app/channels" + assert_file "Gemfile" do |content| + assert_no_match(/em-hiredis/, content) + assert_no_match(/redis/, content) + end + end + + def test_action_cable_redis_gems + run_generator + assert_gem 'em-hiredis' + assert_gem 'redis' end def test_inclusion_of_javascript_runtime From ae31da20cd250154c951b67d5625fc71ac27e2f1 Mon Sep 17 00:00:00 2001 From: Jon Moss Date: Sat, 16 Jan 2016 10:33:50 -0500 Subject: [PATCH 14/14] Fix code review comments - adapter -> pubsub (re)rename internally - Change variable names to match method names - Add EventMachine `~> 1.0` as a runtime dependency of ActionCable - Refactor dependency loading for adapters --- Gemfile.lock | 1 + actioncable/actioncable.gemspec | 1 + .../lib/action_cable/channel/streams.rb | 6 +-- .../lib/action_cable/connection/base.rb | 2 +- .../connection/internal_channel.rb | 4 +- actioncable/lib/action_cable/server/base.rb | 4 +- .../lib/action_cable/server/broadcasting.rb | 2 +- .../lib/action_cable/server/configuration.rb | 19 ++++++--- .../lib/action_cable/subscription_adapter.rb | 2 - .../subscription_adapter/postgresql.rb | 8 +--- .../subscription_adapter/redis.rb | 41 ++++++++----------- actioncable/test/channel/stream_test.rb | 6 +-- .../test/connection/identifier_test.rb | 8 ++-- actioncable/test/stubs/test_connection.rb | 2 +- actioncable/test/stubs/test_server.rb | 2 +- 15 files changed, 53 insertions(+), 55 deletions(-) diff --git a/Gemfile.lock b/Gemfile.lock index 390b2a18f8..6b29d2c44b 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -32,6 +32,7 @@ PATH actioncable (5.0.0.beta1) actionpack (= 5.0.0.beta1) coffee-rails (~> 4.1.0) + eventmachine (~> 1.0) faye-websocket (~> 0.10.0) websocket-driver (~> 0.6.1) actionmailer (5.0.0.beta1) diff --git a/actioncable/actioncable.gemspec b/actioncable/actioncable.gemspec index 847fcc71c3..a36acc8f6f 100644 --- a/actioncable/actioncable.gemspec +++ b/actioncable/actioncable.gemspec @@ -21,6 +21,7 @@ Gem::Specification.new do |s| s.add_dependency 'actionpack', version s.add_dependency 'coffee-rails', '~> 4.1.0' + s.add_dependency 'eventmachine', '~> 1.0' s.add_dependency 'faye-websocket', '~> 0.10.0' s.add_dependency 'websocket-driver', '~> 0.6.1' diff --git a/actioncable/lib/action_cable/channel/streams.rb b/actioncable/lib/action_cable/channel/streams.rb index 89dcbdfa27..589946c3db 100644 --- a/actioncable/lib/action_cable/channel/streams.rb +++ b/actioncable/lib/action_cable/channel/streams.rb @@ -76,7 +76,7 @@ module ActionCable streams << [ broadcasting, callback ] EM.next_tick do - adapter.subscribe(broadcasting, callback, lambda do |reply| + pubsub.subscribe(broadcasting, callback, lambda do |reply| transmit_subscription_confirmation logger.info "#{self.class.name} is streaming from #{broadcasting}" end) @@ -92,13 +92,13 @@ module ActionCable def stop_all_streams streams.each do |broadcasting, callback| - adapter.unsubscribe broadcasting, callback + pubsub.unsubscribe broadcasting, callback logger.info "#{self.class.name} stopped streaming from #{broadcasting}" end.clear end private - delegate :adapter, to: :connection + delegate :pubsub, to: :connection def streams @_streams ||= [] diff --git a/actioncable/lib/action_cable/connection/base.rb b/actioncable/lib/action_cable/connection/base.rb index 2d7f99b09a..bb8850aaa0 100644 --- a/actioncable/lib/action_cable/connection/base.rb +++ b/actioncable/lib/action_cable/connection/base.rb @@ -49,7 +49,7 @@ module ActionCable include Authorization attr_reader :server, :env, :subscriptions, :logger - delegate :worker_pool, :adapter, to: :server + delegate :worker_pool, :pubsub, to: :server def initialize(server, env) @server, @env = server, env diff --git a/actioncable/lib/action_cable/connection/internal_channel.rb b/actioncable/lib/action_cable/connection/internal_channel.rb index c618e9d087..54ed7672d2 100644 --- a/actioncable/lib/action_cable/connection/internal_channel.rb +++ b/actioncable/lib/action_cable/connection/internal_channel.rb @@ -15,14 +15,14 @@ module ActionCable @_internal_subscriptions ||= [] @_internal_subscriptions << [ internal_channel, callback ] - EM.next_tick { adapter.subscribe(internal_channel, callback) } + EM.next_tick { pubsub.subscribe(internal_channel, callback) } logger.info "Registered connection (#{connection_identifier})" end end def unsubscribe_from_internal_channel if @_internal_subscriptions.present? - @_internal_subscriptions.each { |channel, callback| EM.next_tick { adapter.unsubscribe(channel, callback) } } + @_internal_subscriptions.each { |channel, callback| EM.next_tick { pubsub.unsubscribe(channel, callback) } } end end diff --git a/actioncable/lib/action_cable/server/base.rb b/actioncable/lib/action_cable/server/base.rb index f44d0fdfb7..3385a4c9f3 100644 --- a/actioncable/lib/action_cable/server/base.rb +++ b/actioncable/lib/action_cable/server/base.rb @@ -46,8 +46,8 @@ module ActionCable end # Adapter used for all streams/broadcasting. - def adapter - @adapter ||= config.subscription_adapter.new(self) + def pubsub + @pubsub ||= config.pubsub_adapter.new(self) end # All the identifiers applied to the connection class associated with this server. diff --git a/actioncable/lib/action_cable/server/broadcasting.rb b/actioncable/lib/action_cable/server/broadcasting.rb index 021589b82d..4a26ed9269 100644 --- a/actioncable/lib/action_cable/server/broadcasting.rb +++ b/actioncable/lib/action_cable/server/broadcasting.rb @@ -39,7 +39,7 @@ module ActionCable def broadcast(message) server.logger.info "[ActionCable] Broadcasting to #{broadcasting}: #{message}" - server.adapter.broadcast broadcasting, ActiveSupport::JSON.encode(message) + server.pubsub.broadcast broadcasting, ActiveSupport::JSON.encode(message) end end end diff --git a/actioncable/lib/action_cable/server/configuration.rb b/actioncable/lib/action_cable/server/configuration.rb index cdf5e9eb1c..7bd67110a5 100644 --- a/actioncable/lib/action_cable/server/configuration.rb +++ b/actioncable/lib/action_cable/server/configuration.rb @@ -30,11 +30,20 @@ module ActionCable end end - # Returns constant of subscription adapter specified in config/cable.yml - # If the adapter cannot be found, this will default to the Redis adapter - def subscription_adapter - # Defaults to redis if no adapter is set - adapter = cable.fetch('adapter') { 'redis' } + # Returns constant of subscription adapter specified in config/cable.yml. + # If the adapter cannot be found, this will default to the Redis adapter. + # Also makes sure proper dependencies are required. + def pubsub_adapter + adapter = (cable.fetch('adapter') { 'redis' }) + path_to_adapter = "action_cable/subscription_adapter/#{adapter}" + begin + require path_to_adapter + rescue Gem::LoadError => e + raise Gem::LoadError, "Specified '#{adapter}' for Action Cable pubsub adapter, but the gem is not loaded. Add `gem '#{e.name}'` to your Gemfile (and ensure its version is at the minimum required by Action Cable)." + rescue LoadError => e + raise LoadError, "Could not load '#{path_to_adapter}'. Make sure that the adapter in config/cable.yml is valid. If you use an adapter other than 'postgresql' or 'redis' add the necessary adapter gem to the Gemfile.", e.backtrace + end + ### adapter.camelize adapter = 'PostgreSQL' if adapter == 'Postgresql' "ActionCable::SubscriptionAdapter::#{adapter}".constantize diff --git a/actioncable/lib/action_cable/subscription_adapter.rb b/actioncable/lib/action_cable/subscription_adapter.rb index 287d2b9611..e770f4fb00 100644 --- a/actioncable/lib/action_cable/subscription_adapter.rb +++ b/actioncable/lib/action_cable/subscription_adapter.rb @@ -1,7 +1,5 @@ module ActionCable module SubscriptionAdapter autoload :Base, 'action_cable/subscription_adapter/base' - autoload :PostgreSQL, 'action_cable/subscription_adapter/postgresql' - autoload :Redis, 'action_cable/subscription_adapter/redis' end end diff --git a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb index afa99355e8..64c519beed 100644 --- a/actioncable/lib/action_cable/subscription_adapter/postgresql.rb +++ b/actioncable/lib/action_cable/subscription_adapter/postgresql.rb @@ -1,11 +1,7 @@ +gem 'pg', '~> 0.18' +require 'pg' require 'thread' -begin - require 'pg' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the PostgreSQL ActionCable adapter, but do not have the proper gems installed. Add `gem 'pg'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end - module ActionCable module SubscriptionAdapter class PostgreSQL < Base # :nodoc: diff --git a/actioncable/lib/action_cable/subscription_adapter/redis.rb b/actioncable/lib/action_cable/subscription_adapter/redis.rb index c6d8371f16..9615430be4 100644 --- a/actioncable/lib/action_cable/subscription_adapter/redis.rb +++ b/actioncable/lib/action_cable/subscription_adapter/redis.rb @@ -1,19 +1,18 @@ -begin - require 'em-hiredis' - require 'redis' -rescue Gem::LoadError => e - raise Gem::LoadError, "You are trying to use the Redis ActionCable adapter, but do not have the proper gems installed. Add `gem 'em-hiredis'` and `gem 'redis'` to your Gemfile (and ensure its version is at the minimum required by ActionCable)." -end +gem 'em-hiredis', '~> 0.3.0' +gem 'redis', '~> 3.0' +require 'em-hiredis' +require 'redis' module ActionCable module SubscriptionAdapter - class Redis < Base + class Redis < Base # :nodoc: + # The redis instance used for broadcasting. Not intended for direct user use. def broadcast(channel, payload) - redis_conn.publish(channel, payload) + broadcast_redis_connection.publish(channel, payload) end def subscribe(channel, message_callback, success_callback = nil) - hi_redis_conn.pubsub.subscribe(channel, &message_callback).tap do |result| + subscription_redis_connection.pubsub.subscribe(channel, &message_callback).tap do |result| result.callback(&success_callback) if success_callback end end @@ -23,23 +22,17 @@ module ActionCable end private - - # The redis instance used for broadcasting. Not intended for direct user use. - def redis_conn - @broadcast ||= ::Redis.new(@server.config.config_opts) - end - - # The EventMachine Redis instance used by the pubsub adapter. - def hi_redis_conn - @hi_redis_conn ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| - redis.on(:reconnect_failed) do - @logger.info "[ActionCable] Redis reconnect failed." + def subscription_redis_connection + @subscription_redis_connection ||= EM::Hiredis.connect(@server.config.cable[:url]).tap do |redis| + redis.on(:reconnect_failed) do + @logger.info "[ActionCable] Redis reconnect failed." + end end end - end - def redis_conn - @redis_conn ||= ::Redis.new(@server.config.cable) - end + + def broadcast_redis_connection + @broadcast_redis_connection ||= ::Redis.new(@server.config.cable) + end end end end diff --git a/actioncable/test/channel/stream_test.rb b/actioncable/test/channel/stream_test.rb index 8424310ca2..3fa2b291b7 100644 --- a/actioncable/test/channel/stream_test.rb +++ b/actioncable/test/channel/stream_test.rb @@ -20,10 +20,10 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase test "streaming start and stop" do run_in_eventmachine do connection = TestConnection.new - connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("test_room_1", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } channel = ChatChannel.new connection, "{id: 1}", { id: 1 } - connection.expects(:adapter).returns mock().tap { |m| m.expects(:unsubscribe) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:unsubscribe) } channel.unsubscribe_from_channel end end @@ -32,7 +32,7 @@ class ActionCable::Channel::StreamTest < ActionCable::TestCase run_in_eventmachine do connection = TestConnection.new EM.next_tick do - connection.expects(:adapter).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:adapter) } + connection.expects(:pubsub).returns mock().tap { |m| m.expects(:subscribe).with("action_cable:channel:stream_test:chat:Room#1-Campfire", kind_of(Proc), kind_of(Proc)).returns stub_everything(:pubsub) } end channel = ChatChannel.new connection, "" diff --git a/actioncable/test/connection/identifier_test.rb b/actioncable/test/connection/identifier_test.rb index bdc793e56d..a110dfdee0 100644 --- a/actioncable/test/connection/identifier_test.rb +++ b/actioncable/test/connection/identifier_test.rb @@ -23,12 +23,12 @@ class ActionCable::Connection::IdentifierTest < ActionCable::TestCase test "should subscribe to internal channel on open and unsubscribe on close" do run_in_eventmachine do - adapter = mock('adapter') - adapter.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) - adapter.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) + pubsub = mock('pubsub_adapter') + pubsub.expects(:subscribe).with('action_cable/User#lifo', kind_of(Proc)) + pubsub.expects(:unsubscribe).with('action_cable/User#lifo', kind_of(Proc)) server = TestServer.new - server.stubs(:adapter).returns(adapter) + server.stubs(:pubsub).returns(pubsub) open_connection server: server close_connection diff --git a/actioncable/test/stubs/test_connection.rb b/actioncable/test/stubs/test_connection.rb index fe87dbcb36..da98201900 100644 --- a/actioncable/test/stubs/test_connection.rb +++ b/actioncable/test/stubs/test_connection.rb @@ -11,7 +11,7 @@ class TestConnection @transmissions = [] end - def adapter + def pubsub SuccessAdapter.new(TestServer.new) end diff --git a/actioncable/test/stubs/test_server.rb b/actioncable/test/stubs/test_server.rb index 067266ed57..6e6541a952 100644 --- a/actioncable/test/stubs/test_server.rb +++ b/actioncable/test/stubs/test_server.rb @@ -10,7 +10,7 @@ class TestServer @config = OpenStruct.new(log_tags: [], subscription_adapter: SuccessAdapter) end - def adapter + def pubsub @config.subscription_adapter.new(self) end