Skip to content

Commit

Permalink
Fix federation sender shard tests
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed Jan 18, 2025
1 parent 545f22d commit 0b31100
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 55 deletions.
91 changes: 55 additions & 36 deletions tests/handlers/test_presence.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,11 @@

from synapse.api.constants import EventTypes, Membership, PresenceState
from synapse.api.presence import UserDevicePresenceState, UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.api.room_versions import (
RoomVersion,
)
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import make_event_from_dict
from synapse.events import EventBase, make_event_from_dict
from synapse.federation.sender import FederationSender
from synapse.handlers.presence import (
BUSY_ONLINE_TIMEOUT,
Expand Down Expand Up @@ -1947,7 +1949,51 @@ def _add_new_user(self, room_id: str, user_id: str) -> None:

hostname = get_domain_from_id(user_id)

room_version = self.get_success(self.store.get_room_version_id(room_id))
room_version = self.get_success(self.store.get_room_version(room_id))

state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)

# Figure out what the forward extremities in the room are (the most recent
# events that aren't tied into the DAG)
forward_extremity_event_ids = self.get_success(
self.hs.get_datastores().main.get_latest_event_ids_in_room(room_id)
)

event = self.create_fake_event_from_remote_server(
remote_server_name=hostname,
event_dict={
"room_id": room_id,
"sender": user_id,
"type": EventTypes.Member,
"state_key": user_id,
"depth": 1000,
"origin_server_ts": 1,
"content": {"membership": Membership.JOIN},
"auth_events": [
state_map[(EventTypes.Create, "")].event_id,
state_map[(EventTypes.JoinRules, "")].event_id,
],
"prev_events": list(forward_extremity_event_ids),
},
room_version=room_version,
)

self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))

# Check that it was successfully persisted.
self.get_success(self.store.get_event(event.event_id))
self.get_success(self.store.get_event(event.event_id))

def create_fake_event_from_remote_server(
self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
) -> EventBase:
"""
This is similar to what `FederatingHomeserverTestCase` is doing but we don't
need all of the extra baggage and we want to be able to create an event from
many remote servers.
"""

# poke the other server's signing key into the key store, so that we don't
# make requests for it
Expand All @@ -1957,8 +2003,8 @@ def _add_new_user(self, room_id: str, user_id: str) -> None:

self.get_success(
self.hs.get_datastores().main.store_server_keys_response(
hostname,
from_server=hostname,
remote_server_name,
from_server=remote_server_name,
ts_added_ms=self.clock.time_msec(),
verify_keys={
verify_key_id: FetchKeyResult(
Expand All @@ -1974,42 +2020,15 @@ def _add_new_user(self, room_id: str, user_id: str) -> None:
)
)

state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)

# Figure out what the forward extremities in the room are (the most recent
# events that aren't tied into the DAG)
forward_extremity_event_ids = self.get_success(
self.hs.get_datastores().main.get_latest_event_ids_in_room(room_id)
)
event_dict = {
"room_id": room_id,
"sender": user_id,
"state_key": user_id,
"depth": 1000,
"origin_server_ts": 1,
"type": EventTypes.Member,
"content": {"membership": Membership.JOIN},
"auth_events": [
state_map[(EventTypes.Create, "")].event_id,
state_map[(EventTypes.JoinRules, "")].event_id,
],
"prev_events": list(forward_extremity_event_ids),
}
add_hashes_and_signatures(
room_version=KNOWN_ROOM_VERSIONS[room_version],
room_version=room_version,
event_dict=event_dict,
signature_name=hostname,
signature_name=remote_server_name,
signing_key=other_server_signature_key,
)
event = make_event_from_dict(
event_dict,
room_version=KNOWN_ROOM_VERSIONS[room_version],
room_version=room_version,
)

self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))

# Check that it was successfully persisted.
self.get_success(self.store.get_event(event.event_id))
self.get_success(self.store.get_event(event.event_id))
return event
110 changes: 91 additions & 19 deletions tests/replication/test_federation_sender_shard.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,26 @@
from unittest.mock import AsyncMock, Mock

from netaddr import IPSet
from signedjson.key import (
encode_verify_key_base64,
generate_signing_key,
get_verify_key,
)

from twisted.test.proto_helpers import MemoryReactor

from synapse.api.constants import EventTypes, Membership
from synapse.events.builder import EventBuilderFactory
from synapse.api.room_versions import RoomVersion
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events import EventBase, make_event_from_dict
from synapse.handlers.typing import TypingWriterHandler
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.rest.admin import register_servlets_for_client_rest_resource
from synapse.rest.client import login, room
from synapse.types import UserID, create_requester
from synapse.server import HomeServer
from synapse.storage.keys import FetchKeyResult
from synapse.types import JsonDict, UserID, create_requester
from synapse.util import Clock

from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.server import get_clock
Expand Down Expand Up @@ -63,6 +75,9 @@ def setUp(self) -> None:
ip_blocklist=IPSet(),
)

def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.storage_controllers = hs.get_storage_controllers()

def test_send_event_single_sender(self) -> None:
"""Test that using a single federation sender worker correctly sends a
new event.
Expand Down Expand Up @@ -243,35 +258,92 @@ def test_send_typing_sharded(self) -> None:
self.assertTrue(sent_on_1)
self.assertTrue(sent_on_2)

def create_fake_event_from_remote_server(
self, remote_server_name: str, event_dict: JsonDict, room_version: RoomVersion
) -> EventBase:
"""
This is similar to what `FederatingHomeserverTestCase` is doing but we don't
need all of the extra baggage and we want to be able to create an event from
many remote servers.
"""

# poke the other server's signing key into the key store, so that we don't
# make requests for it
other_server_signature_key = generate_signing_key("test")
verify_key = get_verify_key(other_server_signature_key)
verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version)

self.get_success(
self.hs.get_datastores().main.store_server_keys_response(
remote_server_name,
from_server=remote_server_name,
ts_added_ms=self.clock.time_msec(),
verify_keys={
verify_key_id: FetchKeyResult(
verify_key=verify_key,
valid_until_ts=self.clock.time_msec() + 10000,
),
},
response_json={
"verify_keys": {
verify_key_id: {"key": encode_verify_key_base64(verify_key)}
}
},
)
)

add_hashes_and_signatures(
room_version=room_version,
event_dict=event_dict,
signature_name=remote_server_name,
signing_key=other_server_signature_key,
)
event = make_event_from_dict(
event_dict,
room_version=room_version,
)

return event

def create_room_with_remote_server(
self, user: str, token: str, remote_server: str = "other_server"
) -> str:
room = self.helper.create_room_as(user, tok=token)
room_id = self.helper.create_room_as(user, tok=token)
store = self.hs.get_datastores().main
federation = self.hs.get_federation_event_handler()

prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
room_version = self.get_success(store.get_room_version(room))
room_version = self.get_success(store.get_room_version(room_id))

factory = EventBuilderFactory(self.hs)
factory.hostname = remote_server
state_map = self.get_success(
self.storage_controllers.state.get_current_state(room_id)
)

# Figure out what the forward extremities in the room are (the most recent
# events that aren't tied into the DAG)
prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room_id))

user_id = UserID("user", remote_server).to_string()

event_dict = {
"type": EventTypes.Member,
"state_key": user_id,
"content": {"membership": Membership.JOIN},
"sender": user_id,
"room_id": room,
}

builder = factory.for_room_version(room_version, event_dict)
join_event = self.get_success(
builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
join_event = self.create_fake_event_from_remote_server(
remote_server_name=remote_server,
event_dict={
"room_id": room_id,
"sender": user_id,
"type": EventTypes.Member,
"state_key": user_id,
"depth": 1000,
"origin_server_ts": 1,
"content": {"membership": Membership.JOIN},
"auth_events": [
state_map[(EventTypes.Create, "")].event_id,
state_map[(EventTypes.JoinRules, "")].event_id,
],
"prev_events": list(prev_event_ids),
},
room_version=room_version,
)

self.get_success(federation.on_send_membership_event(remote_server, join_event))
self.replicate()

return room
return room_id

0 comments on commit 0b31100

Please sign in to comment.