Skip to content

Commit

Permalink
feat(event cache): redact events in the database whenever they're red…
Browse files Browse the repository at this point in the history
…acted
  • Loading branch information
bnjbvr committed Jan 16, 2025
1 parent 6f780a4 commit 5038309
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 18 deletions.
7 changes: 3 additions & 4 deletions crates/matrix-sdk-base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ mod rooms;

pub mod read_receipts;
pub use read_receipts::PreviousEventsProvider;
pub use rooms::RoomMembersUpdate;
pub mod sliding_sync;

pub mod store;
Expand All @@ -56,9 +55,9 @@ pub use http;
pub use matrix_sdk_crypto as crypto;
pub use once_cell;
pub use rooms::{
Room, RoomCreateWithCreatorEventContent, RoomDisplayName, RoomHero, RoomInfo,
RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMember, RoomMemberships, RoomState,
RoomStateFilter,
apply_redaction, Room, RoomCreateWithCreatorEventContent, RoomDisplayName, RoomHero, RoomInfo,
RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons, RoomMember, RoomMembersUpdate,
RoomMemberships, RoomState, RoomStateFilter,
};
pub use store::{
ComposerDraft, ComposerDraftType, QueueWedgeError, StateChanges, StateStore, StateStoreDataKey,
Expand Down
2 changes: 1 addition & 1 deletion crates/matrix-sdk-base/src/rooms/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{
use bitflags::bitflags;
pub use members::RoomMember;
pub use normal::{
Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
apply_redaction, Room, RoomHero, RoomInfo, RoomInfoNotableUpdate, RoomInfoNotableUpdateReasons,
RoomMembersUpdate, RoomState, RoomStateFilter,
};
use regex::Regex;
Expand Down
6 changes: 4 additions & 2 deletions crates/matrix-sdk-base/src/rooms/normal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2018,7 +2018,9 @@ impl RoomInfo {
}
}

fn apply_redaction(
/// Apply a redaction to the given target `event`, given the raw redaction event
/// and the room version.
pub fn apply_redaction(
event: &Raw<AnySyncTimelineEvent>,
raw_redaction: &Raw<SyncRoomRedactionEvent>,
room_version: &RoomVersionId,
Expand All @@ -2044,7 +2046,7 @@ fn apply_redaction(
let redact_result = redact_in_place(&mut event_json, room_version, Some(redacted_because));

if let Err(e) = redact_result {
warn!("Failed to redact latest event: {e}");
warn!("Failed to redact event: {e}");
return None;
}

Expand Down
13 changes: 13 additions & 0 deletions crates/matrix-sdk-common/src/deserialized_responses.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,19 @@ impl SyncTimelineEvent {
self.kind.raw()
}

/// Replace the raw event included in this item by another one.
pub fn replace_raw(&mut self, replacement: Raw<AnyMessageLikeEvent>) {
match &mut self.kind {
TimelineEventKind::Decrypted(decrypted) => decrypted.event = replacement,
TimelineEventKind::UnableToDecrypt { event, .. }
| TimelineEventKind::PlainText { event } => {
// It's safe to cast `AnyMessageLikeEvent` into `AnySyncMessageLikeEvent`,
// because the former contains a superset of the fields included in the latter.
*event = replacement.cast();
}
}
}

/// If the event was a decrypted event that was successfully decrypted, get
/// its encryption info. Otherwise, `None`.
pub fn encryption_info(&self) -> Option<&EncryptionInfo> {
Expand Down
13 changes: 12 additions & 1 deletion crates/matrix-sdk/src/event_cache/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use ruma::{
AnySyncTimelineEvent,
},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId, RoomId,
EventId, OwnedEventId, OwnedRoomId, RoomId, RoomVersionId,
};
use tokio::sync::{
broadcast::{error::RecvError, Receiver},
Expand Down Expand Up @@ -622,10 +622,21 @@ impl EventCacheInner {
let room_state =
RoomEventCacheState::new(room_id.to_owned(), self.store.clone()).await?;

let room_version = self
.client
.get()
.and_then(|client| client.get_room(room_id))
.map(|room| room.clone_info().room_version_or_default())
.unwrap_or_else(|| {
warn!("unknown room version for {room_id}, using default V1");
RoomVersionId::V1
});

let room_event_cache = RoomEventCache::new(
self.client.clone(),
room_state,
room_id.to_owned(),
room_version,
self.all_events.clone(),
);

Expand Down
11 changes: 7 additions & 4 deletions crates/matrix-sdk/src/event_cache/pagination.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,8 @@ impl RoomPagination {
// (backward). The `RoomEvents` API expects the first event to be the oldest.
.rev()
.cloned()
.map(SyncTimelineEvent::from);
.map(SyncTimelineEvent::from)
.collect::<Vec<_>>();

let first_event_pos = room_events.events().next().map(|(item_pos, _)| item_pos);

Expand All @@ -190,20 +191,20 @@ impl RoomPagination {
// There is a prior gap, let's replace it by new events!
trace!("replaced gap with new events from backpagination");
room_events
.replace_gap_at(sync_events, gap_id)
.replace_gap_at(sync_events.clone(), gap_id)
.expect("gap_identifier is a valid chunk id we read previously")
} else if let Some(pos) = first_event_pos {
// No prior gap, but we had some events: assume we need to prepend events
// before those.
trace!("inserted events before the first known event");
let report = room_events
.insert_events_at(sync_events, pos)
.insert_events_at(sync_events.clone(), pos)
.expect("pos is a valid position we just read above");
(report, Some(pos))
} else {
// No prior gap, and no prior events: push the events.
trace!("pushing events received from back-pagination");
let report = room_events.push_events(sync_events);
let report = room_events.push_events(sync_events.clone());
// A new gap may be inserted before the new events, if there are any.
let next_pos = room_events.events().next().map(|(item_pos, _)| item_pos);
(report, next_pos)
Expand All @@ -228,6 +229,8 @@ impl RoomPagination {
debug!("not storing previous batch token, because we deduplicated all new back-paginated events");
}

room_events.on_new_events(&self.inner.room_version, sync_events.iter());

BackPaginationOutcome { events, reached_start }
})
.await?;
Expand Down
87 changes: 84 additions & 3 deletions crates/matrix-sdk/src/event_cache/room/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@
use std::cmp::Ordering;

use eyeball_im::VectorDiff;
use matrix_sdk_base::event_cache::store::DEFAULT_CHUNK_CAPACITY;
pub use matrix_sdk_base::event_cache::{Event, Gap};
use matrix_sdk_base::{apply_redaction, event_cache::store::DEFAULT_CHUNK_CAPACITY};
use matrix_sdk_common::linked_chunk::{
AsVector, Chunk, ChunkIdentifier, EmptyChunk, Error, Iter, IterBackward, LinkedChunk,
ObservableUpdates, Position,
};
use ruma::OwnedEventId;
use tracing::{debug, error, warn};
use ruma::{
events::{room::redaction::SyncRoomRedactionEvent, AnySyncTimelineEvent},
OwnedEventId, RoomVersionId,
};
use tracing::{debug, error, instrument, trace, warn};

use super::{
super::deduplicator::{Decoration, Deduplicator},
Expand Down Expand Up @@ -90,6 +93,84 @@ impl RoomEvents {
self.chunks.clear();
}

/// If the given event is a redaction, try to retrieve the to-be-redacted
/// event in the chunk, and replace it by the redacted form.
#[instrument(skip_all)]
fn maybe_apply_new_redaction(&mut self, room_version: &RoomVersionId, event: &Event) {
let Ok(AnySyncTimelineEvent::MessageLike(
ruma::events::AnySyncMessageLikeEvent::RoomRedaction(redaction),
)) = event.raw().deserialize()
else {
return;
};

let Some(event_id) = redaction.redacts(room_version) else {
warn!("missing target event id from the redaction event");
return;
};

// Replace the redacted event by a redacted form, if we knew about it.
let mut items = self.chunks.items();

if let Some((pos, target_event)) =
items.find(|(_, item)| item.event_id().as_deref() == Some(event_id))
{
// Don't redact already redacted events.
if let Ok(deserialized) = target_event.raw().deserialize() {
match deserialized {
AnySyncTimelineEvent::MessageLike(ev) => {
if ev.original_content().is_none() {
// Already redacted.
return;
}
}
AnySyncTimelineEvent::State(ev) => {
if ev.original_content().is_none() {
// Already redacted.
return;
}
}
}
}

if let Some(redacted_event) = apply_redaction(
target_event.raw(),
event.raw().cast_ref::<SyncRoomRedactionEvent>(),
room_version,
) {
let mut copy = target_event.clone();

// It's safe to cast `redacted_event` here:
// - either the event was an `AnyTimelineEvent` cast to `AnySyncTimelineEvent`
// when calling .raw(), so it's still one under the hood.
// - or it wasn't, and it's a plain `AnySyncTimelineEvent` in this case.
copy.replace_raw(redacted_event.cast());

// Get rid of the immutable borrow on self.chunks.
drop(items);

self.chunks
.replace_item_at(pos, copy)
.expect("should have been a valid position of an item");
}
} else {
trace!("redacted event is missing from the linked chunk");
}

// TODO: remove all related events too!
}

/// Callback to call whenever we touch events in the database.
pub fn on_new_events<'a>(
&mut self,
room_version: &RoomVersionId,
events: impl Iterator<Item = &'a Event>,
) {
for ev in events {
self.maybe_apply_new_redaction(room_version, ev);
}
}

/// Push events after all events or gaps.
///
/// The last event in `events` is the most recent one.
Expand Down
20 changes: 18 additions & 2 deletions crates/matrix-sdk/src/event_cache/room/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use matrix_sdk_base::{
use ruma::{
events::{relation::RelationType, AnyRoomAccountDataEvent, AnySyncEphemeralRoomEvent},
serde::Raw,
EventId, OwnedEventId, OwnedRoomId,
EventId, OwnedEventId, OwnedRoomId, RoomVersionId,
};
use tokio::sync::{
broadcast::{Receiver, Sender},
Expand Down Expand Up @@ -61,9 +61,18 @@ impl RoomEventCache {
client: WeakClient,
state: RoomEventCacheState,
room_id: OwnedRoomId,
room_version: RoomVersionId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
Self { inner: Arc::new(RoomEventCacheInner::new(client, state, room_id, all_events_cache)) }
Self {
inner: Arc::new(RoomEventCacheInner::new(
client,
state,
room_id,
room_version,
all_events_cache,
)),
}
}

/// Subscribe to room updates for this room, after getting the initial list
Expand Down Expand Up @@ -189,6 +198,9 @@ pub(super) struct RoomEventCacheInner {
/// The room id for this room.
room_id: OwnedRoomId,

/// The room version for this room.
pub(crate) room_version: RoomVersionId,

/// Sender part for subscribers to this room.
pub sender: Sender<RoomEventCacheUpdate>,

Expand Down Expand Up @@ -222,12 +234,14 @@ impl RoomEventCacheInner {
client: WeakClient,
state: RoomEventCacheState,
room_id: OwnedRoomId,
room_version: RoomVersionId,
all_events_cache: Arc<RwLock<AllEventsCache>>,
) -> Self {
let sender = Sender::new(32);
let weak_room = WeakRoom::new(client, room_id);
Self {
room_id: weak_room.room_id().to_owned(),
room_version,
state: RwLock::new(state),
all_events: all_events_cache,
sender,
Expand Down Expand Up @@ -444,6 +458,8 @@ impl RoomEventCacheInner {
.expect("we obtained the valid position beforehand");
}
}

room_events.on_new_events(&self.room_version, sync_timeline_events.iter());
})
.await?;

Expand Down
Loading

0 comments on commit 5038309

Please sign in to comment.