From 50383098ff8dd6ea00dc988e01b3542d8e63d079 Mon Sep 17 00:00:00 2001 From: Benjamin Bouvier Date: Wed, 15 Jan 2025 16:55:57 +0100 Subject: [PATCH] feat(event cache): redact events in the database whenever they're redacted --- crates/matrix-sdk-base/src/lib.rs | 7 +- crates/matrix-sdk-base/src/rooms/mod.rs | 2 +- crates/matrix-sdk-base/src/rooms/normal.rs | 6 +- .../src/deserialized_responses.rs | 13 ++ crates/matrix-sdk/src/event_cache/mod.rs | 13 +- .../matrix-sdk/src/event_cache/pagination.rs | 11 +- .../matrix-sdk/src/event_cache/room/events.rs | 87 +++++++++- crates/matrix-sdk/src/event_cache/room/mod.rs | 20 ++- .../tests/integration/event_cache.rs | 151 +++++++++++++++++- 9 files changed, 292 insertions(+), 18 deletions(-) diff --git a/crates/matrix-sdk-base/src/lib.rs b/crates/matrix-sdk-base/src/lib.rs index ba165ba8026..cc4e5128403 100644 --- a/crates/matrix-sdk-base/src/lib.rs +++ b/crates/matrix-sdk-base/src/lib.rs @@ -37,7 +37,6 @@ mod rooms; pub mod read_receipts; pub use read_receipts::PreviousEventsProvider; -pub use rooms::RoomMembersUpdate; pub mod sliding_sync; pub mod store; @@ -56,9 +55,9 @@ pub use http; pub use matrix_sdk_crypto as crypto; pub use once_cell; pub use rooms::{ - Room, RoomCreateWithCreatorEventContent, RoomDisplayName, RoomHero, RoomInfo, - RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMember, RoomMemberships, RoomState, - RoomStateFilter, + apply_redaction, Room, RoomCreateWithCreatorEventContent, RoomDisplayName, RoomHero, RoomInfo, + RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMember, RoomMembersUpdate, + RoomMemberships, RoomState, RoomStateFilter, }; pub use store::{ ComposerDraft, ComposerDraftType, QueueWedgeError, StateChanges, StateStore, StateStoreDataKey, diff --git a/crates/matrix-sdk-base/src/rooms/mod.rs b/crates/matrix-sdk-base/src/rooms/mod.rs index d738199ed95..a128a10f17e 100644 --- a/crates/matrix-sdk-base/src/rooms/mod.rs +++ b/crates/matrix-sdk-base/src/rooms/mod.rs @@ -12,7 +12,7 @@ use std::{ use bitflags::bitflags; pub use members::RoomMember; pub use normal::{ - Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, + apply_redaction, Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMembersUpdate, RoomState, RoomStateFilter, }; use regex::Regex; diff --git a/crates/matrix-sdk-base/src/rooms/normal.rs b/crates/matrix-sdk-base/src/rooms/normal.rs index b393ddac9f1..f534ad399e8 100644 --- a/crates/matrix-sdk-base/src/rooms/normal.rs +++ b/crates/matrix-sdk-base/src/rooms/normal.rs @@ -2018,7 +2018,9 @@ impl RoomInfo { } } -fn apply_redaction( +/// Apply a redaction to the given target `event`, given the raw redaction event +/// and the room version. +pub fn apply_redaction( event: &Raw, raw_redaction: &Raw, room_version: &RoomVersionId, @@ -2044,7 +2046,7 @@ fn apply_redaction( let redact_result = redact_in_place(&mut event_json, room_version, Some(redacted_because)); if let Err(e) = redact_result { - warn!("Failed to redact latest event: {e}"); + warn!("Failed to redact event: {e}"); return None; } diff --git a/crates/matrix-sdk-common/src/deserialized_responses.rs b/crates/matrix-sdk-common/src/deserialized_responses.rs index 69458887539..cf27dae0d9c 100644 --- a/crates/matrix-sdk-common/src/deserialized_responses.rs +++ b/crates/matrix-sdk-common/src/deserialized_responses.rs @@ -390,6 +390,19 @@ impl SyncTimelineEvent { self.kind.raw() } + /// Replace the raw event included in this item by another one. + pub fn replace_raw(&mut self, replacement: Raw) { + match &mut self.kind { + TimelineEventKind::Decrypted(decrypted) => decrypted.event = replacement, + TimelineEventKind::UnableToDecrypt { event, .. } + | TimelineEventKind::PlainText { event } => { + // It's safe to cast `AnyMessageLikeEvent` into `AnySyncMessageLikeEvent`, + // because the former contains a superset of the fields included in the latter. + *event = replacement.cast(); + } + } + } + /// If the event was a decrypted event that was successfully decrypted, get /// its encryption info. Otherwise, `None`. pub fn encryption_info(&self) -> Option<&EncryptionInfo> { diff --git a/crates/matrix-sdk/src/event_cache/mod.rs b/crates/matrix-sdk/src/event_cache/mod.rs index e1c58dc0082..0ea2fa05b55 100644 --- a/crates/matrix-sdk/src/event_cache/mod.rs +++ b/crates/matrix-sdk/src/event_cache/mod.rs @@ -52,7 +52,7 @@ use ruma::{ AnySyncTimelineEvent, }, serde::Raw, - EventId, OwnedEventId, OwnedRoomId, RoomId, + EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId, }; use tokio::sync::{ broadcast::{error::RecvError, Receiver}, @@ -622,10 +622,21 @@ impl EventCacheInner { let room_state = RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?; + let room_version = self + .client + .get() + .and_then(|client| client.get_room(room_id)) + .map(|room| room.clone_info().room_version_or_default()) + .unwrap_or_else(|| { + warn!("unknown room version for {room_id}, using default V1"); + RoomVersionId::V1 + }); + let room_event_cache = RoomEventCache::new( self.client.clone(), room_state, room_id.to_owned(), + room_version, self.all_events.clone(), ); diff --git a/crates/matrix-sdk/src/event_cache/pagination.rs b/crates/matrix-sdk/src/event_cache/pagination.rs index 4d135666649..31cdf963301 100644 --- a/crates/matrix-sdk/src/event_cache/pagination.rs +++ b/crates/matrix-sdk/src/event_cache/pagination.rs @@ -181,7 +181,8 @@ impl RoomPagination { // (backward). The `RoomEvents` API expects the first event to be the oldest. .rev() .cloned() - .map(SyncTimelineEvent::from); + .map(SyncTimelineEvent::from) + .collect::>(); let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos); @@ -190,20 +191,20 @@ impl RoomPagination { // There is a prior gap, let's replace it by new events! trace!("replaced gap with new events from backpagination"); room_events - .replace_gap_at(sync_events, gap_id) + .replace_gap_at(sync_events.clone(), gap_id) .expect("gap_identifier is a valid chunk id we read previously") } else if let Some(pos) = first_event_pos { // No prior gap, but we had some events: assume we need to prepend events // before those. trace!("inserted events before the first known event"); let report = room_events - .insert_events_at(sync_events, pos) + .insert_events_at(sync_events.clone(), pos) .expect("pos is a valid position we just read above"); (report, Some(pos)) } else { // No prior gap, and no prior events: push the events. trace!("pushing events received from back-pagination"); - let report = room_events.push_events(sync_events); + let report = room_events.push_events(sync_events.clone()); // A new gap may be inserted before the new events, if there are any. let next_pos = room_events.events().next().map(|(item_pos, _)| item_pos); (report, next_pos) @@ -228,6 +229,8 @@ impl RoomPagination { debug!("not storing previous batch token, because we deduplicated all new back-paginated events"); } + room_events.on_new_events(&self.inner.room_version, sync_events.iter()); + BackPaginationOutcome { events, reached_start } }) .await?; diff --git a/crates/matrix-sdk/src/event_cache/room/events.rs b/crates/matrix-sdk/src/event_cache/room/events.rs index 516432304a6..142594ecb7a 100644 --- a/crates/matrix-sdk/src/event_cache/room/events.rs +++ b/crates/matrix-sdk/src/event_cache/room/events.rs @@ -15,14 +15,17 @@ use std::cmp::Ordering; use eyeball_im::VectorDiff; -use matrix_sdk_base::event_cache::store::DEFAULT_CHUNK_CAPACITY; pub use matrix_sdk_base::event_cache::{Event, Gap}; +use matrix_sdk_base::{apply_redaction, event_cache::store::DEFAULT_CHUNK_CAPACITY}; use matrix_sdk_common::linked_chunk::{ AsVector, Chunk, ChunkIdentifier, EmptyChunk, Error, Iter, IterBackward, LinkedChunk, ObservableUpdates, Position, }; -use ruma::OwnedEventId; -use tracing::{debug, error, warn}; +use ruma::{ + events::{room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent}, + OwnedEventId, RoomVersionId, +}; +use tracing::{debug, error, instrument, trace, warn}; use super::{ super::deduplicator::{Decoration, Deduplicator}, @@ -90,6 +93,84 @@ impl RoomEvents { self.chunks.clear(); } + /// If the given event is a redaction, try to retrieve the to-be-redacted + /// event in the chunk, and replace it by the redacted form. + #[instrument(skip_all)] + fn maybe_apply_new_redaction(&mut self, room_version: &RoomVersionId, event: &Event) { + let Ok(AnySyncTimelineEvent::MessageLike( + ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction), + )) = event.raw().deserialize() + else { + return; + }; + + let Some(event_id) = redaction.redacts(room_version) else { + warn!("missing target event id from the redaction event"); + return; + }; + + // Replace the redacted event by a redacted form, if we knew about it. + let mut items = self.chunks.items(); + + if let Some((pos, target_event)) = + items.find(|(_, item)| item.event_id().as_deref() == Some(event_id)) + { + // Don't redact already redacted events. + if let Ok(deserialized) = target_event.raw().deserialize() { + match deserialized { + AnySyncTimelineEvent::MessageLike(ev) => { + if ev.original_content().is_none() { + // Already redacted. + return; + } + } + AnySyncTimelineEvent::State(ev) => { + if ev.original_content().is_none() { + // Already redacted. + return; + } + } + } + } + + if let Some(redacted_event) = apply_redaction( + target_event.raw(), + event.raw().cast_ref::(), + room_version, + ) { + let mut copy = target_event.clone(); + + // It's safe to cast `redacted_event` here: + // - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent` + // when calling .raw(), so it's still one under the hood. + // - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case. + copy.replace_raw(redacted_event.cast()); + + // Get rid of the immutable borrow on self.chunks. + drop(items); + + self.chunks + .replace_item_at(pos, copy) + .expect("should have been a valid position of an item"); + } + } else { + trace!("redacted event is missing from the linked chunk"); + } + + // TODO: remove all related events too! + } + + /// Callback to call whenever we touch events in the database. + pub fn on_new_events<'a>( + &mut self, + room_version: &RoomVersionId, + events: impl Iterator, + ) { + for ev in events { + self.maybe_apply_new_redaction(room_version, ev); + } + } + /// Push events after all events or gaps. /// /// The last event in `events` is the most recent one. diff --git a/crates/matrix-sdk/src/event_cache/room/mod.rs b/crates/matrix-sdk/src/event_cache/room/mod.rs index de83c5cac8f..5d011d4a906 100644 --- a/crates/matrix-sdk/src/event_cache/room/mod.rs +++ b/crates/matrix-sdk/src/event_cache/room/mod.rs @@ -25,7 +25,7 @@ use matrix_sdk_base::{ use ruma::{ events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent}, serde::Raw, - EventId, OwnedEventId, OwnedRoomId, + EventId, OwnedEventId, OwnedRoomId, RoomVersionId, }; use tokio::sync::{ broadcast::{Receiver, Sender}, @@ -61,9 +61,18 @@ impl RoomEventCache { client: WeakClient, state: RoomEventCacheState, room_id: OwnedRoomId, + room_version: RoomVersionId, all_events_cache: Arc>, ) -> Self { - Self { inner: Arc::new(RoomEventCacheInner::new(client, state, room_id, all_events_cache)) } + Self { + inner: Arc::new(RoomEventCacheInner::new( + client, + state, + room_id, + room_version, + all_events_cache, + )), + } } /// Subscribe to room updates for this room, after getting the initial list @@ -189,6 +198,9 @@ pub(super) struct RoomEventCacheInner { /// The room id for this room. room_id: OwnedRoomId, + /// The room version for this room. + pub(crate) room_version: RoomVersionId, + /// Sender part for subscribers to this room. pub sender: Sender, @@ -222,12 +234,14 @@ impl RoomEventCacheInner { client: WeakClient, state: RoomEventCacheState, room_id: OwnedRoomId, + room_version: RoomVersionId, all_events_cache: Arc>, ) -> Self { let sender = Sender::new(32); let weak_room = WeakRoom::new(client, room_id); Self { room_id: weak_room.room_id().to_owned(), + room_version, state: RwLock::new(state), all_events: all_events_cache, sender, @@ -444,6 +458,8 @@ impl RoomEventCacheInner { .expect("we obtained the valid position beforehand"); } } + + room_events.on_new_events(&self.room_version, sync_timeline_events.iter()); }) .await?; diff --git a/crates/matrix-sdk/tests/integration/event_cache.rs b/crates/matrix-sdk/tests/integration/event_cache.rs index e43fcbb1f2a..a2e2aaf7d44 100644 --- a/crates/matrix-sdk/tests/integration/event_cache.rs +++ b/crates/matrix-sdk/tests/integration/event_cache.rs @@ -22,7 +22,11 @@ use matrix_sdk::{ use matrix_sdk_test::{ async_test, event_factory::EventFactory, GlobalAccountDataTestEvent, JoinedRoomBuilder, }; -use ruma::{event_id, room_id, user_id}; +use ruma::{ + event_id, + events::{AnySyncMessageLikeEvent, AnySyncTimelineEvent}, + room_id, user_id, RoomVersionId, +}; use serde_json::json; use tokio::{spawn, sync::broadcast, time::sleep}; @@ -1453,3 +1457,148 @@ async fn test_dont_delete_gap_that_wasnt_inserted() { // This doesn't cause an update, because nothing changed. assert!(stream.is_empty()); } + +#[async_test] +async fn test_apply_redaction_when_redaction_comes_later() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let event_cache = client.event_cache(); + + // Immediately subscribe the event cache to sync updates. + event_cache.subscribe().unwrap(); + event_cache.enable_storage().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + + let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); + + // Start with a room with two events. + let room = server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id).add_timeline_event( + f.text_msg("inapprops").event_id(event_id!("$1")).into_raw_sync(), + ), + ) + .await; + + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + + // Wait for the first event. + let (events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + if events.is_empty() { + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { .. }) = subscriber.recv() + ); + } + + // Sync a redaction for the event $1. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.redaction(event_id!("$1")).into_raw_sync()), + ) + .await; + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + assert_eq!(diffs.len(), 2); + + // First, the redaction event itself. + { + assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); + assert_eq!(new_events.len(), 1); + let ev = new_events[0].raw().deserialize().unwrap(); + assert_let!( + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) = ev + ); + assert_eq!(ev.redacts(&RoomVersionId::V1).unwrap(), event_id!("$1")); + } + + // Then, we have an update for the redacted event. + { + assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &diffs[1]); + let ev = redacted_event.raw().deserialize().unwrap(); + assert_let!( + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev + ); + // The event has been redacted! + assert_matches!(ev.as_original(), None); + } + + // And done for now. + assert!(subscriber.is_empty()); +} + +#[async_test] +async fn test_apply_redaction_when_redacted_and_redaction_are_in_same_sync() { + let server = MatrixMockServer::new().await; + let client = server.client_builder().build().await; + + let event_cache = client.event_cache(); + + // Immediately subscribe the event cache to sync updates. + event_cache.subscribe().unwrap(); + event_cache.enable_storage().unwrap(); + + let room_id = room_id!("!omelette:fromage.fr"); + let room = server.sync_joined_room(&client, room_id).await; + let (room_event_cache, _drop_handles) = room.event_cache().await.unwrap(); + let (_events, mut subscriber) = room_event_cache.subscribe().await.unwrap(); + + let f = EventFactory::new().room(room_id).sender(user_id!("@a:b.c")); + + // Now include a sync with both the original event *and* the redacted one. + server + .sync_room( + &client, + JoinedRoomBuilder::new(room_id) + .add_timeline_event(f.text_msg("bleh").event_id(event_id!("$2")).into_raw_sync()) + .add_timeline_event(f.redaction(event_id!("$2")).into_raw_sync()), + ) + .await; + + assert_let_timeout!( + Ok(RoomEventCacheUpdate::UpdateTimelineEvents { diffs, .. }) = subscriber.recv() + ); + + assert_eq!(diffs.len(), 2); + + // First, we get an update with all the new events. + { + assert_let!(VectorDiff::Append { values: new_events } = &diffs[0]); + assert_eq!(new_events.len(), 2); + + // The original event. + let ev = new_events[0].raw().deserialize().unwrap(); + assert_let!( + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev + ); + assert_eq!(ev.as_original().unwrap().content.body(), "bleh"); + + // The redaction. + let ev = new_events[1].raw().deserialize().unwrap(); + assert_let!( + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomRedaction(ev)) = ev + ); + assert_eq!(ev.redacts(&RoomVersionId::V1).unwrap(), event_id!("$2")); + } + + // Then the redaction of the event happens separately. + { + assert_let!(VectorDiff::Set { index: 0, value: redacted_event } = &diffs[1]); + let ev = redacted_event.raw().deserialize().unwrap(); + assert_let!( + AnySyncTimelineEvent::MessageLike(AnySyncMessageLikeEvent::RoomMessage(ev)) = ev + ); + // The event has been redacted! + assert_matches!(ev.as_original(), None); + } + + // That's all, folks! + assert!(subscriber.is_empty()); +}