mirror of
https://github.com/rails/rails.git
synced 2022-11-09 12:12:34 -05:00
Extract connection monitoring and rewrite as a subscriber
This commit is contained in:
parent
d9d7371c56
commit
c846f43d46
4 changed files with 70 additions and 76 deletions
|
@ -4,6 +4,8 @@
|
|||
#= require cable/channel
|
||||
|
||||
class @Cable
|
||||
@PING_IDENTIFIER: "_ping"
|
||||
|
||||
constructor: (@url) ->
|
||||
@subscribers = new Cable.SubscriberManager this
|
||||
@connection = new Cable.Connection this
|
||||
|
|
|
@ -1,10 +1,8 @@
|
|||
class Cable.Connection
|
||||
MAX_CONNECTION_INTERVAL: 5 * 1000
|
||||
PING_STALE_INTERVAL: 8
|
||||
#= require cable/connection_monitor
|
||||
|
||||
class Cable.Connection
|
||||
constructor: (@cable) ->
|
||||
@resetPingTime()
|
||||
@resetConnectionAttemptsCount()
|
||||
new Cable.ConnectionMonitor @cable
|
||||
@connect()
|
||||
|
||||
send: (data) ->
|
||||
|
@ -15,88 +13,40 @@ class Cable.Connection
|
|||
false
|
||||
|
||||
connect: ->
|
||||
@websocket = @createWebSocket()
|
||||
@removeWebsocket()
|
||||
@createWebSocket()
|
||||
|
||||
createWebSocket: ->
|
||||
ws = new WebSocket(@cable.url)
|
||||
ws.onmessage = @onMessage
|
||||
ws.onopen = @onConnect
|
||||
ws.onclose = @onClose
|
||||
ws.onerror = @onError
|
||||
ws
|
||||
@websocket = new WebSocket(@cable.url)
|
||||
@websocket.onmessage = @onMessage
|
||||
@websocket.onopen = @onConnect
|
||||
@websocket.onclose = @onClose
|
||||
@websocket.onerror = @onError
|
||||
@websocket
|
||||
|
||||
onMessage: (message) =>
|
||||
data = JSON.parse message.data
|
||||
|
||||
if data.identifier is '_ping'
|
||||
@pingReceived(data.message)
|
||||
else
|
||||
@cable.subscribers.notify(data.identifier, "received", data.message)
|
||||
|
||||
onConnect: =>
|
||||
@startWaitingForPing()
|
||||
@resetConnectionAttemptsCount()
|
||||
@cable.subscribers.reload()
|
||||
|
||||
onClose: =>
|
||||
@reconnect()
|
||||
|
||||
onError: =>
|
||||
@reconnect()
|
||||
|
||||
isConnected: ->
|
||||
@websocket?.readyState is 1
|
||||
|
||||
disconnect: ->
|
||||
@removeExistingConnection()
|
||||
@resetPingTime()
|
||||
@cable.subscribers.notifyAll("disconnected")
|
||||
|
||||
reconnect: ->
|
||||
@disconnect()
|
||||
|
||||
setTimeout =>
|
||||
@incrementConnectionAttemptsCount()
|
||||
@connect()
|
||||
, @generateReconnectInterval()
|
||||
|
||||
removeExistingConnection: ->
|
||||
removeWebsocket: ->
|
||||
if @websocket?
|
||||
@clearPingWaitTimeout()
|
||||
|
||||
@websocket.onclose = -> # no-op
|
||||
@websocket.onerror = -> # no-op
|
||||
@websocket.close()
|
||||
@websocket = null
|
||||
|
||||
resetConnectionAttemptsCount: ->
|
||||
@connectionAttempts = 1
|
||||
onMessage: (message) =>
|
||||
data = JSON.parse message.data
|
||||
@cable.subscribers.notify(data.identifier, "received", data.message)
|
||||
|
||||
incrementConnectionAttemptsCount: ->
|
||||
@connectionAttempts += 1
|
||||
onConnect: =>
|
||||
@cable.subscribers.reload()
|
||||
|
||||
generateReconnectInterval: () ->
|
||||
interval = (Math.pow(2, @connectionAttempts) - 1) * 1000
|
||||
if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval
|
||||
onClose: =>
|
||||
@disconnect()
|
||||
|
||||
startWaitingForPing: ->
|
||||
@clearPingWaitTimeout()
|
||||
onError: =>
|
||||
@disconnect()
|
||||
|
||||
@waitForPingTimeout = setTimeout =>
|
||||
console.log "Ping took too long to arrive. Reconnecting.."
|
||||
@reconnect()
|
||||
, @PING_STALE_INTERVAL * 1000
|
||||
isConnected: ->
|
||||
@websocket?.readyState is 1
|
||||
|
||||
clearPingWaitTimeout: ->
|
||||
clearTimeout(@waitForPingTimeout)
|
||||
|
||||
resetPingTime: ->
|
||||
@lastPingTime = null
|
||||
|
||||
pingReceived: (timestamp) ->
|
||||
if @lastPingTime? and (timestamp - @lastPingTime) > @PING_STALE_INTERVAL
|
||||
console.log "Websocket connection is stale. Reconnecting.."
|
||||
@reconnect()
|
||||
else
|
||||
@startWaitingForPing()
|
||||
@lastPingTime = timestamp
|
||||
disconnect: ->
|
||||
@cable.subscribers.notifyAll("disconnected")
|
||||
@removeWebsocket()
|
||||
|
|
41
lib/assets/javascripts/cable/connection_monitor.js.coffee
Normal file
41
lib/assets/javascripts/cable/connection_monitor.js.coffee
Normal file
|
@ -0,0 +1,41 @@
|
|||
class Cable.ConnectionMonitor
|
||||
MAX_CONNECTION_INTERVAL: 5 * 1000
|
||||
PING_STALE_INTERVAL: 8 * 1000
|
||||
|
||||
identifier: Cable.PING_IDENTIFIER
|
||||
|
||||
constructor: (@cable) ->
|
||||
@reset()
|
||||
@cable.subscribers.add(this)
|
||||
@pollConnection()
|
||||
|
||||
connected: ->
|
||||
@reset()
|
||||
@pingedAt = now()
|
||||
|
||||
received: ->
|
||||
@pingedAt = now()
|
||||
|
||||
reset: ->
|
||||
@connectionAttempts = 1
|
||||
|
||||
pollConnection: ->
|
||||
setTimeout =>
|
||||
@reconnect() if @connectionIsStale()
|
||||
@pollConnection()
|
||||
, @getPollTimeout()
|
||||
|
||||
getPollTimeout: ->
|
||||
interval = (Math.pow(2, @connectionAttempts) - 1) * 1000
|
||||
if interval > @MAX_CONNECTION_INTERVAL then @MAX_CONNECTION_INTERVAL else interval
|
||||
|
||||
connectionIsStale: ->
|
||||
@pingedAt? and (now() - @pingedAt) > @PING_STALE_INTERVAL
|
||||
|
||||
reconnect: ->
|
||||
console.log "Ping took too long to arrive. Reconnecting.."
|
||||
@connectionAttempts += 1
|
||||
@cable.connection.connect()
|
||||
|
||||
now = ->
|
||||
new Date().getTime()
|
|
@ -30,4 +30,5 @@ class Cable.SubscriberManager
|
|||
subscriber[event]?(args...)
|
||||
|
||||
sendCommand: (command, identifier) ->
|
||||
return true if identifier is Cable.PING_IDENTIFIER
|
||||
@cable.send({command, identifier})
|
||||
|
|
Loading…
Reference in a new issue