From d2d5240e0411052e514b73342c1aaf6398b044a4 Mon Sep 17 00:00:00 2001 From: Ivan Enderlin Date: Wed, 9 Oct 2024 17:39:13 +0200 Subject: [PATCH] feat(sdk): Dropping a `UpdatesSubscriber` release the reader token for the GC. The event cache stores its events in a linked chunk. The linked chunk supports updates (`ObservableUpdates`) via `LinkedChunk::updates()`. This `ObservableUpdates` receives all updates that are happening inside the `LinkedChunk`. An `ObservableUpdates` wraps `UpdatesInner`, which is the real logic to handle multiple update readers. Each reader has a unique `ReaderToken`. `UpdatesInner` has a garbage collector that drops all updates that are read by all readers. And here comes the problem. A category of readers are `UpdatesSubscriber`, returned by `ObservableUpdates::subscribe()`. When an `UpdatesSubscriber` is dropped, its reader token was still alive, thus preventing the garbage collector to clear all its pending updates: they were kept in memory for the eternity. This patch implements `Drop` for `UpdatesSubscriber` to correctly remove its `ReaderToken` from `UpdatesInner`. This patch also adds a test that runs multiple subscribers, and when one is dropped, its pending updates are collected by the garbage collector. --- .../src/event_cache/linked_chunk/updates.rs | 193 +++++++++++++++++- 1 file changed, 183 insertions(+), 10 deletions(-) diff --git a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs index d6ccc5a0ef9..257c4eaaec1 100644 --- a/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs +++ b/crates/matrix-sdk/src/event_cache/linked_chunk/updates.rs @@ -322,6 +322,22 @@ where } } +impl Drop for UpdatesSubscriber { + fn drop(&mut self) { + // Remove `Self::token` from `UpdatesInner::last_index_per_reader`. + // This is important so that the garbage collector can do its jobs correctly + // without a dead dangling reader token. + if let Some(updates) = self.updates.upgrade() { + let mut updates = updates.write().unwrap(); + + // Remove the reader token from `UpdatesInner`. + // It's safe to ignore the result of `remove` here: `None` means the token was + // already removed (note: it should be unreachable). + let _ = updates.last_index_per_reader.remove(&self.token); + } + } +} + #[cfg(test)] mod tests { use std::{ @@ -563,19 +579,19 @@ mod tests { } } - #[test] - fn test_updates_stream() { - use super::Update::*; + struct CounterWaker { + number_of_wakeup: Mutex, + } - struct CounterWaker { - number_of_wakeup: Mutex, + impl Wake for CounterWaker { + fn wake(self: Arc) { + *self.number_of_wakeup.lock().unwrap() += 1; } + } - impl Wake for CounterWaker { - fn wake(self: Arc) { - *self.number_of_wakeup.lock().unwrap() += 1; - } - } + #[test] + fn test_updates_stream() { + use super::Update::*; let counter_waker = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); let waker = counter_waker.clone().into(); @@ -646,4 +662,161 @@ mod tests { // Wakers calls have not changed. assert_eq!(*counter_waker.number_of_wakeup.lock().unwrap(), 2); } + + #[test] + fn test_updates_multiple_streams() { + use super::Update::*; + + let counter_waker1 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); + let counter_waker2 = Arc::new(CounterWaker { number_of_wakeup: Mutex::new(0) }); + + let waker1 = counter_waker1.clone().into(); + let waker2 = counter_waker2.clone().into(); + + let mut context1 = Context::from_waker(&waker1); + let mut context2 = Context::from_waker(&waker2); + + let mut linked_chunk = LinkedChunk::<3, char, ()>::new_with_update_history(); + + let updates_subscriber1 = linked_chunk.updates().unwrap().subscribe(); + pin_mut!(updates_subscriber1); + + // Scope for `updates_subscriber2`. + let updates_subscriber2_token = { + let updates_subscriber2 = linked_chunk.updates().unwrap().subscribe(); + pin_mut!(updates_subscriber2); + + // No update, streams are pending. + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending); + assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 0); + assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending); + assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 0); + + // Let's generate an update. + linked_chunk.push_items_back(['a']); + + // The wakers must have been called. + assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 1); + assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 1); + + // There is an update! Right after that, the streams are pending again. + assert_matches!( + updates_subscriber1.as_mut().poll_next(&mut context1), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + } + ); + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending); + assert_matches!( + updates_subscriber2.as_mut().poll_next(&mut context2), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[PushItems { at: Position(ChunkIdentifier(0), 0), items: vec!['a'] }] + ); + } + ); + assert_matches!(updates_subscriber2.as_mut().poll_next(&mut context2), Poll::Pending); + + // Let's generate two other updates. + linked_chunk.push_items_back(['b']); + linked_chunk.push_items_back(['c']); + + // The waker must have been called only once for the two updates. + assert_eq!(*counter_waker1.number_of_wakeup.lock().unwrap(), 2); + assert_eq!(*counter_waker2.number_of_wakeup.lock().unwrap(), 2); + + // Let's poll `updates_subscriber1` only. + assert_matches!( + updates_subscriber1.as_mut().poll_next(&mut context1), + Poll::Ready(Some(items)) => { + assert_eq!( + items, + &[ + PushItems { at: Position(ChunkIdentifier(0), 1), items: vec!['b'] }, + PushItems { at: Position(ChunkIdentifier(0), 2), items: vec!['c'] }, + ] + ); + } + ); + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending); + + // For the sake of this test, we also need to advance the main reader token. + let _ = linked_chunk.updates().unwrap().take(); + let _ = linked_chunk.updates().unwrap().take(); + + // If we inspect the garbage collector state, `a`, `b` and `c` should still be + // present because not all of them have been consumed by `updates_subscriber2` + // yet. + { + let updates = linked_chunk.updates().unwrap(); + + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + // We get 2 because the garbage collector runs before data are taken, not after: + // `updates_subscriber2` has read `a` only, so `b` and `c` remain. + assert_eq!(inner.len(), 2); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&updates_subscriber1.token), Some(&2)); + assert_eq!(indices.get(&updates_subscriber2.token), Some(&0)); + } + + // Poll `updates_subscriber1` again: there is no new update so it must be + // pending. + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending); + + // The state of the garbage collector is unchanged: `a`, `b` and `c` are still + // in memory. + { + let updates = linked_chunk.updates().unwrap(); + + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. Value is unchanged. + assert_eq!(inner.len(), 2); + + // Inspect the indices. They are unchanged. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&updates_subscriber1.token), Some(&2)); + assert_eq!(indices.get(&updates_subscriber2.token), Some(&0)); + } + + updates_subscriber2.token.clone() + // Drop `updates_subscriber2`! + }; + + // `updates_subscriber2` has been dropped. Poll `updates_subscriber1` again: + // still no new update, but it will run the garbage collector again, and this + // time `updates_subscriber2` is not “retaining” `b` and `c`. The garbage + // collector must be empty. + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Pending); + + // Inspect the garbage collector. + { + let updates = linked_chunk.updates().unwrap(); + + let inner = updates.inner.read().unwrap(); + + // Inspect number of updates in memory. + assert_eq!(inner.len(), 0); + + // Inspect the indices. + let indices = &inner.last_index_per_reader; + + assert_eq!(indices.get(&updates_subscriber1.token), Some(&0)); + assert_eq!(indices.get(&updates_subscriber2_token), None); // token is unknown! + } + + // When dropping the `LinkedChunk`, it closes the stream. + drop(linked_chunk); + assert_matches!(updates_subscriber1.as_mut().poll_next(&mut context1), Poll::Ready(None)); + } }