diff --git a/lib/action_cable.rb b/lib/action_cable.rb index 26b3980deb..aaf48efa4b 100644 --- a/lib/action_cable.rb +++ b/lib/action_cable.rb @@ -19,10 +19,10 @@ require 'action_cable/engine' if defined?(Rails) module ActionCable VERSION = '0.0.3' - autoload :Channel, 'action_cable/channel' - autoload :Worker, 'action_cable/worker' autoload :Server, 'action_cable/server' autoload :Connection, 'action_cable/connection' + autoload :Channel, 'action_cable/channel' + autoload :RemoteConnection, 'action_cable/remote_connection' autoload :RemoteConnections, 'action_cable/remote_connections' autoload :Broadcaster, 'action_cable/broadcaster' diff --git a/lib/action_cable/server.rb b/lib/action_cable/server.rb index 3a16f51757..e17cf872e0 100644 --- a/lib/action_cable/server.rb +++ b/lib/action_cable/server.rb @@ -1,75 +1,6 @@ module ActionCable - class Server - cattr_accessor(:logger, instance_reader: true) { Rails.logger } - - attr_accessor :registered_channels, :redis_config, :log_tags - - def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) - @redis_config = redis_config.with_indifferent_access - @registered_channels = Set.new(channels) - @worker_pool_size = worker_pool_size - @connection_class = connection - @log_tags = log_tags - - @connections = [] - - logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" - end - - def call(env) - @connection_class.new(self, env).process - end - - def worker_pool - @worker_pool ||= ActionCable::Worker.pool(size: @worker_pool_size) - end - - def pubsub - @pubsub ||= redis.pubsub - end - - def redis - @redis ||= begin - redis = EM::Hiredis.connect(@redis_config[:url]) - redis.on(:reconnect_failed) do - logger.info "[ActionCable] Redis reconnect failed." - # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." - # @connections.map &:close - end - redis - end - end - - def threaded_redis - @threaded_redis ||= Redis.new(redis_config) - end - - def remote_connections - @remote_connections ||= RemoteConnections.new(self) - end - - def broadcaster_for(channel) - Broadcaster.new(self, channel) - end - - def broadcast(channel, message) - broadcaster_for(channel).broadcast(message) - end - - def connection_identifiers - @connection_class.identifiers - end - - def add_connection(connection) - @connections << connection - end - - def remove_connection(connection) - @connections.delete connection - end - - def open_connections_statistics - @connections.map(&:statistics) - end + module Server + autoload :Base, 'action_cable/server/base' + autoload :Worker, 'action_cable/server/worker' end end diff --git a/lib/action_cable/server/base.rb b/lib/action_cable/server/base.rb new file mode 100644 index 0000000000..6abec92dc1 --- /dev/null +++ b/lib/action_cable/server/base.rb @@ -0,0 +1,77 @@ +module ActionCable + module Server + class Base + cattr_accessor(:logger, instance_reader: true) { Rails.logger } + + attr_accessor :registered_channels, :redis_config, :log_tags + + def initialize(redis_config:, channels:, worker_pool_size: 100, connection: Connection, log_tags: [ 'ActionCable' ]) + @redis_config = redis_config.with_indifferent_access + @registered_channels = Set.new(channels) + @worker_pool_size = worker_pool_size + @connection_class = connection + @log_tags = log_tags + + @connections = [] + + logger.info "[ActionCable] Initialized server (redis_config: #{@redis_config.inspect}, worker_pool_size: #{@worker_pool_size})" + end + + def call(env) + @connection_class.new(self, env).process + end + + def worker_pool + @worker_pool ||= ActionCable::Server::Worker.pool(size: @worker_pool_size) + end + + def pubsub + @pubsub ||= redis.pubsub + end + + def redis + @redis ||= begin + redis = EM::Hiredis.connect(@redis_config[:url]) + redis.on(:reconnect_failed) do + logger.info "[ActionCable] Redis reconnect failed." + # logger.info "[ActionCable] Redis reconnected. Closing all the open connections." + # @connections.map &:close + end + redis + end + end + + def threaded_redis + @threaded_redis ||= Redis.new(redis_config) + end + + def remote_connections + @remote_connections ||= RemoteConnections.new(self) + end + + def broadcaster_for(channel) + Broadcaster.new(self, channel) + end + + def broadcast(channel, message) + broadcaster_for(channel).broadcast(message) + end + + def connection_identifiers + @connection_class.identifiers + end + + def add_connection(connection) + @connections << connection + end + + def remove_connection(connection) + @connections.delete connection + end + + def open_connections_statistics + @connections.map(&:statistics) + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/server/worker.rb b/lib/action_cable/server/worker.rb new file mode 100644 index 0000000000..0491cb9ab0 --- /dev/null +++ b/lib/action_cable/server/worker.rb @@ -0,0 +1,32 @@ +module ActionCable + module Server + class Worker + include ActiveSupport::Callbacks + include Celluloid + + define_callbacks :work + + def invoke(receiver, method, *args) + run_callbacks :work do + receiver.send method, *args + end + rescue Exception => e + logger.error "There was an exception - #{e.class}(#{e.message})" + logger.error e.backtrace.join("\n") + + receiver.handle_exception if receiver.respond_to?(:handle_exception) + end + + def run_periodic_timer(channel, callback) + run_callbacks :work do + callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) + end + end + + private + def logger + ActionCable::Server::Base.logger + end + end + end +end \ No newline at end of file diff --git a/lib/action_cable/worker.rb b/lib/action_cable/worker.rb deleted file mode 100644 index 6800a75d1d..0000000000 --- a/lib/action_cable/worker.rb +++ /dev/null @@ -1,30 +0,0 @@ -module ActionCable - class Worker - include ActiveSupport::Callbacks - include Celluloid - - define_callbacks :work - - def invoke(receiver, method, *args) - run_callbacks :work do - receiver.send method, *args - end - rescue Exception => e - logger.error "There was an exception - #{e.class}(#{e.message})" - logger.error e.backtrace.join("\n") - - receiver.handle_exception if receiver.respond_to?(:handle_exception) - end - - def run_periodic_timer(channel, callback) - run_callbacks :work do - callback.respond_to?(:call) ? channel.instance_exec(&callback) : channel.send(callback) - end - end - - private - def logger - ActionCable::Server.logger - end - end -end diff --git a/test/channel_test.rb b/test/channel_test.rb index ad5fa04356..96987977ea 100644 --- a/test/channel_test.rb +++ b/test/channel_test.rb @@ -8,7 +8,7 @@ class ChannelTest < ActionCableTest end end - class PingServer < ActionCable::Server + class PingServer < ActionCable::Server::Base register_channels PingChannel end diff --git a/test/server_test.rb b/test/server_test.rb index 824875bb99..1e02497f61 100644 --- a/test/server_test.rb +++ b/test/server_test.rb @@ -8,7 +8,7 @@ class ServerTest < ActionCableTest end end - class ChatServer < ActionCable::Server + class ChatServer < ActionCable::Server::Base register_channels ChatChannel end