From 9e34f3e075ff62ef17daf8dbf1b0823d99f0fb7e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Dec 2023 15:39:19 +0000 Subject: [PATCH 1/5] Remove 'order' field --- synapse/events/__init__.py | 1 - synapse/handlers/room.py | 4 +++- synapse/storage/databases/main/stream.py | 1 - 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 80e17f0fb0d..a5a5c04782b 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -216,7 +216,6 @@ def __init__(self, internal_metadata_dict: JsonDict): # be here before: DictProperty[RoomStreamToken] = DictProperty("before") after: DictProperty[RoomStreamToken] = DictProperty("after") - order: DictProperty[Tuple[int, int]] = DictProperty("order") def get_dict(self) -> JsonDict: return dict(self._dict) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index e78e598d5eb..fc5151957e2 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1742,7 +1742,9 @@ async def get_new_events( events = list(room_events) events.extend(e for evs, _ in room_to_events.values() for e in evs) - events.sort(key=lambda e: e.internal_metadata.order) + # We know stream_ordering must be not None here, as its been + # persisted, but mypy doesn't know that + events.sort(key=lambda e: e.internal_metadata.stream_ordering or 0) if limit: events[:] = events[:limit] diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 1b2a65bed2b..ae1c5f634ab 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1117,7 +1117,6 @@ def _set_before_and_after( internal = event.internal_metadata internal.before = RoomStreamToken(topological=topo, stream=stream - 1) internal.after = RoomStreamToken(topological=topo, stream=stream) - internal.order = (int(topo) if topo else 0, int(stream)) async def get_events_around( self, From de020cb7c8c834b67a7bad97727f8f877a7f356f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Dec 2023 15:44:49 +0000 Subject: [PATCH 2/5] Remove 'before' and 'after' fields --- synapse/events/__init__.py | 8 +----- synapse/handlers/admin.py | 7 +++++- synapse/handlers/room.py | 7 +++++- synapse/handlers/sync.py | 10 ++++++-- synapse/storage/databases/main/stream.py | 32 ------------------------ 5 files changed, 21 insertions(+), 43 deletions(-) diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index a5a5c04782b..c52e7266617 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -42,7 +42,7 @@ from synapse.api.constants import RelationTypes from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions -from synapse.types import JsonDict, RoomStreamToken, StrCollection +from synapse.types import JsonDict, StrCollection from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze from synapse.util.stringutils import strtobool @@ -211,12 +211,6 @@ def __init__(self, internal_metadata_dict: JsonDict): device_id: DictProperty[str] = DictProperty("device_id") """The device ID of the user who sent this event, if any.""" - # XXX: These are set by StreamWorkerStore._set_before_and_after. - # I'm pretty sure that these are never persisted to the database, so shouldn't - # be here - before: DictProperty[RoomStreamToken] = DictProperty("before") - after: DictProperty[RoomStreamToken] = DictProperty("after") - def get_dict(self) -> JsonDict: return dict(self._dict) diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 9a4af3c45fe..db80345b945 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -208,7 +208,12 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> if not events: break - from_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + from_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + topological=last_event.depth, + ) events = await filter_events_for_client( self._storage_controllers, user_id, events diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index fc5151957e2..160f983777b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1750,7 +1750,12 @@ async def get_new_events( events[:] = events[:limit] if events: - end_key = events[-1].internal_metadata.after + last_event = events[-1] + assert last_event.internal_metadata.stream_ordering + end_key = RoomStreamToken( + stream=last_event.internal_metadata.stream_ordering, + topological=last_event.depth, + ) else: end_key = to_key diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 1152c0158f7..0385c04bc24 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -601,7 +601,10 @@ async def _load_filtered_recents( if not limited or block_all_timeline: prev_batch_token = upto_token if recents: - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace( StreamKeyType.ROOM, room_key ) @@ -689,7 +692,10 @@ async def _load_filtered_recents( if len(recents) > timeline_limit: limited = True recents = recents[-timeline_limit:] - room_key = recents[0].internal_metadata.before + assert recents[0].internal_metadata.stream_ordering + room_key = RoomStreamToken( + stream=recents[0].internal_metadata.stream_ordering - 1 + ) prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ae1c5f634ab..aeeb74b46d5 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -705,8 +705,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - if order.lower() == "desc": ret.reverse() @@ -793,8 +791,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]: [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(ret, rows, topo_order=False) - return ret async def get_recent_events_for_room( @@ -820,8 +816,6 @@ async def get_recent_events_for_room( [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token async def get_recent_event_ids_for_room( @@ -1094,30 +1088,6 @@ def _get_max_topological_txn(self, txn: LoggingTransaction, room_id: str) -> int # `[(None,)]` return rows[0][0] if rows[0][0] is not None else 0 - @staticmethod - def _set_before_and_after( - events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True - ) -> None: - """Inserts ordering information to events' internal metadata from - the DB rows. - - Args: - events - rows - topo_order: Whether the events were ordered topologically or by stream - ordering. If true then all rows should have a non null - topological_ordering. - """ - for event, row in zip(events, rows): - stream = row.stream_ordering - if topo_order and row.topological_ordering: - topo: Optional[int] = row.topological_ordering - else: - topo = None - internal = event.internal_metadata - internal.before = RoomStreamToken(topological=topo, stream=stream - 1) - internal.after = RoomStreamToken(topological=topo, stream=stream) - async def get_events_around( self, room_id: str, @@ -1558,8 +1528,6 @@ async def paginate_room_events( [r.event_id for r in rows], get_prev_content=True ) - self._set_before_and_after(events, rows) - return events, token @cached() From 78183d74b502c33a491fed2687ef786ac6118bc2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 22 Dec 2023 16:00:16 +0000 Subject: [PATCH 3/5] Newsfile --- changelog.d/16762.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/16762.misc diff --git a/changelog.d/16762.misc b/changelog.d/16762.misc new file mode 100644 index 00000000000..c49dc2085e0 --- /dev/null +++ b/changelog.d/16762.misc @@ -0,0 +1 @@ +Simplify event internal metadata class. From d42000a2fac72a236df32efcf29d8507f6e3ffa7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 23 Dec 2023 11:51:37 +0000 Subject: [PATCH 4/5] Should not be a topological token --- synapse/handlers/room.py | 1 - 1 file changed, 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 160f983777b..97239b1b491 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1754,7 +1754,6 @@ async def get_new_events( assert last_event.internal_metadata.stream_ordering end_key = RoomStreamToken( stream=last_event.internal_metadata.stream_ordering, - topological=last_event.depth, ) else: end_key = to_key From 208c28791e00c20dccd3d2788a74de04bebb261b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 5 Jan 2024 13:01:15 +0000 Subject: [PATCH 5/5] Update synapse/handlers/room.py Co-authored-by: reivilibre --- synapse/handlers/room.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 97239b1b491..41b00a5cf7b 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1744,7 +1744,7 @@ async def get_new_events( # We know stream_ordering must be not None here, as its been # persisted, but mypy doesn't know that - events.sort(key=lambda e: e.internal_metadata.stream_ordering or 0) + events.sort(key=lambda e: cast(int, e.internal_metadata.stream_ordering)) if limit: events[:] = events[:limit]