From 9e2e28d57a085b768e7858ed44597553e013af58 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 14:22:51 +0200 Subject: [PATCH 1/9] feat(sdk) Change the update history from a `Vec` to a new `LinkedChunkUpdates` type. This patch updates the `LinkedChunk::update_history` field from a simple `Vec>` type to the new `LinkedChunkUpdates` type (note the plural). This is going to be helpul for the next patches. --- .../src/event_cache/linked_chunk.rs | 142 +++++++++--------- 1 file changed, 73 insertions(+), 69 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 87bd4047559..447fad9ec6b 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -20,6 +20,7 @@ use std::{ ops::Not, ptr::NonNull, sync::atomic::{AtomicU64, Ordering}, + vec::Drain, }; /// Errors of [`LinkedChunk`]. @@ -84,6 +85,24 @@ pub enum LinkedChunkUpdate { }, } +pub struct LinkedChunkUpdates { + updates: Vec>, +} + +impl LinkedChunkUpdates { + fn new() -> Self { + Self { updates: Vec::new() } + } + + pub fn push(&mut self, update: LinkedChunkUpdate) { + self.updates.push(update); + } + + pub fn take(&mut self) -> Drain> { + self.updates.drain(..) + } +} + /// Links of a `LinkedChunk`, i.e. the first and last [`Chunk`]. /// /// This type was introduced to avoid borrow checking errors when mutably @@ -154,7 +173,7 @@ pub struct LinkedChunk { /// All updates that have been made on this `LinkedChunk`. If this field is /// `Some(…)`, update history is enabled, otherwise, if it's `None`, update /// history is disabled. - update_history: Option>>, + updates: Option>, /// Marker. marker: PhantomData>>, } @@ -170,7 +189,7 @@ impl LinkedChunk { }, length: 0, chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(), - update_history: None, + updates: None, marker: PhantomData, } } @@ -185,7 +204,7 @@ impl LinkedChunk { }, length: 0, chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(), - update_history: Some(Vec::new()), + updates: Some(LinkedChunkUpdates::new()), marker: PhantomData, } } @@ -213,11 +232,8 @@ impl LinkedChunk { let last_chunk = self.links.latest_chunk_mut(); // Push the items. - let last_chunk = last_chunk.push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ); + let last_chunk = + last_chunk.push_items(items, &self.chunk_identifier_generator, &mut self.updates); debug_assert!(last_chunk.is_last_chunk(), "`last_chunk` must be… the last chunk"); @@ -242,7 +258,7 @@ impl LinkedChunk { let last_chunk = self.links.latest_chunk_mut(); last_chunk.insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ); self.links.last = last_chunk.next; @@ -292,15 +308,11 @@ impl LinkedChunk { if item_index == current_items_length { chunk // Push the new items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ) + .push_items(items, &self.chunk_identifier_generator, &mut self.updates) } // Insert inside the current items. else { - if let Some(updates) = self.update_history.as_mut() { + if let Some(updates) = self.updates.as_mut() { updates.push(LinkedChunkUpdate::TruncateItems { chunk: chunk_identifier, length: item_index, @@ -312,16 +324,12 @@ impl LinkedChunk { chunk // Push the new items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ) + .push_items(items, &self.chunk_identifier_generator, &mut self.updates) // Finally, push the items that have been detached. .push_items( detached_items.into_iter(), &self.chunk_identifier_generator, - &mut self.update_history, + &mut self.updates, ) }, number_of_items, @@ -378,7 +386,7 @@ impl LinkedChunk { previous_chunk.insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ); // We don't need to update `self.last` because we have inserted a new chunk @@ -399,7 +407,7 @@ impl LinkedChunk { return Err(LinkedChunkError::InvalidItemIndex { index: item_index }); } - if let Some(updates) = self.update_history.as_mut() { + if let Some(updates) = self.updates.as_mut() { updates.push(LinkedChunkUpdate::TruncateItems { chunk: chunk_identifier, length: item_index, @@ -413,18 +421,18 @@ impl LinkedChunk { // Insert a new gap chunk. .insert_next( Chunk::new_gap_leaked(self.chunk_identifier_generator.next(), content), - &mut self.update_history, + &mut self.updates, ) // Insert a new items chunk. .insert_next( Chunk::new_items_leaked(self.chunk_identifier_generator.next()), - &mut self.update_history, + &mut self.updates, ) // Finally, push the items that have been detached. .push_items( detached_items.into_iter(), &self.chunk_identifier_generator, - &mut self.update_history, + &mut self.updates, ) } }; @@ -478,14 +486,10 @@ impl LinkedChunk { // Insert a new items chunk… .insert_next( Chunk::new_items_leaked(self.chunk_identifier_generator.next()), - &mut self.update_history, + &mut self.updates, ) // … and insert the items. - .push_items( - items, - &self.chunk_identifier_generator, - &mut self.update_history, - ); + .push_items(items, &self.chunk_identifier_generator, &mut self.updates); ( last_inserted_chunk.is_last_chunk().then(|| last_inserted_chunk.as_ptr()), @@ -503,7 +507,7 @@ impl LinkedChunk { .unwrap(); // Now that new items have been pushed, we can unlink the gap chunk. - chunk.unlink(&mut self.update_history); + chunk.unlink(&mut self.updates); // Get the pointer to `chunk`. chunk_ptr = chunk.as_ptr(); @@ -673,8 +677,8 @@ impl LinkedChunk { /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use /// `Option::take()` directly but rather `Vec::drain` for example. - pub fn updates(&mut self) -> Option<&mut Vec>> { - self.update_history.as_mut() + pub fn updates(&mut self) -> Option<&mut LinkedChunkUpdates> { + self.updates.as_mut() } } @@ -973,7 +977,7 @@ impl Chunk { &mut self, mut new_items: I, chunk_identifier_generator: &ChunkIdentifierGenerator, - updates: &mut Option>>, + updates: &mut Option>, ) -> &mut Self where I: Iterator + ExactSizeIterator, @@ -1055,7 +1059,7 @@ impl Chunk { fn insert_next( &mut self, mut new_chunk_ptr: NonNull, - updates: &mut Option>>, + updates: &mut Option>, ) -> &mut Self where Gap: Clone, @@ -1102,7 +1106,7 @@ impl Chunk { /// /// Be careful: `self` won't belong to `LinkedChunk` anymore, and should be /// dropped appropriately. - fn unlink(&mut self, updates: &mut Option>>) { + fn unlink(&mut self, updates: &mut Option>) { let previous_ptr = self.previous; let next_ptr = self.next; @@ -1297,21 +1301,21 @@ mod tests { assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_items_back(['b', 'c']); assert_items_eq!(linked_chunk, ['a', 'b', 'c']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b', 'c'] }] ); linked_chunk.push_items_back(['d', 'e']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(0)), @@ -1325,7 +1329,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i', 'j']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ InsertItems { at: Position(ChunkIdentifier(1), 2), items: vec!['f'] }, NewItemsChunk { @@ -1354,14 +1358,14 @@ mod tests { linked_chunk.push_items_back(['a']); assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_gap_back(()); assert_items_eq!(linked_chunk, ['a'] [-]); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[NewGapChunk { previous: Some(ChunkIdentifier(0)), new: ChunkIdentifier(1), @@ -1373,7 +1377,7 @@ mod tests { linked_chunk.push_items_back(['b', 'c', 'd', 'e']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -1394,7 +1398,7 @@ mod tests { linked_chunk.push_gap_back(()); // why not assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-]); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewGapChunk { previous: Some(ChunkIdentifier(3)), @@ -1414,7 +1418,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-] ['f', 'g', 'h'] ['i']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), @@ -1655,7 +1659,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1681,7 +1685,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 10); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ TruncateItems { chunk: ChunkIdentifier(1), length: 1 }, InsertItems { at: Position(ChunkIdentifier(1), 1), items: vec!['w', 'x'] }, @@ -1713,7 +1717,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 14); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['l', 'm', 'n'] }, @@ -1745,7 +1749,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 16); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ TruncateItems { chunk: ChunkIdentifier(5), length: 0 }, InsertItems { at: Position(ChunkIdentifier(5), 0), items: vec!['r', 's'] }, @@ -1766,7 +1770,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[InsertItems { at: Position(ChunkIdentifier(3), 1), items: vec!['p', 'q'] },] ); assert_eq!(linked_chunk.len(), 18); @@ -1778,7 +1782,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1787,7 +1791,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } // Insert in a gap. @@ -1799,7 +1803,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1827,7 +1831,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1846,7 +1850,7 @@ mod tests { assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 1 }, NewGapChunk { @@ -1873,7 +1877,7 @@ mod tests { // A new empty chunk is created as the first chunk. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, NewGapChunk { @@ -1902,7 +1906,7 @@ mod tests { // space. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1919,7 +1923,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_first_empty_chunk), Err(LinkedChunkError::InvalidItemIndex { index: 0 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } // Insert in an empty chunk. @@ -1930,7 +1934,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(6)), @@ -1945,7 +1949,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(8), @@ -1961,7 +1965,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1970,7 +1974,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } // Insert in an existing gap. @@ -1982,7 +1986,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_a_gap), Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) }) ); - assert!(linked_chunk.updates().unwrap().drain(..).collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); } assert_eq!(linked_chunk.len(), 6); @@ -2000,7 +2004,7 @@ mod tests { linked_chunk.push_items_back(['l', 'm']); assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['l', 'm']); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b'] }, NewGapChunk { @@ -2031,7 +2035,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -2058,7 +2062,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[NewGapChunk { previous: Some(ChunkIdentifier(2)), new: ChunkIdentifier(5), @@ -2077,7 +2081,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ['w', 'x', 'y'] ['z'] ); assert_eq!( - linked_chunk.updates().unwrap().drain(..).collect::>(), + linked_chunk.updates().unwrap().take().collect::>(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), From d6915793c13c5400d3145b3dd19137392db3c8f1 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 15:31:18 +0200 Subject: [PATCH 2/9] feat(sdk) Add the `LinkedChunkUpdates::peek`. This patch adds the `LinkedChunkUpdates::peek` method. This is a new channel to read the updates without consuming them, like `LinkedChunkUpdates::take` does. The complexity is: when do we clear/drop the updates then? We don't want to keep them in memory forever. Initially `take` was clearing the updates, but now that we can read them with `peek` too, who's responsible to clear them? Enter `garbage_collect`. First off, we already need to maintain 2 index, resp. `last_taken_index` and `last_peeked_index` so that `take` and `peek` don't return already returned updates. They respectively know the index of the last update that has been read. We can use this information to know which updates must be garbage collected: that's all updates below the two index. Tadaa. Simple. The only _drawback_ (if it can be considered as such) is that the garbage collection happens on the next call to `take` or `peek` (because of the borrow checker). That's not really a big deal in practise. We could make it happens immediately when calling `take` or `peek` but it needs more pointer arithmetic and a less straighforward code. --- .../src/event_cache/linked_chunk.rs | 301 +++++++++++++++--- 1 file changed, 261 insertions(+), 40 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 447fad9ec6b..6c034d5d362 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -15,12 +15,12 @@ #![allow(dead_code)] use std::{ + cmp::min, fmt, marker::PhantomData, ops::Not, ptr::NonNull, sync::atomic::{AtomicU64, Ordering}, - vec::Drain, }; /// Errors of [`LinkedChunk`]. @@ -85,21 +85,97 @@ pub enum LinkedChunkUpdate { }, } +/// A collection of [`LinkedChunkUpdate`]. +/// +/// Get a value for this type with [`LinkedChunk::updates`]. pub struct LinkedChunkUpdates { + /// All the updates that have not been peeked nor taken. updates: Vec>, + + /// The last index used by the last call of [`Self::take`]. + last_taken_index: usize, + + /// The last index used by the last call of [`Self::peek`]. + last_peeked_index: usize, } impl LinkedChunkUpdates { + /// Create a new [`Self`]. fn new() -> Self { - Self { updates: Vec::new() } + Self { updates: Vec::new(), last_taken_index: 0, last_peeked_index: 0 } } - pub fn push(&mut self, update: LinkedChunkUpdate) { + /// Push a new update. + fn push(&mut self, update: LinkedChunkUpdate) { self.updates.push(update); } - pub fn take(&mut self) -> Drain> { - self.updates.drain(..) + /// Take new updates. + /// + /// Updates that have been taken will not be read again. + pub fn take(&mut self) -> &[LinkedChunkUpdate] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + // Read new updates and update the `Self::last_taken_index`. + let slice = &self.updates[self.last_taken_index..]; + self.last_taken_index = self.updates.len(); + + slice + } + + /// Peek updates. + /// + /// That's another channel to read the updates. The difference with + /// [`Self::take`] is that peeking updates doesn't consume them. However, + /// updates that have been peeked cannot be peeked again. + /// + /// Basically, [`Self::take`] is intended to be used by the public API, + /// whilst [`Self::peek`] is for internal usage that must not conflict with + /// [`Self::take`]. + fn peek(&mut self) -> &[LinkedChunkUpdate] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + // Read new updates and update the `Self::last_peeked_index`. + let slice = &self.updates[self.last_peeked_index..]; + self.last_peeked_index = self.updates.len(); + + slice + } + + /// Return `true` if there is new updates that can be read with + /// [`Self::take`]. + fn has_new_takable_updates(&self) -> bool { + self.last_taken_index < self.updates.len() + } + + /// Return `true` if there is new update that can be read with + /// [`Self::peek`]. + fn has_new_peekable_updates(&self) -> bool { + self.last_peeked_index < self.updates.len() + } + + /// Garbage collect unused updates. An update is considered unused when it's + /// been read by `Self::take` **and** by `Self::peek`. + /// + /// Find the smallest index between `Self::last_taken_index` and + /// `Self::last_peeked_index`, and clear from 0 to that index. + fn garbage_collect(&mut self) { + let min_index = min(self.last_taken_index, self.last_peeked_index); + + if min_index > 0 { + let _ = self.updates.drain(0..min_index); + + // Let's shift the index to the left by `min_index` to preserve them. + self.last_taken_index -= min_index; + self.last_peeked_index -= min_index; + } + } + + /// Return the number of updates in the buffer. + fn len(&self) -> usize { + self.updates.len() } } @@ -195,6 +271,10 @@ impl LinkedChunk { } /// Create a new [`Self`] with a history of updates. + /// + /// When [`Self`] is built with update history, the + /// [`LinkedChunkUpdates::take`] method must be called to consume and clean + /// the updates. See [`Self::updates`]. pub fn new_with_update_history() -> Self { Self { links: LinkedChunkEnds { @@ -670,13 +750,15 @@ impl LinkedChunk { .skip(position.index())) } - /// Get a mutable reference to the `LinkedChunk` updates. + /// Get a mutable reference to the `LinkedChunk` updates, aka + /// [`LinkedChunkUpdates`]. /// - /// The caller is responsible to drain/empty these updates. + /// The caller is responsible to clear these updates. /// /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use - /// `Option::take()` directly but rather `Vec::drain` for example. + /// `Option::take()` directly but rather [`LinkedChunkUpdates::take`] for + /// example. pub fn updates(&mut self) -> Option<&mut LinkedChunkUpdates> { self.updates.as_mut() } @@ -1179,6 +1261,8 @@ where #[cfg(test)] mod tests { + use std::ops::Not; + use assert_matches::assert_matches; use super::{ @@ -1287,11 +1371,148 @@ mod tests { } #[test] - fn test_update_history() { + fn test_updates() { assert!(LinkedChunk::<3, char, ()>::new().updates().is_none()); assert!(LinkedChunk::<3, char, ()>::new_with_update_history().updates().is_some()); } + #[test] + fn test_updates_take_and_peek() { + use super::LinkedChunkUpdate::*; + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + // There is no new update. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.has_new_takable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + } + + linked_chunk.push_items_back(['a']); + + // Let's `peek` only the new update. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // Peek the update. + assert!(updates.has_new_takable_updates()); + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + + // No more update to peek. + assert!(updates.has_new_takable_updates()); + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + } + + linked_chunk.push_items_back(['b']); + + // Let's `peek` then `take` the new update. + { + let updates = linked_chunk.updates().unwrap(); + + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + + // Peek the update… + assert!(updates.has_new_takable_updates()); + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + } + + // … and take the update. + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + ] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + } + + // No more update to peek or to take. + assert!(updates.has_new_takable_updates().not()); + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + + { + // Inspect number of updates in memory. + // The updates have been garbage collected. + assert_eq!(updates.len(), 0); + } + } + + linked_chunk.push_items_back(['c']); + + // Let's `take` then `peek` the new update. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // Take and peek the update. + assert!(updates.has_new_takable_updates()); + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.take(), + &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + ); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // No more update to peek or to take. + assert!(updates.has_new_takable_updates().not()); + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + + { + // Inspect number of updates in memory. + // The update has been garbage collected. + assert_eq!(updates.len(), 0); + } + } + } + #[test] fn test_push_items() { use super::LinkedChunkUpdate::*; @@ -1301,21 +1522,21 @@ mod tests { assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_items_back(['b', 'c']); assert_items_eq!(linked_chunk, ['a', 'b', 'c']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b', 'c'] }] ); linked_chunk.push_items_back(['d', 'e']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(0)), @@ -1329,7 +1550,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i', 'j']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(1), 2), items: vec!['f'] }, NewItemsChunk { @@ -1358,14 +1579,14 @@ mod tests { linked_chunk.push_items_back(['a']); assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_gap_back(()); assert_items_eq!(linked_chunk, ['a'] [-]); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(0)), new: ChunkIdentifier(1), @@ -1377,7 +1598,7 @@ mod tests { linked_chunk.push_items_back(['b', 'c', 'd', 'e']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -1398,7 +1619,7 @@ mod tests { linked_chunk.push_gap_back(()); // why not assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-]); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewGapChunk { previous: Some(ChunkIdentifier(3)), @@ -1418,7 +1639,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-] ['f', 'g', 'h'] ['i']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), @@ -1659,7 +1880,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1685,7 +1906,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 10); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(1), length: 1 }, InsertItems { at: Position(ChunkIdentifier(1), 1), items: vec!['w', 'x'] }, @@ -1717,7 +1938,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 14); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['l', 'm', 'n'] }, @@ -1749,7 +1970,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 16); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(5), length: 0 }, InsertItems { at: Position(ChunkIdentifier(5), 0), items: vec!['r', 's'] }, @@ -1770,7 +1991,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(3), 1), items: vec!['p', 'q'] },] ); assert_eq!(linked_chunk.len(), 18); @@ -1782,7 +2003,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1791,7 +2012,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a gap. @@ -1803,7 +2024,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1831,7 +2052,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1850,7 +2071,7 @@ mod tests { assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 1 }, NewGapChunk { @@ -1877,7 +2098,7 @@ mod tests { // A new empty chunk is created as the first chunk. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, NewGapChunk { @@ -1906,7 +2127,7 @@ mod tests { // space. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1923,7 +2144,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_first_empty_chunk), Err(LinkedChunkError::InvalidItemIndex { index: 0 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an empty chunk. @@ -1934,7 +2155,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(6)), @@ -1949,7 +2170,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(8), @@ -1965,7 +2186,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1974,7 +2195,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an existing gap. @@ -1986,7 +2207,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_a_gap), Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } assert_eq!(linked_chunk.len(), 6); @@ -2004,7 +2225,7 @@ mod tests { linked_chunk.push_items_back(['l', 'm']); assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['l', 'm']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b'] }, NewGapChunk { @@ -2035,7 +2256,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -2062,7 +2283,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(2)), new: ChunkIdentifier(5), @@ -2081,7 +2302,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ['w', 'x', 'y'] ['z'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), From 3a00271af0c362afd4ab202e93e17b1e6f322044 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 20:15:35 +0200 Subject: [PATCH 3/9] chore(sdk): Hmmmm. --- crates/matrix-sdk/src/event_cache/linked_chunk.rs | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 6c034d5d362..a6899f0df55 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -51,35 +51,46 @@ pub enum LinkedChunkUpdate { NewItemsChunk { /// The identifier of the previous chunk of this new chunk. previous: Option, + /// The identifier of the new chunk. new: ChunkIdentifier, + /// The identifier of the next chunk of this new chunk. next: Option, }, + /// A new chunk of kind Gap has been created. NewGapChunk { /// The identifier of the previous chunk of this new chunk. previous: Option, + /// The identifier of the new chunk. new: ChunkIdentifier, + /// The identifier of the next chunk of this new chunk. next: Option, + /// The content of the chunk. gap: Gap, }, + /// A chunk has been removed. RemoveChunk(ChunkIdentifier), + /// Items are inserted inside a chunk of kind Items. InsertItems { /// [`Position`] of the items. at: Position, + /// The items. items: Vec, }, + /// A chunk of kind Items has been truncated. TruncateItems { /// The identifier of the chunk. chunk: ChunkIdentifier, + /// The new length of the chunk. length: usize, }, From 76210686c4e5997df611114ca9b04254d62fa06a Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 22:22:32 +0200 Subject: [PATCH 4/9] feat(sdk): Implement `Clone` on `LinkedChunkUpdate`. `LinkedChunkUpdate` implements `Clone` if and only if `Item` and `Gap` both implement `Clone`. --- .../src/event_cache/linked_chunk.rs | 29 +++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index a6899f0df55..f743af3abbf 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -96,6 +96,35 @@ pub enum LinkedChunkUpdate { }, } +impl Clone for LinkedChunkUpdate +where + Item: Clone, + Gap: Clone, +{ + fn clone(&self) -> Self { + match self { + Self::NewItemsChunk { previous, new, next } => Self::NewItemsChunk { + previous: previous.clone(), + new: new.clone(), + next: next.clone(), + }, + Self::NewGapChunk { previous, new, next, gap } => Self::NewGapChunk { + previous: previous.clone(), + new: new.clone(), + next: next.clone(), + gap: gap.clone(), + }, + Self::RemoveChunk(identifier) => Self::RemoveChunk(identifier.clone()), + Self::InsertItems { at, items } => { + Self::InsertItems { at: at.clone(), items: items.clone() } + } + Self::TruncateItems { chunk, length } => { + Self::TruncateItems { chunk: chunk.clone(), length: length.clone() } + } + } + } +} + /// A collection of [`LinkedChunkUpdate`]. /// /// Get a value for this type with [`LinkedChunk::updates`]. From 73ae1cc6daac887dfb3cd865abb5910124167363 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 22:23:57 +0200 Subject: [PATCH 5/9] feat(sdk): Add `LinkedChunkUpdatesSubscriber`. This patch implements `LinkedChunkUpdates::subscribe` and `LinkedChunkUpdateSubscriber`, which itself implements `Stream`. This patch splits `LinkedChunkUpdates` into `LinkedChunkUpdatesInner`, so that the latter can be shared with `LinkedChunkUpdatesSubscriber`. --- .../src/event_cache/linked_chunk.rs | 266 +++++++++++++++--- 1 file changed, 224 insertions(+), 42 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index f743af3abbf..6bc5b4ba25a 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -19,10 +19,17 @@ use std::{ fmt, marker::PhantomData, ops::Not, + pin::Pin, ptr::NonNull, - sync::atomic::{AtomicU64, Ordering}, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, RwLock, Weak, + }, + task::{Context, Poll, Waker}, }; +use futures_core::Stream; + /// Errors of [`LinkedChunk`]. #[derive(thiserror::Error, Debug)] pub enum LinkedChunkError { @@ -129,6 +136,10 @@ where /// /// Get a value for this type with [`LinkedChunk::updates`]. pub struct LinkedChunkUpdates { + inner: Arc>>, +} + +struct LinkedChunkUpdatesInner { /// All the updates that have not been peeked nor taken. updates: Vec>, @@ -137,23 +148,26 @@ pub struct LinkedChunkUpdates { /// The last index used by the last call of [`Self::peek`]. last_peeked_index: usize, -} -impl LinkedChunkUpdates { - /// Create a new [`Self`]. - fn new() -> Self { - Self { updates: Vec::new(), last_taken_index: 0, last_peeked_index: 0 } - } + /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. + wakers: Vec, +} +impl LinkedChunkUpdatesInner { /// Push a new update. fn push(&mut self, update: LinkedChunkUpdate) { self.updates.push(update); + + // Wake them up \o/. + for waker in self.wakers.drain(..) { + waker.wake(); + } } /// Take new updates. /// /// Updates that have been taken will not be read again. - pub fn take(&mut self) -> &[LinkedChunkUpdate] { + fn take(&mut self) -> &[LinkedChunkUpdate] { // Let's garbage collect unused updates. self.garbage_collect(); @@ -184,18 +198,17 @@ impl LinkedChunkUpdates { slice } - /// Return `true` if there is new updates that can be read with - /// [`Self::take`]. - fn has_new_takable_updates(&self) -> bool { - self.last_taken_index < self.updates.len() - } - /// Return `true` if there is new update that can be read with /// [`Self::peek`]. fn has_new_peekable_updates(&self) -> bool { self.last_peeked_index < self.updates.len() } + /// Return the number of updates in the buffer. + fn len(&self) -> usize { + self.updates.len() + } + /// Garbage collect unused updates. An update is considered unused when it's /// been read by `Self::take` **and** by `Self::peek`. /// @@ -212,10 +225,91 @@ impl LinkedChunkUpdates { self.last_peeked_index -= min_index; } } +} - /// Return the number of updates in the buffer. - fn len(&self) -> usize { - self.updates.len() +impl LinkedChunkUpdates { + /// Create a new [`Self`]. + fn new() -> Self { + Self { + inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner { + updates: Vec::new(), + last_taken_index: 0, + last_peeked_index: 0, + wakers: Vec::new(), + })), + } + } + + /// Push a new update. + fn push(&mut self, update: LinkedChunkUpdate) { + self.inner.write().unwrap().push(update); + } + + /// Take new updates. + /// + /// Updates that have been taken will not be read again. + pub fn take(&mut self) -> Vec> + where + Item: Clone, + Gap: Clone, + { + self.inner.write().unwrap().take().to_owned() + } + + /// Return `true` if there is new updates that can be read with + /// [`Self::take`]. + pub fn has_new_takable_updates(&self) -> bool { + let inner = self.inner.read().unwrap(); + + inner.last_taken_index < inner.updates.len() + } + + /// Subscribe to updates by using a [`Stream`]. + /// + /// TODO: only one subscriber must exist so far because multiple concurrent + /// subscriber would conflict on the garbage collector. It's not complex to + /// fix, I will do it. + fn subscribe(&self) -> LinkedChunkUpdatesSubscriber { + LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner) } + } +} + +/// A subscriber to [`LinkedChunkUpdates`]. It is helpful to receive updates via +/// a [`Stream`]. +struct LinkedChunkUpdatesSubscriber { + /// Weak reference to [`LinkedChunkUpdatesInner`]. + /// + /// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped + /// freely even if a subscriber exists. + updates: Weak>>, +} + +impl Stream for LinkedChunkUpdatesSubscriber +where + Item: Clone, + Gap: Clone, +{ + type Item = Vec>; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let Some(updates) = self.updates.upgrade() else { + // The `LinkedChunkUpdates` has been dropped. It's time to close this stream. + return Poll::Ready(None); + }; + + let mut updates = updates.write().unwrap(); + + // No updates to peek. + if updates.has_new_peekable_updates().not() { + // Let's register the waker. + updates.wakers.push(context.waker().clone()); + + // The stream is pending. + return Poll::Pending; + } + + // There is updates to peek! Let's forward them in this stream. + return Poll::Ready(Some(updates.peek().to_owned())); } } @@ -1301,13 +1395,17 @@ where #[cfg(test)] mod tests { - use std::ops::Not; + use std::{ + sync::{Arc, Mutex}, + task::{Context, Poll, Wake}, + }; use assert_matches::assert_matches; + use futures_util::pin_mut; use super::{ Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk, - LinkedChunkError, Position, + LinkedChunkError, Not, Position, Stream, }; /// A macro to test the items and the gap of a `LinkedChunk`. @@ -1426,9 +1524,9 @@ mod tests { { let updates = linked_chunk.updates().unwrap(); - assert!(updates.has_new_peekable_updates().not()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); assert!(updates.has_new_takable_updates().not()); - assert!(updates.peek().is_empty()); + assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); } @@ -1440,25 +1538,25 @@ mod tests { { // Inspect number of updates in memory. - assert_eq!(updates.len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 1); } // Peek the update. assert!(updates.has_new_takable_updates()); - assert!(updates.has_new_peekable_updates()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( - updates.peek(), + updates.inner.write().unwrap().peek(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); // No more update to peek. assert!(updates.has_new_takable_updates()); - assert!(updates.has_new_peekable_updates().not()); - assert!(updates.peek().is_empty()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); + assert!(updates.inner.write().unwrap().peek().is_empty()); { // Inspect number of updates in memory. - assert_eq!(updates.len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 1); } } @@ -1469,19 +1567,19 @@ mod tests { let updates = linked_chunk.updates().unwrap(); // Inspect number of updates in memory. - assert_eq!(updates.len(), 2); + assert_eq!(updates.inner.read().unwrap().len(), 2); // Peek the update… assert!(updates.has_new_takable_updates()); - assert!(updates.has_new_peekable_updates()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( - updates.peek(), + updates.inner.write().unwrap().peek(), &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },] ); { // Inspect number of updates in memory. - assert_eq!(updates.len(), 2); + assert_eq!(updates.inner.read().unwrap().len(), 2); } // … and take the update. @@ -1495,19 +1593,19 @@ mod tests { { // Inspect number of updates in memory. - assert_eq!(updates.len(), 2); + assert_eq!(updates.inner.read().unwrap().len(), 2); } // No more update to peek or to take. assert!(updates.has_new_takable_updates().not()); - assert!(updates.has_new_peekable_updates().not()); - assert!(updates.peek().is_empty()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); + assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); { // Inspect number of updates in memory. // The updates have been garbage collected. - assert_eq!(updates.len(), 0); + assert_eq!(updates.inner.read().unwrap().len(), 0); } } @@ -1519,40 +1617,124 @@ mod tests { { // Inspect number of updates in memory. - assert_eq!(updates.len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 1); } // Take and peek the update. assert!(updates.has_new_takable_updates()); - assert!(updates.has_new_peekable_updates()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( updates.take(), &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] ); assert_eq!( - updates.peek(), + updates.inner.write().unwrap().peek(), &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] ); { // Inspect number of updates in memory. - assert_eq!(updates.len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 1); } // No more update to peek or to take. assert!(updates.has_new_takable_updates().not()); - assert!(updates.has_new_peekable_updates().not()); - assert!(updates.peek().is_empty()); + assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); + assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); { // Inspect number of updates in memory. // The update has been garbage collected. - assert_eq!(updates.len(), 0); + assert_eq!(updates.inner.read().unwrap().len(), 0); } } } + #[test] + fn test_updates_stream() { + use super::LinkedChunkUpdate::*; + + struct CounterWaker { + number_of_wakeup: Mutex, + } + + impl Wake for CounterWaker { + fn wake(self: Arc) { + *self.number_of_wakeup.lock().unwrap() += 1; + } + } + + let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); + let waker = counter_waker.clone().into(); + let mut context = Context::from_waker(&waker); + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + let updates_subscriber = linked_chunk.updates().unwrap().subscribe(); + pin_mut!(updates_subscriber); + + // No update, stream is pending. + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0); + + // Let's generate an update. + linked_chunk.push_items_back(['a']); + + // The waker must have been called. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1); + + // There is an update! Right after that, the stream is pending again. + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // Let's generate two other updates. + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // The waker must have been called only once for the two updates. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + + // We can consume the updates without the stream, but the stream continues to + // know it has updates. + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[ + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // When dropping the `LinkedChunk`, it closes the stream. + drop(linked_chunk); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None)); + + // Wakers calls have not changed. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + } + #[test] fn test_push_items() { use super::LinkedChunkUpdate::*; From 9531a4041e29ed697a4ffdee8f0fd892288519b3 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 May 2024 12:29:14 +0200 Subject: [PATCH 6/9] feat(sdk): Allow `LinkedChunkUpdatesInner` to have multiple readers. This patch removes the notion of `take` vs. `peek` from `LinkedChunkUpdatesInner` and widens the problem to a more general approach: `LinkedChunkUpdatesInner` must support multiple readers, not only two (`take` was the first reader, `peek` was the second reader, kind of). Why do we need multiple readers? `LinkedChunkUpdates::take` is clearly the first reader, it's part of the public API. But the private API needs to read the updates too, without consuming them, like `LinkedChunkUpdatesSubscriber`. `peek` was nice for that, but it's possible to have multiple `LinkedChunkUpdatesSubscriber` at the same time! Hence the need to widen the approach from 2 readers to many readers. This patch introduces a `ReaderToken` to identify readers. The last indexes are now all stored in a `HashMap`. The rest of the modifications are the consequence of that. The test `test_updates_take_and_peek` has been entirely rewritten to be `test_updates_take_and_garbage_collector` where it tests 2 readers and see how the garbage collector reacts to that. --- .../src/event_cache/linked_chunk.rs | 397 ++++++++++++------ 1 file changed, 262 insertions(+), 135 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 6bc5b4ba25a..1e9d39af641 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] use std::{ - cmp::min, + collections::HashMap, fmt, marker::PhantomData, ops::Not, @@ -139,21 +139,64 @@ pub struct LinkedChunkUpdates { inner: Arc>>, } +/// A token used to represent readers that read the updates in +/// [`LinkedChunkUpdatesInner`]. +type ReaderToken = usize; + +/// Inner type for [`LinkedChunkUpdates`]. +/// +/// The particularity of this type is that multiple readers can read the +/// updates. A reader has a [`ReaderToken`]. The public API (i.e. +/// [`LinkedChunkUpdates`]) is considered to be the _main reader_ (it has the +/// token [`Self::MAIN_READER_TOKEN`]). +/// +/// An update that have been read by all readers are garbage collected to be +/// removed from the memory. An update will never be read twice by the same +/// reader. +/// +/// Why do we need multiple readers? The public API reads the updates with +/// [`LinkedChunkUpdates::take`], but the private API must also read the updates +/// for example with [`LinkedChunkUpdatesSubscriber`]. Of course, they can be +/// multiple `LinkedChunkUpdatesSubscriber`s at the same time. Hence the need of +/// supporting multiple readers. struct LinkedChunkUpdatesInner { - /// All the updates that have not been peeked nor taken. + /// All the updates that have not been read by all readers. updates: Vec>, - /// The last index used by the last call of [`Self::take`]. - last_taken_index: usize, + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. + /// + /// To each reader token is associated an index that represents the index of + /// the last reading. It is used to never return the same update twice. + last_index_per_reader: HashMap, - /// The last index used by the last call of [`Self::peek`]. - last_peeked_index: usize, + /// The last generated token. This is useful to generate new token. + last_token: ReaderToken, - /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. + /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. A waker is removed + /// everytime it is called. wakers: Vec, } impl LinkedChunkUpdatesInner { + /// The token used by the main reader. See [`Self::take`] to learn more. + const MAIN_READER_TOKEN: ReaderToken = 0; + + /// Create a new [`Self`]. + fn new() -> Self { + Self { + updates: Vec::with_capacity(8), + last_index_per_reader: { + let mut map = HashMap::with_capacity(2); + map.insert(Self::MAIN_READER_TOKEN, 0); + + map + }, + last_token: Self::MAIN_READER_TOKEN, + wakers: Vec::with_capacity(2), + } + } + /// Push a new update. fn push(&mut self, update: LinkedChunkUpdate) { self.updates.push(update); @@ -164,44 +207,38 @@ impl LinkedChunkUpdatesInner { } } - /// Take new updates. + /// Take new updates; it considers the caller is the main reader, i.e. it + /// will use the [`Self::MAIN_READER_TOKEN`]. /// - /// Updates that have been taken will not be read again. + /// Updates that have been read will never be read again by the current + /// reader. + /// + /// Learn more by reading [`Self::take_with_token`]. fn take(&mut self) -> &[LinkedChunkUpdate] { - // Let's garbage collect unused updates. - self.garbage_collect(); - - // Read new updates and update the `Self::last_taken_index`. - let slice = &self.updates[self.last_taken_index..]; - self.last_taken_index = self.updates.len(); - - slice + self.take_with_token(Self::MAIN_READER_TOKEN) } - /// Peek updates. - /// - /// That's another channel to read the updates. The difference with - /// [`Self::take`] is that peeking updates doesn't consume them. However, - /// updates that have been peeked cannot be peeked again. + /// Take new updates with a particular reader token. /// - /// Basically, [`Self::take`] is intended to be used by the public API, - /// whilst [`Self::peek`] is for internal usage that must not conflict with - /// [`Self::take`]. - fn peek(&mut self) -> &[LinkedChunkUpdate] { + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. Every reader can + /// take/read/consume each update only once. An internal index is stored + /// per reader token to know where to start reading updates next time this + /// method is called. + fn take_with_token(&mut self, token: ReaderToken) -> &[LinkedChunkUpdate] { // Let's garbage collect unused updates. self.garbage_collect(); - // Read new updates and update the `Self::last_peeked_index`. - let slice = &self.updates[self.last_peeked_index..]; - self.last_peeked_index = self.updates.len(); + let index = self + .last_index_per_reader + .get_mut(&token) + .expect("Given `UpdatesToken` does not map to any index"); - slice - } + // Read new updates, and update the index. + let slice = &self.updates[*index..]; + *index = self.updates.len(); - /// Return `true` if there is new update that can be read with - /// [`Self::peek`]. - fn has_new_peekable_updates(&self) -> bool { - self.last_peeked_index < self.updates.len() + slice } /// Return the number of updates in the buffer. @@ -210,19 +247,20 @@ impl LinkedChunkUpdatesInner { } /// Garbage collect unused updates. An update is considered unused when it's - /// been read by `Self::take` **and** by `Self::peek`. + /// been read by all readers. /// - /// Find the smallest index between `Self::last_taken_index` and - /// `Self::last_peeked_index`, and clear from 0 to that index. + /// Basically, it reduces to finding the smallest last index for all + /// readers, and clear from 0 to that index. fn garbage_collect(&mut self) { - let min_index = min(self.last_taken_index, self.last_peeked_index); + let min_index = self.last_index_per_reader.values().min().map(|min| *min).unwrap_or(0); if min_index > 0 { let _ = self.updates.drain(0..min_index); - // Let's shift the index to the left by `min_index` to preserve them. - self.last_taken_index -= min_index; - self.last_peeked_index -= min_index; + // Let's shift the indices to the left by `min_index` to preserve them. + for index in self.last_index_per_reader.values_mut() { + *index -= min_index; + } } } } @@ -230,14 +268,7 @@ impl LinkedChunkUpdatesInner { impl LinkedChunkUpdates { /// Create a new [`Self`]. fn new() -> Self { - Self { - inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner { - updates: Vec::new(), - last_taken_index: 0, - last_peeked_index: 0, - wakers: Vec::new(), - })), - } + Self { inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner::new())) } } /// Push a new update. @@ -256,21 +287,20 @@ impl LinkedChunkUpdates { self.inner.write().unwrap().take().to_owned() } - /// Return `true` if there is new updates that can be read with - /// [`Self::take`]. - pub fn has_new_takable_updates(&self) -> bool { - let inner = self.inner.read().unwrap(); + /// Subscribe to updates by using a [`Stream`]. + fn subscribe(&mut self) -> LinkedChunkUpdatesSubscriber { + // A subscriber is a new update reader, it needs its own token. + let token = { + let mut inner = self.inner.write().unwrap(); + inner.last_token += 1; - inner.last_taken_index < inner.updates.len() - } + let last_token = inner.last_token; + inner.last_index_per_reader.insert(last_token, 0); - /// Subscribe to updates by using a [`Stream`]. - /// - /// TODO: only one subscriber must exist so far because multiple concurrent - /// subscriber would conflict on the garbage collector. It's not complex to - /// fix, I will do it. - fn subscribe(&self) -> LinkedChunkUpdatesSubscriber { - LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner) } + last_token + }; + + LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } } } @@ -282,6 +312,9 @@ struct LinkedChunkUpdatesSubscriber { /// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped /// freely even if a subscriber exists. updates: Weak>>, + + /// The token to read the updates. + token: ReaderToken, } impl Stream for LinkedChunkUpdatesSubscriber @@ -298,9 +331,10 @@ where }; let mut updates = updates.write().unwrap(); + let the_updates = updates.take_with_token(self.token); - // No updates to peek. - if updates.has_new_peekable_updates().not() { + // No updates. + if the_updates.is_empty() { // Let's register the waker. updates.wakers.push(context.waker().clone()); @@ -308,8 +342,8 @@ where return Poll::Pending; } - // There is updates to peek! Let's forward them in this stream. - return Poll::Ready(Some(updates.peek().to_owned())); + // There is updates! Let's forward them in this stream. + Poll::Ready(Some(the_updates.to_owned())) } } @@ -1405,8 +1439,9 @@ mod tests { use super::{ Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk, - LinkedChunkError, Not, Position, Stream, + LinkedChunkError, Position, Stream, }; + use crate::event_cache::linked_chunk::LinkedChunkUpdatesInner; /// A macro to test the items and the gap of a `LinkedChunk`. /// A chunk is delimited by `[` and `]`. An item chunk has the form `[a, b, @@ -1515,138 +1550,230 @@ mod tests { } #[test] - fn test_updates_take_and_peek() { + fn test_updates_take_and_garbage_collector() { use super::LinkedChunkUpdate::*; - let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); + + // Simulate another updates “reader”, it can a subscriber. + let main_token = LinkedChunkUpdatesInner::::MAIN_READER_TOKEN; + let other_token = { + let updates = linked_chunk.updates().unwrap(); + let mut inner = updates.inner.write().unwrap(); + inner.last_token += 1; - // There is no new update. + let other_token = inner.last_token; + inner.last_index_per_reader.insert(other_token, 0); + + other_token + }; + + // There is no new update yet. { let updates = linked_chunk.updates().unwrap(); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); } linked_chunk.push_items_back(['a']); + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); - // Let's `peek` only the new update. + // Scenario 1: “main” takes the new updates, “other” doesn't take the new + // updates. + // + // 0 1 2 3 + // +---+---+---+ + // | a | b | c | + // +---+---+---+ + // + // “main” will move its index from 0 to 3. + // “other” won't move its index. { let updates = linked_chunk.updates().unwrap(); { // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 3); } - // Peek the update. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] ); - // No more update to peek. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); - { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // It must be the same number as before as the garbage collector weren't not + // able to remove any unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&0)); } } - linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['d']); + linked_chunk.push_items_back(['e']); + linked_chunk.push_items_back(['f']); - // Let's `peek` then `take` the new update. + // Scenario 2: “other“ takes the new updates, “main” doesn't take the + // new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | a | b | c | d | e | f | + // +---+---+---+---+---+---+ + // + // “main” won't move its index. + // “other” will move its index from 0 to 6. { let updates = linked_chunk.updates().unwrap(); - // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - - // Peek the update… - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); - assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },] - ); - - { - // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - } - - // … and take the update. assert_eq!( - updates.take(), + updates.inner.write().unwrap().take_with_token(other_token), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, ] ); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - } + // It must be the same number as before as the garbage collector will be able to + // remove unused updates but at the next call… + assert_eq!(inner.len(), 6); - // No more update to peek or to take. - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); - assert!(updates.take().is_empty()); + // Inspect the indices. + let indices = &inner.last_index_per_reader; - { - // Inspect number of updates in memory. - // The updates have been garbage collected. - assert_eq!(updates.inner.read().unwrap().len(), 0); + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&6)); } } - linked_chunk.push_items_back(['c']); - - // Let's `take` then `peek` the new update. + // Scenario 3: “other” take new updates, but there is none, “main” + // doesn't take new updates. The garbage collector will run and collect + // unused updates. + // + // 0 1 2 3 + // +---+---+---+ + // | d | e | f | + // +---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. { let updates = linked_chunk.updates().unwrap(); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // The garbage collector has removed unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. They must have been adjusted. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&3)); } + } + + linked_chunk.push_items_back(['g']); + linked_chunk.push_items_back(['h']); + linked_chunk.push_items_back(['i']); + + // Scenario 4: both “main” and “other” take the new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | d | e | f | g | h | i | + // +---+---+---+---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); - // Take and peek the update. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( updates.take(), - &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + &[ + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] ); assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] ); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // The garbage collector had a chance to collect the first 3 updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&3)); } + } + + // Scenario 5: no more updates but they both try to take new updates. + // The garbage collector will collect all updates as all of them as + // been read already. + // + // “main” will have its index updated from 0 to 0. + // “other” will have its index updated from 3 to 0. + { + let updates = linked_chunk.updates().unwrap(); - // No more update to peek or to take. - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - // The update has been garbage collected. - assert_eq!(updates.inner.read().unwrap().len(), 0); + // The garbage collector had a chance to collect all updates. + assert_eq!(inner.len(), 0); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&0)); } } } From 1493dc2c6a1feae2af10c5a17e115a880b379d62 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 May 2024 12:42:43 +0200 Subject: [PATCH 7/9] chore(sdk): Remove the `LinkedChunk` prefix from type names. --- .../src/event_cache/linked_chunk.rs | 250 +++++++++--------- crates/matrix-sdk/src/event_cache/store.rs | 31 +-- 2 files changed, 128 insertions(+), 153 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 1e9d39af641..1b5fe1948b7 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -32,7 +32,7 @@ use futures_core::Stream; /// Errors of [`LinkedChunk`]. #[derive(thiserror::Error, Debug)] -pub enum LinkedChunkError { +pub enum Error { #[error("The chunk identifier is invalid: `{identifier:?}`")] InvalidChunkIdentifier { identifier: ChunkIdentifier }, @@ -53,7 +53,7 @@ pub enum LinkedChunkError { /// These updates are useful to store a `LinkedChunk` in another form of /// storage, like a database or something similar. #[derive(Debug, PartialEq)] -pub enum LinkedChunkUpdate { +pub enum Update { /// A new chunk of kind Items has been created. NewItemsChunk { /// The identifier of the previous chunk of this new chunk. @@ -103,7 +103,7 @@ pub enum LinkedChunkUpdate { }, } -impl Clone for LinkedChunkUpdate +impl Clone for Update where Item: Clone, Gap: Clone, @@ -132,36 +132,36 @@ where } } -/// A collection of [`LinkedChunkUpdate`]. +/// A collection of [`Update`]. /// /// Get a value for this type with [`LinkedChunk::updates`]. -pub struct LinkedChunkUpdates { - inner: Arc>>, +pub struct Updates { + inner: Arc>>, } /// A token used to represent readers that read the updates in -/// [`LinkedChunkUpdatesInner`]. +/// [`UpdatesInner`]. type ReaderToken = usize; -/// Inner type for [`LinkedChunkUpdates`]. +/// Inner type for [`Updates`]. /// /// The particularity of this type is that multiple readers can read the /// updates. A reader has a [`ReaderToken`]. The public API (i.e. -/// [`LinkedChunkUpdates`]) is considered to be the _main reader_ (it has the -/// token [`Self::MAIN_READER_TOKEN`]). +/// [`Updates`]) is considered to be the _main reader_ (it has the token +/// [`Self::MAIN_READER_TOKEN`]). /// /// An update that have been read by all readers are garbage collected to be /// removed from the memory. An update will never be read twice by the same /// reader. /// /// Why do we need multiple readers? The public API reads the updates with -/// [`LinkedChunkUpdates::take`], but the private API must also read the updates -/// for example with [`LinkedChunkUpdatesSubscriber`]. Of course, they can be -/// multiple `LinkedChunkUpdatesSubscriber`s at the same time. Hence the need of -/// supporting multiple readers. -struct LinkedChunkUpdatesInner { +/// [`Updates::take`], but the private API must also read the updates for +/// example with [`UpdatesSubscriber`]. Of course, they can be multiple +/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple +/// readers. +struct UpdatesInner { /// All the updates that have not been read by all readers. - updates: Vec>, + updates: Vec>, /// Updates are stored in [`Self::updates`]. Multiple readers can read them. /// A reader is identified by a [`ReaderToken`]. @@ -173,12 +173,12 @@ struct LinkedChunkUpdatesInner { /// The last generated token. This is useful to generate new token. last_token: ReaderToken, - /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. A waker is removed + /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed /// everytime it is called. wakers: Vec, } -impl LinkedChunkUpdatesInner { +impl UpdatesInner { /// The token used by the main reader. See [`Self::take`] to learn more. const MAIN_READER_TOKEN: ReaderToken = 0; @@ -198,7 +198,7 @@ impl LinkedChunkUpdatesInner { } /// Push a new update. - fn push(&mut self, update: LinkedChunkUpdate) { + fn push(&mut self, update: Update) { self.updates.push(update); // Wake them up \o/. @@ -214,7 +214,7 @@ impl LinkedChunkUpdatesInner { /// reader. /// /// Learn more by reading [`Self::take_with_token`]. - fn take(&mut self) -> &[LinkedChunkUpdate] { + fn take(&mut self) -> &[Update] { self.take_with_token(Self::MAIN_READER_TOKEN) } @@ -225,7 +225,7 @@ impl LinkedChunkUpdatesInner { /// take/read/consume each update only once. An internal index is stored /// per reader token to know where to start reading updates next time this /// method is called. - fn take_with_token(&mut self, token: ReaderToken) -> &[LinkedChunkUpdate] { + fn take_with_token(&mut self, token: ReaderToken) -> &[Update] { // Let's garbage collect unused updates. self.garbage_collect(); @@ -265,21 +265,21 @@ impl LinkedChunkUpdatesInner { } } -impl LinkedChunkUpdates { +impl Updates { /// Create a new [`Self`]. fn new() -> Self { - Self { inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner::new())) } + Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) } } /// Push a new update. - fn push(&mut self, update: LinkedChunkUpdate) { + fn push(&mut self, update: Update) { self.inner.write().unwrap().push(update); } /// Take new updates. /// /// Updates that have been taken will not be read again. - pub fn take(&mut self) -> Vec> + pub fn take(&mut self) -> Vec> where Item: Clone, Gap: Clone, @@ -288,7 +288,7 @@ impl LinkedChunkUpdates { } /// Subscribe to updates by using a [`Stream`]. - fn subscribe(&mut self) -> LinkedChunkUpdatesSubscriber { + fn subscribe(&mut self) -> UpdatesSubscriber { // A subscriber is a new update reader, it needs its own token. let token = { let mut inner = self.inner.write().unwrap(); @@ -300,33 +300,33 @@ impl LinkedChunkUpdates { last_token }; - LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } + UpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } } } -/// A subscriber to [`LinkedChunkUpdates`]. It is helpful to receive updates via -/// a [`Stream`]. -struct LinkedChunkUpdatesSubscriber { - /// Weak reference to [`LinkedChunkUpdatesInner`]. +/// A subscriber to [`Updates`]. It is helpful to receive updates via a +/// [`Stream`]. +struct UpdatesSubscriber { + /// Weak reference to [`UpdatesInner`]. /// - /// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped + /// Using a weak reference allows [`Updates`] to be dropped /// freely even if a subscriber exists. - updates: Weak>>, + updates: Weak>>, /// The token to read the updates. token: ReaderToken, } -impl Stream for LinkedChunkUpdatesSubscriber +impl Stream for UpdatesSubscriber where Item: Clone, Gap: Clone, { - type Item = Vec>; + type Item = Vec>; fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { let Some(updates) = self.updates.upgrade() else { - // The `LinkedChunkUpdates` has been dropped. It's time to close this stream. + // The `Updates` has been dropped. It's time to close this stream. return Poll::Ready(None); }; @@ -351,14 +351,14 @@ where /// /// This type was introduced to avoid borrow checking errors when mutably /// referencing a subset of fields of a `LinkedChunk`. -struct LinkedChunkEnds { +struct Ends { /// The first chunk. first: NonNull>, /// The last chunk. last: Option>>, } -impl LinkedChunkEnds { +impl Ends { /// Get the first chunk, as an immutable reference. fn first_chunk(&self) -> &Chunk { unsafe { self.first.as_ref() } @@ -409,15 +409,19 @@ impl LinkedChunkEnds { /// entirely full. A chunk can represents a `Gap` between other chunks. pub struct LinkedChunk { /// The links to the chunks, i.e. the first and the last chunk. - links: LinkedChunkEnds, + links: Ends, + /// The number of items hold by this linked chunk. length: usize, + /// The generator of chunk identifiers. chunk_identifier_generator: ChunkIdentifierGenerator, + /// All updates that have been made on this `LinkedChunk`. If this field is /// `Some(…)`, update history is enabled, otherwise, if it's `None`, update /// history is disabled. - updates: Option>, + updates: Option>, + /// Marker. marker: PhantomData>>, } @@ -426,7 +430,7 @@ impl LinkedChunk { /// Create a new [`Self`]. pub fn new() -> Self { Self { - links: LinkedChunkEnds { + links: Ends { // INVARIANT: The first chunk must always be an Items, not a Gap. first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER), last: None, @@ -440,19 +444,18 @@ impl LinkedChunk { /// Create a new [`Self`] with a history of updates. /// - /// When [`Self`] is built with update history, the - /// [`LinkedChunkUpdates::take`] method must be called to consume and clean - /// the updates. See [`Self::updates`]. + /// When [`Self`] is built with update history, the [`Updates::take`] method + /// must be called to consume and clean the updates. See [`Self::updates`]. pub fn new_with_update_history() -> Self { Self { - links: LinkedChunkEnds { + links: Ends { // INVARIANT: The first chunk must always be an Items, not a Gap. first: Chunk::new_items_leaked(ChunkIdentifierGenerator::FIRST_IDENTIFIER), last: None, }, length: 0, chunk_identifier_generator: ChunkIdentifierGenerator::new_from_scratch(), - updates: Some(LinkedChunkUpdates::new()), + updates: Some(Updates::new()), marker: PhantomData, } } @@ -516,11 +519,7 @@ impl LinkedChunk { /// /// Because the `position` can be invalid, this method returns a /// `Result`. - pub fn insert_items_at( - &mut self, - items: I, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_items_at(&mut self, items: I, position: Position) -> Result<(), Error> where Item: Clone, Gap: Clone, @@ -533,18 +532,18 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; let (chunk, number_of_items) = match &mut chunk.content { ChunkContent::Gap(..) => { - return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier }) + return Err(Error::ChunkIsAGap { identifier: chunk_identifier }) } ChunkContent::Items(current_items) => { let current_items_length = current_items.len(); if item_index > current_items_length { - return Err(LinkedChunkError::InvalidItemIndex { index: item_index }); + return Err(Error::InvalidItemIndex { index: item_index }); } // Prepare the items to be pushed. @@ -561,7 +560,7 @@ impl LinkedChunk { // Insert inside the current items. else { if let Some(updates) = self.updates.as_mut() { - updates.push(LinkedChunkUpdate::TruncateItems { + updates.push(Update::TruncateItems { chunk: chunk_identifier, length: item_index, }); @@ -602,11 +601,7 @@ impl LinkedChunk { /// /// Because the `position` can be invalid, this method returns a /// `Result`. - pub fn insert_gap_at( - &mut self, - content: Gap, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_gap_at(&mut self, content: Gap, position: Position) -> Result<(), Error> where Item: Clone, Gap: Clone, @@ -617,7 +612,7 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; // If `item_index` is 0, we don't want to split the current items chunk to // insert a new gap chunk, otherwise it would create an empty current items @@ -645,18 +640,18 @@ impl LinkedChunk { let chunk = match &mut chunk.content { ChunkContent::Gap(..) => { - return Err(LinkedChunkError::ChunkIsAGap { identifier: chunk_identifier }); + return Err(Error::ChunkIsAGap { identifier: chunk_identifier }); } ChunkContent::Items(current_items) => { let current_items_length = current_items.len(); if item_index >= current_items_length { - return Err(LinkedChunkError::InvalidItemIndex { index: item_index }); + return Err(Error::InvalidItemIndex { index: item_index }); } if let Some(updates) = self.updates.as_mut() { - updates.push(LinkedChunkUpdate::TruncateItems { + updates.push(Update::TruncateItems { chunk: chunk_identifier, length: item_index, }); @@ -707,7 +702,7 @@ impl LinkedChunk { &mut self, items: I, chunk_identifier: ChunkIdentifier, - ) -> Result<&Chunk, LinkedChunkError> + ) -> Result<&Chunk, Error> where Item: Clone, Gap: Clone, @@ -721,7 +716,7 @@ impl LinkedChunk { let chunk = self .links .chunk_mut(chunk_identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier: chunk_identifier })?; + .ok_or(Error::InvalidChunkIdentifier { identifier: chunk_identifier })?; debug_assert!(chunk.is_first_chunk().not(), "A gap cannot be the first chunk"); @@ -745,7 +740,7 @@ impl LinkedChunk { ) } ChunkContent::Items(..) => { - return Err(LinkedChunkError::ChunkIsItems { identifier: chunk_identifier }) + return Err(Error::ChunkIsItems { identifier: chunk_identifier }) } }; @@ -803,15 +798,15 @@ impl LinkedChunk { /// Iterate over the chunks, backwards. /// /// It iterates from the last to the first chunk. - pub fn rchunks(&self) -> LinkedChunkIterBackward<'_, CAP, Item, Gap> { - LinkedChunkIterBackward::new(self.links.latest_chunk()) + pub fn rchunks(&self) -> IterBackward<'_, CAP, Item, Gap> { + IterBackward::new(self.links.latest_chunk()) } /// Iterate over the chunks, forward. /// /// It iterates from the first to the last chunk. - pub fn chunks(&self) -> LinkedChunkIter<'_, CAP, Item, Gap> { - LinkedChunkIter::new(self.links.first_chunk()) + pub fn chunks(&self) -> Iter<'_, CAP, Item, Gap> { + Iter::new(self.links.first_chunk()) } /// Iterate over the chunks, starting from `identifier`, backward. @@ -821,11 +816,9 @@ impl LinkedChunk { pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> { - Ok(LinkedChunkIterBackward::new( - self.links - .chunk(identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?, + ) -> Result, Error> { + Ok(IterBackward::new( + self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?, )) } @@ -836,11 +829,9 @@ impl LinkedChunk { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> { - Ok(LinkedChunkIter::new( - self.links - .chunk(identifier) - .ok_or(LinkedChunkError::InvalidChunkIdentifier { identifier })?, + ) -> Result, Error> { + Ok(Iter::new( + self.links.chunk(identifier).ok_or(Error::InvalidChunkIdentifier { identifier })?, )) } @@ -868,7 +859,7 @@ impl LinkedChunk { pub fn ritems_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { Ok(self .rchunks_from(position.chunk_identifier())? .filter_map(|chunk| match &chunk.content { @@ -899,7 +890,7 @@ impl LinkedChunk { pub fn items_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { Ok(self .chunks_from(position.chunk_identifier())? .filter_map(|chunk| match &chunk.content { @@ -918,16 +909,14 @@ impl LinkedChunk { .skip(position.index())) } - /// Get a mutable reference to the `LinkedChunk` updates, aka - /// [`LinkedChunkUpdates`]. + /// Get a mutable reference to the `LinkedChunk` updates, aka [`Updates`]. /// /// The caller is responsible to clear these updates. /// /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use - /// `Option::take()` directly but rather [`LinkedChunkUpdates::take`] for - /// example. - pub fn updates(&mut self) -> Option<&mut LinkedChunkUpdates> { + /// `Option::take()` directly but rather [`Updates::take`] for example. + pub fn updates(&mut self) -> Option<&mut Updates> { self.updates.as_mut() } } @@ -1043,18 +1032,18 @@ impl Position { /// An iterator over a [`LinkedChunk`] that traverses the chunk in backward /// direction (i.e. it calls `previous` on each chunk to make progress). -pub struct LinkedChunkIterBackward<'a, const CAP: usize, Item, Gap> { +pub struct IterBackward<'a, const CAP: usize, Item, Gap> { chunk: Option<&'a Chunk>, } -impl<'a, const CAP: usize, Item, Gap> LinkedChunkIterBackward<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> IterBackward<'a, CAP, Item, Gap> { /// Create a new [`LinkedChunkIter`] from a particular [`Chunk`]. fn new(from_chunk: &'a Chunk) -> Self { Self { chunk: Some(from_chunk) } } } -impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iterator for IterBackward<'a, CAP, Item, Gap> { type Item = &'a Chunk; fn next(&mut self) -> Option { @@ -1068,18 +1057,18 @@ impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIterBackward<'a, C /// An iterator over a [`LinkedChunk`] that traverses the chunk in forward /// direction (i.e. it calls `next` on each chunk to make progress). -pub struct LinkedChunkIter<'a, const CAP: usize, Item, Gap> { +pub struct Iter<'a, const CAP: usize, Item, Gap> { chunk: Option<&'a Chunk>, } -impl<'a, const CAP: usize, Item, Gap> LinkedChunkIter<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iter<'a, CAP, Item, Gap> { /// Create a new [`LinkedChunkIter`] from a particular [`Chunk`]. fn new(from_chunk: &'a Chunk) -> Self { Self { chunk: Some(from_chunk) } } } -impl<'a, const CAP: usize, Item, Gap> Iterator for LinkedChunkIter<'a, CAP, Item, Gap> { +impl<'a, const CAP: usize, Item, Gap> Iterator for Iter<'a, CAP, Item, Gap> { type Item = &'a Chunk; fn next(&mut self) -> Option { @@ -1227,7 +1216,7 @@ impl Chunk { &mut self, mut new_items: I, chunk_identifier_generator: &ChunkIdentifierGenerator, - updates: &mut Option>, + updates: &mut Option>, ) -> &mut Self where I: Iterator + ExactSizeIterator, @@ -1266,7 +1255,7 @@ impl Chunk { items.extend(new_items); if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::InsertItems { + updates.push(Update::InsertItems { at: Position(identifier, start), items: items[start..].to_vec(), }); @@ -1281,7 +1270,7 @@ impl Chunk { items.extend(new_items.by_ref().take(free_space)); if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::InsertItems { + updates.push(Update::InsertItems { at: Position(identifier, start), items: items[start..].to_vec(), }); @@ -1309,7 +1298,7 @@ impl Chunk { fn insert_next( &mut self, mut new_chunk_ptr: NonNull, - updates: &mut Option>, + updates: &mut Option>, ) -> &mut Self where Gap: Clone, @@ -1336,15 +1325,12 @@ impl Chunk { let next = new_chunk.next().map(Chunk::identifier); match new_chunk.content() { - ChunkContent::Gap(gap) => updates.push(LinkedChunkUpdate::NewGapChunk { - previous, - new, - next, - gap: gap.clone(), - }), + ChunkContent::Gap(gap) => { + updates.push(Update::NewGapChunk { previous, new, next, gap: gap.clone() }) + } ChunkContent::Items(..) => { - updates.push(LinkedChunkUpdate::NewItemsChunk { previous, new, next }) + updates.push(Update::NewItemsChunk { previous, new, next }) } } } @@ -1356,7 +1342,7 @@ impl Chunk { /// /// Be careful: `self` won't belong to `LinkedChunk` anymore, and should be /// dropped appropriately. - fn unlink(&mut self, updates: &mut Option>) { + fn unlink(&mut self, updates: &mut Option>) { let previous_ptr = self.previous; let next_ptr = self.next; @@ -1369,7 +1355,7 @@ impl Chunk { } if let Some(updates) = updates.as_mut() { - updates.push(LinkedChunkUpdate::RemoveChunk(self.identifier())); + updates.push(Update::RemoveChunk(self.identifier())); } } @@ -1438,10 +1424,10 @@ mod tests { use futures_util::pin_mut; use super::{ - Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk, - LinkedChunkError, Position, Stream, + Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Error, LinkedChunk, + Position, Stream, }; - use crate::event_cache::linked_chunk::LinkedChunkUpdatesInner; + use crate::event_cache::linked_chunk::UpdatesInner; /// A macro to test the items and the gap of a `LinkedChunk`. /// A chunk is delimited by `[` and `]`. An item chunk has the form `[a, b, @@ -1551,12 +1537,12 @@ mod tests { #[test] fn test_updates_take_and_garbage_collector() { - use super::LinkedChunkUpdate::*; + use super::Update::*; let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); // Simulate another updates “reader”, it can a subscriber. - let main_token = LinkedChunkUpdatesInner::::MAIN_READER_TOKEN; + let main_token = UpdatesInner::::MAIN_READER_TOKEN; let other_token = { let updates = linked_chunk.updates().unwrap(); let mut inner = updates.inner.write().unwrap(); @@ -1780,7 +1766,7 @@ mod tests { #[test] fn test_updates_stream() { - use super::LinkedChunkUpdate::*; + use super::Update::*; struct CounterWaker { number_of_wakeup: Mutex, @@ -1864,7 +1850,7 @@ mod tests { #[test] fn test_push_items() { - use super::LinkedChunkUpdate::*; + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a']); @@ -1922,7 +1908,7 @@ mod tests { #[test] fn test_push_gap() { - use super::LinkedChunkUpdate::*; + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a']); @@ -2092,7 +2078,7 @@ mod tests { } #[test] - fn test_rchunks_from() -> Result<(), LinkedChunkError> { + fn test_rchunks_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -2124,7 +2110,7 @@ mod tests { } #[test] - fn test_chunks_from() -> Result<(), LinkedChunkError> { + fn test_chunks_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -2186,7 +2172,7 @@ mod tests { } #[test] - fn test_ritems_from() -> Result<(), LinkedChunkError> { + fn test_ritems_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -2204,7 +2190,7 @@ mod tests { } #[test] - fn test_items_from() -> Result<(), LinkedChunkError> { + fn test_items_from() -> Result<(), Error> { let mut linked_chunk = LinkedChunk::<2, char, ()>::new(); linked_chunk.push_items_back(['a', 'b']); linked_chunk.push_gap_back(()); @@ -2222,8 +2208,8 @@ mod tests { } #[test] - fn test_insert_items_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_insert_items_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); @@ -2350,7 +2336,7 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), - Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) + Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2359,7 +2345,7 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), - Err(LinkedChunkError::InvalidItemIndex { index: 128 }) + Err(Error::InvalidItemIndex { index: 128 }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2384,7 +2370,7 @@ mod tests { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(6), 0)), - Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(6) }) + Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(6) }) ); } @@ -2394,8 +2380,8 @@ mod tests { } #[test] - fn test_insert_gap_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_insert_gap_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); @@ -2491,7 +2477,7 @@ mod tests { let position_of_first_empty_chunk = Position(ChunkIdentifier(0), 0); assert_matches!( linked_chunk.insert_gap_at((), position_of_first_empty_chunk), - Err(LinkedChunkError::InvalidItemIndex { index: 0 }) + Err(Error::InvalidItemIndex { index: 0 }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2533,7 +2519,7 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), - Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) + Err(Error::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2542,7 +2528,7 @@ mod tests { { assert_matches!( linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), - Err(LinkedChunkError::InvalidItemIndex { index: 128 }) + Err(Error::InvalidItemIndex { index: 128 }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2554,7 +2540,7 @@ mod tests { let position_of_a_gap = Position(ChunkIdentifier(4), 0); assert_matches!( linked_chunk.insert_gap_at((), position_of_a_gap), - Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) }) + Err(Error::ChunkIsAGap { identifier: ChunkIdentifier(4) }) ); assert!(linked_chunk.updates().unwrap().take().is_empty()); } @@ -2565,8 +2551,8 @@ mod tests { } #[test] - fn test_replace_gap_at() -> Result<(), LinkedChunkError> { - use super::LinkedChunkUpdate::*; + fn test_replace_gap_at() -> Result<(), Error> { + use super::Update::*; let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); linked_chunk.push_items_back(['a', 'b']); diff --git a/crates/matrix-sdk/src/event_cache/store.rs b/crates/matrix-sdk/src/event_cache/store.rs index 842207354fe..fce7c29709c 100644 --- a/crates/matrix-sdk/src/event_cache/store.rs +++ b/crates/matrix-sdk/src/event_cache/store.rs @@ -17,8 +17,7 @@ use std::{fmt, iter::once}; use matrix_sdk_common::deserialized_responses::SyncTimelineEvent; use super::linked_chunk::{ - Chunk, ChunkIdentifier, LinkedChunk, LinkedChunkError, LinkedChunkIter, - LinkedChunkIterBackward, Position, + Chunk, ChunkIdentifier, Error, Iter, IterBackward, LinkedChunk, Position, }; /// A newtype wrapper for a pagination token returned by a /messages response. @@ -82,11 +81,7 @@ impl RoomEvents { } /// Insert events at a specified position. - pub fn insert_events_at( - &mut self, - events: I, - position: Position, - ) -> Result<(), LinkedChunkError> + pub fn insert_events_at(&mut self, events: I, position: Position) -> Result<(), Error> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -95,7 +90,7 @@ impl RoomEvents { } /// Insert a gap at a specified position. - pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), LinkedChunkError> { + pub fn insert_gap_at(&mut self, gap: Gap, position: Position) -> Result<(), Error> { self.chunks.insert_gap_at(gap, position) } @@ -110,7 +105,7 @@ impl RoomEvents { &mut self, events: I, gap_identifier: ChunkIdentifier, - ) -> Result<&Chunk, LinkedChunkError> + ) -> Result<&Chunk, Error> where I: IntoIterator, I::IntoIter: ExactSizeIterator, @@ -137,16 +132,14 @@ impl RoomEvents { /// Iterate over the chunks, backward. /// /// The most recent chunk comes first. - pub fn rchunks( - &self, - ) -> LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { + pub fn rchunks(&self) -> IterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { self.chunks.rchunks() } /// Iterate over the chunks, forward. /// /// The oldest chunk comes first. - pub fn chunks(&self) -> LinkedChunkIter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { + pub fn chunks(&self) -> Iter<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap> { self.chunks.chunks() } @@ -154,10 +147,7 @@ impl RoomEvents { pub fn rchunks_from( &self, identifier: ChunkIdentifier, - ) -> Result< - LinkedChunkIterBackward<'_, DEFAULT_CHUNK_CAPACITY, SyncTimelineEvent, Gap>, - LinkedChunkError, - > { + ) -> Result, Error> { self.chunks.rchunks_from(identifier) } @@ -166,8 +156,7 @@ impl RoomEvents { pub fn chunks_from( &self, identifier: ChunkIdentifier, - ) -> Result, LinkedChunkError> - { + ) -> Result, Error> { self.chunks.chunks_from(identifier) } @@ -189,7 +178,7 @@ impl RoomEvents { pub fn revents_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { self.chunks.ritems_from(position) } @@ -198,7 +187,7 @@ impl RoomEvents { pub fn events_from( &self, position: Position, - ) -> Result, LinkedChunkError> { + ) -> Result, Error> { self.chunks.items_from(position) } } From 44a78ba9f6e77adb69845a50e11b48b3e2b87ef8 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 May 2024 13:58:49 +0200 Subject: [PATCH 8/9] chore(sdk): Thanks Clippy. --- .../src/event_cache/linked_chunk.rs | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 1b5fe1948b7..90fd3fd943b 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -110,23 +110,16 @@ where { fn clone(&self) -> Self { match self { - Self::NewItemsChunk { previous, new, next } => Self::NewItemsChunk { - previous: previous.clone(), - new: new.clone(), - next: next.clone(), - }, - Self::NewGapChunk { previous, new, next, gap } => Self::NewGapChunk { - previous: previous.clone(), - new: new.clone(), - next: next.clone(), - gap: gap.clone(), - }, - Self::RemoveChunk(identifier) => Self::RemoveChunk(identifier.clone()), - Self::InsertItems { at, items } => { - Self::InsertItems { at: at.clone(), items: items.clone() } + Self::NewItemsChunk { previous, new, next } => { + Self::NewItemsChunk { previous: *previous, new: *new, next: *next } + } + Self::NewGapChunk { previous, new, next, gap } => { + Self::NewGapChunk { previous: *previous, new: *new, next: *next, gap: gap.clone() } } + Self::RemoveChunk(identifier) => Self::RemoveChunk(*identifier), + Self::InsertItems { at, items } => Self::InsertItems { at: *at, items: items.clone() }, Self::TruncateItems { chunk, length } => { - Self::TruncateItems { chunk: chunk.clone(), length: length.clone() } + Self::TruncateItems { chunk: *chunk, length: *length } } } } @@ -252,7 +245,7 @@ impl UpdatesInner { /// Basically, it reduces to finding the smallest last index for all /// readers, and clear from 0 to that index. fn garbage_collect(&mut self) { - let min_index = self.last_index_per_reader.values().min().map(|min| *min).unwrap_or(0); + let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0); if min_index > 0 { let _ = self.updates.drain(0..min_index); From 1f26822a64e1b56c87e39ba26c67f827fb53789d Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 8 May 2024 14:47:45 +0200 Subject: [PATCH 9/9] chore(sdk): Split `linked_chunk` into 2 modules. --- .../{linked_chunk.rs => linked_chunk/mod.rs} | 628 +---------------- .../src/event_cache/linked_chunk/updates.rs | 644 ++++++++++++++++++ 2 files changed, 649 insertions(+), 623 deletions(-) rename crates/matrix-sdk/src/event_cache/{linked_chunk.rs => linked_chunk/mod.rs} (77%) create mode 100644 crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs similarity index 77% rename from crates/matrix-sdk/src/event_cache/linked_chunk.rs rename to crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs index 90fd3fd943b..71a35d53d1f 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/mod.rs @@ -14,21 +14,17 @@ #![allow(dead_code)] +mod updates; + use std::{ - collections::HashMap, fmt, marker::PhantomData, ops::Not, - pin::Pin, ptr::NonNull, - sync::{ - atomic::{AtomicU64, Ordering}, - Arc, RwLock, Weak, - }, - task::{Context, Poll, Waker}, + sync::atomic::{AtomicU64, Ordering}, }; -use futures_core::Stream; +use updates::*; /// Errors of [`LinkedChunk`]. #[derive(thiserror::Error, Debug)] @@ -46,300 +42,6 @@ pub enum Error { InvalidItemIndex { index: usize }, } -/// Represent the updates that have happened inside a [`LinkedChunk`]. -/// -/// To retrieve the updates, use [`LinkedChunk::updates`]. -/// -/// These updates are useful to store a `LinkedChunk` in another form of -/// storage, like a database or something similar. -#[derive(Debug, PartialEq)] -pub enum Update { - /// A new chunk of kind Items has been created. - NewItemsChunk { - /// The identifier of the previous chunk of this new chunk. - previous: Option, - - /// The identifier of the new chunk. - new: ChunkIdentifier, - - /// The identifier of the next chunk of this new chunk. - next: Option, - }, - - /// A new chunk of kind Gap has been created. - NewGapChunk { - /// The identifier of the previous chunk of this new chunk. - previous: Option, - - /// The identifier of the new chunk. - new: ChunkIdentifier, - - /// The identifier of the next chunk of this new chunk. - next: Option, - - /// The content of the chunk. - gap: Gap, - }, - - /// A chunk has been removed. - RemoveChunk(ChunkIdentifier), - - /// Items are inserted inside a chunk of kind Items. - InsertItems { - /// [`Position`] of the items. - at: Position, - - /// The items. - items: Vec, - }, - - /// A chunk of kind Items has been truncated. - TruncateItems { - /// The identifier of the chunk. - chunk: ChunkIdentifier, - - /// The new length of the chunk. - length: usize, - }, -} - -impl Clone for Update -where - Item: Clone, - Gap: Clone, -{ - fn clone(&self) -> Self { - match self { - Self::NewItemsChunk { previous, new, next } => { - Self::NewItemsChunk { previous: *previous, new: *new, next: *next } - } - Self::NewGapChunk { previous, new, next, gap } => { - Self::NewGapChunk { previous: *previous, new: *new, next: *next, gap: gap.clone() } - } - Self::RemoveChunk(identifier) => Self::RemoveChunk(*identifier), - Self::InsertItems { at, items } => Self::InsertItems { at: *at, items: items.clone() }, - Self::TruncateItems { chunk, length } => { - Self::TruncateItems { chunk: *chunk, length: *length } - } - } - } -} - -/// A collection of [`Update`]. -/// -/// Get a value for this type with [`LinkedChunk::updates`]. -pub struct Updates { - inner: Arc>>, -} - -/// A token used to represent readers that read the updates in -/// [`UpdatesInner`]. -type ReaderToken = usize; - -/// Inner type for [`Updates`]. -/// -/// The particularity of this type is that multiple readers can read the -/// updates. A reader has a [`ReaderToken`]. The public API (i.e. -/// [`Updates`]) is considered to be the _main reader_ (it has the token -/// [`Self::MAIN_READER_TOKEN`]). -/// -/// An update that have been read by all readers are garbage collected to be -/// removed from the memory. An update will never be read twice by the same -/// reader. -/// -/// Why do we need multiple readers? The public API reads the updates with -/// [`Updates::take`], but the private API must also read the updates for -/// example with [`UpdatesSubscriber`]. Of course, they can be multiple -/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple -/// readers. -struct UpdatesInner { - /// All the updates that have not been read by all readers. - updates: Vec>, - - /// Updates are stored in [`Self::updates`]. Multiple readers can read them. - /// A reader is identified by a [`ReaderToken`]. - /// - /// To each reader token is associated an index that represents the index of - /// the last reading. It is used to never return the same update twice. - last_index_per_reader: HashMap, - - /// The last generated token. This is useful to generate new token. - last_token: ReaderToken, - - /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed - /// everytime it is called. - wakers: Vec, -} - -impl UpdatesInner { - /// The token used by the main reader. See [`Self::take`] to learn more. - const MAIN_READER_TOKEN: ReaderToken = 0; - - /// Create a new [`Self`]. - fn new() -> Self { - Self { - updates: Vec::with_capacity(8), - last_index_per_reader: { - let mut map = HashMap::with_capacity(2); - map.insert(Self::MAIN_READER_TOKEN, 0); - - map - }, - last_token: Self::MAIN_READER_TOKEN, - wakers: Vec::with_capacity(2), - } - } - - /// Push a new update. - fn push(&mut self, update: Update) { - self.updates.push(update); - - // Wake them up \o/. - for waker in self.wakers.drain(..) { - waker.wake(); - } - } - - /// Take new updates; it considers the caller is the main reader, i.e. it - /// will use the [`Self::MAIN_READER_TOKEN`]. - /// - /// Updates that have been read will never be read again by the current - /// reader. - /// - /// Learn more by reading [`Self::take_with_token`]. - fn take(&mut self) -> &[Update] { - self.take_with_token(Self::MAIN_READER_TOKEN) - } - - /// Take new updates with a particular reader token. - /// - /// Updates are stored in [`Self::updates`]. Multiple readers can read them. - /// A reader is identified by a [`ReaderToken`]. Every reader can - /// take/read/consume each update only once. An internal index is stored - /// per reader token to know where to start reading updates next time this - /// method is called. - fn take_with_token(&mut self, token: ReaderToken) -> &[Update] { - // Let's garbage collect unused updates. - self.garbage_collect(); - - let index = self - .last_index_per_reader - .get_mut(&token) - .expect("Given `UpdatesToken` does not map to any index"); - - // Read new updates, and update the index. - let slice = &self.updates[*index..]; - *index = self.updates.len(); - - slice - } - - /// Return the number of updates in the buffer. - fn len(&self) -> usize { - self.updates.len() - } - - /// Garbage collect unused updates. An update is considered unused when it's - /// been read by all readers. - /// - /// Basically, it reduces to finding the smallest last index for all - /// readers, and clear from 0 to that index. - fn garbage_collect(&mut self) { - let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0); - - if min_index > 0 { - let _ = self.updates.drain(0..min_index); - - // Let's shift the indices to the left by `min_index` to preserve them. - for index in self.last_index_per_reader.values_mut() { - *index -= min_index; - } - } - } -} - -impl Updates { - /// Create a new [`Self`]. - fn new() -> Self { - Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) } - } - - /// Push a new update. - fn push(&mut self, update: Update) { - self.inner.write().unwrap().push(update); - } - - /// Take new updates. - /// - /// Updates that have been taken will not be read again. - pub fn take(&mut self) -> Vec> - where - Item: Clone, - Gap: Clone, - { - self.inner.write().unwrap().take().to_owned() - } - - /// Subscribe to updates by using a [`Stream`]. - fn subscribe(&mut self) -> UpdatesSubscriber { - // A subscriber is a new update reader, it needs its own token. - let token = { - let mut inner = self.inner.write().unwrap(); - inner.last_token += 1; - - let last_token = inner.last_token; - inner.last_index_per_reader.insert(last_token, 0); - - last_token - }; - - UpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } - } -} - -/// A subscriber to [`Updates`]. It is helpful to receive updates via a -/// [`Stream`]. -struct UpdatesSubscriber { - /// Weak reference to [`UpdatesInner`]. - /// - /// Using a weak reference allows [`Updates`] to be dropped - /// freely even if a subscriber exists. - updates: Weak>>, - - /// The token to read the updates. - token: ReaderToken, -} - -impl Stream for UpdatesSubscriber -where - Item: Clone, - Gap: Clone, -{ - type Item = Vec>; - - fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { - let Some(updates) = self.updates.upgrade() else { - // The `Updates` has been dropped. It's time to close this stream. - return Poll::Ready(None); - }; - - let mut updates = updates.write().unwrap(); - let the_updates = updates.take_with_token(self.token); - - // No updates. - if the_updates.is_empty() { - // Let's register the waker. - updates.wakers.push(context.waker().clone()); - - // The stream is pending. - return Poll::Pending; - } - - // There is updates! Let's forward them in this stream. - Poll::Ready(Some(the_updates.to_owned())) - } -} - /// Links of a `LinkedChunk`, i.e. the first and last [`Chunk`]. /// /// This type was introduced to avoid borrow checking errors when mutably @@ -1408,19 +1110,12 @@ where #[cfg(test)] mod tests { - use std::{ - sync::{Arc, Mutex}, - task::{Context, Poll, Wake}, - }; - use assert_matches::assert_matches; - use futures_util::pin_mut; use super::{ Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, Error, LinkedChunk, - Position, Stream, + Position, }; - use crate::event_cache::linked_chunk::UpdatesInner; /// A macro to test the items and the gap of a `LinkedChunk`. /// A chunk is delimited by `[` and `]`. An item chunk has the form `[a, b, @@ -1528,319 +1223,6 @@ mod tests { assert!(LinkedChunk::<3, char, ()>::new_with_update_history().updates().is_some()); } - #[test] - fn test_updates_take_and_garbage_collector() { - use super::Update::*; - - let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); - - // Simulate another updates “reader”, it can a subscriber. - let main_token = UpdatesInner::::MAIN_READER_TOKEN; - let other_token = { - let updates = linked_chunk.updates().unwrap(); - let mut inner = updates.inner.write().unwrap(); - inner.last_token += 1; - - let other_token = inner.last_token; - inner.last_index_per_reader.insert(other_token, 0); - - other_token - }; - - // There is no new update yet. - { - let updates = linked_chunk.updates().unwrap(); - - assert!(updates.take().is_empty()); - assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); - } - - linked_chunk.push_items_back(['a']); - linked_chunk.push_items_back(['b']); - linked_chunk.push_items_back(['c']); - - // Scenario 1: “main” takes the new updates, “other” doesn't take the new - // updates. - // - // 0 1 2 3 - // +---+---+---+ - // | a | b | c | - // +---+---+---+ - // - // “main” will move its index from 0 to 3. - // “other” won't move its index. - { - let updates = linked_chunk.updates().unwrap(); - - { - // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 3); - } - - assert_eq!( - updates.take(), - &[ - InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, - InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, - InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, - ] - ); - - { - let inner = updates.inner.read().unwrap(); - - // Inspect number of updates in memory. - // It must be the same number as before as the garbage collector weren't not - // able to remove any unused updates. - assert_eq!(inner.len(), 3); - - // Inspect the indices. - let indices = &inner.last_index_per_reader; - - assert_eq!(indices.get(&main_token), Some(&3)); - assert_eq!(indices.get(&other_token), Some(&0)); - } - } - - linked_chunk.push_items_back(['d']); - linked_chunk.push_items_back(['e']); - linked_chunk.push_items_back(['f']); - - // Scenario 2: “other“ takes the new updates, “main” doesn't take the - // new updates. - // - // 0 1 2 3 4 5 6 - // +---+---+---+---+---+---+ - // | a | b | c | d | e | f | - // +---+---+---+---+---+---+ - // - // “main” won't move its index. - // “other” will move its index from 0 to 6. - { - let updates = linked_chunk.updates().unwrap(); - - assert_eq!( - updates.inner.write().unwrap().take_with_token(other_token), - &[ - InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, - InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, - InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, - InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, - InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, - InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, - ] - ); - - { - let inner = updates.inner.read().unwrap(); - - // Inspect number of updates in memory. - // It must be the same number as before as the garbage collector will be able to - // remove unused updates but at the next call… - assert_eq!(inner.len(), 6); - - // Inspect the indices. - let indices = &inner.last_index_per_reader; - - assert_eq!(indices.get(&main_token), Some(&3)); - assert_eq!(indices.get(&other_token), Some(&6)); - } - } - - // Scenario 3: “other” take new updates, but there is none, “main” - // doesn't take new updates. The garbage collector will run and collect - // unused updates. - // - // 0 1 2 3 - // +---+---+---+ - // | d | e | f | - // +---+---+---+ - // - // “main” will have its index updated from 3 to 0. - // “other” will have its index updated from 6 to 3. - { - let updates = linked_chunk.updates().unwrap(); - - assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); - - { - let inner = updates.inner.read().unwrap(); - - // Inspect number of updates in memory. - // The garbage collector has removed unused updates. - assert_eq!(inner.len(), 3); - - // Inspect the indices. They must have been adjusted. - let indices = &inner.last_index_per_reader; - - assert_eq!(indices.get(&main_token), Some(&0)); - assert_eq!(indices.get(&other_token), Some(&3)); - } - } - - linked_chunk.push_items_back(['g']); - linked_chunk.push_items_back(['h']); - linked_chunk.push_items_back(['i']); - - // Scenario 4: both “main” and “other” take the new updates. - // - // 0 1 2 3 4 5 6 - // +---+---+---+---+---+---+ - // | d | e | f | g | h | i | - // +---+---+---+---+---+---+ - // - // “main” will have its index updated from 3 to 0. - // “other” will have its index updated from 6 to 3. - { - let updates = linked_chunk.updates().unwrap(); - - assert_eq!( - updates.take(), - &[ - InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, - InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, - InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, - InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, - InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, - InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, - ] - ); - assert_eq!( - updates.inner.write().unwrap().take_with_token(other_token), - &[ - InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, - InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, - InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, - ] - ); - - { - let inner = updates.inner.read().unwrap(); - - // Inspect number of updates in memory. - // The garbage collector had a chance to collect the first 3 updates. - assert_eq!(inner.len(), 3); - - // Inspect the indices. - let indices = &inner.last_index_per_reader; - - assert_eq!(indices.get(&main_token), Some(&3)); - assert_eq!(indices.get(&other_token), Some(&3)); - } - } - - // Scenario 5: no more updates but they both try to take new updates. - // The garbage collector will collect all updates as all of them as - // been read already. - // - // “main” will have its index updated from 0 to 0. - // “other” will have its index updated from 3 to 0. - { - let updates = linked_chunk.updates().unwrap(); - - assert!(updates.take().is_empty()); - assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); - - { - let inner = updates.inner.read().unwrap(); - - // Inspect number of updates in memory. - // The garbage collector had a chance to collect all updates. - assert_eq!(inner.len(), 0); - - // Inspect the indices. - let indices = &inner.last_index_per_reader; - - assert_eq!(indices.get(&main_token), Some(&0)); - assert_eq!(indices.get(&other_token), Some(&0)); - } - } - } - - #[test] - fn test_updates_stream() { - use super::Update::*; - - struct CounterWaker { - number_of_wakeup: Mutex, - } - - impl Wake for CounterWaker { - fn wake(self: Arc) { - *self.number_of_wakeup.lock().unwrap() += 1; - } - } - - let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); - let waker = counter_waker.clone().into(); - let mut context = Context::from_waker(&waker); - - let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); - - let updates_subscriber = linked_chunk.updates().unwrap().subscribe(); - pin_mut!(updates_subscriber); - - // No update, stream is pending. - assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); - assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0); - - // Let's generate an update. - linked_chunk.push_items_back(['a']); - - // The waker must have been called. - assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1); - - // There is an update! Right after that, the stream is pending again. - assert_matches!( - updates_subscriber.as_mut().poll_next(&mut context), - Poll::Ready(Some(items)) => { - assert_eq!( - items, - &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] - ); - } - ); - assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); - - // Let's generate two other updates. - linked_chunk.push_items_back(['b']); - linked_chunk.push_items_back(['c']); - - // The waker must have been called only once for the two updates. - assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); - - // We can consume the updates without the stream, but the stream continues to - // know it has updates. - assert_eq!( - linked_chunk.updates().unwrap().take(), - &[ - InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, - InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, - InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, - ] - ); - assert_matches!( - updates_subscriber.as_mut().poll_next(&mut context), - Poll::Ready(Some(items)) => { - assert_eq!( - items, - &[ - InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, - InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, - ] - ); - } - ); - assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); - - // When dropping the `LinkedChunk`, it closes the stream. - drop(linked_chunk); - assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None)); - - // Wakers calls have not changed. - assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); - } - #[test] fn test_push_items() { use super::Update::*; diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs new file mode 100644 index 00000000000..2112a9edfcf --- /dev/null +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs @@ -0,0 +1,644 @@ +// Copyright 2024 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{ + collections::HashMap, + pin::Pin, + sync::{Arc, RwLock, Weak}, + task::{Context, Poll, Waker}, +}; + +use futures_core::Stream; + +use super::{ChunkIdentifier, Position}; + +/// Represent the updates that have happened inside a [`LinkedChunk`]. +/// +/// To retrieve the updates, use [`LinkedChunk::updates`]. +/// +/// These updates are useful to store a `LinkedChunk` in another form of +/// storage, like a database or something similar. +#[derive(Debug, PartialEq)] +pub enum Update { + /// A new chunk of kind Items has been created. + NewItemsChunk { + /// The identifier of the previous chunk of this new chunk. + previous: Option, + + /// The identifier of the new chunk. + new: ChunkIdentifier, + + /// The identifier of the next chunk of this new chunk. + next: Option, + }, + + /// A new chunk of kind Gap has been created. + NewGapChunk { + /// The identifier of the previous chunk of this new chunk. + previous: Option, + + /// The identifier of the new chunk. + new: ChunkIdentifier, + + /// The identifier of the next chunk of this new chunk. + next: Option, + + /// The content of the chunk. + gap: Gap, + }, + + /// A chunk has been removed. + RemoveChunk(ChunkIdentifier), + + /// Items are inserted inside a chunk of kind Items. + InsertItems { + /// [`Position`] of the items. + at: Position, + + /// The items. + items: Vec, + }, + + /// A chunk of kind Items has been truncated. + TruncateItems { + /// The identifier of the chunk. + chunk: ChunkIdentifier, + + /// The new length of the chunk. + length: usize, + }, +} + +impl Clone for Update +where + Item: Clone, + Gap: Clone, +{ + fn clone(&self) -> Self { + match self { + Self::NewItemsChunk { previous, new, next } => { + Self::NewItemsChunk { previous: *previous, new: *new, next: *next } + } + Self::NewGapChunk { previous, new, next, gap } => { + Self::NewGapChunk { previous: *previous, new: *new, next: *next, gap: gap.clone() } + } + Self::RemoveChunk(identifier) => Self::RemoveChunk(*identifier), + Self::InsertItems { at, items } => Self::InsertItems { at: *at, items: items.clone() }, + Self::TruncateItems { chunk, length } => { + Self::TruncateItems { chunk: *chunk, length: *length } + } + } + } +} + +/// A collection of [`Update`]. +/// +/// Get a value for this type with [`LinkedChunk::updates`]. +pub struct Updates { + inner: Arc>>, +} + +/// A token used to represent readers that read the updates in +/// [`UpdatesInner`]. +type ReaderToken = usize; + +/// Inner type for [`Updates`]. +/// +/// The particularity of this type is that multiple readers can read the +/// updates. A reader has a [`ReaderToken`]. The public API (i.e. +/// [`Updates`]) is considered to be the _main reader_ (it has the token +/// [`Self::MAIN_READER_TOKEN`]). +/// +/// An update that have been read by all readers are garbage collected to be +/// removed from the memory. An update will never be read twice by the same +/// reader. +/// +/// Why do we need multiple readers? The public API reads the updates with +/// [`Updates::take`], but the private API must also read the updates for +/// example with [`UpdatesSubscriber`]. Of course, they can be multiple +/// `UpdatesSubscriber`s at the same time. Hence the need of supporting multiple +/// readers. +struct UpdatesInner { + /// All the updates that have not been read by all readers. + updates: Vec>, + + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. + /// + /// To each reader token is associated an index that represents the index of + /// the last reading. It is used to never return the same update twice. + last_index_per_reader: HashMap, + + /// The last generated token. This is useful to generate new token. + last_token: ReaderToken, + + /// Pending wakers for [`UpdateSubscriber`]s. A waker is removed + /// everytime it is called. + wakers: Vec, +} + +impl UpdatesInner { + /// The token used by the main reader. See [`Self::take`] to learn more. + const MAIN_READER_TOKEN: ReaderToken = 0; + + /// Create a new [`Self`]. + fn new() -> Self { + Self { + updates: Vec::with_capacity(8), + last_index_per_reader: { + let mut map = HashMap::with_capacity(2); + map.insert(Self::MAIN_READER_TOKEN, 0); + + map + }, + last_token: Self::MAIN_READER_TOKEN, + wakers: Vec::with_capacity(2), + } + } + + /// Push a new update. + fn push(&mut self, update: Update) { + self.updates.push(update); + + // Wake them up \o/. + for waker in self.wakers.drain(..) { + waker.wake(); + } + } + + /// Take new updates; it considers the caller is the main reader, i.e. it + /// will use the [`Self::MAIN_READER_TOKEN`]. + /// + /// Updates that have been read will never be read again by the current + /// reader. + /// + /// Learn more by reading [`Self::take_with_token`]. + fn take(&mut self) -> &[Update] { + self.take_with_token(Self::MAIN_READER_TOKEN) + } + + /// Take new updates with a particular reader token. + /// + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. Every reader can + /// take/read/consume each update only once. An internal index is stored + /// per reader token to know where to start reading updates next time this + /// method is called. + fn take_with_token(&mut self, token: ReaderToken) -> &[Update] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + let index = self + .last_index_per_reader + .get_mut(&token) + .expect("Given `UpdatesToken` does not map to any index"); + + // Read new updates, and update the index. + let slice = &self.updates[*index..]; + *index = self.updates.len(); + + slice + } + + /// Return the number of updates in the buffer. + fn len(&self) -> usize { + self.updates.len() + } + + /// Garbage collect unused updates. An update is considered unused when it's + /// been read by all readers. + /// + /// Basically, it reduces to finding the smallest last index for all + /// readers, and clear from 0 to that index. + fn garbage_collect(&mut self) { + let min_index = self.last_index_per_reader.values().min().copied().unwrap_or(0); + + if min_index > 0 { + let _ = self.updates.drain(0..min_index); + + // Let's shift the indices to the left by `min_index` to preserve them. + for index in self.last_index_per_reader.values_mut() { + *index -= min_index; + } + } + } +} + +impl Updates { + /// Create a new [`Self`]. + pub(super) fn new() -> Self { + Self { inner: Arc::new(RwLock::new(UpdatesInner::new())) } + } + + /// Push a new update. + pub(super) fn push(&mut self, update: Update) { + self.inner.write().unwrap().push(update); + } + + /// Take new updates. + /// + /// Updates that have been taken will not be read again. + pub(super) fn take(&mut self) -> Vec> + where + Item: Clone, + Gap: Clone, + { + self.inner.write().unwrap().take().to_owned() + } + + /// Subscribe to updates by using a [`Stream`]. + fn subscribe(&mut self) -> UpdatesSubscriber { + // A subscriber is a new update reader, it needs its own token. + let token = { + let mut inner = self.inner.write().unwrap(); + inner.last_token += 1; + + let last_token = inner.last_token; + inner.last_index_per_reader.insert(last_token, 0); + + last_token + }; + + UpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } + } +} + +/// A subscriber to [`Updates`]. It is helpful to receive updates via a +/// [`Stream`]. +struct UpdatesSubscriber { + /// Weak reference to [`UpdatesInner`]. + /// + /// Using a weak reference allows [`Updates`] to be dropped + /// freely even if a subscriber exists. + updates: Weak>>, + + /// The token to read the updates. + token: ReaderToken, +} + +impl Stream for UpdatesSubscriber +where + Item: Clone, + Gap: Clone, +{ + type Item = Vec>; + + fn poll_next(self: Pin<&mut Self>, context: &mut Context<'_>) -> Poll> { + let Some(updates) = self.updates.upgrade() else { + // The `Updates` has been dropped. It's time to close this stream. + return Poll::Ready(None); + }; + + let mut updates = updates.write().unwrap(); + let the_updates = updates.take_with_token(self.token); + + // No updates. + if the_updates.is_empty() { + // Let's register the waker. + updates.wakers.push(context.waker().clone()); + + // The stream is pending. + return Poll::Pending; + } + + // There is updates! Let's forward them in this stream. + Poll::Ready(Some(the_updates.to_owned())) + } +} + +#[cfg(test)] +mod tests { + use std::{ + sync::{Arc, Mutex}, + task::{Context, Poll, Wake}, + }; + + use assert_matches::assert_matches; + use futures_util::pin_mut; + + use super::{super::LinkedChunk, ChunkIdentifier, Position, Stream, UpdatesInner}; + + #[test] + fn test_updates_take_and_garbage_collector() { + use super::Update::*; + + let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); + + // Simulate another updates “reader”, it can a subscriber. + let main_token = UpdatesInner::::MAIN_READER_TOKEN; + let other_token = { + let updates = linked_chunk.updates().unwrap(); + let mut inner = updates.inner.write().unwrap(); + inner.last_token += 1; + + let other_token = inner.last_token; + inner.last_index_per_reader.insert(other_token, 0); + + other_token + }; + + // There is no new update yet. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + } + + linked_chunk.push_items_back(['a']); + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // Scenario 1: “main” takes the new updates, “other” doesn't take the new + // updates. + // + // 0 1 2 3 + // +---+---+---+ + // | a | b | c | + // +---+---+---+ + // + // “main” will move its index from 0 to 3. + // “other” won't move its index. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.inner.read().unwrap().len(), 3); + } + + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // It must be the same number as before as the garbage collector weren't not + // able to remove any unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&0)); + } + } + + linked_chunk.push_items_back(['d']); + linked_chunk.push_items_back(['e']); + linked_chunk.push_items_back(['f']); + + // Scenario 2: “other“ takes the new updates, “main” doesn't take the + // new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | a | b | c | d | e | f | + // +---+---+---+---+---+---+ + // + // “main” won't move its index. + // “other” will move its index from 0 to 6. + { + let updates = linked_chunk.updates().unwrap(); + + assert_eq!( + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // It must be the same number as before as the garbage collector will be able to + // remove unused updates but at the next call… + assert_eq!(inner.len(), 6); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&6)); + } + } + + // Scenario 3: “other” take new updates, but there is none, “main” + // doesn't take new updates. The garbage collector will run and collect + // unused updates. + // + // 0 1 2 3 + // +---+---+---+ + // | d | e | f | + // +---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector has removed unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. They must have been adjusted. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&3)); + } + } + + linked_chunk.push_items_back(['g']); + linked_chunk.push_items_back(['h']); + linked_chunk.push_items_back(['i']); + + // Scenario 4: both “main” and “other” take the new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | d | e | f | g | h | i | + // +---+---+---+---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); + + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] + ); + assert_eq!( + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] + ); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector had a chance to collect the first 3 updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&3)); + } + } + + // Scenario 5: no more updates but they both try to take new updates. + // The garbage collector will collect all updates as all of them as + // been read already. + // + // “main” will have its index updated from 0 to 0. + // “other” will have its index updated from 3 to 0. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + + { + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // The garbage collector had a chance to collect all updates. + assert_eq!(inner.len(), 0); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&0)); + } + } + } + + #[test] + fn test_updates_stream() { + use super::Update::*; + + struct CounterWaker { + number_of_wakeup: Mutex, + } + + impl Wake for CounterWaker { + fn wake(self: Arc) { + *self.number_of_wakeup.lock().unwrap() += 1; + } + } + + let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); + let waker = counter_waker.clone().into(); + let mut context = Context::from_waker(&waker); + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + let updates_subscriber = linked_chunk.updates().unwrap().subscribe(); + pin_mut!(updates_subscriber); + + // No update, stream is pending. + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 0); + + // Let's generate an update. + linked_chunk.push_items_back(['a']); + + // The waker must have been called. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 1); + + // There is an update! Right after that, the stream is pending again. + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // Let's generate two other updates. + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // The waker must have been called only once for the two updates. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + + // We can consume the updates without the stream, but the stream continues to + // know it has updates. + assert_eq!( + linked_chunk.updates().unwrap().take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + assert_matches!( + updates_subscriber.as_mut().poll_next(&mut context), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[ + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + } + ); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Pending); + + // When dropping the `LinkedChunk`, it closes the stream. + drop(linked_chunk); + assert_matches!(updates_subscriber.as_mut().poll_next(&mut context), Poll::Ready(None)); + + // Wakers calls have not changed. + assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); + } +}