diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk.rs b/crates/matrix-sdk/src/event_cache/linked_chunk.rs index 6bc5b4ba25a..1e9d39af641 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk.rs @@ -15,7 +15,7 @@ #![allow(dead_code)] use std::{ - cmp::min, + collections::HashMap, fmt, marker::PhantomData, ops::Not, @@ -139,21 +139,64 @@ pub struct LinkedChunkUpdates { inner: Arc>>, } +/// A token used to represent readers that read the updates in +/// [`LinkedChunkUpdatesInner`]. +type ReaderToken = usize; + +/// Inner type for [`LinkedChunkUpdates`]. +/// +/// The particularity of this type is that multiple readers can read the +/// updates. A reader has a [`ReaderToken`]. The public API (i.e. +/// [`LinkedChunkUpdates`]) is considered to be the _main reader_ (it has the +/// token [`Self::MAIN_READER_TOKEN`]). +/// +/// An update that have been read by all readers are garbage collected to be +/// removed from the memory. An update will never be read twice by the same +/// reader. +/// +/// Why do we need multiple readers? The public API reads the updates with +/// [`LinkedChunkUpdates::take`], but the private API must also read the updates +/// for example with [`LinkedChunkUpdatesSubscriber`]. Of course, they can be +/// multiple `LinkedChunkUpdatesSubscriber`s at the same time. Hence the need of +/// supporting multiple readers. struct LinkedChunkUpdatesInner { - /// All the updates that have not been peeked nor taken. + /// All the updates that have not been read by all readers. updates: Vec>, - /// The last index used by the last call of [`Self::take`]. - last_taken_index: usize, + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. + /// + /// To each reader token is associated an index that represents the index of + /// the last reading. It is used to never return the same update twice. + last_index_per_reader: HashMap, - /// The last index used by the last call of [`Self::peek`]. - last_peeked_index: usize, + /// The last generated token. This is useful to generate new token. + last_token: ReaderToken, - /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. + /// Pending wakers for [`LinkedChunkUpdateSubscriber`]s. A waker is removed + /// everytime it is called. wakers: Vec, } impl LinkedChunkUpdatesInner { + /// The token used by the main reader. See [`Self::take`] to learn more. + const MAIN_READER_TOKEN: ReaderToken = 0; + + /// Create a new [`Self`]. + fn new() -> Self { + Self { + updates: Vec::with_capacity(8), + last_index_per_reader: { + let mut map = HashMap::with_capacity(2); + map.insert(Self::MAIN_READER_TOKEN, 0); + + map + }, + last_token: Self::MAIN_READER_TOKEN, + wakers: Vec::with_capacity(2), + } + } + /// Push a new update. fn push(&mut self, update: LinkedChunkUpdate) { self.updates.push(update); @@ -164,44 +207,38 @@ impl LinkedChunkUpdatesInner { } } - /// Take new updates. + /// Take new updates; it considers the caller is the main reader, i.e. it + /// will use the [`Self::MAIN_READER_TOKEN`]. /// - /// Updates that have been taken will not be read again. + /// Updates that have been read will never be read again by the current + /// reader. + /// + /// Learn more by reading [`Self::take_with_token`]. fn take(&mut self) -> &[LinkedChunkUpdate] { - // Let's garbage collect unused updates. - self.garbage_collect(); - - // Read new updates and update the `Self::last_taken_index`. - let slice = &self.updates[self.last_taken_index..]; - self.last_taken_index = self.updates.len(); - - slice + self.take_with_token(Self::MAIN_READER_TOKEN) } - /// Peek updates. - /// - /// That's another channel to read the updates. The difference with - /// [`Self::take`] is that peeking updates doesn't consume them. However, - /// updates that have been peeked cannot be peeked again. + /// Take new updates with a particular reader token. /// - /// Basically, [`Self::take`] is intended to be used by the public API, - /// whilst [`Self::peek`] is for internal usage that must not conflict with - /// [`Self::take`]. - fn peek(&mut self) -> &[LinkedChunkUpdate] { + /// Updates are stored in [`Self::updates`]. Multiple readers can read them. + /// A reader is identified by a [`ReaderToken`]. Every reader can + /// take/read/consume each update only once. An internal index is stored + /// per reader token to know where to start reading updates next time this + /// method is called. + fn take_with_token(&mut self, token: ReaderToken) -> &[LinkedChunkUpdate] { // Let's garbage collect unused updates. self.garbage_collect(); - // Read new updates and update the `Self::last_peeked_index`. - let slice = &self.updates[self.last_peeked_index..]; - self.last_peeked_index = self.updates.len(); + let index = self + .last_index_per_reader + .get_mut(&token) + .expect("Given `UpdatesToken` does not map to any index"); - slice - } + // Read new updates, and update the index. + let slice = &self.updates[*index..]; + *index = self.updates.len(); - /// Return `true` if there is new update that can be read with - /// [`Self::peek`]. - fn has_new_peekable_updates(&self) -> bool { - self.last_peeked_index < self.updates.len() + slice } /// Return the number of updates in the buffer. @@ -210,19 +247,20 @@ impl LinkedChunkUpdatesInner { } /// Garbage collect unused updates. An update is considered unused when it's - /// been read by `Self::take` **and** by `Self::peek`. + /// been read by all readers. /// - /// Find the smallest index between `Self::last_taken_index` and - /// `Self::last_peeked_index`, and clear from 0 to that index. + /// Basically, it reduces to finding the smallest last index for all + /// readers, and clear from 0 to that index. fn garbage_collect(&mut self) { - let min_index = min(self.last_taken_index, self.last_peeked_index); + let min_index = self.last_index_per_reader.values().min().map(|min| *min).unwrap_or(0); if min_index > 0 { let _ = self.updates.drain(0..min_index); - // Let's shift the index to the left by `min_index` to preserve them. - self.last_taken_index -= min_index; - self.last_peeked_index -= min_index; + // Let's shift the indices to the left by `min_index` to preserve them. + for index in self.last_index_per_reader.values_mut() { + *index -= min_index; + } } } } @@ -230,14 +268,7 @@ impl LinkedChunkUpdatesInner { impl LinkedChunkUpdates { /// Create a new [`Self`]. fn new() -> Self { - Self { - inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner { - updates: Vec::new(), - last_taken_index: 0, - last_peeked_index: 0, - wakers: Vec::new(), - })), - } + Self { inner: Arc::new(RwLock::new(LinkedChunkUpdatesInner::new())) } } /// Push a new update. @@ -256,21 +287,20 @@ impl LinkedChunkUpdates { self.inner.write().unwrap().take().to_owned() } - /// Return `true` if there is new updates that can be read with - /// [`Self::take`]. - pub fn has_new_takable_updates(&self) -> bool { - let inner = self.inner.read().unwrap(); + /// Subscribe to updates by using a [`Stream`]. + fn subscribe(&mut self) -> LinkedChunkUpdatesSubscriber { + // A subscriber is a new update reader, it needs its own token. + let token = { + let mut inner = self.inner.write().unwrap(); + inner.last_token += 1; - inner.last_taken_index < inner.updates.len() - } + let last_token = inner.last_token; + inner.last_index_per_reader.insert(last_token, 0); - /// Subscribe to updates by using a [`Stream`]. - /// - /// TODO: only one subscriber must exist so far because multiple concurrent - /// subscriber would conflict on the garbage collector. It's not complex to - /// fix, I will do it. - fn subscribe(&self) -> LinkedChunkUpdatesSubscriber { - LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner) } + last_token + }; + + LinkedChunkUpdatesSubscriber { updates: Arc::downgrade(&self.inner), token } } } @@ -282,6 +312,9 @@ struct LinkedChunkUpdatesSubscriber { /// Using a weak reference allows [`LinkedChunkUpdates`] to be dropped /// freely even if a subscriber exists. updates: Weak>>, + + /// The token to read the updates. + token: ReaderToken, } impl Stream for LinkedChunkUpdatesSubscriber @@ -298,9 +331,10 @@ where }; let mut updates = updates.write().unwrap(); + let the_updates = updates.take_with_token(self.token); - // No updates to peek. - if updates.has_new_peekable_updates().not() { + // No updates. + if the_updates.is_empty() { // Let's register the waker. updates.wakers.push(context.waker().clone()); @@ -308,8 +342,8 @@ where return Poll::Pending; } - // There is updates to peek! Let's forward them in this stream. - return Poll::Ready(Some(updates.peek().to_owned())); + // There is updates! Let's forward them in this stream. + Poll::Ready(Some(the_updates.to_owned())) } } @@ -1405,8 +1439,9 @@ mod tests { use super::{ Chunk, ChunkContent, ChunkIdentifier, ChunkIdentifierGenerator, LinkedChunk, - LinkedChunkError, Not, Position, Stream, + LinkedChunkError, Position, Stream, }; + use crate::event_cache::linked_chunk::LinkedChunkUpdatesInner; /// A macro to test the items and the gap of a `LinkedChunk`. /// A chunk is delimited by `[` and `]`. An item chunk has the form `[a, b, @@ -1515,138 +1550,230 @@ mod tests { } #[test] - fn test_updates_take_and_peek() { + fn test_updates_take_and_garbage_collector() { use super::LinkedChunkUpdate::*; - let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + let mut linked_chunk = LinkedChunk::<10, char, ()>::new_with_update_history(); + + // Simulate another updates “reader”, it can a subscriber. + let main_token = LinkedChunkUpdatesInner::::MAIN_READER_TOKEN; + let other_token = { + let updates = linked_chunk.updates().unwrap(); + let mut inner = updates.inner.write().unwrap(); + inner.last_token += 1; - // There is no new update. + let other_token = inner.last_token; + inner.last_index_per_reader.insert(other_token, 0); + + other_token + }; + + // There is no new update yet. { let updates = linked_chunk.updates().unwrap(); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); } linked_chunk.push_items_back(['a']); + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); - // Let's `peek` only the new update. + // Scenario 1: “main” takes the new updates, “other” doesn't take the new + // updates. + // + // 0 1 2 3 + // +---+---+---+ + // | a | b | c | + // +---+---+---+ + // + // “main” will move its index from 0 to 3. + // “other” won't move its index. { let updates = linked_chunk.updates().unwrap(); { // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + assert_eq!(updates.inner.read().unwrap().len(), 3); } - // Peek the update. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + updates.take(), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, + InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] ); - // No more update to peek. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); - { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // It must be the same number as before as the garbage collector weren't not + // able to remove any unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&0)); } } - linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['d']); + linked_chunk.push_items_back(['e']); + linked_chunk.push_items_back(['f']); - // Let's `peek` then `take` the new update. + // Scenario 2: “other“ takes the new updates, “main” doesn't take the + // new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | a | b | c | d | e | f | + // +---+---+---+---+---+---+ + // + // “main” won't move its index. + // “other” will move its index from 0 to 6. { let updates = linked_chunk.updates().unwrap(); - // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - - // Peek the update… - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); - assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] },] - ); - - { - // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - } - - // … and take the update. assert_eq!( - updates.take(), + updates.inner.write().unwrap().take_with_token(other_token), &[ InsertItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }, InsertItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, ] ); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 2); - } + // It must be the same number as before as the garbage collector will be able to + // remove unused updates but at the next call… + assert_eq!(inner.len(), 6); - // No more update to peek or to take. - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); - assert!(updates.take().is_empty()); + // Inspect the indices. + let indices = &inner.last_index_per_reader; - { - // Inspect number of updates in memory. - // The updates have been garbage collected. - assert_eq!(updates.inner.read().unwrap().len(), 0); + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&6)); } } - linked_chunk.push_items_back(['c']); - - // Let's `take` then `peek` the new update. + // Scenario 3: “other” take new updates, but there is none, “main” + // doesn't take new updates. The garbage collector will run and collect + // unused updates. + // + // 0 1 2 3 + // +---+---+---+ + // | d | e | f | + // +---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. { let updates = linked_chunk.updates().unwrap(); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); + { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // The garbage collector has removed unused updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. They must have been adjusted. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&3)); } + } + + linked_chunk.push_items_back(['g']); + linked_chunk.push_items_back(['h']); + linked_chunk.push_items_back(['i']); + + // Scenario 4: both “main” and “other” take the new updates. + // + // 0 1 2 3 4 5 6 + // +---+---+---+---+---+---+ + // | d | e | f | g | h | i | + // +---+---+---+---+---+---+ + // + // “main” will have its index updated from 3 to 0. + // “other” will have its index updated from 6 to 3. + { + let updates = linked_chunk.updates().unwrap(); - // Take and peek the update. - assert!(updates.has_new_takable_updates()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates()); assert_eq!( updates.take(), - &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + &[ + InsertItems { at: Position(ChunkIdentifier(0), 3), items: vec!['d'] }, + InsertItems { at: Position(ChunkIdentifier(0), 4), items: vec!['e'] }, + InsertItems { at: Position(ChunkIdentifier(0), 5), items: vec!['f'] }, + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] ); assert_eq!( - updates.inner.write().unwrap().peek(), - &[InsertItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] },] + updates.inner.write().unwrap().take_with_token(other_token), + &[ + InsertItems { at: Position(ChunkIdentifier(0), 6), items: vec!['g'] }, + InsertItems { at: Position(ChunkIdentifier(0), 7), items: vec!['h'] }, + InsertItems { at: Position(ChunkIdentifier(0), 8), items: vec!['i'] }, + ] ); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - assert_eq!(updates.inner.read().unwrap().len(), 1); + // The garbage collector had a chance to collect the first 3 updates. + assert_eq!(inner.len(), 3); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&3)); + assert_eq!(indices.get(&other_token), Some(&3)); } + } + + // Scenario 5: no more updates but they both try to take new updates. + // The garbage collector will collect all updates as all of them as + // been read already. + // + // “main” will have its index updated from 0 to 0. + // “other” will have its index updated from 3 to 0. + { + let updates = linked_chunk.updates().unwrap(); - // No more update to peek or to take. - assert!(updates.has_new_takable_updates().not()); - assert!(updates.inner.read().unwrap().has_new_peekable_updates().not()); - assert!(updates.inner.write().unwrap().peek().is_empty()); assert!(updates.take().is_empty()); + assert!(updates.inner.write().unwrap().take_with_token(other_token).is_empty()); { + let inner = updates.inner.read().unwrap(); + // Inspect number of updates in memory. - // The update has been garbage collected. - assert_eq!(updates.inner.read().unwrap().len(), 0); + // The garbage collector had a chance to collect all updates. + assert_eq!(inner.len(), 0); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&main_token), Some(&0)); + assert_eq!(indices.get(&other_token), Some(&0)); } } }