Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(base) Implement RelationalLinkedChunk #4298

Merged
merged 9 commits into from
Nov 25, 2024
65 changes: 45 additions & 20 deletions crates/matrix-sdk-base/src/event_cache/store/memory_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ use std::{collections::HashMap, num::NonZeroUsize, sync::RwLock as StdRwLock, ti

use async_trait::async_trait;
use matrix_sdk_common::{
linked_chunk::Update, ring_buffer::RingBuffer,
linked_chunk::{relational::RelationalLinkedChunk, Update},
ring_buffer::RingBuffer,
store_locks::memory_store_helper::try_take_leased_lock,
};
use ruma::{MxcUri, OwnedMxcUri};
use ruma::{MxcUri, OwnedMxcUri, RoomId};

use super::{EventCacheStore, EventCacheStoreError, Result};
use crate::{
Expand All @@ -33,8 +34,14 @@ use crate::{
#[allow(clippy::type_complexity)]
#[derive(Debug)]
pub struct MemoryStore {
media: StdRwLock<RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>>,
leases: StdRwLock<HashMap<String, (String, Instant)>>,
inner: StdRwLock<MemoryStoreInner>,
}

#[derive(Debug)]
struct MemoryStoreInner {
media: RingBuffer<(OwnedMxcUri, String /* unique key */, Vec<u8>)>,
leases: HashMap<String, (String, Instant)>,
events: RelationalLinkedChunk<Event, Gap>,
}

// SAFETY: `new_unchecked` is safe because 20 is not zero.
Expand All @@ -43,8 +50,11 @@ const NUMBER_OF_MEDIAS: NonZeroUsize = unsafe { NonZeroUsize::new_unchecked(20)
impl Default for MemoryStore {
fn default() -> Self {
Self {
media: StdRwLock::new(RingBuffer::new(NUMBER_OF_MEDIAS)),
leases: Default::default(),
inner: StdRwLock::new(MemoryStoreInner {
media: RingBuffer::new(NUMBER_OF_MEDIAS),
leases: Default::default(),
events: RelationalLinkedChunk::new(),
}),
}
}
}
Expand All @@ -67,14 +77,20 @@ impl EventCacheStore for MemoryStore {
key: &str,
holder: &str,
) -> Result<bool, Self::Error> {
Ok(try_take_leased_lock(&self.leases, lease_duration_ms, key, holder))
let mut inner = self.inner.write().unwrap();

Ok(try_take_leased_lock(&mut inner.leases, lease_duration_ms, key, holder))
}

async fn handle_linked_chunk_updates(
&self,
_updates: &[Update<Event, Gap>],
room_id: &RoomId,
updates: Vec<Update<Event, Gap>>,
) -> Result<(), Self::Error> {
todo!()
let mut inner = self.inner.write().unwrap();
inner.events.apply_updates(room_id, updates);

Ok(())
}

async fn add_media_content(
Expand All @@ -84,8 +100,10 @@ impl EventCacheStore for MemoryStore {
) -> Result<()> {
// Avoid duplication. Let's try to remove it first.
self.remove_media_content(request).await?;

// Now, let's add it.
self.media.write().unwrap().push((request.uri().to_owned(), request.unique_key(), data));
let mut inner = self.inner.write().unwrap();
inner.media.push((request.uri().to_owned(), request.unique_key(), data));

Ok(())
}
Expand All @@ -97,8 +115,10 @@ impl EventCacheStore for MemoryStore {
) -> Result<(), Self::Error> {
let expected_key = from.unique_key();

let mut medias = self.media.write().unwrap();
if let Some((mxc, key, _)) = medias.iter_mut().find(|(_, key, _)| *key == expected_key) {
let mut inner = self.inner.write().unwrap();

if let Some((mxc, key, _)) = inner.media.iter_mut().find(|(_, key, _)| *key == expected_key)
{
*mxc = to.uri().to_owned();
*key = to.unique_key();
}
Expand All @@ -109,32 +129,37 @@ impl EventCacheStore for MemoryStore {
async fn get_media_content(&self, request: &MediaRequestParameters) -> Result<Option<Vec<u8>>> {
let expected_key = request.unique_key();

let media = self.media.read().unwrap();
Ok(media.iter().find_map(|(_media_uri, media_key, media_content)| {
let inner = self.inner.read().unwrap();

Ok(inner.media.iter().find_map(|(_media_uri, media_key, media_content)| {
(media_key == &expected_key).then(|| media_content.to_owned())
}))
}

async fn remove_media_content(&self, request: &MediaRequestParameters) -> Result<()> {
let expected_key = request.unique_key();

let mut media = self.media.write().unwrap();
let Some(index) = media
let mut inner = self.inner.write().unwrap();

let Some(index) = inner
.media
.iter()
.position(|(_media_uri, media_key, _media_content)| media_key == &expected_key)
else {
return Ok(());
};

media.remove(index);
inner.media.remove(index);

Ok(())
}

async fn remove_media_content_for_uri(&self, uri: &MxcUri) -> Result<()> {
let mut media = self.media.write().unwrap();
let mut inner = self.inner.write().unwrap();

let expected_key = uri.to_owned();
let positions = media
let positions = inner
.media
.iter()
.enumerate()
.filter_map(|(position, (media_uri, _media_key, _media_content))| {
Expand All @@ -144,7 +169,7 @@ impl EventCacheStore for MemoryStore {

// Iterate in reverse-order so that positions stay valid after first removals.
for position in positions.into_iter().rev() {
media.remove(position);
inner.media.remove(position);
}

Ok(())
Expand Down
10 changes: 6 additions & 4 deletions crates/matrix-sdk-base/src/event_cache/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::{fmt, sync::Arc};

use async_trait::async_trait;
use matrix_sdk_common::{linked_chunk::Update, AsyncTraitDeps};
use ruma::MxcUri;
use ruma::{MxcUri, RoomId};

use super::EventCacheStoreError;
use crate::{
Expand Down Expand Up @@ -45,7 +45,8 @@ pub trait EventCacheStore: AsyncTraitDeps {
/// in-memory. This method aims at forwarding this update inside this store.
async fn handle_linked_chunk_updates(
&self,
updates: &[Update<Event, Gap>],
room_id: &RoomId,
updates: Vec<Update<Event, Gap>>,
) -> Result<(), Self::Error>;

/// Add a media file's content in the media store.
Expand Down Expand Up @@ -144,9 +145,10 @@ impl<T: EventCacheStore> EventCacheStore for EraseEventCacheStoreError<T> {

async fn handle_linked_chunk_updates(
&self,
updates: &[Update<Event, Gap>],
room_id: &RoomId,
updates: Vec<Update<Event, Gap>>,
) -> Result<(), Self::Error> {
self.0.handle_linked_chunk_updates(updates).await.map_err(Into::into)
self.0.handle_linked_chunk_updates(room_id, updates).await.map_err(Into::into)
}

async fn add_media_content(
Expand Down
32 changes: 30 additions & 2 deletions crates/matrix-sdk-common/src/linked_chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ macro_rules! assert_items_eq {
}

mod as_vector;
pub mod relational;
mod updates;

use std::{
Expand Down Expand Up @@ -871,12 +872,12 @@ impl<const CAP: usize, Item, Gap> Drop for LinkedChunk<CAP, Item, Gap> {
}

/// A [`LinkedChunk`] can be safely sent over thread boundaries if `Item: Send`
/// and `Gap: Send`. The only unsafe part if around the `NonNull`, but the API
/// and `Gap: Send`. The only unsafe part is around the `NonNull`, but the API
/// and the lifetimes to deref them are designed safely.
unsafe impl<const CAP: usize, Item: Send, Gap: Send> Send for LinkedChunk<CAP, Item, Gap> {}

/// A [`LinkedChunk`] can be safely share between threads if `Item: Sync` and
/// `Gap: Sync`. The only unsafe part if around the `NonNull`, but the API and
/// `Gap: Sync`. The only unsafe part is around the `NonNull`, but the API and
/// the lifetimes to deref them are designed safely.
unsafe impl<const CAP: usize, Item: Sync, Gap: Sync> Sync for LinkedChunk<CAP, Item, Gap> {}

Expand Down Expand Up @@ -935,6 +936,18 @@ impl ChunkIdentifierGenerator {
#[repr(transparent)]
pub struct ChunkIdentifier(u64);

impl ChunkIdentifier {
/// Create a new [`ChunkIdentifier`].
pub(super) fn new(identifier: u64) -> Self {
Self(identifier)
}

/// Get the underlying identifier.
fn index(&self) -> u64 {
self.0
}
}

impl PartialEq<u64> for ChunkIdentifier {
fn eq(&self, other: &u64) -> bool {
self.0 == *other
Expand All @@ -948,6 +961,11 @@ impl PartialEq<u64> for ChunkIdentifier {
pub struct Position(ChunkIdentifier, usize);

impl Position {
/// Create a new [`Position`].
pub(super) fn new(chunk_identifier: ChunkIdentifier, index: usize) -> Self {
Self(chunk_identifier, index)
}

/// Get the chunk identifier of the item.
pub fn chunk_identifier(&self) -> ChunkIdentifier {
self.0
Expand All @@ -966,6 +984,16 @@ impl Position {
pub fn decrement_index(&mut self) {
self.1 = self.1.checked_sub(1).expect("Cannot decrement the index because it's already 0");
}

/// Increment the index part (see [`Self::index`]), i.e. add 1.
///
/// # Panic
///
/// This method will panic if it will overflow, i.e. if the index is larger
/// than `usize::MAX`.
pub fn increment_index(&mut self) {
self.1 = self.1.checked_add(1).expect("Cannot increment the index because it's too large");
}
}

/// An iterator over a [`LinkedChunk`] that traverses the chunk in backward
Expand Down
Loading
Loading