diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs similarity index 86% rename from crates/matrix-sdk/src/event_cache/linked_chunk.rs rename to crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs index 87bd4047559..71a35d53d1f 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs @@ -14,6 +14,8 @@ #![allow(dead_code)] +mod updates; + use std::{ fmt, marker::PhantomData, @@ -22,9 +24,11 @@ use std::{ sync::atomic::{AtomicU64, Ordering}, }; +use updates::*; + /// Errors of [`LinkedChunk`]. #[derive(thiserror::Error, Debug)] -pub enum LinkedChunkError { +pub enum Error { #[error("The chunk identifier is invalid: `{identifier:?}`")] InvalidChunkIdentifier { identifier: ChunkIdentifier }, @@ -38,64 +42,18 @@ pub enum LinkedChunkError { InvalidItemIndex { index: usize }, } -/// Represent the updates that have happened inside a [`LinkedChunk`]. -/// -/// To retrieve the updates, use [`LinkedChunk::updates`]. -/// -/// These updates are useful to store a `LinkedChunk` in another form of -/// storage, like a database or something similar. -#[derive(Debug, PartialEq)] -pub enum LinkedChunkUpdate { - /// A new chunk of kind Items has been created. - NewItemsChunk { - /// The identifier of the previous chunk of this new chunk. - previous: Option, - /// The identifier of the new chunk. - new: ChunkIdentifier, - /// The identifier of the next chunk of this new chunk. - next: Option, - }, - /// A new chunk of kind Gap has been created. - NewGapChunk { - /// The identifier of the previous chunk of this new chunk. - previous: Option, - /// The identifier of the new chunk. - new: ChunkIdentifier, - /// The identifier of the next chunk of this new chunk. - next: Option, - /// The content of the chunk. - gap: Gap, - }, - /// A chunk has been removed. - RemoveChunk(ChunkIdentifier), - /// Items are inserted inside a chunk of kind Items. - InsertItems { - /// [`Position`] of the items. - at: Position, - /// The items. - items: Vec, - }, - /// A chunk of kind Items has been truncated. - TruncateItems { - /// The identifier of the chunk. - chunk: ChunkIdentifier, - /// The new length of the chunk. - length: usize, - }, -} - /// Links of a `LinkedChunk`, i.e. the first and last [`Chunk`]. /// /// This type was introduced to avoid borrow checking errors when mutably /// referencing a subset of fields of a `LinkedChunk`. -struct LinkedChunkEnds { +struct Ends { /// The first chunk. first: NonNull>, /// The last chunk. last: Option>>, } -impl LinkedChunkEnds { +impl Ends { /// Get the first chunk, as an immutable reference. fn first_chunk(&self) -> &Chunk { unsafe { self.first.as_ref() } @@ -146,15 +104,19 @@ impl LinkedChunkEnds { /// entirely full. A chunk can represents a `Gap` between other chunks. pub struct LinkedChunk { /// The links to the chunks, i.e. the first and the last chunk. - links: LinkedChunkEnds, + links: Ends, + /// The number of items hold by this linked chunk. length: usize, + /// The generator of chunk identifiers. chunk_identifier_generator: ChunkIdentifierGenerator, + /// All updates that have been made on this `LinkedChunk`. If this field is /// `Some(…)`, update history is enabled, otherwise, if it's `None`, update /// history is disabled. - update_history: Option>>, + updates: Option>, + /// Marker. marker: PhantomData>>, } @@ -163,29 +125,32 @@ impl LinkedChunk { /// Create a new [`Self`]. pub fn new() -> Self { Self { - links: LinkedChunkEnds { + links: Ends { // INVARIANT: The first chunk must always be an Items, not a Gap. first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER), last: None, }, length: 0, chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(), - update_history: None, + updates: None, marker: PhantomData, } } /// Create a new [`Self`] with a history of updates. + /// + /// When [`Self`] is built with update history, the [`Updates::take`] method + /// must be called to consume and clean the updates. See [`Self::updates`]. pub fn new_with_update_history() -> Self { Self { - links: LinkedChunkEnds { + links: Ends { // INVARIANT: The first chunk must always be an Items, not a Gap. first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER), last: None, }, length: 0, chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(), - update_history: Some(Vec::new()), + updates: Some(Updates::new()), marker: PhantomData, } } @@ -213,11 +178,8 @@ impl LinkedChunk { let last_chunk = self.links.latest_chunk_mut(); // Push the items. - let last_chunk = last_chunk.push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ); + let last_chunk = + last_chunk.push_items(items, &self.chunk_identifier_generator, &mut self.updates); debug_assert!(last_chunk.is_last_chunk(), "`last_chunk` must be… the last chunk"); @@ -242,7 +204,7 @@ impl LinkedChunk { let last_chunk = self.links.latest_chunk_mut(); last_chunk.insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ); self.links.last = last_chunk.next; @@ -252,11 +214,7 @@ impl LinkedChunk { /// /// Because the `position` can be invalid, this method returns a /// `Result`. - pub fn insert_items_at( - &mut self, - items: I, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_items_at(&mut self, items: I, position: Position) -> Result<(), Error> where Item: Clone, Gap: Clone, @@ -269,18 +227,18 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; let (chunk, number_of_items) = match &mut chunk.content { ChunkContent::Gap(..) => { - return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier }) + return Err(Error::ChunkIsAGap { identifier: chunk_identifier }) } ChunkContent::Items(current_items) => { let current_items_length = current_items.len(); if item_index > current_items_length { - return Err(LinkedChunkError::InvalidItemIndex { index: item_index }); + return Err(Error::InvalidItemIndex { index: item_index }); } // Prepare the items to be pushed. @@ -292,16 +250,12 @@ impl LinkedChunk { if item_index == current_items_length { chunk // Push the new items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ) + .push_items(items, &self.chunk_identifier_generator, &mut self.updates) } // Insert inside the current items. else { - if let Some(updates) = self.update_history.as_mut() { - updates.push(LinkedChunkUpdate::TruncateItems { + if let Some(updates) = self.updates.as_mut() { + updates.push(Update::TruncateItems { chunk: chunk_identifier, length: item_index, }); @@ -312,16 +266,12 @@ impl LinkedChunk { chunk // Push the new items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ) + .push_items(items, &self.chunk_identifier_generator, &mut self.updates) // Finally, push the items that have been detached. .push_items( detached_items.into_iter(), &self.chunk_identifier_generator, - &mut self.update_history, + &mut self.updates, ) }, number_of_items, @@ -346,11 +296,7 @@ impl LinkedChunk { /// /// Because the `position` can be invalid, this method returns a /// `Result`. - pub fn insert_gap_at( - &mut self, - content: Gap, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_gap_at(&mut self, content: Gap, position: Position) -> Result<(), Error> where Item: Clone, Gap: Clone, @@ -361,7 +307,7 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; // If `item_index` is 0, we don't want to split the current items chunk to // insert a new gap chunk, otherwise it would create an empty current items @@ -378,7 +324,7 @@ impl LinkedChunk { previous_chunk.insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ); // We don't need to update `self.last` because we have inserted a new chunk @@ -389,18 +335,18 @@ impl LinkedChunk { let chunk = match &mut chunk.content { ChunkContent::Gap(..) => { - return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier }); + return Err(Error::ChunkIsAGap { identifier: chunk_identifier }); } ChunkContent::Items(current_items) => { let current_items_length = current_items.len(); if item_index >= current_items_length { - return Err(LinkedChunkError::InvalidItemIndex { index: item_index }); + return Err(Error::InvalidItemIndex { index: item_index }); } - if let Some(updates) = self.update_history.as_mut() { - updates.push(LinkedChunkUpdate::TruncateItems { + if let Some(updates) = self.updates.as_mut() { + updates.push(Update::TruncateItems { chunk: chunk_identifier, length: item_index, }); @@ -413,18 +359,18 @@ impl LinkedChunk { // Insert a new gap chunk. .insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ) // Insert a new items chunk. .insert_next( Chunk::new_items_leaked(self.chunk_identifier_generator.next()), - &mut self.update_history, + &mut self.updates, ) // Finally, push the items that have been detached. .push_items( detached_items.into_iter(), &self.chunk_identifier_generator, - &mut self.update_history, + &mut self.updates, ) } }; @@ -451,7 +397,7 @@ impl LinkedChunk { &mut self, items: I, chunk_identifier: ChunkIdentifier, - ) -> Result<&Chunk, LinkedChunkError> + ) -> Result<&Chunk, Error> where Item: Clone, Gap: Clone, @@ -465,7 +411,7 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; debug_assert!(chunk.is_first_chunk().not(), "A gap cannot be the first chunk"); @@ -478,14 +424,10 @@ impl LinkedChunk { // Insert a new items chunk… .insert_next( Chunk::new_items_leaked(self.chunk_identifier_generator.next()), - &mut self.update_history, + &mut self.updates, ) // … and insert the items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ); + .push_items(items, &self.chunk_identifier_generator, &mut self.updates); ( last_inserted_chunk.is_last_chunk().then(|| last_inserted_chunk.as_ptr()), @@ -493,7 +435,7 @@ impl LinkedChunk { ) } ChunkContent::Items(..) => { - return Err(LinkedChunkError::ChunkIsItems { identifier: chunk_identifier }) + return Err(Error::ChunkIsItems { identifier: chunk_identifier }) } }; @@ -503,7 +445,7 @@ impl LinkedChunk { .unwrap(); // Now that new items have been pushed, we can unlink the gap chunk. - chunk.unlink(&mut self.update_history); + chunk.unlink(&mut self.updates); // Get the pointer to `chunk`. chunk_ptr = chunk.as_ptr(); @@ -551,15 +493,15 @@ impl LinkedChunk { /// Iterate over the chunks, backwards. /// /// It iterates from the last to the first chunk. - pub fn rchunks(&self) -> LinkedChunkIterBackward<'_, CAP, Item, Gap> { - LinkedChunkIterBackward::new(self.links.latest_chunk()) + pub fn rchunks(&self) -> IterBackward<'_, CAP, Item, Gap> { + IterBackward::new(self.links.latest_chunk()) } /// Iterate over the chunks, forward. /// /// It iterates from the first to the last chunk. - pub fn chunks(&self) -> LinkedChunkIter<'_, CAP, Item, Gap> { - LinkedChunkIter::new(self.links.first_chunk()) + pub fn chunks(&self) -> Iter<'_, CAP, Item, Gap> { + Iter::new(self.links.first_chunk()) } /// Iterate over the chunks, starting from `identifier`, backward. @@ -569,11 +511,9 @@ impl LinkedChunk { pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> { - Ok(LinkedChunkIterBackward::new( - self.links - .chunk(identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?, + ) -> Result, Error> { + Ok(IterBackward::new( + self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?, )) } @@ -584,11 +524,9 @@ impl LinkedChunk { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> { - Ok(LinkedChunkIter::new( - self.links - .chunk(identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?, + ) -> Result, Error> { + Ok(Iter::new( + self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?, )) } @@ -616,7 +554,7 @@ impl LinkedChunk { pub fn ritems_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { Ok(self .rchunks_from(position.chunk_identifier())? .filter_map(|chunk| match &chunk.content { @@ -647,7 +585,7 @@ impl LinkedChunk { pub fn items_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { Ok(self .chunks_from(position.chunk_identifier())? .filter_map(|chunk| match &chunk.content { @@ -666,15 +604,15 @@ impl LinkedChunk { .skip(position.index())) } - /// Get a mutable reference to the `LinkedChunk` updates. + /// Get a mutable reference to the `LinkedChunk` updates, aka [`Updates`]. /// - /// The caller is responsible to drain/empty these updates. + /// The caller is responsible to clear these updates. /// /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use - /// `Option::take()` directly but rather `Vec::drain` for example. - pub fn updates(&mut self) -> Option<&mut Vec>> { - self.update_history.as_mut() + /// `Option::take()` directly but rather [`Updates::take`] for example. + pub fn updates(&mut self) -> Option<&mut Updates> { + self.updates.as_mut() } } @@ -789,18 +727,18 @@ impl Position { /// An iterator over a [`LinkedChunk`] that traverses the chunk in backward /// direction (i.e. it calls `previous` on each chunk to make progress). -pub struct LinkedChunkIterBackward<'a, const CAP: usize, Item, Gap> { +pub struct IterBackward<'a, const CAP: usize, Item, Gap> { chunk: Option<&'a Chunk>, } -impl<'a, const CAP: usize, Item, Gap> LinkedChunkIterBackward<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> IterBackward<'a, CAP, Item, Gap> { /// Create a new [`LinkedChunkIter`] from a particular [`Chunk`]. fn new(from_chunk: &'a Chunk) -> Self { Self { chunk: Some(from_chunk) } } } -impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iterator for IterBackward<'a, CAP, Item, Gap> { type Item = &'a Chunk; fn next(&mut self) -> Option { @@ -814,18 +752,18 @@ impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, C /// An iterator over a [`LinkedChunk`] that traverses the chunk in forward /// direction (i.e. it calls `next` on each chunk to make progress). -pub struct LinkedChunkIter<'a, const CAP: usize, Item, Gap> { +pub struct Iter<'a, const CAP: usize, Item, Gap> { chunk: Option<&'a Chunk>, } -impl<'a, const CAP: usize, Item, Gap> LinkedChunkIter<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iter<'a, CAP, Item, Gap> { /// Create a new [`LinkedChunkIter`] from a particular [`Chunk`]. fn new(from_chunk: &'a Chunk) -> Self { Self { chunk: Some(from_chunk) } } } -impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIter<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iterator for Iter<'a, CAP, Item, Gap> { type Item = &'a Chunk; fn next(&mut self) -> Option { @@ -973,7 +911,7 @@ impl Chunk { &mut self, mut new_items: I, chunk_identifier_generator: &ChunkIdentifierGenerator, - updates: &mut Option>>, + updates: &mut Option>, ) -> &mut Self where I: Iterator + ExactSizeIterator, @@ -1012,7 +950,7 @@ impl Chunk { items.extend(new_items); if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::InsertItems { + updates.push(Update::InsertItems { at: Position(identifier, start), items: items[start..].to_vec(), }); @@ -1027,7 +965,7 @@ impl Chunk { items.extend(new_items.by_ref().take(free_space)); if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::InsertItems { + updates.push(Update::InsertItems { at: Position(identifier, start), items: items[start..].to_vec(), }); @@ -1055,7 +993,7 @@ impl Chunk { fn insert_next( &mut self, mut new_chunk_ptr: NonNull, - updates: &mut Option>>, + updates: &mut Option>, ) -> &mut Self where Gap: Clone, @@ -1082,15 +1020,12 @@ impl Chunk { let next = new_chunk.next().map(Chunk::identifier); match new_chunk.content() { - ChunkContent::Gap(gap) => updates.push(LinkedChunkUpdate::NewGapChunk { - previous, - new, - next, - gap: gap.clone(), - }), + ChunkContent::Gap(gap) => { + updates.push(Update::NewGapChunk { previous, new, next, gap: gap.clone() }) + } ChunkContent::Items(..) => { - updates.push(LinkedChunkUpdate::NewItemsChunk { previous, new, next }) + updates.push(Update::NewItemsChunk { previous, new, next }) } } } @@ -1102,7 +1037,7 @@ impl Chunk { /// /// Be careful: `self` won't belong to `LinkedChunk` anymore, and should be /// dropped appropriately. - fn unlink(&mut self, updates: &mut Option>>) { + fn unlink(&mut self, updates: &mut Option>) { let previous_ptr = self.previous; let next_ptr = self.next; @@ -1115,7 +1050,7 @@ impl Chunk { } if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::RemoveChunk(self.identifier())); + updates.push(Update::RemoveChunk(self.identifier())); } } @@ -1178,8 +1113,8 @@ mod tests { use assert_matches::assert_matches; use super::{ - Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk, - LinkedChunkError, Position, + Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Error, LinkedChunk, + Position, }; /// A macro to test the items and the gap of a `LinkedChunk`. @@ -1283,35 +1218,35 @@ mod tests { } #[test] - fn test_update_history() { + fn test_updates() { assert!(LinkedChunk::<3, char, ()>::new().updates().is_none()); assert!(LinkedChunk::<3, char, ()>::new_with_update_history().updates().is_some()); } #[test] fn test_push_items() { - use super::LinkedChunkUpdate::*; + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a']); assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_items_back(['b', 'c']); assert_items_eq!(linked_chunk, ['a', 'b', 'c']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b', 'c'] }] ); linked_chunk.push_items_back(['d', 'e']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(0)), @@ -1325,7 +1260,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i', 'j']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(1), 2), items: vec!['f'] }, NewItemsChunk { @@ -1348,20 +1283,20 @@ mod tests { #[test] fn test_push_gap() { - use super::LinkedChunkUpdate::*; + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a']); assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_gap_back(()); assert_items_eq!(linked_chunk, ['a'] [-]); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(0)), new: ChunkIdentifier(1), @@ -1373,7 +1308,7 @@ mod tests { linked_chunk.push_items_back(['b', 'c', 'd', 'e']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -1394,7 +1329,7 @@ mod tests { linked_chunk.push_gap_back(()); // why not assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-]); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewGapChunk { previous: Some(ChunkIdentifier(3)), @@ -1414,7 +1349,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-] ['f', 'g', 'h'] ['i']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), @@ -1518,7 +1453,7 @@ mod tests { } #[test] - fn test_rchunks_from() -> Result<(), LinkedChunkError> { + fn test_rchunks_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -1550,7 +1485,7 @@ mod tests { } #[test] - fn test_chunks_from() -> Result<(), LinkedChunkError> { + fn test_chunks_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -1612,7 +1547,7 @@ mod tests { } #[test] - fn test_ritems_from() -> Result<(), LinkedChunkError> { + fn test_ritems_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -1630,7 +1565,7 @@ mod tests { } #[test] - fn test_items_from() -> Result<(), LinkedChunkError> { + fn test_items_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -1648,14 +1583,14 @@ mod tests { } #[test] - fn test_insert_items_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_insert_items_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1681,7 +1616,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 10); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(1), length: 1 }, InsertItems { at: Position(ChunkIdentifier(1), 1), items: vec!['w', 'x'] }, @@ -1713,7 +1648,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 14); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['l', 'm', 'n'] }, @@ -1745,7 +1680,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 16); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(5), length: 0 }, InsertItems { at: Position(ChunkIdentifier(5), 0), items: vec!['r', 's'] }, @@ -1766,7 +1701,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(3), 1), items: vec!['p', 'q'] },] ); assert_eq!(linked_chunk.len(), 18); @@ -1776,18 +1711,18 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), - Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) + Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), - Err(LinkedChunkError::InvalidItemIndex { index: 128 }) + Err(Error::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a gap. @@ -1799,7 +1734,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1810,7 +1745,7 @@ mod tests { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(6), 0)), - Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(6) }) + Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(6) }) ); } @@ -1820,14 +1755,14 @@ mod tests { } #[test] - fn test_insert_gap_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_insert_gap_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1846,7 +1781,7 @@ mod tests { assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 1 }, NewGapChunk { @@ -1873,7 +1808,7 @@ mod tests { // A new empty chunk is created as the first chunk. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, NewGapChunk { @@ -1902,7 +1837,7 @@ mod tests { // space. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1917,9 +1852,9 @@ mod tests { let position_of_first_empty_chunk = Position(ChunkIdentifier(0), 0); assert_matches!( linked_chunk.insert_gap_at((), position_of_first_empty_chunk), - Err(LinkedChunkError::InvalidItemIndex { index: 0 }) + Err(Error::InvalidItemIndex { index: 0 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an empty chunk. @@ -1930,7 +1865,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(6)), @@ -1945,7 +1880,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(8), @@ -1959,18 +1894,18 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), - Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) + Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), - Err(LinkedChunkError::InvalidItemIndex { index: 128 }) + Err(Error::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an existing gap. @@ -1980,9 +1915,9 @@ mod tests { let position_of_a_gap = Position(ChunkIdentifier(4), 0); assert_matches!( linked_chunk.insert_gap_at((), position_of_a_gap), - Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) }) + Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(4) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } assert_eq!(linked_chunk.len(), 6); @@ -1991,8 +1926,8 @@ mod tests { } #[test] - fn test_replace_gap_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_replace_gap_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b']); @@ -2000,7 +1935,7 @@ mod tests { linked_chunk.push_items_back(['l', 'm']); assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['l', 'm']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b'] }, NewGapChunk { @@ -2031,7 +1966,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -2058,7 +1993,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(2)), new: ChunkIdentifier(5), @@ -2077,7 +2012,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ['w', 'x', 'y'] ['z'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs new file mode 100644 index 00000000000..2112a9edfcf --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs @@ -0,0 +1,644 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, RwLock, Weak}, + task::{Context, Poll, Waker}, +}; + +use futures_core::Stream; + +use super::{ChunkIdentifier, Position}; + +/// Represent the updates that have happened inside a [`LinkedChunk`]. +/// +/// To retrieve the updates, use [`LinkedChunk::updates`]. +/// +/// These updates are useful to store a `LinkedChunk` in another form of +/// storage, like a database or something similar. +#[derive(Debug, PartialEq)] +pub enum Update { + /// A new chunk of kind Items has been created. + NewItemsChunk { + /// The identifier of the previous chunk of this new chunk. + previous: Option, + + /// The identifier of the new chunk. + new: ChunkIdentifier, + + /// The identifier of the next chunk of this new chunk. + next: Option, + }, + + /// A new chunk of kind Gap has been created. + NewGapChunk { + /// The identifier of the previous chunk of this new chunk. + previous: Option, + + /// The identifier of the new chunk. + new: ChunkIdentifier, + + /// The identifier of the next chunk of this new chunk. + next: Option, + + /// The content of the chunk. + gap: Gap, + }, + + /// A chunk has been removed. + RemoveChunk(ChunkIdentifier), + + /// Items are inserted inside a chunk of kind Items. + InsertItems { + /// [`Position`] of the items. + at: Position, + + /// The items. + items: Vec, + }, + + /// A chunk of kind Items has been truncated. + TruncateItems { + /// The identifier of the chunk. + chunk: ChunkIdentifier, + + /// The new length of the chunk. + length: usize, + }, +} + +impl Clone for Update +where + Item: Clone, + Gap: Clone, +{ + fn clone(&self) -> Self { + match self { + Self::NewItemsChunk { previous, new, next } => { + Self::NewItemsChunk { previous: *previous, new: *new, next: *next } + } + Self::NewGapChunk { previous, new, next, gap } => { + Self::NewGapChunk { previous: *previous, new: *new, next: *next, gap: gap.clone() } + } + Self::RemoveChunk(identifier) => Self::RemoveChunk(*identifier), + Self::InsertItems { at, items } => Self::InsertItems { at: *at, items: items.clone() }, + Self::TruncateItems { chunk, length } => { + Self::TruncateItems { chunk: *chunk, length: *length } + } + } + } +} + +/// A collection of [`Update`]. +/// +/// Get a value for this type with [`LinkedChunk::updates`]. +pub struct Updates { + inner: Arc>>, +} + +/// A token used to represent readers that read the updates in +/// [`UpdatesInner`]. +type ReaderToken = usize; + +/// Inner type for [`Updates`]. +/// +/// The particularity of this type is that multiple readers can read the +/// updates. A reader has a [`ReaderToken`]. The public API (i.e. +/// [`Updates`]) is considered to be the _main reader_ (it has the token +/// [`Self::MAIN_READER_TOKEN`]). +/// +/// An update that have been read by all readers are garbage collected to be +/// removed from the memory. An update will never be read twice by the same +/// reader. +/// +/// Why do we need multiple readers? The public API reads the updates with +/// [`Updates::take`], but the private API must also read the updates for +/// example with [`UpdatesSubscriber`]. Of course, they can be multiple +/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple +/// readers. +struct UpdatesInner { + /// All the updates that have not been read by all readers. + updates: Vec>, + + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. + /// + /// To each reader token is associated an index that represents the index of + /// the last reading. It is used to never return the same update twice. + last_index_per_reader: HashMap, + + /// The last generated token. This is useful to generate new token. + last_token: ReaderToken, + + /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed + /// everytime it is called. + wakers: Vec, +} + +impl UpdatesInner { + /// The token used by the main reader. See [`Self::take`] to learn more. + const MAIN_READER_TOKEN: ReaderToken = 0; + + /// Create a new [`Self`]. + fn new() -> Self { + Self { + updates: Vec::with_capacity(8), + last_index_per_reader: { + let mut map = HashMap::with_capacity(2); + map.insert(Self::MAIN_READER_TOKEN, 0); + + map + }, + last_token: Self::MAIN_READER_TOKEN, + wakers: Vec::with_capacity(2), + } + } + + /// Push a new update. + fn push(&mut self, update: Update) { + self.updates.push(update); + + // Wake them up \o/. + for waker in self.wakers.drain(..) { + waker.wake(); + } + } + + /// Take new updates; it considers the caller is the main reader, i.e. it + /// will use the [`Self::MAIN_READER_TOKEN`]. + /// + /// Updates that have been read will never be read again by the current + /// reader. + /// + /// Learn more by reading [`Self::take_with_token`]. + fn take(&mut self) -> &[Update] { + self.take_with_token(Self::MAIN_READER_TOKEN) + } + + /// Take new updates with a particular reader token. + /// + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. Every reader can + /// take/read/consume each update only once. An internal index is stored + /// per reader token to know where to start reading updates next time this + /// method is called. + fn take_with_token(&mut self, token: ReaderToken) -> &[Update] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + let index = self + .last_index_per_reader + .get_mut(&token) + .expect("Given `UpdatesToken` does not map to any index"); + + // Read new updates, and update the index. + let slice = &self.updates[*index..]; + *index = self.updates.len(); + + slice + } + + /// Return the number of updates in the buffer. + fn len(&self) -> usize { + self.updates.len() + } + + /// Garbage collect unused updates. An update is considered unused when it's + /// been read by all readers. + /// + /// Basically, it reduces to finding the smallest last index for all + /// readers, and clear from 0 to that index. + fn garbage_collect(&mut self) { + let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0); + + if min_index > 0 { + let _ = self.updates.drain(0..min_index); + + // Let's shift the indices to the left by `min_index` to preserve them. + for index in self.last_index_per_reader.values_mut() { + *index -= min_index; + } + } + } +} + +impl Updates { + /// Create a new [`Self`]. + pub(super) fn new() -> Self { + Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) } + } + + /// Push a new update. + pub(super) fn push(&mut self, update: Update) { + self.inner.write().unwrap().push(update); + } + + /// Take new updates. + /// + /// Updates that have been taken will not be read again. + pub(super) fn take(&mut self) -> Vec> + where + Item: Clone, + Gap: Clone, + { + self.inner.write().unwrap().take().to_owned() + } + + /// Subscribe to updates by using a [`Stream`]. + fn subscribe(&mut self) -> UpdatesSubscriber { + // A subscriber is a new update reader, it needs its own token. + let token = { + let mut inner = self.inner.write().unwrap(); + inner.last_token += 1; + + let last_token = inner.last_token; + inner.last_index_per_reader.insert(last_token, 0); + + last_token + }; + + UpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } + } +} + +/// A subscriber to [`Updates`]. It is helpful to receive updates via a +/// [`Stream`]. +struct UpdatesSubscriber { + /// Weak reference to [`UpdatesInner`]. + /// + /// Using a weak reference allows [`Updates`] to be dropped + /// freely even if a subscriber exists. + updates: Weak>>, + + /// The token to read the updates. + token: ReaderToken, +} + +impl Stream for UpdatesSubscriber +where + Item: Clone, + Gap: Clone, +{ + type Item = Vec>; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let Some(updates) = self.updates.upgrade() else { + // The `Updates` has been dropped. It's time to close this stream. + return Poll::Ready(None); + }; + + let mut updates = updates.write().unwrap(); + let the_updates = updates.take_with_token(self.token); + + // No updates. + if the_updates.is_empty() { + // Let's register the waker. + updates.wakers.push(context.waker().clone()); + + // The stream is pending. + return Poll::Pending; + } + + // There is updates! Let's forward them in this stream. + Poll::Ready(Some(the_updates.to_owned())) + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{Arc, Mutex}, + task::{Context, Poll, Wake}, + }; + + use assert_matches::assert_matches; + use futures_util::pin_mut; + + use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner}; + + #[test] + fn test_updates_take_and_garbage_collector() { + use super::Update::*; + + let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); + + // Simulate another updates “reader”, it can a subscriber. + let main_token = UpdatesInner::::MAIN_READER_TOKEN; + let other_token = { + let updates = linked_chunk.updates().unwrap(); + let mut inner = updates.inner.write().unwrap(); + inner.last_token += 1; + + let other_token = inner.last_token; + inner.last_index_per_reader.insert(other_token, 0); + + other_token + }; + + // There is no new update yet. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + } + + linked_chunk.push_items_back(['a']); + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // Scenario 1: “main” takes the new updates, “other” doesn't take the new + // updates. + // + // 0 1 2 3 + // +---+---+---+ + // | a | b | c | + // +---+---+---+ + // + // “main” will move its index from 0 to 3. + // “other” won't move its index. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.inner.read().unwrap().len(), 3); + } + + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // It must be the same number as before as the garbage collector weren't not + // able to remove any unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&0)); + } + } + + linked_chunk.push_items_back(['d']); + linked_chunk.push_items_back(['e']); + linked_chunk.push_items_back(['f']); + + // Scenario 2: “other“ takes the new updates, “main” doesn't take the + // new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | a | b | c | d | e | f | + // +---+---+---+---+---+---+ + // + // “main” won't move its index. + // “other” will move its index from 0 to 6. + { + let updates = linked_chunk.updates().unwrap(); + + assert_eq!( + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // It must be the same number as before as the garbage collector will be able to + // remove unused updates but at the next call… + assert_eq!(inner.len(), 6); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&6)); + } + } + + // Scenario 3: “other” take new updates, but there is none, “main” + // doesn't take new updates. The garbage collector will run and collect + // unused updates. + // + // 0 1 2 3 + // +---+---+---+ + // | d | e | f | + // +---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector has removed unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. They must have been adjusted. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&3)); + } + } + + linked_chunk.push_items_back(['g']); + linked_chunk.push_items_back(['h']); + linked_chunk.push_items_back(['i']); + + // Scenario 4: both “main” and “other” take the new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | d | e | f | g | h | i | + // +---+---+---+---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); + + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] + ); + assert_eq!( + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector had a chance to collect the first 3 updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&3)); + } + } + + // Scenario 5: no more updates but they both try to take new updates. + // The garbage collector will collect all updates as all of them as + // been read already. + // + // “main” will have its index updated from 0 to 0. + // “other” will have its index updated from 3 to 0. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector had a chance to collect all updates. + assert_eq!(inner.len(), 0); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&0)); + } + } + } + + #[test] + fn test_updates_stream() { + use super::Update::*; + + struct CounterWaker { + number_of_wakeup: Mutex, + } + + impl Wake for CounterWaker { + fn wake(self: Arc) { + *self.number_of_wakeup.lock().unwrap() += 1; + } + } + + let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); + let waker = counter_waker.clone().into(); + let mut context = Context::from_waker(&waker); + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + let updates_subscriber = linked_chunk.updates().unwrap().subscribe(); + pin_mut!(updates_subscriber); + + // No update, stream is pending. + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0); + + // Let's generate an update. + linked_chunk.push_items_back(['a']); + + // The waker must have been called. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1); + + // There is an update! Right after that, the stream is pending again. + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // Let's generate two other updates. + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // The waker must have been called only once for the two updates. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + + // We can consume the updates without the stream, but the stream continues to + // know it has updates. + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[ + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // When dropping the `LinkedChunk`, it closes the stream. + drop(linked_chunk); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None)); + + // Wakers calls have not changed. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + } +} diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index 842207354fe..fce7c29709c 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -17,8 +17,7 @@ use std::{fmt, iter::once}; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; use super::linked_chunk::{ - Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, - LinkedChunkIterBackward, Position, + Chunk, ChunkIdentifier, Error, Iter, IterBackward, LinkedChunk, Position, }; /// A newtype wrapper for a pagination token returned by a /messages response. @@ -82,11 +81,7 @@ impl RoomEvents { } /// Insert events at a specified position. - pub fn insert_events_at( - &mut self, - events: I, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_events_at(&mut self, events: I, position: Position) -> Result<(), Error> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -95,7 +90,7 @@ impl RoomEvents { } /// Insert a gap at a specified position. - pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> { + pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), Error> { self.chunks.insert_gap_at(gap, position) } @@ -110,7 +105,7 @@ impl RoomEvents { &mut self, events: I, gap_identifier: ChunkIdentifier, - ) -> Result<&Chunk, LinkedChunkError> + ) -> Result<&Chunk, Error> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -137,16 +132,14 @@ impl RoomEvents { /// Iterate over the chunks, backward. /// /// The most recent chunk comes first. - pub fn rchunks( - &self, - ) -> LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { + pub fn rchunks(&self) -> IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { self.chunks.rchunks() } /// Iterate over the chunks, forward. /// /// The oldest chunk comes first. - pub fn chunks(&self) -> LinkedChunkIter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { + pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { self.chunks.chunks() } @@ -154,10 +147,7 @@ impl RoomEvents { pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> Result< - LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, - LinkedChunkError, - > { + ) -> Result, Error> { self.chunks.rchunks_from(identifier) } @@ -166,8 +156,7 @@ impl RoomEvents { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> - { + ) -> Result, Error> { self.chunks.chunks_from(identifier) } @@ -189,7 +178,7 @@ impl RoomEvents { pub fn revents_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { self.chunks.ritems_from(position) } @@ -198,7 +187,7 @@ impl RoomEvents { pub fn events_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { self.chunks.items_from(position) } }