diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 90991031aa8..c1b0b2153a1 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -769,26 +769,29 @@ async def get_room_sync_data( and rooms_for_user_membership_at_to_token.membership == Membership.JOIN ): newly_joined = ( - rooms_for_user_membership_at_to_token.event_pos.stream - > from_token.room_key.get_stream_pos_for_instance( - rooms_for_user_membership_at_to_token.event_pos.instance_name + rooms_for_user_membership_at_to_token.event_pos.persisted_after( + from_token.room_key ) ) + # We should return historical messages (before token range) in the + # following cases because we want clients to be able to show a basic + # screen of information: + # - Initial sync (because no `from_token` to limit us anyway) + # - When users `newly_joined` + # - TODO: For an incremental sync where we haven't sent it down this + # connection before + should_limit_timeline_to_token_range = ( + from_token is not None and not newly_joined + ) + timeline_events, new_room_key = await self.store.paginate_room_events( room_id=room_id, # We're going to paginate backwards from the `to_token` from_key=to_token.room_key, - # We should return historical messages (before token range) in the - # following cases because we want clients to be able to show a basic - # screen of information: - # - Initial sync (because no `from_token` to limit us anyway) - # - When users `newly_joined` - # - TODO: For an incremental sync where we haven't sent it down this - # connection before to_key=( from_token.room_key - if from_token is not None and not newly_joined + if should_limit_timeline_to_token_range else None ), direction=Direction.BACKWARDS, @@ -832,7 +835,7 @@ async def get_room_sync_data( # old events in the timeline) num_live = 0 if from_token is not None: - for timeline_event in timeline_events: + for timeline_event in reversed(timeline_events): # This fields should be present for all persisted events assert timeline_event.internal_metadata.stream_ordering is not None assert timeline_event.internal_metadata.instance_name is not None @@ -843,6 +846,12 @@ async def get_room_sync_data( ) if persisted_position.persisted_after(from_token.room_key): num_live += 1 + else: + # Since we're iterating over the timeline events in + # reverse-chronological order, we can break once we hit an event + # that's not live. In the future, we could potentially optimize + # this more with a binary search (bisect). + break prev_batch_token = prev_batch_token.copy_and_replace( StreamKeyType.ROOM, new_room_key diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index db447738249..434eaa4789a 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -785,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet): Response JSON:: { - "next_pos": "s58_224_0_13_10_1_1_16_0_1", + "pos": "s58_224_0_13_10_1_1_16_0_1", "lists": { "foo-list": { "count": 1337, @@ -824,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 41, "invited_count": 1, "notification_count": 1, - "highlight_count": 0 + "highlight_count": 0, + "num_live": 2" }, // rooms from list "!foo:bar": { @@ -849,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet): "joined_count": 4, "invited_count": 0, "notification_count": 54, - "highlight_count": 3 + "highlight_count": 3, + "num_live": 1, }, // ... 99 more items }, @@ -927,7 +929,7 @@ async def encode_response( ) -> JsonDict: response: JsonDict = defaultdict(dict) - response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store) + response["pos"] = await sliding_sync_result.next_pos.to_string(self.store) serialized_lists = self.encode_lists(sliding_sync_result.lists) if serialized_lists: response["lists"] = serialized_lists diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 151658df534..b52236d6029 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1078,6 +1078,9 @@ class PersistedPosition: stream: int def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool: + """ + Checks whether this position happened after the token + """ return token.get_stream_pos_for_instance(self.instance_name) < self.stream diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index 5b611cd0962..d538716e5ac 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -19,6 +19,7 @@ # # import json +import logging from typing import List from parameterized import parameterized, parameterized_class @@ -35,7 +36,7 @@ ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID from synapse.util import Clock from tests import unittest @@ -44,6 +45,8 @@ ) from tests.server import TimedOutException +logger = logging.getLogger(__name__) + class FilterTestCase(unittest.HomeserverTestCase): user_id = "@apple:test" @@ -1379,11 +1382,9 @@ def test_wait_for_sync_token(self) -> None: channel.await_result(timeout_ms=200) self.assertEqual(channel.code, 200, channel.json_body) - # We expect the `next_pos` in the result to be the same as what we requested + # We expect the next `pos` in the result to be the same as what we requested # with because we weren't able to find anything new yet. - self.assertEqual( - channel.json_body["next_pos"], future_position_token_serialized - ) + self.assertEqual(channel.json_body["pos"], future_position_token_serialized) def test_filter_list(self) -> None: """ @@ -1602,7 +1603,15 @@ def test_rooms_limited_initial_sync(self) -> None: f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}", ) - def test_not_limited_initial_sync(self) -> None: + # With no `from_token` (initial sync), it's all historical since there is no + # "current" range + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_not_limited_initial_sync(self) -> None: """ Test that we mark `rooms` as `limited=False` when there are no more events to paginate to. @@ -1619,6 +1628,7 @@ def test_not_limited_initial_sync(self) -> None: self.helper.join(room_id1, user1_id, tok=user1_tok) # Make the Sliding Sync request + timeline_limit = 100 channel = self.make_request( "POST", self.sync_endpoint, @@ -1627,7 +1637,7 @@ def test_not_limited_initial_sync(self) -> None: "foo-list": { "ranges": [[0, 1]], "required_state": [], - "timeline_limit": 100, + "timeline_limit": timeline_limit, } } }, @@ -1642,9 +1652,257 @@ def test_not_limited_initial_sync(self) -> None: False, channel.json_body["rooms"][room_id1], ) + expected_number_of_events = 9 # We're just looking to make sure we got all of the events before hitting the `timeline_limit` self.assertEqual( len(channel.json_body["rooms"][room_id1]["timeline"]), - 9, + expected_number_of_events, channel.json_body["rooms"][room_id1]["timeline"], ) + self.assertLessEqual(expected_number_of_events, timeline_limit) + + # With no `from_token` (initial sync), it's all historical since there is no + # "live" token range. + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 0, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_incremental_sync(self) -> None: + """ + Test that `rooms` data during an incremental sync after an initial sync. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok) + + # Make an initial Sliding Sync request to grab a token. This is also a sanity + # check that we can go from initial to incremental sync. + sync_params = { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + } + channel = self.make_request( + "POST", + self.sync_endpoint, + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + next_pos = channel.json_body["pos"] + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok) + event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok) + + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + f"?pos={next_pos}", + sync_params, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We only expect to see the new events since the last sync which isn't enough to + # fill up the `timeline_limit`. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + False, + f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} ' + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure the latest events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + event_response3["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # All events are "live" + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 2, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_newly_joined_incremental_sync(self) -> None: + """ + Test that when we make an incremental sync with a `newly_joined` `rooms`, we are + able to see some historical events before the `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before token1", tok=user2_tok) + event_response2 = self.helper.send( + room_id1, "activity before token2", tok=user2_tok + ) + + from_token = self.event_sources.get_current_token() + + # Join the room after the `from_token` which will make us consider this room as + # `newly_joined`. + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Send some events but don't send enough to saturate the `timeline_limit`. + # We want to later test that we only get the new events since the `next_pos` + event_response3 = self.helper.send( + room_id1, "activity after token3", tok=user2_tok + ) + event_response4 = self.helper.send( + room_id1, "activity after token4", tok=user2_tok + ) + + # The `timeline_limit` is set to 4 so we can at least see one historical event + # before the `from_token`. We should see historical events because this is a + # `newly_joined` room. + timeline_limit = 4 + # Make an incremental Sliding Sync request (what we're trying to test) + channel = self.make_request( + "POST", + self.sync_endpoint + + f"?pos={self.get_success( + from_token.to_string(self.store) + )}", + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": timeline_limit, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # We should see the new events and the rest should be filled with historical + # events which will make us `limited=True` since there are more to paginate to. + self.assertEqual( + channel.json_body["rooms"][room_id1]["limited"], + True, + f"Our `timeline_limit` was {timeline_limit} " + + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. ' + + str(channel.json_body["rooms"][room_id1]), + ) + # Check to make sure that the "live" and historical events are returned + self.assertEqual( + [ + event["event_id"] + for event in channel.json_body["rooms"][room_id1]["timeline"] + ], + [ + event_response2["event_id"], + user1_join_response["event_id"], + event_response3["event_id"], + event_response4["event_id"], + ], + channel.json_body["rooms"][room_id1]["timeline"], + ) + + # Only events after the `from_token` are "live" (join, event3, event4) + self.assertEqual( + channel.json_body["rooms"][room_id1]["num_live"], + 3, + channel.json_body["rooms"][room_id1], + ) + + def test_rooms_invite_sync(self) -> None: + """ + Test that `rooms` we are invited to have some stripped `invite_state` and that + we can't see any timeline events because we haven't joined the room yet. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user1 = UserID.from_string(user1_id) + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + user2 = UserID.from_string(user2_id) + + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.send(room_id1, "activity before1", tok=user2_tok) + self.helper.send(room_id1, "activity before2", tok=user2_tok) + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.send(room_id1, "activity after3", tok=user2_tok) + self.helper.send(room_id1, "activity after4", tok=user2_tok) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 1]], + "required_state": [], + "timeline_limit": 3, + } + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Should not see anything (except maybe the invite event) because we haven't + # joined yet (`filter_events_for_client(...)` is doing the work here) + self.assertEqual( + channel.json_body["rooms"][room_id1]["timeline"], + [], + channel.json_body["rooms"][room_id1]["timeline"], + ) + # We should have some stripped state so the potential joiner can identify the + # room (we don't care about the order). + self.assertCountEqual( + channel.json_body["rooms"][room_id1]["invite_state"], + [ + { + "content": {"creator": user2_id, "room_version": "10"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.create", + }, + { + "content": {"join_rule": "public"}, + "sender": user2_id, + "state_key": "", + "type": "m.room.join_rules", + }, + { + "content": {"displayname": user2.localpart, "membership": "join"}, + "sender": user2_id, + "state_key": user2_id, + "type": "m.room.member", + }, + { + "content": {"displayname": user1.localpart, "membership": "invite"}, + "sender": user2_id, + "state_key": user1_id, + "type": "m.room.member", + }, + ], + channel.json_body["rooms"][room_id1]["invite_state"], + )