1
0
Fork 0
mirror of https://github.com/rails/rails.git synced 2022-11-09 12:12:34 -05:00

Client ensures subscribe command is confirmed. (#41581)

A SubscriptionGuarantor maintains a set of pending subscriptions,
resending the subscribe command unless and until the subscription
is confirmed or rejected by the server or cancelled client-side.

A race condition in the ActionCable server - where an unsubscribe
is sent, followed rapidly by a subscribe, but handled in the reverse
order - necessitates this enhancement.  Indeed, the subscriptions created
and torn down by Turbo Streams amplifies the existence of this race
condition.
This commit is contained in:
Dan Spinosa 2021-09-26 13:06:27 -04:00 committed by GitHub
parent c04cf690f1
commit 6d7c12274e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 270 additions and 9 deletions

View file

@ -1,3 +1,15 @@
* The Action Cable client now ensures successful channel subscriptions:
* The client maintains a set of pending subscriptions until either
the server confirms the subscription or the channel is torn down.
* Rectifies the race condition where an unsubscribe is rapidly followed
by a subscribe (on the same channel identifier) and the requests are
handled out of order by the ActionCable server, thereby ignoring the
subscribe command.
*Daniel Spinosa*
## Rails 7.0.0.alpha2 (September 15, 2021) ##
* No changes.

View file

@ -246,6 +246,7 @@
return this.monitor.recordPing();
case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");
case message_types.rejection:
@ -310,9 +311,46 @@
return this.consumer.subscriptions.remove(this);
}
}
class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions;
this.pendingSubscriptions = [];
}
guarantee(subscription) {
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
this.pendingSubscriptions.push(subscription);
} else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
}
this.startGuaranteeing();
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
}
startGuaranteeing() {
this.stopGuaranteeing();
this.retrySubscribing();
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout);
}
retrySubscribing() {
this.retryTimeout = setTimeout((() => {
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
this.pendingSubscriptions.map((subscription => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
this.subscriptions.subscribe(subscription);
}));
}
}), 500);
}
}
class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.subscriptions = [];
}
create(channelName, mixin) {
@ -327,7 +365,7 @@
this.subscriptions.push(subscription);
this.consumer.ensureActiveConnection();
this.notify(subscription, "initialized");
this.sendCommand(subscription, "subscribe");
this.subscribe(subscription);
return subscription;
}
remove(subscription) {
@ -345,6 +383,7 @@
}));
}
forget(subscription) {
this.guarantor.forget(subscription);
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
return subscription;
}
@ -352,7 +391,7 @@
return this.subscriptions.filter((s => s.identifier === identifier));
}
reload() {
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
return this.subscriptions.map((subscription => this.subscribe(subscription)));
}
notifyAll(callbackName, ...args) {
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
@ -366,6 +405,15 @@
}
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription);
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`);
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
}
sendCommand(subscription, command) {
const {identifier: identifier} = subscription;
return this.consumer.send({
@ -429,6 +477,7 @@
exports.Consumer = Consumer;
exports.INTERNAL = INTERNAL;
exports.Subscription = Subscription;
exports.SubscriptionGuarantor = SubscriptionGuarantor;
exports.Subscriptions = Subscriptions;
exports.adapters = adapters;
exports.createConsumer = createConsumer;

View file

@ -254,6 +254,7 @@ Connection.prototype.events = {
return this.monitor.recordPing();
case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");
case message_types.rejection:
@ -321,9 +322,47 @@ class Subscription {
}
}
class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions;
this.pendingSubscriptions = [];
}
guarantee(subscription) {
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
this.pendingSubscriptions.push(subscription);
} else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
}
this.startGuaranteeing();
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
}
startGuaranteeing() {
this.stopGuaranteeing();
this.retrySubscribing();
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout);
}
retrySubscribing() {
this.retryTimeout = setTimeout((() => {
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
this.pendingSubscriptions.map((subscription => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
this.subscriptions.subscribe(subscription);
}));
}
}), 500);
}
}
class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.subscriptions = [];
}
create(channelName, mixin) {
@ -338,7 +377,7 @@ class Subscriptions {
this.subscriptions.push(subscription);
this.consumer.ensureActiveConnection();
this.notify(subscription, "initialized");
this.sendCommand(subscription, "subscribe");
this.subscribe(subscription);
return subscription;
}
remove(subscription) {
@ -356,6 +395,7 @@ class Subscriptions {
}));
}
forget(subscription) {
this.guarantor.forget(subscription);
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
return subscription;
}
@ -363,7 +403,7 @@ class Subscriptions {
return this.subscriptions.filter((s => s.identifier === identifier));
}
reload() {
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
return this.subscriptions.map((subscription => this.subscribe(subscription)));
}
notifyAll(callbackName, ...args) {
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
@ -377,6 +417,15 @@ class Subscriptions {
}
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription);
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`);
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
}
sendCommand(subscription, command) {
const {identifier: identifier} = subscription;
return this.consumer.send({
@ -439,4 +488,4 @@ function getConfig(name) {
}
}
export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };
export { Connection, ConnectionMonitor, Consumer, INTERNAL, Subscription, SubscriptionGuarantor, Subscriptions, adapters, createConsumer, createWebSocketURL, getConfig, logger };

View file

@ -246,6 +246,7 @@
return this.monitor.recordPing();
case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier);
return this.subscriptions.notify(identifier, "connected");
case message_types.rejection:
@ -310,9 +311,46 @@
return this.consumer.subscriptions.remove(this);
}
}
class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions;
this.pendingSubscriptions = [];
}
guarantee(subscription) {
if (this.pendingSubscriptions.indexOf(subscription) == -1) {
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`);
this.pendingSubscriptions.push(subscription);
} else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`);
}
this.startGuaranteeing();
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`);
this.pendingSubscriptions = this.pendingSubscriptions.filter((s => s !== subscription));
}
startGuaranteeing() {
this.stopGuaranteeing();
this.retrySubscribing();
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout);
}
retrySubscribing() {
this.retryTimeout = setTimeout((() => {
if (this.subscriptions && typeof this.subscriptions.subscribe === "function") {
this.pendingSubscriptions.map((subscription => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`);
this.subscriptions.subscribe(subscription);
}));
}
}), 500);
}
}
class Subscriptions {
constructor(consumer) {
this.consumer = consumer;
this.guarantor = new SubscriptionGuarantor(this);
this.subscriptions = [];
}
create(channelName, mixin) {
@ -327,7 +365,7 @@
this.subscriptions.push(subscription);
this.consumer.ensureActiveConnection();
this.notify(subscription, "initialized");
this.sendCommand(subscription, "subscribe");
this.subscribe(subscription);
return subscription;
}
remove(subscription) {
@ -345,6 +383,7 @@
}));
}
forget(subscription) {
this.guarantor.forget(subscription);
this.subscriptions = this.subscriptions.filter((s => s !== subscription));
return subscription;
}
@ -352,7 +391,7 @@
return this.subscriptions.filter((s => s.identifier === identifier));
}
reload() {
return this.subscriptions.map((subscription => this.sendCommand(subscription, "subscribe")));
return this.subscriptions.map((subscription => this.subscribe(subscription)));
}
notifyAll(callbackName, ...args) {
return this.subscriptions.map((subscription => this.notify(subscription, callbackName, ...args)));
@ -366,6 +405,15 @@
}
return subscriptions.map((subscription => typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined));
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription);
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`);
this.findAll(identifier).map((subscription => this.guarantor.forget(subscription)));
}
sendCommand(subscription, command) {
const {identifier: identifier} = subscription;
return this.consumer.send({
@ -428,6 +476,7 @@
exports.Consumer = Consumer;
exports.INTERNAL = INTERNAL;
exports.Subscription = Subscription;
exports.SubscriptionGuarantor = SubscriptionGuarantor;
exports.Subscriptions = Subscriptions;
exports.adapters = adapters;
exports.createConsumer = createConsumer;

View file

@ -132,6 +132,7 @@ Connection.prototype.events = {
case message_types.ping:
return this.monitor.recordPing()
case message_types.confirmation:
this.subscriptions.confirmSubscription(identifier)
return this.subscriptions.notify(identifier, "connected")
case message_types.rejection:
return this.subscriptions.reject(identifier)

View file

@ -4,6 +4,7 @@ import Consumer, { createWebSocketURL } from "./consumer"
import INTERNAL from "./internal"
import Subscription from "./subscription"
import Subscriptions from "./subscriptions"
import SubscriptionGuarantor from "./subscription_guarantor"
import adapters from "./adapters"
import logger from "./logger"
@ -14,6 +15,7 @@ export {
INTERNAL,
Subscription,
Subscriptions,
SubscriptionGuarantor,
adapters,
createWebSocketURL,
logger,

View file

@ -0,0 +1,50 @@
import logger from "./logger"
// Responsible for ensuring channel subscribe command is confirmed, retrying until confirmation is received.
// Internal class, not intended for direct user manipulation.
class SubscriptionGuarantor {
constructor(subscriptions) {
this.subscriptions = subscriptions
this.pendingSubscriptions = []
}
guarantee(subscription) {
if(this.pendingSubscriptions.indexOf(subscription) == -1){
logger.log(`SubscriptionGuarantor guaranteeing ${subscription.identifier}`)
this.pendingSubscriptions.push(subscription)
}
else {
logger.log(`SubscriptionGuarantor already guaranteeing ${subscription.identifier}`)
}
this.startGuaranteeing()
}
forget(subscription) {
logger.log(`SubscriptionGuarantor forgetting ${subscription.identifier}`)
this.pendingSubscriptions = (this.pendingSubscriptions.filter((s) => s !== subscription))
}
startGuaranteeing() {
this.stopGuaranteeing()
this.retrySubscribing()
}
stopGuaranteeing() {
clearTimeout(this.retryTimeout)
}
retrySubscribing() {
this.retryTimeout = setTimeout(() => {
if (this.subscriptions && typeof(this.subscriptions.subscribe) === "function") {
this.pendingSubscriptions.map((subscription) => {
logger.log(`SubscriptionGuarantor resubscribing ${subscription.identifier}`)
this.subscriptions.subscribe(subscription)
})
}
}
, 500)
}
}
export default SubscriptionGuarantor

View file

@ -1,4 +1,6 @@
import Subscription from "./subscription"
import SubscriptionGuarantor from "./subscription_guarantor"
import logger from "./logger"
// Collection class for creating (and internally managing) channel subscriptions.
// The only method intended to be triggered by the user is ActionCable.Subscriptions#create,
@ -13,6 +15,7 @@ import Subscription from "./subscription"
export default class Subscriptions {
constructor(consumer) {
this.consumer = consumer
this.guarantor = new SubscriptionGuarantor(this)
this.subscriptions = []
}
@ -29,7 +32,7 @@ export default class Subscriptions {
this.subscriptions.push(subscription)
this.consumer.ensureActiveConnection()
this.notify(subscription, "initialized")
this.sendCommand(subscription, "subscribe")
this.subscribe(subscription)
return subscription
}
@ -50,6 +53,7 @@ export default class Subscriptions {
}
forget(subscription) {
this.guarantor.forget(subscription)
this.subscriptions = (this.subscriptions.filter((s) => s !== subscription))
return subscription
}
@ -60,7 +64,7 @@ export default class Subscriptions {
reload() {
return this.subscriptions.map((subscription) =>
this.sendCommand(subscription, "subscribe"))
this.subscribe(subscription))
}
notifyAll(callbackName, ...args) {
@ -80,6 +84,18 @@ export default class Subscriptions {
(typeof subscription[callbackName] === "function" ? subscription[callbackName](...args) : undefined))
}
subscribe(subscription) {
if (this.sendCommand(subscription, "subscribe")) {
this.guarantor.guarantee(subscription)
}
}
confirmSubscription(identifier) {
logger.log(`Subscription confirmed ${identifier}`)
this.findAll(identifier).map((subscription) =>
this.guarantor.forget(subscription))
}
sendCommand(subscription, command) {
const {identifier} = subscription
return this.consumer.send({command, identifier})

View file

@ -5,3 +5,4 @@ import "./unit/connection_monitor_test"
import "./unit/consumer_test"
import "./unit/subscription_test"
import "./unit/subscriptions_test"
import "./unit/subscription_guarantor_test"

View file

@ -0,0 +1,32 @@
import * as ActionCable from "../../../../app/javascript/action_cable/index"
const {module, test} = QUnit
module("ActionCable.SubscriptionGuarantor", hooks => {
let guarantor
hooks.beforeEach(() => guarantor = new ActionCable.SubscriptionGuarantor({}))
module("#guarantee", () => {
test("guarantees subscription only once", assert => {
const sub = {}
assert.equal(guarantor.pendingSubscriptions.length, 0)
guarantor.guarantee(sub)
assert.equal(guarantor.pendingSubscriptions.length, 1)
guarantor.guarantee(sub)
assert.equal(guarantor.pendingSubscriptions.length, 1)
})
}),
module("#forget", () => {
test("removes subscription", assert => {
const sub = {}
assert.equal(guarantor.pendingSubscriptions.length, 0)
guarantor.guarantee(sub)
assert.equal(guarantor.pendingSubscriptions.length, 1)
guarantor.forget(sub)
assert.equal(guarantor.pendingSubscriptions.length, 0)
})
})
})