Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MatrixRTC: refactor MatrixRTCSession MemberManager API #4610

56 changes: 25 additions & 31 deletions spec/unit/matrixrtc/MatrixRTCSession.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
import { encodeBase64, EventType, MatrixClient, MatrixError, MatrixEvent, Room } from "../../../src";
import { KnownMembership } from "../../../src/@types/membership";
import { DEFAULT_EXPIRE_DURATION, SessionMembershipData } from "../../../src/matrixrtc/CallMembership";
import { MembershipManager } from "../../../src/matrixrtc/MembershipManager";
import { MatrixRTCSession, MatrixRTCSessionEvent } from "../../../src/matrixrtc/MatrixRTCSession";
import { EncryptionKeysEventContent } from "../../../src/matrixrtc/types";
import { randomString } from "../../../src/randomstring";
import { flushPromises } from "../../test-utils/flushPromises";
import { makeMockRoom, makeMockRoomState, membershipTemplate } from "./mocks";

const mockFocus = { type: "mock" };
Expand Down Expand Up @@ -236,16 +236,15 @@ describe("MatrixRTCSession", () => {
});

async function testSession(membershipData: SessionMembershipData): Promise<void> {
const makeNewMembershipSpy = jest.spyOn(MembershipManager.prototype as any, "makeNewMembership");
sess = MatrixRTCSession.roomSessionForRoom(client, makeMockRoom(membershipData));

sess.joinRoomSession([mockFocus], mockFocus, joinSessionConfig);
await Promise.race([sentStateEvent, new Promise((resolve) => setTimeout(resolve, 500))]);

expect(makeNewMembershipSpy).toHaveBeenCalledTimes(1);
expect(sendStateEventMock).toHaveBeenCalledTimes(1);

await Promise.race([sentDelayedState, new Promise((resolve) => setTimeout(resolve, 500))]);
expect(client._unstable_sendDelayedStateEvent).toHaveBeenCalledTimes(1);
expect(sendDelayedStateMock).toHaveBeenCalledTimes(1);
}

it("sends events", async () => {
Expand Down Expand Up @@ -323,9 +322,11 @@ describe("MatrixRTCSession", () => {
let sendStateEventMock: jest.Mock;
let sendDelayedStateMock: jest.Mock;
let sendEventMock: jest.Mock;
let updateDelayedEventMock: jest.Mock;

let sentStateEvent: Promise<void>;
let sentDelayedState: Promise<void>;
let updatedDelayedEvent: Promise<void>;

beforeEach(() => {
sentStateEvent = new Promise((resolve) => {
Expand All @@ -339,12 +340,15 @@ describe("MatrixRTCSession", () => {
};
});
});
updatedDelayedEvent = new Promise((r) => {
updateDelayedEventMock = jest.fn(r);
});
sendEventMock = jest.fn();
client.sendStateEvent = sendStateEventMock;
client._unstable_sendDelayedStateEvent = sendDelayedStateMock;
client.sendEvent = sendEventMock;

client._unstable_updateDelayedEvent = jest.fn();
client._unstable_updateDelayedEvent = updateDelayedEventMock;

mockRoom = makeMockRoom([]);
sess = MatrixRTCSession.roomSessionForRoom(client, mockRoom);
Expand Down Expand Up @@ -482,19 +486,7 @@ describe("MatrixRTCSession", () => {
membershipServerSideExpiryTimeout: 9000,
});

// needed to advance the mock timers properly
// depends on myMembershipManager being created
const scheduledDelayDisconnection = new Promise<void>((resolve) => {
const membershipManager = (sess as any).membershipManager;
const originalFn: () => void = membershipManager.scheduleDelayDisconnection;
membershipManager.scheduleDelayDisconnection = jest.fn(() => {
originalFn.call(membershipManager);
resolve();
});
});

await sendDelayedStateExceedAttempt.then(); // needed to resolve after the send attempt catches

await sendDelayedStateAttempt;
const callProps = (d: number) => {
return [mockRoom!.roomId, { delay: d }, "org.matrix.msc3401.call.member", {}, userStateKey];
Expand Down Expand Up @@ -525,11 +517,13 @@ describe("MatrixRTCSession", () => {
await sentDelayedState;

// should have prepared the heartbeat to keep delaying the leave event while still connected
await scheduledDelayDisconnection;
// should have tried updating the delayed leave to test that it wasn't replaced by own state
await updatedDelayedEvent;
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(1);
// should update delayed disconnect

// ensures that we reach the code that schedules the timeout for the next delay update before we advance the timers.
await flushPromises();
jest.advanceTimersByTime(5000);
// should update delayed disconnect
expect(client._unstable_updateDelayedEvent).toHaveBeenCalledTimes(2);

jest.useRealTimers();
Expand Down Expand Up @@ -561,7 +555,7 @@ describe("MatrixRTCSession", () => {

const onMembershipsChanged = jest.fn();
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

expect(onMembershipsChanged).not.toHaveBeenCalled();
});
Expand All @@ -574,7 +568,7 @@ describe("MatrixRTCSession", () => {
sess.on(MatrixRTCSessionEvent.MembershipsChanged, onMembershipsChanged);

mockRoom.getLiveTimeline().getState = jest.fn().mockReturnValue(makeMockRoomState([], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

expect(onMembershipsChanged).toHaveBeenCalled();
});
Expand Down Expand Up @@ -763,7 +757,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

// member2 re-joins which should trigger an immediate re-send
const keysSentPromise2 = new Promise<EncryptionKeysEventContent>((resolve) => {
Expand All @@ -772,7 +766,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
// but, that immediate resend is throttled so we need to wait a bit
jest.advanceTimersByTime(1000);
const { keys } = await keysSentPromise2;
Expand Down Expand Up @@ -825,7 +819,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await keysSentPromise2;

Expand Down Expand Up @@ -879,7 +873,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();

// these should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);
expect(sess!.statistics.counters.roomEventEncryptionKeysSent).toEqual(1);
} finally {
Expand Down Expand Up @@ -933,7 +927,7 @@ describe("MatrixRTCSession", () => {
sendEventMock.mockClear();

// this should be a no-op:
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();
expect(sendEventMock).toHaveBeenCalledTimes(0);

// advance time to avoid key throttling
Expand All @@ -947,7 +941,7 @@ describe("MatrixRTCSession", () => {
});

// this should re-send the key
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await keysSentPromise2;

Expand Down Expand Up @@ -1010,7 +1004,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

jest.advanceTimersByTime(10000);

Expand Down Expand Up @@ -1055,7 +1049,7 @@ describe("MatrixRTCSession", () => {
);
}

sess!.onMembershipUpdate();
sess!.onRTCSessionMemberUpdate();

// advance time to avoid key throttling
jest.advanceTimersByTime(10000);
Expand Down Expand Up @@ -1096,7 +1090,7 @@ describe("MatrixRTCSession", () => {
mockRoom.getLiveTimeline().getState = jest
.fn()
.mockReturnValue(makeMockRoomState([membershipTemplate, member2], mockRoom.roomId));
sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

await new Promise((resolve) => {
realSetTimeout(resolve);
Expand Down
71 changes: 50 additions & 21 deletions src/matrixrtc/MatrixRTCSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import { decodeBase64, encodeUnpaddedBase64 } from "../base64.ts";
import { KnownMembership } from "../@types/membership.ts";
import { MatrixError, safeGetRetryAfterMs } from "../http-api/errors.ts";
import { MatrixEvent } from "../models/event.ts";
import { MembershipManager } from "./MembershipManager.ts";
import { LegacyMembershipManager, IMembershipManager } from "./MembershipManager.ts";

const logger = rootLogger.getChild("MatrixRTCSession");

Expand Down Expand Up @@ -132,7 +132,7 @@ export type JoinSessionConfig = MembershipConfig & EncryptionConfig;
* This class doesn't deal with media at all, just membership & properties of a session.
*/
export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, MatrixRTCSessionEventHandlerMap> {
private membershipManager?: MembershipManager;
private membershipManager?: IMembershipManager;

// The session Id of the call, this is the call_id of the call Member event.
private _callId: string | undefined;
Expand Down Expand Up @@ -283,7 +283,8 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
super();
this._callId = memberships[0]?.callId;
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.on(RoomStateEvent.Members, this.onMembershipUpdate);
// TODO: double check if this is actually needed. Should be covered by refreshRoom in MatrixRTCSessionManager
roomState?.on(RoomStateEvent.Members, this.onRoomMemberUpdate);
this.setExpiryTimer();
}

Expand All @@ -299,14 +300,13 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* Performs cleanup & removes timers for client shutdown
*/
public async stop(): Promise<void> {
await this.membershipManager?.leaveRoomSession(1000);
await this.membershipManager?.leave(1000);
if (this.expiryTimeout) {
clearTimeout(this.expiryTimeout);
this.expiryTimeout = undefined;
}
this.membershipManager?.stop();
const roomState = this.room.getLiveTimeline().getState(EventTimeline.FORWARDS);
roomState?.off(RoomStateEvent.Members, this.onMembershipUpdate);
roomState?.off(RoomStateEvent.Members, this.onRoomMemberUpdate);
}

/**
Expand All @@ -324,24 +324,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
* @param joinConfig - Additional configuration for the joined session.
*/
public joinRoomSession(fociPreferred: Focus[], fociActive?: Focus, joinConfig?: JoinSessionConfig): void {
this.joinConfig = joinConfig;
if (this.isJoined()) {
logger.info(`Already joined to session in room ${this.room.roomId}: ignoring join call`);
return;
} else {
this.membershipManager = new MembershipManager(joinConfig, this.room, this.client, () =>
this.membershipManager = new LegacyMembershipManager(joinConfig, this.room, this.client, () =>
this.getOldestMembership(),
);
}
this.joinConfig = joinConfig;
this.membershipManager!.join(fociPreferred, fociActive);
this.manageMediaKeys = joinConfig?.manageMediaKeys ?? this.manageMediaKeys;
// TODO: it feels wrong to be doing `setJoined()` and then `joinRoomSession()` non-atomically
// A new api between MembershipManager and the session will need to be defined.
this.membershipManager.setJoined(fociPreferred, fociActive);
if (joinConfig?.manageMediaKeys) {
this.makeNewSenderKey();
this.requestSendCurrentKey();
}
this.membershipManager.joinRoomSession();
this.emit(MatrixRTCSessionEvent.JoinStateChanged, true);
}

Expand Down Expand Up @@ -383,12 +380,17 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M

logger.info(`Leaving call session in room ${this.room.roomId}`);
this.joinConfig = undefined;
this.membershipManager!.setLeft();
this.manageMediaKeys = false;
const leavePromise = this.membershipManager!.leave(timeout);
this.emit(MatrixRTCSessionEvent.JoinStateChanged, false);
return await this.membershipManager!.leaveRoomSession(timeout);
return await leavePromise;
}

/**
* Get the active focus from the current CallMemberState event
* @returns The focus that is currently in use to connect to this session. This is undefined
* if the client is not connected to this session.
*/
public getActiveFocus(): Focus | undefined {
return this.membershipManager?.getActiveFocus();
}
Expand Down Expand Up @@ -650,14 +652,21 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
}

if (soonestExpiry != undefined) {
this.expiryTimeout = setTimeout(this.onMembershipUpdate, soonestExpiry);
this.expiryTimeout = setTimeout(this.onRTCSessionMemberUpdate, soonestExpiry);
}
}

public getOldestMembership(): CallMembership | undefined {
return this.memberships[0];
}

/**
* This method is used when the user is not yet connected to the Session but wants to know what focus
* the users in the session are using to make a decision how it wants/should connect.
*
* See also `getActiveFocus`
* @returns The focus which should be used when joining this session.
*/
public getFocusInUse(): Focus | undefined {
const oldestMembership = this.getOldestMembership();
if (oldestMembership?.getFocusSelection() === "oldest_membership") {
Expand Down Expand Up @@ -746,11 +755,35 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
m.sender === this.client.getUserId() && m.deviceId === this.client.getDeviceId();

/**
* @deprecated use onRoomMemberUpdate or onRTCSessionMemberUpdate instead. this should be called when any membership in the call is updated
* the old name might have implied to only need to call this when your own membership changes.
*/
public onMembershipUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when the room members have changed.
toger5 marked this conversation as resolved.
Show resolved Hide resolved
*/
public onRoomMemberUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when sth changed that impacts the current rtc members in this session.
toger5 marked this conversation as resolved.
Show resolved Hide resolved
*/
public onRTCSessionMemberUpdate = (): void => {
this.recalculateSessionMembers();
};

/**
* Call this when anything that could impact rtc memberships has changed: Room Members or RTC members.
*
* Examines the latest call memberships and handles any encryption key sending or rotation that is needed.
*
* This function should be called when the room members or call memberships might have changed.
*/
public onMembershipUpdate = (): void => {
private recalculateSessionMembers = (): void => {
const oldMemberships = this.memberships;
this.memberships = MatrixRTCSession.callMembershipsForRoom(this.room);

Expand All @@ -764,11 +797,7 @@ export class MatrixRTCSession extends TypedEventEmitter<MatrixRTCSessionEvent, M
logger.info(`Memberships for call in room ${this.room.roomId} have changed: emitting`);
this.emit(MatrixRTCSessionEvent.MembershipsChanged, oldMemberships, this.memberships);

if (this.isJoined() && !this.memberships.some(this.isMyMembership)) {
logger.warn("Missing own membership: force re-join");
// TODO: Should this be awaited? And is there anything to tell the focus?
this.membershipManager?.triggerCallMembershipEventUpdate();
}
this.membershipManager?.onRTCSessionMemberUpdate(this.memberships);
}

if (this.manageMediaKeys && this.isJoined()) {
Expand Down
2 changes: 1 addition & 1 deletion src/matrixrtc/MatrixRTCSessionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ export class MatrixRTCSessionManager extends TypedEventEmitter<MatrixRTCSessionM

const wasActiveAndKnown = sess.memberships.length > 0 && !isNewSession;

sess.onMembershipUpdate();
sess.onRTCSessionMemberUpdate();

const nowActive = sess.memberships.length > 0;

Expand Down
Loading
Loading