Skip to content

Commit

Permalink
Sliding Sync: Reset forgotten status when membership changes (like …
Browse files Browse the repository at this point in the history
…rejoining a room) (#17835)

Reset `sliding_sync_membership_snapshots` -> `forgotten` status when
membership changes (like rejoining a room).

Fix #17781

### What was the problem before?

Previously, if someone used `/forget` on one of their rooms, it would
update `sliding_sync_membership_snapshots` as expected but when someone
rejoined the room (or had any membership change), the upsert didn't
overwrite and reset the `forgotten` status so it remained `forgotten`
and invisible down the Sliding Sync endpoint.
  • Loading branch information
MadLittleMods authored Oct 22, 2024
1 parent 80ad02e commit a5e16a4
Show file tree
Hide file tree
Showing 10 changed files with 433 additions and 4 deletions.
1 change: 1 addition & 0 deletions changelog.d/17835.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a bug in [MSC4186](https://github.com/matrix-org/matrix-spec-proposals/pull/4186) Sliding Sync that would cause rooms to stay forgotten and hidden even after rejoining.
11 changes: 9 additions & 2 deletions synapse/storage/databases/main/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -1863,10 +1863,10 @@ def _update_current_state_txn(
txn.execute_batch(
f"""
INSERT INTO sliding_sync_membership_snapshots
(room_id, user_id, sender, membership_event_id, membership, event_stream_ordering, event_instance_name
(room_id, user_id, sender, membership_event_id, membership, forgotten, event_stream_ordering, event_instance_name
{("," + ", ".join(sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""})
VALUES (
?, ?, ?, ?, ?,
?, ?, ?, ?, ?, ?,
(SELECT stream_ordering FROM events WHERE event_id = ?),
(SELECT COALESCE(instance_name, 'master') FROM events WHERE event_id = ?)
{("," + ", ".join("?" for _ in sliding_sync_snapshot_values)) if sliding_sync_snapshot_values else ""}
Expand All @@ -1876,6 +1876,7 @@ def _update_current_state_txn(
sender = EXCLUDED.sender,
membership_event_id = EXCLUDED.membership_event_id,
membership = EXCLUDED.membership,
forgotten = EXCLUDED.forgotten,
event_stream_ordering = EXCLUDED.event_stream_ordering
{("," + ", ".join(f"{key} = EXCLUDED.{key}" for key in sliding_sync_snapshot_keys)) if sliding_sync_snapshot_keys else ""}
""",
Expand All @@ -1886,6 +1887,9 @@ def _update_current_state_txn(
membership_info.sender,
membership_info.membership_event_id,
membership_info.membership,
# Since this is a new membership, it isn't forgotten anymore (which
# matches how Synapse currently thinks about the forgotten status)
0,
# XXX: We do not use `membership_info.membership_event_stream_ordering` here
# because it is an unreliable value. See XXX note above.
membership_info.membership_event_id,
Expand Down Expand Up @@ -2901,6 +2905,9 @@ def _store_room_members_txn(
"sender": event.sender,
"membership_event_id": event.event_id,
"membership": event.membership,
# Since this is a new membership, it isn't forgotten anymore (which
# matches how Synapse currently thinks about the forgotten status)
"forgotten": 0,
"event_stream_ordering": event.internal_metadata.stream_ordering,
"event_instance_name": event.internal_metadata.instance_name,
}
Expand Down
118 changes: 118 additions & 0 deletions synapse/storage/databases/main/events_bg_updates.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,12 @@ def __init__(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
self._sliding_sync_membership_snapshots_bg_update,
)
# Add a background update to fix data integrity issue in the
# `sliding_sync_membership_snapshots` -> `forgotten` column
self.db_pool.updates.register_background_update_handler(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
self._sliding_sync_membership_snapshots_fix_forgotten_column_bg_update,
)

# We want this to run on the main database at startup before we start processing
# events.
Expand Down Expand Up @@ -2429,6 +2435,118 @@ def _fill_table_txn(txn: LoggingTransaction) -> None:

return len(memberships_to_update_rows)

async def _sliding_sync_membership_snapshots_fix_forgotten_column_bg_update(
self, progress: JsonDict, batch_size: int
) -> int:
"""
Background update to update the `sliding_sync_membership_snapshots` ->
`forgotten` column to be in sync with the `room_memberships` table.
Because of previously flawed code (now fixed); any room that someone has
forgotten and subsequently re-joined or had any new membership on, we need to go
and update the column to match the `room_memberships` table as it has fallen out
of sync.
"""
last_event_stream_ordering = progress.get(
"last_event_stream_ordering", -(1 << 31)
)

def _txn(
txn: LoggingTransaction,
) -> int:
"""
Returns:
The number of rows updated.
"""

# To simplify things, we can just recheck any row in
# `sliding_sync_membership_snapshots` with `forgotten=1`
txn.execute(
"""
SELECT
s.room_id,
s.user_id,
s.membership_event_id,
s.event_stream_ordering,
m.forgotten
FROM sliding_sync_membership_snapshots AS s
INNER JOIN room_memberships AS m ON (s.membership_event_id = m.event_id)
WHERE s.event_stream_ordering > ?
AND s.forgotten = 1
ORDER BY s.event_stream_ordering ASC
LIMIT ?
""",
(last_event_stream_ordering, batch_size),
)

memberships_to_update_rows = cast(
List[Tuple[str, str, str, int, int]],
txn.fetchall(),
)
if not memberships_to_update_rows:
return 0

# Assemble the values to update
#
# (room_id, user_id)
key_values: List[Tuple[str, str]] = []
# (forgotten,)
value_values: List[Tuple[int]] = []
for (
room_id,
user_id,
_membership_event_id,
_event_stream_ordering,
forgotten,
) in memberships_to_update_rows:
key_values.append(
(
room_id,
user_id,
)
)
value_values.append((forgotten,))

# Update all of the rows in one go
self.db_pool.simple_update_many_txn(
txn,
table="sliding_sync_membership_snapshots",
key_names=("room_id", "user_id"),
key_values=key_values,
value_names=("forgotten",),
value_values=value_values,
)

# Update the progress
(
_room_id,
_user_id,
_membership_event_id,
event_stream_ordering,
_forgotten,
) = memberships_to_update_rows[-1]
self.db_pool.updates._background_update_progress_txn(
txn,
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE,
{
"last_event_stream_ordering": event_stream_ordering,
},
)

return len(memberships_to_update_rows)

num_rows = await self.db_pool.runInteraction(
"_sliding_sync_membership_snapshots_fix_forgotten_column_bg_update",
_txn,
)

if not num_rows:
await self.db_pool.updates._end_background_update(
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE
)

return num_rows


def _resolve_stale_data_in_sliding_sync_tables(
txn: LoggingTransaction,
Expand Down
1 change: 1 addition & 0 deletions synapse/storage/databases/main/roommember.py
Original file line number Diff line number Diff line change
Expand Up @@ -1375,6 +1375,7 @@ def f(txn: LoggingTransaction) -> None:
keyvalues={"user_id": user_id, "room_id": room_id},
updatevalues={"forgotten": 1},
)
# Handle updating the `sliding_sync_membership_snapshots` table
self.db_pool.simple_update_txn(
txn,
table="sliding_sync_membership_snapshots",
Expand Down
2 changes: 2 additions & 0 deletions synapse/storage/schema/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,8 @@
Changes in SCHEMA_VERSION = 88
- MSC4140: Add `delayed_events` table that keeps track of events that are to
be posted in response to a resettable timeout or an on-demand action.
- Add background update to fix data integrity issue in the
`sliding_sync_membership_snapshots` -> `forgotten` column
"""


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2024 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Add a background update to update the `sliding_sync_membership_snapshots` ->
-- `forgotten` column to be in sync with the `room_memberships` table.
--
-- For any room that someone has forgotten and subsequently re-joined or had any new
-- membership on, we need to go and update the column to match the `room_memberships`
-- table as it has fallen out of sync.
INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
(8802, 'sliding_sync_membership_snapshots_fix_forgotten_column_bg_update', '{}');
3 changes: 3 additions & 0 deletions synapse/types/storage/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,6 @@ class _BackgroundUpdates:
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE = (
"sliding_sync_membership_snapshots_bg_update"
)
SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_FIX_FORGOTTEN_COLUMN_BG_UPDATE = (
"sliding_sync_membership_snapshots_fix_forgotten_column_bg_update"
)
115 changes: 113 additions & 2 deletions tests/rest/client/sliding_sync/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ def _create_remote_invite_room_for_user(
self,
invitee_user_id: str,
unsigned_invite_room_state: Optional[List[StrippedStateEvent]],
invite_room_id: Optional[str] = None,
) -> str:
"""
Create a fake invite for a remote room and persist it.
Expand All @@ -252,19 +253,23 @@ def _create_remote_invite_room_for_user(
invitee_user_id: The person being invited
unsigned_invite_room_state: List of stripped state events to assist the
receiver in identifying the room.
invite_room_id: Optional remote room ID to be invited to. When unset, we
will generate one.
Returns:
The room ID of the remote invite room
"""
store = self.hs.get_datastores().main

invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
if invite_room_id is None:
invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"

invite_event_dict = {
"room_id": invite_room_id,
"sender": "@inviter:remote_server",
"state_key": invitee_user_id,
"depth": 1,
# Just keep advancing the depth
"depth": self._remote_invite_count,
"origin_server_ts": 1,
"type": EventTypes.Member,
"content": {"membership": Membership.INVITE},
Expand Down Expand Up @@ -679,6 +684,112 @@ def test_forgotten_up_to_date(self) -> None:
exact=True,
)

def test_rejoin_forgotten_room(self) -> None:
"""
Make sure we can see a forgotten room again if we rejoin (or any new membership
like an invite) (no longer forgotten)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")
user2_id = self.register_user("user2", "pass")
user2_tok = self.login(user2_id, "pass")

room_id = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
# User1 joins the room
self.helper.join(room_id, user1_id, tok=user1_tok)

# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We should see the room (like normal)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

# Leave and forget the room
self.helper.leave(room_id, user1_id, tok=user1_tok)
# User1 forgets the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)

# Re-join the room
self.helper.join(room_id, user1_id, tok=user1_tok)

# We should see the room again after re-joining
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

def test_invited_to_forgotten_remote_room(self) -> None:
"""
Make sure we can see a forgotten room again if we are invited again
(remote/federated out-of-band memberships)
"""
user1_id = self.register_user("user1", "pass")
user1_tok = self.login(user1_id, "pass")

# Create a remote room invite (out-of-band membership)
room_id = self._create_remote_invite_room_for_user(user1_id, None)

# Make the Sliding Sync request
sync_body = {
"lists": {
"foo-list": {
"ranges": [[0, 99]],
"required_state": [],
"timeline_limit": 0,
}
}
}
response_body, from_token = self.do_sync(sync_body, tok=user1_tok)
# We should see the room (like normal)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

# Leave and forget the room
self.helper.leave(room_id, user1_id, tok=user1_tok)
# User1 forgets the room
channel = self.make_request(
"POST",
f"/_matrix/client/r0/rooms/{room_id}/forget",
content={},
access_token=user1_tok,
)
self.assertEqual(channel.code, 200, channel.result)

# Get invited to the room again
# self.helper.join(room_id, user1_id, tok=user1_tok)
self._create_remote_invite_room_for_user(user1_id, None, invite_room_id=room_id)

# We should see the room again after re-joining
response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
self.assertIncludes(
set(response_body["lists"]["foo-list"]["ops"][0]["room_ids"]),
{room_id},
exact=True,
)

def test_ignored_user_invites_initial_sync(self) -> None:
"""
Make sure we ignore invites if they are from one of the `m.ignored_user_list` on
Expand Down
Loading

0 comments on commit a5e16a4

Please sign in to comment.