From 7697338c7e5762efecc6fde9ed247f24d82aafab Mon Sep 17 00:00:00 2001 From: Timo <16718859+toger5@users.noreply.github.com> Date: Fri, 10 Jan 2025 11:46:28 +0100 Subject: [PATCH] MatrixRTC: move MatrixRTCSession logic into LocalMembershipManager (#4608) * split joinConfig - myMembership related properties get moved into its own interface * Add MyMembershipManager * Remove methods and functions that are from MatrixRTCSession (they now live in MyMembershipManager) * Refactor MatrixRTCSession to use myMembershipManager * fix tests * review * get rid of more memberhsip manager usage in tests * review - fix tests using private membershipManager props * fix circular import --- spec/unit/matrixrtc/MatrixRTCSession.spec.ts | 36 +- src/matrixrtc/MatrixRTCSession.ts | 372 +++---------------- src/matrixrtc/MembershipManager.ts | 336 +++++++++++++++++ 3 files changed, 399 insertions(+), 345 deletions(-) create mode 100644 src/matrixrtc/MembershipManager.ts diff --git a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts index 4bcd23ae8d..fd843a0e85 100644 --- a/spec/unit/matrixrtc/MatrixRTCSession.spec.ts +++ b/spec/unit/matrixrtc/MatrixRTCSession.spec.ts @@ -16,7 +16,8 @@ limitations under the License. import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src"; import { KnownMembership } from "../../../src/@types/membership"; -import { SessionMembershipData, DEFAULT_EXPIRE_DURATION } from "../../../src/matrixrtc/CallMembership"; +import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership"; +import { MembershipManager } from "../../../src/matrixrtc/MembershipManager"; import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession"; import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types"; import { randomString } from "../../../src/randomstring"; @@ -235,14 +236,13 @@ describe("MatrixRTCSession", () => { }); async function testSession(membershipData: SessionMembershipData): Promise { + const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership"); sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData)); - const makeNewMembershipMock = jest.spyOn(sess as any, "makeNewMembership"); - sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig); await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]); - expect(makeNewMembershipMock).toHaveBeenCalledTimes(1); + expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1); await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]); expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1); @@ -422,7 +422,6 @@ describe("MatrixRTCSession", () => { type: "livekit", }, }, - "_@alice:example.org_AAAAAAA", ); await Promise.race([sentDelayedState, new Promise((resolve) => realSetTimeout(resolve, 500))]); @@ -454,6 +453,7 @@ describe("MatrixRTCSession", () => { }); }); + const userStateKey = `${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`; // preparing the delayed disconnect should handle ratelimiting const sendDelayedStateAttempt = new Promise((resolve) => { const error = new MatrixError({ errcode: "M_LIMIT_EXCEEDED" }); @@ -478,24 +478,30 @@ describe("MatrixRTCSession", () => { }); }); + sess!.joinRoomSession([activeFocusConfig], activeFocus, { + membershipServerSideExpiryTimeout: 9000, + }); + // needed to advance the mock timers properly + // depends on myMembershipManager being created const scheduledDelayDisconnection = new Promise((resolve) => { - const originalFn: () => void = (sess as any).scheduleDelayDisconnection; - (sess as any).scheduleDelayDisconnection = jest.fn(() => { - originalFn.call(sess); + const membershipManager = (sess as any).membershipManager; + const originalFn: () => void = membershipManager.scheduleDelayDisconnection; + membershipManager.scheduleDelayDisconnection = jest.fn(() => { + originalFn.call(membershipManager); resolve(); }); }); - sess!.joinRoomSession([activeFocusConfig], activeFocus, { - membershipServerSideExpiryTimeout: 9000, - }); - - expect(sess).toHaveProperty("membershipServerSideExpiryTimeout", 9000); await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches - expect(sess).toHaveProperty("membershipServerSideExpiryTimeout", 7500); await sendDelayedStateAttempt; + const callProps = (d: number) => { + return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey]; + }; + expect(client._unstable_sendDelayedStateEvent).toHaveBeenNthCalledWith(1, ...callProps(9000)); + expect(client._unstable_sendDelayedStateEvent).toHaveBeenNthCalledWith(2, ...callProps(7500)); + jest.advanceTimersByTime(5000); await sendStateEventAttempt.then(); // needed to resolve after resendIfRateLimited catches @@ -514,7 +520,7 @@ describe("MatrixRTCSession", () => { foci_preferred: [activeFocusConfig], focus_active: activeFocus, } satisfies SessionMembershipData, - `${!useOwnedStateEvents ? "_" : ""}@alice:example.org_AAAAAAA`, + userStateKey, ); await sentDelayedState; diff --git a/src/matrixrtc/MatrixRTCSession.ts b/src/matrixrtc/MatrixRTCSession.ts index c31f3a1763..e8d6e1f430 100644 --- a/src/matrixrtc/MatrixRTCSession.ts +++ b/src/matrixrtc/MatrixRTCSession.ts @@ -20,18 +20,16 @@ import { EventTimeline } from "../models/event-timeline.ts"; import { Room } from "../models/room.ts"; import { MatrixClient } from "../client.ts"; import { EventType } from "../@types/event.ts"; -import { UpdateDelayedEventAction } from "../@types/requests.ts"; -import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "./CallMembership.ts"; +import { CallMembership } from "./CallMembership.ts"; import { RoomStateEvent } from "../models/room-state.ts"; import { Focus } from "./focus.ts"; import { secureRandomBase64Url } from "../randomstring.ts"; import { EncryptionKeysEventContent } from "./types.ts"; import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts"; import { KnownMembership } from "../@types/membership.ts"; -import { HTTPError, MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; +import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts"; import { MatrixEvent } from "../models/event.ts"; -import { isLivekitFocusActive } from "./LivekitFocus.ts"; -import { sleep } from "../utils.ts"; +import { MembershipManager } from "./MembershipManager.ts"; const logger = rootLogger.getChild("MatrixRTCSession"); @@ -67,14 +65,7 @@ export type MatrixRTCSessionEventHandlerMap = { ) => void; }; -export interface JoinSessionConfig { - /** - * If true, generate and share a media key for this participant, - * and emit MatrixRTCSessionEvent.EncryptionKeyChanged when - * media keys for other participants become available. - */ - manageMediaKeys?: boolean; - +export interface MembershipConfig { /** * The timeout (in milliseconds) after we joined the call, that our membership should expire * unless we have explicitly updated it. @@ -94,24 +85,38 @@ export interface JoinSessionConfig { callMemberEventRetryDelayMinimum?: number; /** - * The jitter (in milliseconds) which is added to callMemberEventRetryDelayMinimum before retrying - * sending the membership event. e.g. if this is set to 1000, then a random delay of between 0 and 1000 - * milliseconds will be added. + * The timeout (in milliseconds) with which the deleayed leave event on the server is configured. + * After this time the server will set the event to the disconnected stat if it has not received a keep-alive from the client. */ - callMemberEventRetryJitter?: number; + membershipServerSideExpiryTimeout?: number; + /** + * The interval (in milliseconds) in which the client will send membership keep-alives to the server. + */ + membershipKeepAlivePeriod?: number; + + /** + * @deprecated It should be possible to make it stable without this. + */ + callMemberEventRetryJitter?: number; +} +export interface EncryptionConfig { + /** + * If true, generate and share a media key for this participant, + * and emit MatrixRTCSessionEvent.EncryptionKeyChanged when + * media keys for other participants become available. + */ + manageMediaKeys?: boolean; /** * The minimum time (in milliseconds) between each attempt to send encryption key(s). * e.g. if this is set to 1000, then we will send at most one key event every second. */ updateEncryptionKeyThrottle?: number; - /** * The delay (in milliseconds) after a member leaves before we create and publish a new key, because people * tend to leave calls at the same time. */ makeKeyDelay?: number; - /** * The delay (in milliseconds) between creating and sending a new key and starting to encrypt with it. This * gives other a chance to receive the new key to minimise the chance they don't get media they can't decrypt. @@ -119,40 +124,22 @@ export interface JoinSessionConfig { * makeKeyDelay + useKeyDelay */ useKeyDelay?: number; - - /** - * The timeout (in milliseconds) after which the server will consider the membership to have expired if it - * has not received a keep-alive from the client. - */ - membershipServerSideExpiryTimeout?: number; - - /** - * The period (in milliseconds) that the client will send membership keep-alives to the server. - */ - membershipKeepAlivePeriod?: number; } +export type JoinSessionConfig = MembershipConfig & EncryptionConfig; /** * A MatrixRTCSession manages the membership & properties of a MatrixRTC session. * This class doesn't deal with media at all, just membership & properties of a session. */ export class MatrixRTCSession extends TypedEventEmitter { + private membershipManager?: MembershipManager; + // The session Id of the call, this is the call_id of the call Member event. private _callId: string | undefined; - private relativeExpiry: number | undefined; - // undefined means not yet joined private joinConfig?: JoinSessionConfig; - private get membershipExpiryTimeout(): number { - return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION; - } - - private get callMemberEventRetryDelayMinimum(): number { - return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000; - } - private get updateEncryptionKeyThrottle(): number { return this.joinConfig?.updateEncryptionKeyThrottle ?? 3_000; } @@ -165,49 +152,16 @@ export class MatrixRTCSession extends TypedEventEmitter; private expiryTimeout?: ReturnType; private keysEventUpdateTimeout?: ReturnType; private makeNewKeyTimeout?: ReturnType; private setNewKeyTimeouts = new Set>(); - // This is a Focus with the specified fields for an ActiveFocus (e.g. LivekitFocusActive for type="livekit") - private ownFocusActive?: Focus; - // This is a Foci array that contains the Focus objects this user is aware of and proposes to use. - private ownFociPreferred?: Focus[]; - - private updateCallMembershipRunning = false; - private needCallMembershipUpdate = false; - private manageMediaKeys = false; // userId:deviceId => array of (key, timestamp) private encryptionKeys = new Map>(); private lastEncryptionKeyUpdateRequest?: number; - private disconnectDelayId: string | undefined; - // We use this to store the last membership fingerprints we saw, so we can proactively re-send encryption keys // if it looks like a membership has been updated. private lastMembershipFingerprints: Set | undefined; @@ -338,22 +292,19 @@ export class MatrixRTCSession extends TypedEventEmitter { - await this.leaveRoomSession(1000); + await this.membershipManager?.leaveRoomSession(1000); if (this.expiryTimeout) { clearTimeout(this.expiryTimeout); this.expiryTimeout = undefined; } - if (this.memberEventTimeout) { - clearTimeout(this.memberEventTimeout); - this.memberEventTimeout = undefined; - } + this.membershipManager?.stop(); const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS); roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate); } @@ -376,22 +327,21 @@ export class MatrixRTCSession extends TypedEventEmitter + this.getOldestMembership(), + ); } - - this.ownFocusActive = fociActive; - this.ownFociPreferred = fociPreferred; this.joinConfig = joinConfig; - this.relativeExpiry = this.membershipExpiryTimeout; this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys; - - logger.info(`Joining call session in room ${this.room.roomId} with manageMediaKeys=${this.manageMediaKeys}`); + // TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically + // A new api between MembershipManager and the session will need to be defined. + this.membershipManager.setJoined(fociPreferred, fociActive); if (joinConfig?.manageMediaKeys) { this.makeNewSenderKey(); this.requestSendCurrentKey(); } - // We don't wait for this, mostly because it may fail and schedule a retry, so this - // function returning doesn't really mean anything at all. - this.triggerCallMembershipEventUpdate(); + this.membershipManager.joinRoomSession(); this.emit(MatrixRTCSessionEvent.JoinStateChanged, true); } @@ -433,35 +383,14 @@ export class MatrixRTCSession extends TypedEventEmitter => { - // TODO: Should this await on a shared promise? - if (this.updateCallMembershipRunning) { - this.needCallMembershipUpdate = true; - return; - } - - this.updateCallMembershipRunning = true; - try { - // if anything triggers an update while the update is running, do another update afterwards - do { - this.needCallMembershipUpdate = false; - await this.updateCallMembershipEvent(); - } while (this.needCallMembershipUpdate); - } finally { - this.updateCallMembershipRunning = false; - } - }; - - private async updateCallMembershipEvent(): Promise { - if (this.memberEventTimeout) { - clearTimeout(this.memberEventTimeout); - this.memberEventTimeout = undefined; - } - - const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS); - if (!roomState) throw new Error("Couldn't get room state for room " + this.room.roomId); - - const localUserId = this.client.getUserId(); - const localDeviceId = this.client.getDeviceId(); - if (!localUserId || !localDeviceId) throw new Error("User ID or device ID was null!"); - - let newContent: {} | SessionMembershipData = {}; - // TODO: implement expiry logic to MSC4143 events - // previously we checked here if the event is timed out and scheduled a check if not. - // maybe there is a better way. - newContent = this.makeNewMembership(localDeviceId); - - try { - if (this.isJoined()) { - const stateKey = this.makeMembershipStateKey(localUserId, localDeviceId); - const prepareDelayedDisconnection = async (): Promise => { - try { - const res = await resendIfRateLimited(() => - this.client._unstable_sendDelayedStateEvent( - this.room.roomId, - { - delay: this.membershipServerSideExpiryTimeout, - }, - EventType.GroupCallMemberPrefix, - {}, // leave event - stateKey, - ), - ); - this.disconnectDelayId = res.delay_id; - } catch (e) { - if ( - e instanceof MatrixError && - e.errcode === "M_UNKNOWN" && - e.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED" - ) { - const maxDelayAllowed = e.data["org.matrix.msc4140.max_delay"]; - if ( - typeof maxDelayAllowed === "number" && - this.membershipServerSideExpiryTimeout > maxDelayAllowed - ) { - this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed; - return prepareDelayedDisconnection(); - } - } - logger.error("Failed to prepare delayed disconnection event:", e); - } - }; - await prepareDelayedDisconnection(); - // Send join event _after_ preparing the delayed disconnection event - await resendIfRateLimited(() => - this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey), - ); - // If sending state cancels your own delayed state, prepare another delayed state - // TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state - if (this.disconnectDelayId !== undefined) { - try { - const knownDisconnectDelayId = this.disconnectDelayId; - await resendIfRateLimited(() => - this.client._unstable_updateDelayedEvent( - knownDisconnectDelayId, - UpdateDelayedEventAction.Restart, - ), - ); - } catch (e) { - if (e instanceof MatrixError && e.errcode === "M_NOT_FOUND") { - // If we get a M_NOT_FOUND we prepare a new delayed event. - // In other error cases we do not want to prepare anything since we do not have the guarantee, that the - // future is not still running. - logger.warn("Failed to update delayed disconnection event, prepare it again:", e); - this.disconnectDelayId = undefined; - await prepareDelayedDisconnection(); - } - } - } - if (this.disconnectDelayId !== undefined) { - this.scheduleDelayDisconnection(); - } - } else { - // Not joined - let sentDelayedDisconnect = false; - if (this.disconnectDelayId !== undefined) { - try { - const knownDisconnectDelayId = this.disconnectDelayId; - await resendIfRateLimited(() => - this.client._unstable_updateDelayedEvent( - knownDisconnectDelayId, - UpdateDelayedEventAction.Send, - ), - ); - sentDelayedDisconnect = true; - } catch (e) { - logger.error("Failed to send our delayed disconnection event:", e); - } - this.disconnectDelayId = undefined; - } - if (!sentDelayedDisconnect) { - await resendIfRateLimited(() => - this.client.sendStateEvent( - this.room.roomId, - EventType.GroupCallMemberPrefix, - {}, - this.makeMembershipStateKey(localUserId, localDeviceId), - ), - ); - } - } - logger.info("Sent updated call member event."); - } catch (e) { - const resendDelay = this.callMemberEventRetryDelayMinimum + Math.random() * this.callMemberEventRetryJitter; - logger.warn(`Failed to send call member event (retrying in ${resendDelay}): ${e}`); - await sleep(resendDelay); - await this.triggerCallMembershipEventUpdate(); - } - } - - private scheduleDelayDisconnection(): void { - this.memberEventTimeout = setTimeout(this.delayDisconnection, this.membershipKeepAlivePeriod); - } - - private readonly delayDisconnection = async (): Promise => { - try { - const knownDisconnectDelayId = this.disconnectDelayId!; - await resendIfRateLimited(() => - this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart), - ); - this.scheduleDelayDisconnection(); - } catch (e) { - logger.error("Failed to delay our disconnection event:", e); - } - }; - - private makeMembershipStateKey(localUserId: string, localDeviceId: string): string { - const stateKey = `${localUserId}_${localDeviceId}`; - if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) { - return stateKey; - } else { - return `_${stateKey}`; - } - } - private onRotateKeyTimeout = (): void => { if (!this.manageMediaKeys) return; @@ -1096,31 +836,3 @@ export class MatrixRTCSession extends TypedEventEmitter(func: () => Promise, numRetriesAllowed: number = 1): Promise { - // eslint-disable-next-line no-constant-condition - while (true) { - try { - return await func(); - } catch (e) { - if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) { - numRetriesAllowed--; - let resendDelay: number; - const defaultMs = 5000; - try { - resendDelay = e.getRetryAfterMs() ?? defaultMs; - logger.info(`Rate limited by server, retrying in ${resendDelay}ms`); - } catch (e) { - logger.warn( - `Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`, - e, - ); - resendDelay = defaultMs; - } - await sleep(resendDelay); - } else { - throw e; - } - } - } -} diff --git a/src/matrixrtc/MembershipManager.ts b/src/matrixrtc/MembershipManager.ts new file mode 100644 index 0000000000..04d46b3178 --- /dev/null +++ b/src/matrixrtc/MembershipManager.ts @@ -0,0 +1,336 @@ +import { EventType } from "../@types/event.ts"; +import { UpdateDelayedEventAction } from "../@types/requests.ts"; +import type { MatrixClient } from "../client.ts"; +import { HTTPError, MatrixError } from "../http-api/errors.ts"; +import { logger } from "../logger.ts"; +import { EventTimeline } from "../models/event-timeline.ts"; +import { Room } from "../models/room.ts"; +import { sleep } from "../utils.ts"; +import { CallMembership, DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "./CallMembership.ts"; +import { Focus } from "./focus.ts"; +import { isLivekitFocusActive } from "./LivekitFocus.ts"; +import { MembershipConfig } from "./MatrixRTCSession.ts"; + +/** + * This internal class is used by the MatrixRTCSession to manage the local user's own membership of the session. + * @internal + */ +export class MembershipManager { + private relativeExpiry: number | undefined; + + public constructor( + private joinConfig: MembershipConfig | undefined, + private room: Room, + private client: MatrixClient, + private getOldestMembership: () => CallMembership | undefined, + ) {} + private memberEventTimeout?: ReturnType; + + /** + * This is a Foci array that contains the Focus objects this user is aware of and proposes to use. + */ + private ownFociPreferred?: Focus[]; + /** + * This is a Focus with the specified fields for an ActiveFocus (e.g. LivekitFocusActive for type="livekit") + */ + private ownFocusActive?: Focus; + + private updateCallMembershipRunning = false; + private needCallMembershipUpdate = false; + /** + * If the server disallows the configured {@link membershipServerSideExpiryTimeout}, + * this stores a delay that the server does allow. + */ + private membershipServerSideExpiryTimeoutOverride?: number; + private disconnectDelayId: string | undefined; + + private get callMemberEventRetryDelayMinimum(): number { + return this.joinConfig?.callMemberEventRetryDelayMinimum ?? 3_000; + } + private get membershipExpiryTimeout(): number { + return this.joinConfig?.membershipExpiryTimeout ?? DEFAULT_EXPIRE_DURATION; + } + private get membershipServerSideExpiryTimeout(): number { + return ( + this.membershipServerSideExpiryTimeoutOverride ?? + this.joinConfig?.membershipServerSideExpiryTimeout ?? + 8_000 + ); + } + + private get membershipKeepAlivePeriod(): number { + return this.joinConfig?.membershipKeepAlivePeriod ?? 5_000; + } + + private get callMemberEventRetryJitter(): number { + return this.joinConfig?.callMemberEventRetryJitter ?? 2_000; + } + public joinRoomSession(): void { + // We don't wait for this, mostly because it may fail and schedule a retry, so this + // function returning doesn't really mean anything at all. + this.triggerCallMembershipEventUpdate(); + } + public setJoined(fociPreferred: Focus[], fociActive?: Focus): void { + this.ownFocusActive = fociActive; + this.ownFociPreferred = fociPreferred; + this.relativeExpiry = this.membershipExpiryTimeout; + } + public setLeft(): void { + this.relativeExpiry = undefined; + this.ownFocusActive = undefined; + } + public async leaveRoomSession(timeout: number | undefined = undefined): Promise { + if (timeout) { + // The sleep promise returns the string 'timeout' and the membership update void + // A success implies that the membership update was quicker then the timeout. + const raceResult = await Promise.race([this.triggerCallMembershipEventUpdate(), sleep(timeout, "timeout")]); + return raceResult !== "timeout"; + } else { + await this.triggerCallMembershipEventUpdate(); + return true; + } + } + public stop(): void { + if (this.memberEventTimeout) { + clearTimeout(this.memberEventTimeout); + this.memberEventTimeout = undefined; + } + } + public triggerCallMembershipEventUpdate = async (): Promise => { + // TODO: Should this await on a shared promise? + if (this.updateCallMembershipRunning) { + this.needCallMembershipUpdate = true; + return; + } + + this.updateCallMembershipRunning = true; + try { + // if anything triggers an update while the update is running, do another update afterwards + do { + this.needCallMembershipUpdate = false; + await this.updateCallMembershipEvent(); + } while (this.needCallMembershipUpdate); + } finally { + this.updateCallMembershipRunning = false; + } + }; + private makeNewMembership(deviceId: string): SessionMembershipData | {} { + // If we're joined, add our own + if (this.isJoined()) { + return this.makeMyMembership(deviceId); + } + return {}; + } + /* + * Returns true if we intend to be participating in the MatrixRTC session. + * This is determined by checking if the relativeExpiry has been set. + */ + public isJoined(): boolean { + return this.relativeExpiry !== undefined; + } + /** + * Constructs our own membership + */ + private makeMyMembership(deviceId: string): SessionMembershipData { + return { + call_id: "", + scope: "m.room", + application: "m.call", + device_id: deviceId, + expires: this.relativeExpiry, + focus_active: { type: "livekit", focus_selection: "oldest_membership" }, + foci_preferred: this.ownFociPreferred ?? [], + }; + } + + public getActiveFocus(): Focus | undefined { + if (this.ownFocusActive && isLivekitFocusActive(this.ownFocusActive)) { + // A livekit active focus + if (this.ownFocusActive.focus_selection === "oldest_membership") { + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } else { + // We do not understand the membership format (could be legacy). We default to oldestMembership + // Once there are other methods this is a hard error! + const oldestMembership = this.getOldestMembership(); + return oldestMembership?.getPreferredFoci()[0]; + } + } + public async updateCallMembershipEvent(): Promise { + if (this.memberEventTimeout) { + clearTimeout(this.memberEventTimeout); + this.memberEventTimeout = undefined; + } + + const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS); + if (!roomState) throw new Error("Couldn't get room state for room " + this.room.roomId); + + const localUserId = this.client.getUserId(); + const localDeviceId = this.client.getDeviceId(); + if (!localUserId || !localDeviceId) throw new Error("User ID or device ID was null!"); + + let newContent: {} | SessionMembershipData = {}; + // TODO: add back expiary logic to non-legacy events + // previously we checked here if the event is timed out and scheduled a check if not. + // maybe there is a better way. + newContent = this.makeNewMembership(localDeviceId); + + try { + if (this.isJoined()) { + const stateKey = this.makeMembershipStateKey(localUserId, localDeviceId); + const prepareDelayedDisconnection = async (): Promise => { + try { + const res = await resendIfRateLimited(() => + this.client._unstable_sendDelayedStateEvent( + this.room.roomId, + { + delay: this.membershipServerSideExpiryTimeout, + }, + EventType.GroupCallMemberPrefix, + {}, // leave event + stateKey, + ), + ); + logger.log("BEFOER:", this.disconnectDelayId); + this.disconnectDelayId = res.delay_id; + logger.log("AFTER:", this.disconnectDelayId); + } catch (e) { + if ( + e instanceof MatrixError && + e.errcode === "M_UNKNOWN" && + e.data["org.matrix.msc4140.errcode"] === "M_MAX_DELAY_EXCEEDED" + ) { + const maxDelayAllowed = e.data["org.matrix.msc4140.max_delay"]; + if ( + typeof maxDelayAllowed === "number" && + this.membershipServerSideExpiryTimeout > maxDelayAllowed + ) { + this.membershipServerSideExpiryTimeoutOverride = maxDelayAllowed; + return prepareDelayedDisconnection(); + } + } + logger.error("Failed to prepare delayed disconnection event:", e); + } + }; + await prepareDelayedDisconnection(); + // Send join event _after_ preparing the delayed disconnection event + await resendIfRateLimited(() => + this.client.sendStateEvent(this.room.roomId, EventType.GroupCallMemberPrefix, newContent, stateKey), + ); + // If sending state cancels your own delayed state, prepare another delayed state + // TODO: Remove this once MSC4140 is stable & doesn't cancel own delayed state + if (this.disconnectDelayId !== undefined) { + try { + const knownDisconnectDelayId = this.disconnectDelayId; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Restart, + ), + ); + } catch (e) { + if (e instanceof MatrixError && e.errcode === "M_NOT_FOUND") { + // If we get a M_NOT_FOUND we prepare a new delayed event. + // In other error cases we do not want to prepare anything since we do not have the guarantee, that the + // future is not still running. + logger.warn("Failed to update delayed disconnection event, prepare it again:", e); + this.disconnectDelayId = undefined; + await prepareDelayedDisconnection(); + } + } + } + if (this.disconnectDelayId !== undefined) { + this.scheduleDelayDisconnection(); + } + } else { + // Not joined + let sentDelayedDisconnect = false; + if (this.disconnectDelayId !== undefined) { + try { + const knownDisconnectDelayId = this.disconnectDelayId; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent( + knownDisconnectDelayId, + UpdateDelayedEventAction.Send, + ), + ); + sentDelayedDisconnect = true; + } catch (e) { + logger.error("Failed to send our delayed disconnection event:", e); + } + this.disconnectDelayId = undefined; + } + if (!sentDelayedDisconnect) { + await resendIfRateLimited(() => + this.client.sendStateEvent( + this.room.roomId, + EventType.GroupCallMemberPrefix, + {}, + this.makeMembershipStateKey(localUserId, localDeviceId), + ), + ); + } + } + logger.info("Sent updated call member event."); + } catch (e) { + const resendDelay = this.callMemberEventRetryDelayMinimum + Math.random() * this.callMemberEventRetryJitter; + logger.warn(`Failed to send call member event (retrying in ${resendDelay}): ${e}`); + await sleep(resendDelay); + await this.triggerCallMembershipEventUpdate(); + } + } + + private scheduleDelayDisconnection(): void { + this.memberEventTimeout = setTimeout(this.delayDisconnection, this.membershipKeepAlivePeriod); + } + + private readonly delayDisconnection = async (): Promise => { + try { + const knownDisconnectDelayId = this.disconnectDelayId!; + await resendIfRateLimited(() => + this.client._unstable_updateDelayedEvent(knownDisconnectDelayId, UpdateDelayedEventAction.Restart), + ); + this.scheduleDelayDisconnection(); + } catch (e) { + logger.error("Failed to delay our disconnection event:", e); + } + }; + + private makeMembershipStateKey(localUserId: string, localDeviceId: string): string { + const stateKey = `${localUserId}_${localDeviceId}`; + if (/^org\.matrix\.msc(3757|3779)\b/.exec(this.room.getVersion())) { + return stateKey; + } else { + return `_${stateKey}`; + } + } +} + +async function resendIfRateLimited(func: () => Promise, numRetriesAllowed: number = 1): Promise { + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await func(); + } catch (e) { + if (numRetriesAllowed > 0 && e instanceof HTTPError && e.isRateLimitError()) { + numRetriesAllowed--; + let resendDelay: number; + const defaultMs = 5000; + try { + resendDelay = e.getRetryAfterMs() ?? defaultMs; + logger.info(`Rate limited by server, retrying in ${resendDelay}ms`); + } catch (e) { + logger.warn( + `Error while retrieving a rate-limit retry delay, retrying after default delay of ${defaultMs}`, + e, + ); + resendDelay = defaultMs; + } + await sleep(resendDelay); + } else { + throw e; + } + } + } +}