mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Add WebSocket decorator
This commit is contained in:
parent
78f3c88d69
commit
321d04ff56
4 changed files with 36 additions and 22 deletions
|
@ -5,6 +5,7 @@ module ActionCable
|
||||||
autoload :Identification, 'action_cable/connection/identification'
|
autoload :Identification, 'action_cable/connection/identification'
|
||||||
autoload :InternalChannel, 'action_cable/connection/internal_channel'
|
autoload :InternalChannel, 'action_cable/connection/internal_channel'
|
||||||
autoload :MessageBuffer, 'action_cable/connection/message_buffer'
|
autoload :MessageBuffer, 'action_cable/connection/message_buffer'
|
||||||
|
autoload :WebSocket, 'action_cable/connection/web_socket'
|
||||||
autoload :Subscriptions, 'action_cable/connection/subscriptions'
|
autoload :Subscriptions, 'action_cable/connection/subscriptions'
|
||||||
autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
|
autoload :TaggedLoggerProxy, 'action_cable/connection/tagged_logger_proxy'
|
||||||
end
|
end
|
||||||
|
|
|
@ -14,6 +14,7 @@ module ActionCable
|
||||||
|
|
||||||
@logger = initialize_tagged_logger
|
@logger = initialize_tagged_logger
|
||||||
|
|
||||||
|
@websocket = ActionCable::Connection::WebSocket.new(env)
|
||||||
@heartbeat = ActionCable::Connection::Heartbeat.new(self)
|
@heartbeat = ActionCable::Connection::Heartbeat.new(self)
|
||||||
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
|
||||||
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
|
||||||
|
@ -21,12 +22,10 @@ module ActionCable
|
||||||
@started_at = Time.now
|
@started_at = Time.now
|
||||||
end
|
end
|
||||||
|
|
||||||
def process
|
def response
|
||||||
logger.info started_request_message
|
logger.info started_request_message
|
||||||
|
|
||||||
if websocket_request?
|
if websocket.possible?
|
||||||
websocket_initialization
|
|
||||||
|
|
||||||
websocket.on(:open) { |event| send_async :on_open }
|
websocket.on(:open) { |event| send_async :on_open }
|
||||||
websocket.on(:message) { |event| on_message event.data }
|
websocket.on(:message) { |event| on_message event.data }
|
||||||
websocket.on(:close) { |event| send_async :on_close }
|
websocket.on(:close) { |event| send_async :on_close }
|
||||||
|
@ -38,7 +37,7 @@ module ActionCable
|
||||||
end
|
end
|
||||||
|
|
||||||
def receive(data_in_json)
|
def receive(data_in_json)
|
||||||
if websocket_alive?
|
if websocket.alive?
|
||||||
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
|
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
|
||||||
else
|
else
|
||||||
logger.error "Received data without a live websocket (#{data.inspect})"
|
logger.error "Received data without a live websocket (#{data.inspect})"
|
||||||
|
@ -46,7 +45,7 @@ module ActionCable
|
||||||
end
|
end
|
||||||
|
|
||||||
def transmit(data)
|
def transmit(data)
|
||||||
websocket.send data
|
websocket.transmit data
|
||||||
end
|
end
|
||||||
|
|
||||||
def close
|
def close
|
||||||
|
@ -78,19 +77,6 @@ module ActionCable
|
||||||
attr_reader :websocket
|
attr_reader :websocket
|
||||||
attr_reader :heartbeat, :subscriptions, :message_buffer
|
attr_reader :heartbeat, :subscriptions, :message_buffer
|
||||||
|
|
||||||
def websocket_initialization
|
|
||||||
@websocket = Faye::WebSocket.new(@env)
|
|
||||||
end
|
|
||||||
|
|
||||||
def websocket_alive?
|
|
||||||
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
|
|
||||||
end
|
|
||||||
|
|
||||||
def websocket_request?
|
|
||||||
@is_websocket ||= Faye::WebSocket.websocket?(@env)
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
def on_open
|
def on_open
|
||||||
server.add_connection(self)
|
server.add_connection(self)
|
||||||
|
|
||||||
|
@ -134,7 +120,7 @@ module ActionCable
|
||||||
'Started %s "%s"%s for %s at %s' % [
|
'Started %s "%s"%s for %s at %s' % [
|
||||||
request.request_method,
|
request.request_method,
|
||||||
request.filtered_path,
|
request.filtered_path,
|
||||||
websocket_request? ? ' [Websocket]' : '',
|
websocket.possible? ? ' [Websocket]' : '',
|
||||||
request.ip,
|
request.ip,
|
||||||
Time.now.to_default_s ]
|
Time.now.to_default_s ]
|
||||||
end
|
end
|
||||||
|
@ -142,7 +128,7 @@ module ActionCable
|
||||||
def finished_request_message
|
def finished_request_message
|
||||||
'Finished "%s"%s for %s at %s' % [
|
'Finished "%s"%s for %s at %s' % [
|
||||||
request.filtered_path,
|
request.filtered_path,
|
||||||
websocket_request? ? ' [Websocket]' : '',
|
websocket.possible? ? ' [Websocket]' : '',
|
||||||
request.ip,
|
request.ip,
|
||||||
Time.now.to_default_s ]
|
Time.now.to_default_s ]
|
||||||
end
|
end
|
||||||
|
|
27
lib/action_cable/connection/web_socket.rb
Normal file
27
lib/action_cable/connection/web_socket.rb
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
module ActionCable
|
||||||
|
module Connection
|
||||||
|
# Decorate the Faye::WebSocket with helpers we need.
|
||||||
|
class WebSocket
|
||||||
|
delegate :rack_response, :close, :on, to: :websocket
|
||||||
|
|
||||||
|
def initialize(env)
|
||||||
|
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
|
||||||
|
end
|
||||||
|
|
||||||
|
def possible?
|
||||||
|
websocket
|
||||||
|
end
|
||||||
|
|
||||||
|
def alive?
|
||||||
|
websocket && websocket.ready_state == Faye::WebSocket::API::OPEN
|
||||||
|
end
|
||||||
|
|
||||||
|
def transmit(data)
|
||||||
|
websocket.send data
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
attr_reader :websocket
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -17,7 +17,7 @@ module ActionCable
|
||||||
end
|
end
|
||||||
|
|
||||||
def call(env)
|
def call(env)
|
||||||
@connection_class.new(self, env).process
|
@connection_class.new(self, env).response
|
||||||
end
|
end
|
||||||
|
|
||||||
def worker_pool
|
def worker_pool
|
||||||
|
|
Loading…
Reference in a new issue