From 12bc2ed981cf0bb77f1ab65771e658cd935c457f Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Mon, 6 May 2024 15:31:18 +0200 Subject: [PATCH] feat(sdk) Add the `LinkedChunkUpdates::peek`. This patch adds the `LinkedChunkUpdates::peek` method. This is a new channel to read the updates without consuming them, like `LinkedChunkUpdates::take` does. The complexity is: when do we clear/drop the updates then? We don't want to keep them in memory forever. Initially `take` was clearing the updates, but now that we can read them with `peek` too, who's responsible to clear them? Enter `garbage_collect`. First off, we already need to maintain 2 index, resp. `last_taken_index` and `last_peeked_index` so that `take` and `peek` don't return already returned updates. They respectively know the index of the last update that has been read. We can use this information to know which updates must be garbage collected: that's all updates below the two index. Tadaa. Simple. The only _drawback_ (if it can be considered as such) is that the garbage collection happens on the next call to `take` or `peek` (because of the borrow checker). That's not really a big deal in practise. We could make it happens immediately when calling `take` or `peek` but it needs more pointer arithmetic and a less straighforward code. --- .../src/event_cache/linked_chunk.rs | 288 +++++++++++++++--- 1 file changed, 248 insertions(+), 40 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 447fad9ec6b..bc8e8247c12 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -15,12 +15,12 @@ #![allow(dead_code)] use std::{ + cmp::min, fmt, marker::PhantomData, ops::Not, ptr::NonNull, sync::atomic::{AtomicU64, Ordering}, - vec::Drain, }; /// Errors of [`LinkedChunk`]. @@ -85,21 +85,91 @@ pub enum LinkedChunkUpdate { }, } +/// A collection of [`LinkedChunkUpdate`]. +/// +/// Get a value for this type with [`LinkedChunk::updates`]. pub struct LinkedChunkUpdates { + /// All the updates that have not been peeked nor taken. updates: Vec>, + + /// The last index used by the last call of [`Self::take`]. + last_taken_index: usize, + + /// The last index used by the last call of [`Self::peek`]. + last_peeked_index: usize, } impl LinkedChunkUpdates { + /// Create a new [`Self`]. fn new() -> Self { - Self { updates: Vec::new() } + Self { updates: Vec::new(), last_taken_index: 0, last_peeked_index: 0 } } - pub fn push(&mut self, update: LinkedChunkUpdate) { + /// Push a new update. + fn push(&mut self, update: LinkedChunkUpdate) { self.updates.push(update); } - pub fn take(&mut self) -> Drain> { - self.updates.drain(..) + /// Take new updates. + /// + /// Updates that have been taken will not be read again. + pub fn take(&mut self) -> &[LinkedChunkUpdate] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + // Read new updates and update the `Self::last_taken_index`. + let slice = &self.updates[self.last_taken_index..]; + self.last_taken_index = self.updates.len(); + + slice + } + + /// Peek updates. + /// + /// That's another channel to read the updates. The difference with + /// [`Self::take`] is that peeking updates doesn't consume them. However, + /// updates that have been peeked cannot be peeked again. + /// + /// Basically, [`Self::take`] is intended to be used by the public API, + /// whilst [`Self::peek`] is for internal usage that must not conflict with + /// [`Self::take`]. + fn peek(&mut self) -> &[LinkedChunkUpdate] { + // Let's garbage collect unused updates. + self.garbage_collect(); + + // Read new updates and update the `Self::last_peeked_index`. + let slice = &self.updates[self.last_peeked_index..]; + self.last_peeked_index = self.updates.len(); + + slice + } + + /// Return `true` if there is new update that can be read with + /// [`Self::peek`]. + fn has_new_peekable_updates(&self) -> bool { + self.last_peeked_index < self.updates.len() + } + + /// Garbage collect unused updates. An update is considered unused when it's + /// been read by `Self::take` **and** by `Self::peek`. + /// + /// Find the smallest index between `Self::last_taken_index` and + /// `Self::last_peeked_index`, and clear from 0 to that index. + fn garbage_collect(&mut self) { + let min_index = min(self.last_taken_index, self.last_peeked_index); + + if min_index > 0 { + let _ = self.updates.drain(0..min_index); + + // Let's shift the index to the left by `min_index` to preserve them. + self.last_taken_index -= min_index; + self.last_peeked_index -= min_index; + } + } + + /// Return the number of updates in the buffer. + fn len(&self) -> usize { + self.updates.len() } } @@ -195,6 +265,10 @@ impl LinkedChunk { } /// Create a new [`Self`] with a history of updates. + /// + /// When [`Self`] is built with update history, the + /// [`LinkedChunkUpdates::take`] method must be called to consume and clean + /// the updates. See [`Self::updates`]. pub fn new_with_update_history() -> Self { Self { links: LinkedChunkEnds { @@ -670,13 +744,15 @@ impl LinkedChunk { .skip(position.index())) } - /// Get a mutable reference to the `LinkedChunk` updates. + /// Get a mutable reference to the `LinkedChunk` updates, aka + /// [`LinkedChunkUpdates`]. /// - /// The caller is responsible to drain/empty these updates. + /// The caller is responsible to clear these updates. /// /// If the `Option` becomes `None`, it will disable update history. Thus, be /// careful when you want to empty the update history: do not use - /// `Option::take()` directly but rather `Vec::drain` for example. + /// `Option::take()` directly but rather [`LinkedChunkUpdates::take`] for + /// example. pub fn updates(&mut self) -> Option<&mut LinkedChunkUpdates> { self.updates.as_mut() } @@ -1179,6 +1255,8 @@ where #[cfg(test)] mod tests { + use std::ops::Not; + use assert_matches::assert_matches; use super::{ @@ -1287,11 +1365,141 @@ mod tests { } #[test] - fn test_update_history() { + fn test_updates() { assert!(LinkedChunk::<3, char, ()>::new().updates().is_none()); assert!(LinkedChunk::<3, char, ()>::new_with_update_history().updates().is_some()); } + #[test] + fn test_updates_take_and_peek() { + use super::LinkedChunkUpdate::*; + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + // There is no new update. + { + let updates = linked_chunk.updates().unwrap(); + + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + } + + linked_chunk.push_items_back(['a']); + + // Let's `peek` only the new update. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // Peek the update. + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + + // No more update to peek. + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + } + + linked_chunk.push_items_back(['b']); + + // Let's `peek` then `take` the new update. + { + let updates = linked_chunk.updates().unwrap(); + + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + + // Peek the update… + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + } + + // … and take the update. + assert_eq!( + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + ] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 2); + } + + // No more update to peek or to take. + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + + { + // Inspect number of updates in memory. + // The updates have been garbage collected. + assert_eq!(updates.len(), 0); + } + } + + linked_chunk.push_items_back(['c']); + + // Let's `take` then `peek` the new update. + { + let updates = linked_chunk.updates().unwrap(); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // Take and peek the update. + assert!(updates.has_new_peekable_updates()); + assert_eq!( + updates.take(), + &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + ); + assert_eq!( + updates.peek(), + &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + ); + + { + // Inspect number of updates in memory. + assert_eq!(updates.len(), 1); + } + + // No more update to peek or to take. + assert!(updates.has_new_peekable_updates().not()); + assert!(updates.peek().is_empty()); + assert!(updates.take().is_empty()); + + { + // Inspect number of updates in memory. + // The update has been garbage collected. + assert_eq!(updates.len(), 0); + } + } + } + #[test] fn test_push_items() { use super::LinkedChunkUpdate::*; @@ -1301,21 +1509,21 @@ mod tests { assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_items_back(['b', 'c']); assert_items_eq!(linked_chunk, ['a', 'b', 'c']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b', 'c'] }] ); linked_chunk.push_items_back(['d', 'e']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(0)), @@ -1329,7 +1537,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i', 'j']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f'] ['g', 'h', 'i'] ['j']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(1), 2), items: vec!['f'] }, NewItemsChunk { @@ -1358,14 +1566,14 @@ mod tests { linked_chunk.push_items_back(['a']); assert_items_eq!(linked_chunk, ['a']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] ); linked_chunk.push_gap_back(()); assert_items_eq!(linked_chunk, ['a'] [-]); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(0)), new: ChunkIdentifier(1), @@ -1377,7 +1585,7 @@ mod tests { linked_chunk.push_items_back(['b', 'c', 'd', 'e']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -1398,7 +1606,7 @@ mod tests { linked_chunk.push_gap_back(()); // why not assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-]); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewGapChunk { previous: Some(ChunkIdentifier(3)), @@ -1418,7 +1626,7 @@ mod tests { linked_chunk.push_items_back(['f', 'g', 'h', 'i']); assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c', 'd'] ['e'] [-] [-] ['f', 'g', 'h'] ['i']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)), @@ -1659,7 +1867,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1685,7 +1893,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 10); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(1), length: 1 }, InsertItems { at: Position(ChunkIdentifier(1), 1), items: vec!['w', 'x'] }, @@ -1717,7 +1925,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 14); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['l', 'm', 'n'] }, @@ -1749,7 +1957,7 @@ mod tests { ); assert_eq!(linked_chunk.len(), 16); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(5), length: 0 }, InsertItems { at: Position(ChunkIdentifier(5), 0), items: vec!['r', 's'] }, @@ -1770,7 +1978,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[InsertItems { at: Position(ChunkIdentifier(3), 1), items: vec!['p', 'q'] },] ); assert_eq!(linked_chunk.len(), 18); @@ -1782,7 +1990,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1791,7 +1999,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a gap. @@ -1803,7 +2011,7 @@ mod tests { ['l', 'm', 'n'] ['o', 'a', 'b'] ['r', 's', 'c'] ['d', 'w', 'x'] ['y', 'z', 'e'] ['f', 'p', 'q'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1831,7 +2039,7 @@ mod tests { linked_chunk.push_items_back(['a', 'b', 'c', 'd', 'e', 'f']); assert_items_eq!(linked_chunk, ['a', 'b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b', 'c'] }, NewItemsChunk { @@ -1850,7 +2058,7 @@ mod tests { assert_items_eq!(linked_chunk, ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 1 }, NewGapChunk { @@ -1877,7 +2085,7 @@ mod tests { // A new empty chunk is created as the first chunk. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ TruncateItems { chunk: ChunkIdentifier(0), length: 0 }, NewGapChunk { @@ -1906,7 +2114,7 @@ mod tests { // space. assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(6), @@ -1923,7 +2131,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_first_empty_chunk), Err(LinkedChunkError::InvalidItemIndex { index: 0 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an empty chunk. @@ -1934,7 +2142,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(6)), @@ -1949,7 +2157,7 @@ mod tests { assert_items_eq!(linked_chunk, [] [-] ['a'] [-] ['b', 'c'] [-] [] ['d', 'e', 'f']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(3)), new: ChunkIdentifier(8), @@ -1965,7 +2173,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(128), 0)), Err(LinkedChunkError::InvalidChunkIdentifier { identifier: ChunkIdentifier(128) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in a chunk that exists, but at an item that does not exist. @@ -1974,7 +2182,7 @@ mod tests { linked_chunk.insert_items_at(['u', 'v'], Position(ChunkIdentifier(0), 128)), Err(LinkedChunkError::InvalidItemIndex { index: 128 }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } // Insert in an existing gap. @@ -1986,7 +2194,7 @@ mod tests { linked_chunk.insert_gap_at((), position_of_a_gap), Err(LinkedChunkError::ChunkIsAGap { identifier: ChunkIdentifier(4) }) ); - assert!(linked_chunk.updates().unwrap().take().collect::>().is_empty()); + assert!(linked_chunk.updates().unwrap().take().is_empty()); } assert_eq!(linked_chunk.len(), 6); @@ -2004,7 +2212,7 @@ mod tests { linked_chunk.push_items_back(['l', 'm']); assert_items_eq!(linked_chunk, ['a', 'b'] [-] ['l', 'm']); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a', 'b'] }, NewGapChunk { @@ -2035,7 +2243,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(1)), @@ -2062,7 +2270,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] [-] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[NewGapChunk { previous: Some(ChunkIdentifier(2)), new: ChunkIdentifier(5), @@ -2081,7 +2289,7 @@ mod tests { ['a', 'b'] ['d', 'e', 'f'] ['g', 'h'] ['l', 'm'] ['w', 'x', 'y'] ['z'] ); assert_eq!( - linked_chunk.updates().unwrap().take().collect::>(), + linked_chunk.updates().unwrap().take(), &[ NewItemsChunk { previous: Some(ChunkIdentifier(5)),