diff --git a/actioncable/CHANGELOG.md b/actioncable/CHANGELOG.md index cec4595970..612fbb3cb3 100644 --- a/actioncable/CHANGELOG.md +++ b/actioncable/CHANGELOG.md @@ -1,3 +1,15 @@ +* The Action Cable client now ensures successful channel subscriptions: + + * The client maintains a set of pending subscriptions until either + the server confirms the subscription or the channel is torn down. + * Rectifies the race condition where an unsubscribe is rapidly followed + by a subscribe (on the same channel identifier) and the requests are + handled out of order by the ActionCable server, thereby ignoring the + subscribe command. + + *Daniel Spinosa* + + ## Rails 7.0.0.alpha2 (September 15, 2021) ## * No changes. diff --git a/actioncable/app/assets/javascripts/action_cable.js b/actioncable/app/assets/javascripts/action_cable.js index 371e0ca4f0..d2c264baa4 100644 --- a/actioncable/app/assets/javascripts/action_cable.js +++ b/actioncable/app/assets/javascripts/action_cable.js @@ -246,6 +246,7 @@ return this.monitor.recordPing(); case message_types.confirmation: + this.subscriptions.confirmSubscription(identifier); return this.subscriptions.notify(identifier, "connected"); case message_types.rejection: @@ -310,9 +311,46 @@ return this.consumer.subscriptions.remove(this); } } + class SubscriptionGuarantor { + constructor(subscriptions) { + this.subscriptions = subscriptions; + this.pendingSubscriptions = []; + } + guarantee(subscription) { + if (this.pendingSubscriptions.indexOf(subscription) == -1) { + logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`); + this.pendingSubscriptions.push(subscription); + } else { + logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`); + } + this.startGuaranteeing(); + } + forget(subscription) { + logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`); + this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription)); + } + startGuaranteeing() { + this.stopGuaranteeing(); + this.retrySubscribing(); + } + stopGuaranteeing() { + clearTimeout(this.retryTimeout); + } + retrySubscribing() { + this.retryTimeout = setTimeout((() => { + if (this.subscriptions && typeof this.subscriptions.subscribe === "function") { + this.pendingSubscriptions.map((subscription => { + logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`); + this.subscriptions.subscribe(subscription); + })); + } + }), 500); + } + } class Subscriptions { constructor(consumer) { this.consumer = consumer; + this.guarantor = new SubscriptionGuarantor(this); this.subscriptions = []; } create(channelName, mixin) { @@ -327,7 +365,7 @@ this.subscriptions.push(subscription); this.consumer.ensureActiveConnection(); this.notify(subscription, "initialized"); - this.sendCommand(subscription, "subscribe"); + this.subscribe(subscription); return subscription; } remove(subscription) { @@ -345,6 +383,7 @@ })); } forget(subscription) { + this.guarantor.forget(subscription); this.subscriptions = this.subscriptions.filter((s => s !== subscription)); return subscription; } @@ -352,7 +391,7 @@ return this.subscriptions.filter((s => s.identifier === identifier)); } reload() { - return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe"))); + return this.subscriptions.map((subscription => this.subscribe(subscription))); } notifyAll(callbackName, ...args) { return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args))); @@ -366,6 +405,15 @@ } return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)); } + subscribe(subscription) { + if (this.sendCommand(subscription, "subscribe")) { + this.guarantor.guarantee(subscription); + } + } + confirmSubscription(identifier) { + logger.log(`Subscription confirmed ${identifier}`); + this.findAll(identifier).map((subscription => this.guarantor.forget(subscription))); + } sendCommand(subscription, command) { const {identifier: identifier} = subscription; return this.consumer.send({ @@ -429,6 +477,7 @@ exports.Consumer = Consumer; exports.INTERNAL = INTERNAL; exports.Subscription = Subscription; + exports.SubscriptionGuarantor = SubscriptionGuarantor; exports.Subscriptions = Subscriptions; exports.adapters = adapters; exports.createConsumer = createConsumer; diff --git a/actioncable/app/assets/javascripts/actioncable.esm.js b/actioncable/app/assets/javascripts/actioncable.esm.js index d69e8ecd89..416448ce6c 100644 --- a/actioncable/app/assets/javascripts/actioncable.esm.js +++ b/actioncable/app/assets/javascripts/actioncable.esm.js @@ -254,6 +254,7 @@ Connection.prototype.events = { return this.monitor.recordPing(); case message_types.confirmation: + this.subscriptions.confirmSubscription(identifier); return this.subscriptions.notify(identifier, "connected"); case message_types.rejection: @@ -321,9 +322,47 @@ class Subscription { } } +class SubscriptionGuarantor { + constructor(subscriptions) { + this.subscriptions = subscriptions; + this.pendingSubscriptions = []; + } + guarantee(subscription) { + if (this.pendingSubscriptions.indexOf(subscription) == -1) { + logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`); + this.pendingSubscriptions.push(subscription); + } else { + logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`); + } + this.startGuaranteeing(); + } + forget(subscription) { + logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`); + this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription)); + } + startGuaranteeing() { + this.stopGuaranteeing(); + this.retrySubscribing(); + } + stopGuaranteeing() { + clearTimeout(this.retryTimeout); + } + retrySubscribing() { + this.retryTimeout = setTimeout((() => { + if (this.subscriptions && typeof this.subscriptions.subscribe === "function") { + this.pendingSubscriptions.map((subscription => { + logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`); + this.subscriptions.subscribe(subscription); + })); + } + }), 500); + } +} + class Subscriptions { constructor(consumer) { this.consumer = consumer; + this.guarantor = new SubscriptionGuarantor(this); this.subscriptions = []; } create(channelName, mixin) { @@ -338,7 +377,7 @@ class Subscriptions { this.subscriptions.push(subscription); this.consumer.ensureActiveConnection(); this.notify(subscription, "initialized"); - this.sendCommand(subscription, "subscribe"); + this.subscribe(subscription); return subscription; } remove(subscription) { @@ -356,6 +395,7 @@ class Subscriptions { })); } forget(subscription) { + this.guarantor.forget(subscription); this.subscriptions = this.subscriptions.filter((s => s !== subscription)); return subscription; } @@ -363,7 +403,7 @@ class Subscriptions { return this.subscriptions.filter((s => s.identifier === identifier)); } reload() { - return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe"))); + return this.subscriptions.map((subscription => this.subscribe(subscription))); } notifyAll(callbackName, ...args) { return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args))); @@ -377,6 +417,15 @@ class Subscriptions { } return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)); } + subscribe(subscription) { + if (this.sendCommand(subscription, "subscribe")) { + this.guarantor.guarantee(subscription); + } + } + confirmSubscription(identifier) { + logger.log(`Subscription confirmed ${identifier}`); + this.findAll(identifier).map((subscription => this.guarantor.forget(subscription))); + } sendCommand(subscription, command) { const {identifier: identifier} = subscription; return this.consumer.send({ @@ -439,4 +488,4 @@ function getConfig(name) { } } -export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger }; +export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, SubscriptionGuarantor, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger }; diff --git a/actioncable/app/assets/javascripts/actioncable.js b/actioncable/app/assets/javascripts/actioncable.js index 7bc8ebb5db..e8f48805e0 100644 --- a/actioncable/app/assets/javascripts/actioncable.js +++ b/actioncable/app/assets/javascripts/actioncable.js @@ -246,6 +246,7 @@ return this.monitor.recordPing(); case message_types.confirmation: + this.subscriptions.confirmSubscription(identifier); return this.subscriptions.notify(identifier, "connected"); case message_types.rejection: @@ -310,9 +311,46 @@ return this.consumer.subscriptions.remove(this); } } + class SubscriptionGuarantor { + constructor(subscriptions) { + this.subscriptions = subscriptions; + this.pendingSubscriptions = []; + } + guarantee(subscription) { + if (this.pendingSubscriptions.indexOf(subscription) == -1) { + logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`); + this.pendingSubscriptions.push(subscription); + } else { + logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`); + } + this.startGuaranteeing(); + } + forget(subscription) { + logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`); + this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription)); + } + startGuaranteeing() { + this.stopGuaranteeing(); + this.retrySubscribing(); + } + stopGuaranteeing() { + clearTimeout(this.retryTimeout); + } + retrySubscribing() { + this.retryTimeout = setTimeout((() => { + if (this.subscriptions && typeof this.subscriptions.subscribe === "function") { + this.pendingSubscriptions.map((subscription => { + logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`); + this.subscriptions.subscribe(subscription); + })); + } + }), 500); + } + } class Subscriptions { constructor(consumer) { this.consumer = consumer; + this.guarantor = new SubscriptionGuarantor(this); this.subscriptions = []; } create(channelName, mixin) { @@ -327,7 +365,7 @@ this.subscriptions.push(subscription); this.consumer.ensureActiveConnection(); this.notify(subscription, "initialized"); - this.sendCommand(subscription, "subscribe"); + this.subscribe(subscription); return subscription; } remove(subscription) { @@ -345,6 +383,7 @@ })); } forget(subscription) { + this.guarantor.forget(subscription); this.subscriptions = this.subscriptions.filter((s => s !== subscription)); return subscription; } @@ -352,7 +391,7 @@ return this.subscriptions.filter((s => s.identifier === identifier)); } reload() { - return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe"))); + return this.subscriptions.map((subscription => this.subscribe(subscription))); } notifyAll(callbackName, ...args) { return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args))); @@ -366,6 +405,15 @@ } return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)); } + subscribe(subscription) { + if (this.sendCommand(subscription, "subscribe")) { + this.guarantor.guarantee(subscription); + } + } + confirmSubscription(identifier) { + logger.log(`Subscription confirmed ${identifier}`); + this.findAll(identifier).map((subscription => this.guarantor.forget(subscription))); + } sendCommand(subscription, command) { const {identifier: identifier} = subscription; return this.consumer.send({ @@ -428,6 +476,7 @@ exports.Consumer = Consumer; exports.INTERNAL = INTERNAL; exports.Subscription = Subscription; + exports.SubscriptionGuarantor = SubscriptionGuarantor; exports.Subscriptions = Subscriptions; exports.adapters = adapters; exports.createConsumer = createConsumer; diff --git a/actioncable/app/javascript/action_cable/connection.js b/actioncable/app/javascript/action_cable/connection.js index 96bac132c1..87584545cc 100644 --- a/actioncable/app/javascript/action_cable/connection.js +++ b/actioncable/app/javascript/action_cable/connection.js @@ -132,6 +132,7 @@ Connection.prototype.events = { case message_types.ping: return this.monitor.recordPing() case message_types.confirmation: + this.subscriptions.confirmSubscription(identifier) return this.subscriptions.notify(identifier, "connected") case message_types.rejection: return this.subscriptions.reject(identifier) diff --git a/actioncable/app/javascript/action_cable/index.js b/actioncable/app/javascript/action_cable/index.js index 848b5631d6..3e650bc120 100644 --- a/actioncable/app/javascript/action_cable/index.js +++ b/actioncable/app/javascript/action_cable/index.js @@ -4,6 +4,7 @@ import Consumer, { createWebSocketURL } from "./consumer" import INTERNAL from "./internal" import Subscription from "./subscription" import Subscriptions from "./subscriptions" +import SubscriptionGuarantor from "./subscription_guarantor" import adapters from "./adapters" import logger from "./logger" @@ -14,6 +15,7 @@ export { INTERNAL, Subscription, Subscriptions, + SubscriptionGuarantor, adapters, createWebSocketURL, logger, diff --git a/actioncable/app/javascript/action_cable/subscription_guarantor.js b/actioncable/app/javascript/action_cable/subscription_guarantor.js new file mode 100644 index 0000000000..7d6ad98fa3 --- /dev/null +++ b/actioncable/app/javascript/action_cable/subscription_guarantor.js @@ -0,0 +1,50 @@ +import logger from "./logger" + +// Responsible for ensuring channel subscribe command is confirmed, retrying until confirmation is received. +// Internal class, not intended for direct user manipulation. + +class SubscriptionGuarantor { + constructor(subscriptions) { + this.subscriptions = subscriptions + this.pendingSubscriptions = [] + } + + guarantee(subscription) { + if(this.pendingSubscriptions.indexOf(subscription) == -1){ + logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`) + this.pendingSubscriptions.push(subscription) + } + else { + logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`) + } + this.startGuaranteeing() + } + + forget(subscription) { + logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`) + this.pendingSubscriptions = (this.pendingSubscriptions.filter((s) => s !== subscription)) + } + + startGuaranteeing() { + this.stopGuaranteeing() + this.retrySubscribing() + } + + stopGuaranteeing() { + clearTimeout(this.retryTimeout) + } + + retrySubscribing() { + this.retryTimeout = setTimeout(() => { + if (this.subscriptions && typeof(this.subscriptions.subscribe) === "function") { + this.pendingSubscriptions.map((subscription) => { + logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`) + this.subscriptions.subscribe(subscription) + }) + } + } + , 500) + } +} + +export default SubscriptionGuarantor \ No newline at end of file diff --git a/actioncable/app/javascript/action_cable/subscriptions.js b/actioncable/app/javascript/action_cable/subscriptions.js index 06ca71cd53..ec41ccbf75 100644 --- a/actioncable/app/javascript/action_cable/subscriptions.js +++ b/actioncable/app/javascript/action_cable/subscriptions.js @@ -1,4 +1,6 @@ import Subscription from "./subscription" +import SubscriptionGuarantor from "./subscription_guarantor" +import logger from "./logger" // Collection class for creating (and internally managing) channel subscriptions. // The only method intended to be triggered by the user is ActionCable.Subscriptions#create, @@ -13,6 +15,7 @@ import Subscription from "./subscription" export default class Subscriptions { constructor(consumer) { this.consumer = consumer + this.guarantor = new SubscriptionGuarantor(this) this.subscriptions = [] } @@ -29,7 +32,7 @@ export default class Subscriptions { this.subscriptions.push(subscription) this.consumer.ensureActiveConnection() this.notify(subscription, "initialized") - this.sendCommand(subscription, "subscribe") + this.subscribe(subscription) return subscription } @@ -50,6 +53,7 @@ export default class Subscriptions { } forget(subscription) { + this.guarantor.forget(subscription) this.subscriptions = (this.subscriptions.filter((s) => s !== subscription)) return subscription } @@ -60,7 +64,7 @@ export default class Subscriptions { reload() { return this.subscriptions.map((subscription) => - this.sendCommand(subscription, "subscribe")) + this.subscribe(subscription)) } notifyAll(callbackName, ...args) { @@ -80,6 +84,18 @@ export default class Subscriptions { (typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined)) } + subscribe(subscription) { + if (this.sendCommand(subscription, "subscribe")) { + this.guarantor.guarantee(subscription) + } + } + + confirmSubscription(identifier) { + logger.log(`Subscription confirmed ${identifier}`) + this.findAll(identifier).map((subscription) => + this.guarantor.forget(subscription)) + } + sendCommand(subscription, command) { const {identifier} = subscription return this.consumer.send({command, identifier}) diff --git a/actioncable/test/javascript/src/test.js b/actioncable/test/javascript/src/test.js index 54529ac10c..938f71a2fa 100644 --- a/actioncable/test/javascript/src/test.js +++ b/actioncable/test/javascript/src/test.js @@ -5,3 +5,4 @@ import "./unit/connection_monitor_test" import "./unit/consumer_test" import "./unit/subscription_test" import "./unit/subscriptions_test" +import "./unit/subscription_guarantor_test" diff --git a/actioncable/test/javascript/src/unit/subscription_guarantor_test.js b/actioncable/test/javascript/src/unit/subscription_guarantor_test.js new file mode 100644 index 0000000000..83665344f5 --- /dev/null +++ b/actioncable/test/javascript/src/unit/subscription_guarantor_test.js @@ -0,0 +1,32 @@ +import * as ActionCable from "../../../../app/javascript/action_cable/index" + +const {module, test} = QUnit + +module("ActionCable.SubscriptionGuarantor", hooks => { + let guarantor + hooks.beforeEach(() => guarantor = new ActionCable.SubscriptionGuarantor({})) + + module("#guarantee", () => { + test("guarantees subscription only once", assert => { + const sub = {} + + assert.equal(guarantor.pendingSubscriptions.length, 0) + guarantor.guarantee(sub) + assert.equal(guarantor.pendingSubscriptions.length, 1) + guarantor.guarantee(sub) + assert.equal(guarantor.pendingSubscriptions.length, 1) + }) + }), + + module("#forget", () => { + test("removes subscription", assert => { + const sub = {} + + assert.equal(guarantor.pendingSubscriptions.length, 0) + guarantor.guarantee(sub) + assert.equal(guarantor.pendingSubscriptions.length, 1) + guarantor.forget(sub) + assert.equal(guarantor.pendingSubscriptions.length, 0) + }) + }) +})