From d6214f17a2623a0760840b03ec47b2a8cdcb8a96 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Fri, 13 Sep 2024 19:08:21 +0200 Subject: [PATCH] fix replica snapshot isolation bug --- libsql-wal/src/segment/current.rs | 76 ++----------------------------- libsql-wal/src/shared_wal.rs | 10 +++- libsql-wal/src/transaction.rs | 26 ++--------- 3 files changed, 14 insertions(+), 98 deletions(-) diff --git a/libsql-wal/src/segment/current.rs b/libsql-wal/src/segment/current.rs index 9b3b4ce5ac..541d2a5e47 100644 --- a/libsql-wal/src/segment/current.rs +++ b/libsql-wal/src/segment/current.rs @@ -370,7 +370,7 @@ impl CurrentSegment { } // not a write tx, or page is not in write tx, look into the segment - self.index.locate(page_no, tx.max_frame_no) + self.index.locate(page_no, tx.max_offset) } /// reads the page conainted in frame at offset into buf @@ -644,13 +644,13 @@ impl SegmentIndex { } } - fn locate(&self, page_no: u32, max_frame_no: u64) -> Option { + fn locate(&self, page_no: u32, max_offset: u64) -> Option { let offsets = self.index.get(&page_no)?; let offsets = offsets.value().read(); offsets .iter() .rev() - .find(|fno| self.start_frame_no + **fno as u64 <= max_frame_no) + .find(|fno| **fno as u64 <= max_offset) .copied() } @@ -672,42 +672,6 @@ impl SegmentIndex { Ok(()) } - /// returns an iterator over (page_no, offset, frame_no), where the returned offset is the most - /// recent version of the page contained in start_frame_no..end_frame_no - /// This method assumes that the current segment is ordered. - pub(crate) fn iter( - &self, - start_frame_no: u64, - end_frame_no: u64, - ) -> impl Iterator + '_ { - // todo: assert segment is sorted - let mut entry = self.index.front(); - let mut fused = false; - let start_offset = (start_frame_no - self.start_frame_no) as u32; - let end_offset = (end_frame_no - self.start_frame_no) as u32; - std::iter::from_fn(move || loop { - if fused { - return None; - } - let entry = entry.as_mut()?; - let ret = { - let offsets = entry.value(); - let offsets = offsets.read(); - if offsets[0] > end_offset || *offsets.last().unwrap() < start_offset { - drop(offsets); - fused = !entry.move_next(); - continue; - } - let offset = *offsets.iter().rev().find(|x| **x <= end_offset).unwrap(); - Some((*entry.key(), offset, self.start_frame_no + offset as u64)) - }; - - fused = !entry.move_next(); - - return ret; - }) - } - pub(crate) fn insert(&self, page_no: u32, offset: u32) { let entry = self.index.get_or_insert(page_no, Default::default()); let mut offsets = entry.value().write(); @@ -734,40 +698,6 @@ mod test { use super::*; - #[test] - fn index_iter() { - let index = SegmentIndex::new(42); - index.insert(1, 0); - index.insert(1, 3); - index.insert(2, 1); - index.insert(2, 2); - index.insert(3, 5); - index.insert(3, 15); - let mut iter = index.iter(42, 50); - assert_eq!(iter.next(), Some((1, 3, 42 + 3))); - assert_eq!(iter.next(), Some((2, 2, 42 + 2))); - assert_eq!(iter.next(), Some((3, 5, 42 + 5))); - assert_eq!(iter.next(), None); - - let mut iter = index.iter(42, 100); - assert_eq!(iter.next(), Some((1, 3, 42 + 3))); - assert_eq!(iter.next(), Some((2, 2, 42 + 2))); - assert_eq!(iter.next(), Some((3, 15, 42 + 15))); - assert_eq!(iter.next(), None); - } - - #[should_panic] - #[test] - fn index_iter_out_of_bounds() { - let index = SegmentIndex::new(42); - index.insert(1, 0); - index.insert(1, 3); - index.insert(2, 1); - index.insert(2, 2); - assert_eq!(index.iter(1, 41).count(), 0); - assert_eq!(index.iter(43, 72).count(), 0); - } - #[tokio::test] async fn current_stream_frames() { let env = TestEnv::new(); diff --git a/libsql-wal/src/shared_wal.rs b/libsql-wal/src/shared_wal.rs index f96fe3f741..e0edf51b4d 100644 --- a/libsql-wal/src/shared_wal.rs +++ b/libsql-wal/src/shared_wal.rs @@ -106,8 +106,13 @@ impl SharedWal { // is not sealed. If the segment is sealed, retry with the current segment let current = self.current.load(); current.inc_reader_count(); - let (max_frame_no, db_size) = - current.with_header(|header| (header.last_committed(), header.size_after())); + let (max_frame_no, db_size, max_offset) = current.with_header(|header| { + ( + header.last_committed(), + header.size_after(), + header.frame_count() as u64, + ) + }); let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed); ReadTransaction { id, @@ -119,6 +124,7 @@ impl SharedWal { pages_read: 0, namespace: self.namespace.clone(), checkpoint_notifier: self.checkpoint_notifier.clone(), + max_offset, } } diff --git a/libsql-wal/src/transaction.rs b/libsql-wal/src/transaction.rs index 93fb3fe16d..a01a7fcff6 100644 --- a/libsql-wal/src/transaction.rs +++ b/libsql-wal/src/transaction.rs @@ -79,6 +79,8 @@ pub struct ReadTransaction { pub id: u64, /// Max frame number that this transaction can read pub max_frame_no: u64, + // max offset that can be read from the current log + pub max_offset: u64, pub db_size: u32, /// The segment to which we have a read lock pub current: Arc>, @@ -105,6 +107,7 @@ impl Clone for ReadTransaction { pages_read: self.pages_read, namespace: self.namespace.clone(), checkpoint_notifier: self.checkpoint_notifier.clone(), + max_offset: self.max_offset, } } } @@ -360,26 +363,3 @@ impl DerefMut for WriteTransaction { &mut self.read_tx } } - -#[cfg(test)] -mod test { - use std::collections::BTreeMap; - - use crate::segment::current::SegmentIndex; - - use super::merge_savepoints; - - #[test] - fn test_merge_savepoints() { - let first = [(1, 1), (3, 2)].into_iter().collect::>(); - let second = [(1, 3), (4, 6)].into_iter().collect::>(); - - let out = SegmentIndex::new(0); - merge_savepoints([first, second].iter().rev(), &out); - - let mut iter = out.iter(0, 100); - assert_eq!(iter.next(), Some((1, 3, 3))); - assert_eq!(iter.next(), Some((3, 2, 2))); - assert_eq!(iter.next(), Some((4, 6, 6))); - } -}