Skip to content

Commit

Permalink
Use receipts event_stream_ordering instead of joins (element-hq#17032)
Browse files Browse the repository at this point in the history
Resurrecting matrix-org/synapse#13918.

This should reduce IOPs incurred by joining to the events table to
lookup stream ordering, which happens in many receipt handling code
paths. Like the previous PR I believe sufficient time has passed between
the original migration in DB schema 72 and now to merge this as-is. It's
highly unlikely that both the migration is still ongoing AND (active)
users still have any receipts prior to that date.

In the unlikely event there is a receipt without a populated
`event_stream_ordering` synapse will behave just as it does now when
receipts exist for events that don't (yet): for push action calculation
the receipts are just ignored.

I've removed the validation on event IDs as this is already covered
here:

https://github.com/element-hq/synapse/blob/59ceabcb9798793cd4312fdbcced4e612aeda84d/synapse/handlers/receipts.py#L189-L192
  • Loading branch information
Fizzadar authored Apr 12, 2024
1 parent 3a30846 commit fe4719a
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 19 deletions.
1 change: 1 addition & 0 deletions changelog.d/17032.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Use new receipts column to optimise receipt and push action SQL queries. Contributed by Nick @ Beeper (@fizzadar).
22 changes: 8 additions & 14 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ def _get_unread_counts_by_room_for_user_txn(
WITH all_receipts AS (
SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
{receipt_types_clause}
AND user_id = ?
Expand Down Expand Up @@ -621,13 +620,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
SELECT notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -659,13 +657,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
sql = f"""
SELECT COUNT(*), thread_id FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -738,13 +735,12 @@ def _get_thread(thread_id: str) -> NotifCounts:
thread_id
FROM event_push_actions
LEFT JOIN (
SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
SELECT thread_id, MAX(event_stream_ordering) AS threaded_receipt_stream_ordering
FROM receipts_linearized
LEFT JOIN events USING (room_id, event_id)
WHERE
user_id = ?
AND room_id = ?
AND stream_ordering > ?
AND event_stream_ordering > ?
AND {receipt_types_clause}
GROUP BY thread_id
) AS receipts USING (thread_id)
Expand Down Expand Up @@ -910,9 +906,8 @@ def _get_receipts_for_room_and_threads_txn(
# given this function generally gets called with only one room and
# thread ID.
sql = f"""
SELECT room_id, thread_id, MAX(stream_ordering)
SELECT room_id, thread_id, MAX(event_stream_ordering)
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {receipt_types_clause}
AND {thread_ids_clause}
AND {room_ids_clause}
Expand Down Expand Up @@ -1442,9 +1437,8 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
)

sql = """
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, r.event_stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
ORDER BY r.stream_id ASC
LIMIT ?
Expand Down
8 changes: 3 additions & 5 deletions synapse/storage/databases/main/receipts.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,14 +178,13 @@ def get_last_unthreaded_receipt_for_user_txn(
)

sql = f"""
SELECT event_id, stream_ordering
SELECT event_id, event_stream_ordering
FROM receipts_linearized
INNER JOIN events USING (room_id, event_id)
WHERE {clause}
AND user_id = ?
AND room_id = ?
AND thread_id IS NULL
ORDER BY stream_ordering DESC
ORDER BY event_stream_ordering DESC
LIMIT 1
"""

Expand Down Expand Up @@ -736,8 +735,7 @@ def _insert_linearized_receipt_txn(
thread_args = (thread_id,)

sql = f"""
SELECT stream_ordering, event_id FROM events
INNER JOIN receipts_linearized AS r USING (event_id, room_id)
SELECT r.event_stream_ordering, r.event_id FROM receipts_linearized AS r
WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ? AND {thread_clause}
"""
txn.execute(
Expand Down

0 comments on commit fe4719a

Please sign in to comment.