Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Simplify internal metadata class. #16762

Merged
merged 5 commits into from
Jan 5, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/16762.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Simplify event internal metadata class.
9 changes: 1 addition & 8 deletions synapse/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

from synapse.api.constants import RelationTypes
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
from synapse.types import JsonDict, RoomStreamToken, StrCollection
from synapse.types import JsonDict, StrCollection
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
from synapse.util.stringutils import strtobool
Expand Down Expand Up @@ -211,13 +211,6 @@ def __init__(self, internal_metadata_dict: JsonDict):
device_id: DictProperty[str] = DictProperty("device_id")
"""The device ID of the user who sent this event, if any."""

# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
# be here
before: DictProperty[RoomStreamToken] = DictProperty("before")
after: DictProperty[RoomStreamToken] = DictProperty("after")
order: DictProperty[Tuple[int, int]] = DictProperty("order")

def get_dict(self) -> JsonDict:
return dict(self._dict)

Expand Down
7 changes: 6 additions & 1 deletion synapse/handlers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,12 @@ async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") ->
if not events:
break

from_key = events[-1].internal_metadata.after
last_event = events[-1]
assert last_event.internal_metadata.stream_ordering
from_key = RoomStreamToken(
stream=last_event.internal_metadata.stream_ordering,
topological=last_event.depth,
)

events = await filter_events_for_client(
self._storage_controllers, user_id, events
Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/room.py
Original file line number Diff line number Diff line change
Expand Up @@ -1742,13 +1742,19 @@ async def get_new_events(
events = list(room_events)
events.extend(e for evs, _ in room_to_events.values() for e in evs)

events.sort(key=lambda e: e.internal_metadata.order)
# We know stream_ordering must be not None here, as its been
# persisted, but mypy doesn't know that
events.sort(key=lambda e: e.internal_metadata.stream_ordering or 0)
erikjohnston marked this conversation as resolved.
Show resolved Hide resolved

if limit:
events[:] = events[:limit]

if events:
end_key = events[-1].internal_metadata.after
last_event = events[-1]
assert last_event.internal_metadata.stream_ordering
end_key = RoomStreamToken(
stream=last_event.internal_metadata.stream_ordering,
)
else:
end_key = to_key

Expand Down
10 changes: 8 additions & 2 deletions synapse/handlers/sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,10 @@ async def _load_filtered_recents(
if not limited or block_all_timeline:
prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
assert recents[0].internal_metadata.stream_ordering
room_key = RoomStreamToken(
stream=recents[0].internal_metadata.stream_ordering - 1
)
prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
Expand Down Expand Up @@ -689,7 +692,10 @@ async def _load_filtered_recents(
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
assert recents[0].internal_metadata.stream_ordering
room_key = RoomStreamToken(
stream=recents[0].internal_metadata.stream_ordering - 1
)

prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)

Expand Down
33 changes: 0 additions & 33 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -705,8 +705,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
[r.event_id for r in rows], get_prev_content=True
)

self._set_before_and_after(ret, rows, topo_order=False)

if order.lower() == "desc":
ret.reverse()

Expand Down Expand Up @@ -793,8 +791,6 @@ def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
[r.event_id for r in rows], get_prev_content=True
)

self._set_before_and_after(ret, rows, topo_order=False)

return ret

async def get_recent_events_for_room(
Expand All @@ -820,8 +816,6 @@ async def get_recent_events_for_room(
[r.event_id for r in rows], get_prev_content=True
)

self._set_before_and_after(events, rows)

return events, token

async def get_recent_event_ids_for_room(
Expand Down Expand Up @@ -1094,31 +1088,6 @@ def _get_max_topological_txn(self, txn: LoggingTransaction, room_id: str) -> int
# `[(None,)]`
return rows[0][0] if rows[0][0] is not None else 0

@staticmethod
def _set_before_and_after(
events: List[EventBase], rows: List[_EventDictReturn], topo_order: bool = True
) -> None:
"""Inserts ordering information to events' internal metadata from
the DB rows.

Args:
events
rows
topo_order: Whether the events were ordered topologically or by stream
ordering. If true then all rows should have a non null
topological_ordering.
"""
for event, row in zip(events, rows):
stream = row.stream_ordering
if topo_order and row.topological_ordering:
topo: Optional[int] = row.topological_ordering
else:
topo = None
internal = event.internal_metadata
internal.before = RoomStreamToken(topological=topo, stream=stream - 1)
internal.after = RoomStreamToken(topological=topo, stream=stream)
internal.order = (int(topo) if topo else 0, int(stream))

async def get_events_around(
self,
room_id: str,
Expand Down Expand Up @@ -1559,8 +1528,6 @@ async def paginate_room_events(
[r.event_id for r in rows], get_prev_content=True
)

self._set_before_and_after(events, rows)

return events, token

@cached()
Expand Down
Loading