Skip to content

Commit

Permalink
Merge branch 'develop' into madlittlemods/msc3575-sliding-sync-0.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
MadLittleMods committed May 29, 2024
2 parents 8bf5a62 + 466f344 commit 34d67fd
Show file tree
Hide file tree
Showing 29 changed files with 504 additions and 523 deletions.
1 change: 1 addition & 0 deletions changelog.d/17083.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Improve DB usage when fetching related events.
1 change: 1 addition & 0 deletions changelog.d/17226.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Move towards using `MultiWriterIdGenerator` everywhere.
1 change: 1 addition & 0 deletions changelog.d/17238.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change the `allow_unsafe_locale` config option to also apply when setting up new databases.
1 change: 1 addition & 0 deletions changelog.d/17239.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix errors in logs about closing incorrect logging contexts when media gets rejected by a module.
1 change: 1 addition & 0 deletions changelog.d/17240.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Ignore attempts to send to-device messages to bad users, to avoid log spam when we try to connect to the bad server.
1 change: 1 addition & 0 deletions changelog.d/17241.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix handling of duplicate concurrent uploading of device one-time-keys.
11 changes: 5 additions & 6 deletions docs/postgres.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,11 @@ host all all ::1/128 ident
### Fixing incorrect `COLLATE` or `CTYPE`
Synapse will refuse to set up a new database if it has the wrong values of
`COLLATE` and `CTYPE` set. Synapse will also refuse to start an existing database with incorrect values
of `COLLATE` and `CTYPE` unless the config flag `allow_unsafe_locale`, found in the
`database` section of the config, is set to true. Using different locales can cause issues if the locale library is updated from
underneath the database, or if a different version of the locale is used on any
replicas.
Synapse will refuse to start when using a database with incorrect values of
`COLLATE` and `CTYPE` unless the config flag `allow_unsafe_locale`, found in the
`database` section of the config, is set to true. Using different locales can
cause issues if the locale library is updated from underneath the database, or
if a different version of the locale is used on any replicas.
If you have a database with an unsafe locale, the safest way to fix the issue is to dump the database and recreate it with
the correct locale parameter (as shown above). It is also possible to change the
Expand Down
7 changes: 7 additions & 0 deletions synapse/handlers/devicemessage.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,13 @@ async def send_device_message(
local_messages = {}
remote_messages: Dict[str, Dict[str, Dict[str, JsonDict]]] = {}
for user_id, by_device in messages.items():
if not UserID.is_valid(user_id):
logger.warning(
"Ignoring attempt to send device message to invalid user: %r",
user_id,
)
continue

# add an opentracing log entry for each message
for device_id, message_content in by_device.items():
log_kv(
Expand Down
78 changes: 45 additions & 33 deletions synapse/handlers/e2e_keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@
logger = logging.getLogger(__name__)


ONE_TIME_KEY_UPLOAD = "one_time_key_upload_lock"


class E2eKeysHandler:
def __init__(self, hs: "HomeServer"):
self.config = hs.config
Expand All @@ -62,6 +65,7 @@ def __init__(self, hs: "HomeServer"):
self._appservice_handler = hs.get_application_service_handler()
self.is_mine = hs.is_mine
self.clock = hs.get_clock()
self._worker_lock_handler = hs.get_worker_locks_handler()

federation_registry = hs.get_federation_registry()

Expand Down Expand Up @@ -855,45 +859,53 @@ async def upload_keys_for_user(
async def _upload_one_time_keys_for_user(
self, user_id: str, device_id: str, time_now: int, one_time_keys: JsonDict
) -> None:
logger.info(
"Adding one_time_keys %r for device %r for user %r at %d",
one_time_keys.keys(),
device_id,
user_id,
time_now,
)
# We take out a lock so that we don't have to worry about a client
# sending duplicate requests.
lock_key = f"{user_id}_{device_id}"
async with self._worker_lock_handler.acquire_lock(
ONE_TIME_KEY_UPLOAD, lock_key
):
logger.info(
"Adding one_time_keys %r for device %r for user %r at %d",
one_time_keys.keys(),
device_id,
user_id,
time_now,
)

# make a list of (alg, id, key) tuples
key_list = []
for key_id, key_obj in one_time_keys.items():
algorithm, key_id = key_id.split(":")
key_list.append((algorithm, key_id, key_obj))
# make a list of (alg, id, key) tuples
key_list = []
for key_id, key_obj in one_time_keys.items():
algorithm, key_id = key_id.split(":")
key_list.append((algorithm, key_id, key_obj))

# First we check if we have already persisted any of the keys.
existing_key_map = await self.store.get_e2e_one_time_keys(
user_id, device_id, [k_id for _, k_id, _ in key_list]
)
# First we check if we have already persisted any of the keys.
existing_key_map = await self.store.get_e2e_one_time_keys(
user_id, device_id, [k_id for _, k_id, _ in key_list]
)

new_keys = [] # Keys that we need to insert. (alg, id, json) tuples.
for algorithm, key_id, key in key_list:
ex_json = existing_key_map.get((algorithm, key_id), None)
if ex_json:
if not _one_time_keys_match(ex_json, key):
raise SynapseError(
400,
(
"One time key %s:%s already exists. "
"Old key: %s; new key: %r"
new_keys = [] # Keys that we need to insert. (alg, id, json) tuples.
for algorithm, key_id, key in key_list:
ex_json = existing_key_map.get((algorithm, key_id), None)
if ex_json:
if not _one_time_keys_match(ex_json, key):
raise SynapseError(
400,
(
"One time key %s:%s already exists. "
"Old key: %s; new key: %r"
)
% (algorithm, key_id, ex_json, key),
)
% (algorithm, key_id, ex_json, key),
else:
new_keys.append(
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)
else:
new_keys.append(
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
)

log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
await self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
await self.store.add_e2e_one_time_keys(
user_id, device_id, time_now, new_keys
)

async def upload_signing_keys_for_user(
self, user_id: str, keys: JsonDict
Expand Down
2 changes: 1 addition & 1 deletion synapse/handlers/relations.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,9 +393,9 @@ async def _get_threads_for_events(

# Attempt to find another event to use as the latest event.
potential_events, _ = await self._main_store.get_relations_for_event(
room_id,
event_id,
event,
room_id,
RelationTypes.THREAD,
direction=Direction.FORWARDS,
)
Expand Down
11 changes: 2 additions & 9 deletions synapse/media/media_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,7 @@ async def _download_remote_file(

file_info = FileInfo(server_name=server_name, file_id=file_id)

with self.media_storage.store_into_file(file_info) as (f, fname, finish):
async with self.media_storage.store_into_file(file_info) as (f, fname):
try:
length, headers = await self.client.download_media(
server_name,
Expand Down Expand Up @@ -693,8 +693,6 @@ async def _download_remote_file(
)
raise SynapseError(502, "Failed to fetch remote media")

await finish()

if b"Content-Type" in headers:
media_type = headers[b"Content-Type"][0].decode("ascii")
else:
Expand Down Expand Up @@ -1045,14 +1043,9 @@ async def _generate_thumbnails(
),
)

with self.media_storage.store_into_file(file_info) as (
f,
fname,
finish,
):
async with self.media_storage.store_into_file(file_info) as (f, fname):
try:
await self.media_storage.write_to_file(t_byte_source, f)
await finish()
finally:
t_byte_source.close()

Expand Down
102 changes: 37 additions & 65 deletions synapse/media/media_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
IO,
TYPE_CHECKING,
Any,
Awaitable,
AsyncIterator,
BinaryIO,
Callable,
Generator,
Optional,
Sequence,
Tuple,
Expand Down Expand Up @@ -97,11 +96,9 @@ async def store_file(self, source: IO, file_info: FileInfo) -> str:
the file path written to in the primary media store
"""

with self.store_into_file(file_info) as (f, fname, finish_cb):
async with self.store_into_file(file_info) as (f, fname):
# Write to the main media repository
await self.write_to_file(source, f)
# Write to the other storage providers
await finish_cb()

return fname

Expand All @@ -111,32 +108,27 @@ async def write_to_file(self, source: IO, output: IO) -> None:
await defer_to_thread(self.reactor, _write_file_synchronously, source, output)

@trace_with_opname("MediaStorage.store_into_file")
@contextlib.contextmanager
def store_into_file(
@contextlib.asynccontextmanager
async def store_into_file(
self, file_info: FileInfo
) -> Generator[Tuple[BinaryIO, str, Callable[[], Awaitable[None]]], None, None]:
"""Context manager used to get a file like object to write into, as
) -> AsyncIterator[Tuple[BinaryIO, str]]:
"""Async Context manager used to get a file like object to write into, as
described by file_info.
Actually yields a 3-tuple (file, fname, finish_cb), where file is a file
like object that can be written to, fname is the absolute path of file
on disk, and finish_cb is a function that returns an awaitable.
Actually yields a 2-tuple (file, fname,), where file is a file
like object that can be written to and fname is the absolute path of file
on disk.
fname can be used to read the contents from after upload, e.g. to
generate thumbnails.
finish_cb must be called and waited on after the file has been successfully been
written to. Should not be called if there was an error. Checks for spam and
stores the file into the configured storage providers.
Args:
file_info: Info about the file to store
Example:
with media_storage.store_into_file(info) as (f, fname, finish_cb):
async with media_storage.store_into_file(info) as (f, fname,):
# .. write into f ...
await finish_cb()
"""

path = self._file_info_to_path(file_info)
Expand All @@ -145,62 +137,42 @@ def store_into_file(
dirname = os.path.dirname(fname)
os.makedirs(dirname, exist_ok=True)

finished_called = [False]

main_media_repo_write_trace_scope = start_active_span(
"writing to main media repo"
)
main_media_repo_write_trace_scope.__enter__()

try:
with open(fname, "wb") as f:

async def finish() -> None:
# When someone calls finish, we assume they are done writing to the main media repo
main_media_repo_write_trace_scope.__exit__(None, None, None)

with start_active_span("writing to other storage providers"):
# Ensure that all writes have been flushed and close the
# file.
f.flush()
f.close()

spam_check = await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])

for provider in self.storage_providers:
with start_active_span(str(provider)):
await provider.store_file(path, file_info)

finished_called[0] = True

yield f, fname, finish
except Exception as e:
with main_media_repo_write_trace_scope:
try:
main_media_repo_write_trace_scope.__exit__(
type(e), None, e.__traceback__
)
os.remove(fname)
except Exception:
pass
with open(fname, "wb") as f:
yield f, fname

raise e from None
except Exception as e:
try:
os.remove(fname)
except Exception:
pass

if not finished_called:
exc = Exception("Finished callback not called")
main_media_repo_write_trace_scope.__exit__(
type(exc), None, exc.__traceback__
raise e from None

with start_active_span("writing to other storage providers"):
spam_check = (
await self._spam_checker_module_callbacks.check_media_file_for_spam(
ReadableFileWrapper(self.clock, fname), file_info
)
)
raise exc
if spam_check != self._spam_checker_module_callbacks.NOT_SPAM:
logger.info("Blocking media due to spam checker")
# Note that we'll delete the stored media, due to the
# try/except below. The media also won't be stored in
# the DB.
# We currently ignore any additional field returned by
# the spam-check API.
raise SpamMediaException(errcode=spam_check[0])

for provider in self.storage_providers:
with start_active_span(str(provider)):
await provider.store_file(path, file_info)

async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
"""Attempts to fetch media described by file_info from the local cache
Expand Down
4 changes: 1 addition & 3 deletions synapse/media/url_previewer.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,7 +592,7 @@ async def _handle_url(

file_info = FileInfo(server_name=None, file_id=file_id, url_cache=True)

with self.media_storage.store_into_file(file_info) as (f, fname, finish):
async with self.media_storage.store_into_file(file_info) as (f, fname):
if url.startswith("data:"):
if not allow_data_urls:
raise SynapseError(
Expand All @@ -603,8 +603,6 @@ async def _handle_url(
else:
download_result = await self._download_url(url, f)

await finish()

try:
time_now_ms = self.clock.time_msec()

Expand Down
Loading

0 comments on commit 34d67fd

Please sign in to comment.