diff --git a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts index df998c2d1ac..96b1db9b031 100644 --- a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts +++ b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts @@ -472,14 +472,54 @@ describe("MatrixRTCSession", () => { const activeFocus = { type: "livekit", focus_selection: "oldest_membership" }; async function testJoin(useOwnedStateEvents: boolean): Promise { - const realSetTimeout = setTimeout; if (useOwnedStateEvents) { - mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3779.default"); + mockRoom.getVersion = jest.fn().mockReturnValue("org.matrix.msc3757.default"); } jest.useFakeTimers(); + + // preparing the delayed disconnect should handle ratelimiting + const sendDelayedStateAttempt = new Promise((resolve) => { + const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" }); + sendDelayedStateMock.mockImplementationOnce(() => { + resolve(); + return Promise.reject(error); + }); + }); + + // setting the membership state should handle ratelimiting (also with a retry-after value) + const sendStateEventAttempt = new Promise((resolve) => { + const error = new MatrixError( + { errcode: "M_LIMIT_EXCEEDED" }, + 429, + undefined, + undefined, + new Headers({ "Retry-After": "1" }), + ); + sendStateEventMock.mockImplementationOnce(() => { + resolve(); + return Promise.reject(error); + }); + }); + + // needed to advance the mock timers properly + const scheduledDelayDisconnection = new Promise((resolve) => { + const originalFn: () => void = (sess as any).scheduleDelayDisconnection; + (sess as any).scheduleDelayDisconnection = jest.fn(() => { + originalFn.call(sess); + resolve(); + }); + }); + sess!.joinRoomSession([activeFocusConfig], activeFocus, { useLegacyMemberEvents: false }); - await Promise.race([sentStateEvent, new Promise((resolve) => realSetTimeout(resolve, 500))]); + + await sendDelayedStateAttempt; + jest.advanceTimersByTime(5000); + + await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches + jest.advanceTimersByTime(1000); + + await sentStateEvent; expect(client.sendStateEvent).toHaveBeenCalledWith( mockRoom!.roomId, EventType.GroupCallMemberPrefix, @@ -493,9 +533,10 @@ describe("MatrixRTCSession", () => { } satisfies SessionMembershipData, `${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`, ); - await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]); - expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); + await sentDelayedState; + // should have prepared the heartbeat to keep delaying the leave event while still connected + await scheduledDelayDisconnection; // should have tried updating the delayed leave to test that it wasn't replaced by own state expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1); // should update delayed disconnect diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index 52258ea53a2..a317495a95d 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -34,7 +34,7 @@ import { randomString, secureRandomBase64Url } from "../randomstring.ts"; import { EncryptionKeysEventContent } from "./types.ts"; import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts"; import { KnownMembership } from "../@types/membership.ts"; -import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; +import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; import { MatrixEvent } from "../models/event.ts"; import { isLivekitFocusActive } from "./LivekitFocus.ts"; import { ExperimentalGroupCallRoomMemberState } from "../webrtc/groupCall.ts"; @@ -1031,39 +1031,39 @@ export class MatrixRTCSession extends TypedEventEmitter => { try { // TODO: If delayed event times out, re-join! - const res = await this.client._unstable_sendDelayedStateEvent( - this.room.roomId, - { - delay: 8000, - }, - EventType.GroupCallMemberPrefix, - {}, // leave event - stateKey, + const res = await resendIfRateLimited(() => + this.client._unstable_sendDelayedStateEvent( + this.room.roomId, + { + delay: 8000, + }, + EventType.GroupCallMemberPrefix, + {}, // leave event + stateKey, + ), ); this.disconnectDelayId = res.delay_id; } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to prepare delayed disconnection event:", e); } }; await prepareDelayedDisconnection(); // Send join event _after_ preparing the delayed disconnection event - await this.client.sendStateEvent( - this.room.roomId, - EventType.GroupCallMemberPrefix, - newContent, - stateKey, + await resendIfRateLimited(() => + this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey), ); // If sending state cancels your own delayed state, prepare another delayed state // TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state if (this.disconnectDelayId !== undefined) { try { - await this.client._unstable_updateDelayedEvent( - this.disconnectDelayId, - UpdateDelayedEventAction.Restart, + const knownDisconnectDelayId = this.disconnectDelayId; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Restart, + ), ); } catch (e) { - // TODO: Make embedded client include errcode, and retry only if not M_NOT_FOUND (or rate-limited) logger.warn("Failed to update delayed disconnection event, prepare it again:", e); this.disconnectDelayId = undefined; await prepareDelayedDisconnection(); @@ -1076,23 +1076,27 @@ export class MatrixRTCSession extends TypedEventEmitter + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Send, + ), ); sentDelayedDisconnect = true; } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to send our delayed disconnection event:", e); } this.disconnectDelayId = undefined; } if (!sentDelayedDisconnect) { - await this.client.sendStateEvent( - this.room.roomId, - EventType.GroupCallMemberPrefix, - {}, - this.makeMembershipStateKey(localUserId, localDeviceId), + await resendIfRateLimited(() => + this.client.sendStateEvent( + this.room.roomId, + EventType.GroupCallMemberPrefix, + {}, + this.makeMembershipStateKey(localUserId, localDeviceId), + ), ); } } @@ -1111,10 +1115,12 @@ export class MatrixRTCSession extends TypedEventEmitter => { try { - await this.client._unstable_updateDelayedEvent(this.disconnectDelayId!, UpdateDelayedEventAction.Restart); + const knownDisconnectDelayId = this.disconnectDelayId!; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart), + ); this.scheduleDelayDisconnection(); } catch (e) { - // TODO: Retry if rate-limited logger.error("Failed to delay our disconnection event:", e); } }; @@ -1162,3 +1168,31 @@ export class MatrixRTCSession extends TypedEventEmitter(func: () => Promise, numRetriesAllowed: number = 1): Promise { + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await func(); + } catch (e) { + if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) { + numRetriesAllowed--; + let resendDelay: number; + const defaultMs = 5000; + try { + resendDelay = e.getRetryAfterMs() ?? defaultMs; + logger.info(`Rate limited by server, retrying in ${resendDelay}ms`); + } catch (e) { + logger.warn( + `Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`, + e, + ); + resendDelay = defaultMs; + } + await sleep(resendDelay); + } else { + throw e; + } + } + } +}