Skip to content

Commit

Permalink
fix replica snapshot isolation bug
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 13, 2024
1 parent 8a1ba79 commit d6214f1
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 98 deletions.
76 changes: 3 additions & 73 deletions libsql-wal/src/segment/current.rs
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ impl<F> CurrentSegment<F> {
}

// not a write tx, or page is not in write tx, look into the segment
self.index.locate(page_no, tx.max_frame_no)
self.index.locate(page_no, tx.max_offset)
}

/// reads the page conainted in frame at offset into buf
Expand Down Expand Up @@ -644,13 +644,13 @@ impl SegmentIndex {
}
}

fn locate(&self, page_no: u32, max_frame_no: u64) -> Option<u32> {
fn locate(&self, page_no: u32, max_offset: u64) -> Option<u32> {
let offsets = self.index.get(&page_no)?;
let offsets = offsets.value().read();
offsets
.iter()
.rev()
.find(|fno| self.start_frame_no + **fno as u64 <= max_frame_no)
.find(|fno| **fno as u64 <= max_offset)
.copied()
}

Expand All @@ -672,42 +672,6 @@ impl SegmentIndex {
Ok(())
}

/// returns an iterator over (page_no, offset, frame_no), where the returned offset is the most
/// recent version of the page contained in start_frame_no..end_frame_no
/// This method assumes that the current segment is ordered.
pub(crate) fn iter(
&self,
start_frame_no: u64,
end_frame_no: u64,
) -> impl Iterator<Item = (u32, u32, u64)> + '_ {
// todo: assert segment is sorted
let mut entry = self.index.front();
let mut fused = false;
let start_offset = (start_frame_no - self.start_frame_no) as u32;
let end_offset = (end_frame_no - self.start_frame_no) as u32;
std::iter::from_fn(move || loop {
if fused {
return None;
}
let entry = entry.as_mut()?;
let ret = {
let offsets = entry.value();
let offsets = offsets.read();
if offsets[0] > end_offset || *offsets.last().unwrap() < start_offset {
drop(offsets);
fused = !entry.move_next();
continue;
}
let offset = *offsets.iter().rev().find(|x| **x <= end_offset).unwrap();
Some((*entry.key(), offset, self.start_frame_no + offset as u64))
};

fused = !entry.move_next();

return ret;
})
}

pub(crate) fn insert(&self, page_no: u32, offset: u32) {
let entry = self.index.get_or_insert(page_no, Default::default());
let mut offsets = entry.value().write();
Expand All @@ -734,40 +698,6 @@ mod test {

use super::*;

#[test]
fn index_iter() {
let index = SegmentIndex::new(42);
index.insert(1, 0);
index.insert(1, 3);
index.insert(2, 1);
index.insert(2, 2);
index.insert(3, 5);
index.insert(3, 15);
let mut iter = index.iter(42, 50);
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
assert_eq!(iter.next(), Some((3, 5, 42 + 5)));
assert_eq!(iter.next(), None);

let mut iter = index.iter(42, 100);
assert_eq!(iter.next(), Some((1, 3, 42 + 3)));
assert_eq!(iter.next(), Some((2, 2, 42 + 2)));
assert_eq!(iter.next(), Some((3, 15, 42 + 15)));
assert_eq!(iter.next(), None);
}

#[should_panic]
#[test]
fn index_iter_out_of_bounds() {
let index = SegmentIndex::new(42);
index.insert(1, 0);
index.insert(1, 3);
index.insert(2, 1);
index.insert(2, 2);
assert_eq!(index.iter(1, 41).count(), 0);
assert_eq!(index.iter(43, 72).count(), 0);
}

#[tokio::test]
async fn current_stream_frames() {
let env = TestEnv::new();
Expand Down
10 changes: 8 additions & 2 deletions libsql-wal/src/shared_wal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,13 @@ impl<IO: Io> SharedWal<IO> {
// is not sealed. If the segment is sealed, retry with the current segment
let current = self.current.load();
current.inc_reader_count();
let (max_frame_no, db_size) =
current.with_header(|header| (header.last_committed(), header.size_after()));
let (max_frame_no, db_size, max_offset) = current.with_header(|header| {
(
header.last_committed(),
header.size_after(),
header.frame_count() as u64,
)
});
let id = self.wal_lock.next_tx_id.fetch_add(1, Ordering::Relaxed);
ReadTransaction {
id,
Expand All @@ -119,6 +124,7 @@ impl<IO: Io> SharedWal<IO> {
pages_read: 0,
namespace: self.namespace.clone(),
checkpoint_notifier: self.checkpoint_notifier.clone(),
max_offset,
}
}

Expand Down
26 changes: 3 additions & 23 deletions libsql-wal/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct ReadTransaction<F> {
pub id: u64,
/// Max frame number that this transaction can read
pub max_frame_no: u64,
// max offset that can be read from the current log
pub max_offset: u64,
pub db_size: u32,
/// The segment to which we have a read lock
pub current: Arc<CurrentSegment<F>>,
Expand All @@ -105,6 +107,7 @@ impl<F> Clone for ReadTransaction<F> {
pages_read: self.pages_read,
namespace: self.namespace.clone(),
checkpoint_notifier: self.checkpoint_notifier.clone(),
max_offset: self.max_offset,
}
}
}
Expand Down Expand Up @@ -360,26 +363,3 @@ impl<F> DerefMut for WriteTransaction<F> {
&mut self.read_tx
}
}

#[cfg(test)]
mod test {
use std::collections::BTreeMap;

use crate::segment::current::SegmentIndex;

use super::merge_savepoints;

#[test]
fn test_merge_savepoints() {
let first = [(1, 1), (3, 2)].into_iter().collect::<BTreeMap<_, _>>();
let second = [(1, 3), (4, 6)].into_iter().collect::<BTreeMap<_, _>>();

let out = SegmentIndex::new(0);
merge_savepoints([first, second].iter().rev(), &out);

let mut iter = out.iter(0, 100);
assert_eq!(iter.next(), Some((1, 3, 3)));
assert_eq!(iter.next(), Some((3, 2, 2)));
assert_eq!(iter.next(), Some((4, 6, 6)));
}
}

0 comments on commit d6214f1

Please sign in to comment.