From 53a860850e6a6496f620d4e0d9bbe0e78fe6e2d4 Mon Sep 17 00:00:00 2001 From: Auxane Thouary <32353355+athouary@users.noreply.github.com> Date: Mon, 23 Oct 2023 13:28:17 +0200 Subject: [PATCH] fix(JingleSessionPC): call task callbacks upon clearing --- modules/util/AsyncQueue.js | 24 +++++++++++++++ modules/xmpp/JingleSessionPC.js | 30 ++++++++++++++++++- .../hand-crafted/modules/util/AsyncQueue.d.ts | 4 +++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/modules/util/AsyncQueue.js b/modules/util/AsyncQueue.js index b7883c3f50..e03b1ddea6 100644 --- a/modules/util/AsyncQueue.js +++ b/modules/util/AsyncQueue.js @@ -3,6 +3,19 @@ import { queue } from 'async-es'; const logger = getLogger(__filename); +/** + * Error to be passed to a callback of a queued task when the queue is cleared. + */ +export class ClearedQueueError extends Error { + /** + * Creates new instance. + */ + constructor(message) { + super(message); + this.name = 'ClearedQueueError'; + } +} + /** * A queue for async task execution. */ @@ -13,12 +26,20 @@ export default class AsyncQueue { constructor() { this._queue = queue(this._processQueueTasks.bind(this), 1); this._stopped = false; + this._taskCallbacks = new Map(); } /** * Removes any pending tasks from the queue. */ clear() { + for (const finishedCallback of this._taskCallbacks.values()) { + try { + finishedCallback(new ClearedQueueError('The queue has been cleared')); + } catch (error) { + logger.error('Error in callback while clearing the queue:', error); + } + } this._queue.kill(); } @@ -31,6 +52,8 @@ export default class AsyncQueue { } catch (error) { logger.error(`Task failed: ${error?.stack}`); finishedCallback(error); + } finally { + this._taskCallbacks.delete(task); } } @@ -64,6 +87,7 @@ export default class AsyncQueue { return; } + this._taskCallbacks.set(task, callback); this._queue.push(task, callback); } diff --git a/modules/xmpp/JingleSessionPC.js b/modules/xmpp/JingleSessionPC.js index f5d24099ae..70b526ca85 100644 --- a/modules/xmpp/JingleSessionPC.js +++ b/modules/xmpp/JingleSessionPC.js @@ -17,7 +17,7 @@ import SDP from '../sdp/SDP'; import SDPDiffer from '../sdp/SDPDiffer'; import SDPUtil from '../sdp/SDPUtil'; import Statistics from '../statistics/statistics'; -import AsyncQueue from '../util/AsyncQueue'; +import AsyncQueue, { ClearedQueueError } from '../util/AsyncQueue'; import GlobalOnErrorHandler from '../util/GlobalOnErrorHandler'; import browser from './../browser'; @@ -1320,6 +1320,13 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} ICE restart task aborted: session terminated`); + success(); + + return; + } logger.error(`${this} ICE restart task failed: ${error}`); failure(error); } else { @@ -2193,6 +2200,13 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} renegotiation after addTrack aborted: session terminated`); + resolve(); + + return; + } logger.error(`${this} renegotiation after addTrack error`, error); reject(error); } else { @@ -2305,6 +2319,13 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug('Replace track aborted: session terminated'); + resolve(); + + return; + } logger.error(`${this} Replace track error:`, error); reject(error); } else { @@ -2503,6 +2524,13 @@ export default class JingleSessionPC extends JingleSession { workFunction, error => { if (error) { + if (error instanceof ClearedQueueError) { + // The session might have been terminated before the task was executed, making it obsolete. + logger.debug(`${this} ${operationName} aborted: session terminated`); + resolve(); + + return; + } logger.error(`${this} ${operationName} failed`); reject(error); } else { diff --git a/types/hand-crafted/modules/util/AsyncQueue.d.ts b/types/hand-crafted/modules/util/AsyncQueue.d.ts index ac45836be3..fd11370ea7 100644 --- a/types/hand-crafted/modules/util/AsyncQueue.d.ts +++ b/types/hand-crafted/modules/util/AsyncQueue.d.ts @@ -1,3 +1,7 @@ +export class ClearedQueueError extends Error { + constructor(); +} + export default class AsyncQueue { constructor(); push: ( task: ( callback: ( err?: Error ) => void ) => void, callback?: ( err: Error ) => void ) => void; // TODO: check this