Skip to content

Commit

Permalink
fix(JingleSessionPC): call task callbacks upon clearing
Browse files Browse the repository at this point in the history
  • Loading branch information
athouary authored and subhamcyara committed Jul 19, 2024
1 parent 5fe70f0 commit 53a8608
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 1 deletion.
24 changes: 24 additions & 0 deletions modules/util/AsyncQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@ import { queue } from 'async-es';

const logger = getLogger(__filename);

/**
* Error to be passed to a callback of a queued task when the queue is cleared.
*/
export class ClearedQueueError extends Error {
/**
* Creates new instance.
*/
constructor(message) {
super(message);
this.name = 'ClearedQueueError';
}
}

/**
* A queue for async task execution.
*/
Expand All @@ -13,12 +26,20 @@ export default class AsyncQueue {
constructor() {
this._queue = queue(this._processQueueTasks.bind(this), 1);
this._stopped = false;
this._taskCallbacks = new Map();
}

/**
* Removes any pending tasks from the queue.
*/
clear() {
for (const finishedCallback of this._taskCallbacks.values()) {
try {
finishedCallback(new ClearedQueueError('The queue has been cleared'));
} catch (error) {
logger.error('Error in callback while clearing the queue:', error);
}
}
this._queue.kill();
}

Expand All @@ -31,6 +52,8 @@ export default class AsyncQueue {
} catch (error) {
logger.error(`Task failed: ${error?.stack}`);
finishedCallback(error);
} finally {
this._taskCallbacks.delete(task);
}
}

Expand Down Expand Up @@ -64,6 +87,7 @@ export default class AsyncQueue {

return;
}
this._taskCallbacks.set(task, callback);
this._queue.push(task, callback);
}

Expand Down
30 changes: 29 additions & 1 deletion modules/xmpp/JingleSessionPC.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import SDP from '../sdp/SDP';
import SDPDiffer from '../sdp/SDPDiffer';
import SDPUtil from '../sdp/SDPUtil';
import Statistics from '../statistics/statistics';
import AsyncQueue from '../util/AsyncQueue';
import AsyncQueue, { ClearedQueueError } from '../util/AsyncQueue';
import GlobalOnErrorHandler from '../util/GlobalOnErrorHandler';

import browser from './../browser';
Expand Down Expand Up @@ -1320,6 +1320,13 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} ICE restart task aborted: session terminated`);
success();

return;
}
logger.error(`${this} ICE restart task failed: ${error}`);
failure(error);
} else {
Expand Down Expand Up @@ -2193,6 +2200,13 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} renegotiation after addTrack aborted: session terminated`);
resolve();

return;
}
logger.error(`${this} renegotiation after addTrack error`, error);
reject(error);
} else {
Expand Down Expand Up @@ -2305,6 +2319,13 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug('Replace track aborted: session terminated');
resolve();

return;
}
logger.error(`${this} Replace track error:`, error);
reject(error);
} else {
Expand Down Expand Up @@ -2503,6 +2524,13 @@ export default class JingleSessionPC extends JingleSession {
workFunction,
error => {
if (error) {
if (error instanceof ClearedQueueError) {
// The session might have been terminated before the task was executed, making it obsolete.
logger.debug(`${this} ${operationName} aborted: session terminated`);
resolve();

return;
}
logger.error(`${this} ${operationName} failed`);
reject(error);
} else {
Expand Down
4 changes: 4 additions & 0 deletions types/hand-crafted/modules/util/AsyncQueue.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
export class ClearedQueueError extends Error {
constructor();
}

export default class AsyncQueue {
constructor();
push: ( task: ( callback: ( err?: Error ) => void ) => void, callback?: ( err: Error ) => void ) => void; // TODO: check this
Expand Down

0 comments on commit 53a8608

Please sign in to comment.