Skip to content

Commit

Permalink
fix: create version scan iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
arriqaaq committed Jan 13, 2025
1 parent de71a25 commit 79cb1bb
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 143 deletions.
95 changes: 93 additions & 2 deletions src/iter.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::{
cmp::Ordering, collections::btree_map, iter::Peekable, marker::PhantomData, ops::RangeBounds,
cmp::Ordering,
collections::{btree_map, VecDeque},
iter::Peekable,
marker::PhantomData,
ops::RangeBounds,
};

use bytes::Bytes;
Expand All @@ -8,7 +12,7 @@ use vart::VariableSizeKey;
use crate::{
indexer::IndexValue,
store::Core,
transaction::{ReadSet, ReadSetEntry, ScanResult, WriteSet, WriteSetEntry},
transaction::{ReadSet, ReadSetEntry, ScanResult, ScanVersionResult, WriteSet, WriteSetEntry},
util::convert_range_bounds_bytes,
Result,
};
Expand Down Expand Up @@ -268,3 +272,90 @@ where
result
}
}

/// An iterator that returns all versions of keys within a given range.
/// For each key, it returns all versions before moving to the next key.
/// Optionally limits the number of unique keys returned.
pub struct VersionScanIterator<'a, I: Iterator> {
snap_iter: Peekable<I>,
current_key_versions: VecDeque<(&'a [u8], Vec<u8>, u64, bool)>,
unique_keys_count: usize,
current_key: Option<&'a [u8]>, // Track current key to detect changes
limit: Option<usize>,
core: &'a Core,
}

impl<'a, I: Iterator> VersionScanIterator<'a, I>
where
I: Iterator<Item = (&'a [u8], &'a IndexValue, u64, u64)>,
{
pub(crate) fn new(core: &'a Core, snap_iter: I, limit: Option<usize>) -> Self {
Self {
snap_iter: snap_iter.peekable(),
current_key_versions: VecDeque::new(),
unique_keys_count: 0,
current_key: None,
limit,
core,
}
}
}

impl<'a, I> Iterator for VersionScanIterator<'a, I>
where
I: Iterator<Item = (&'a [u8], &'a IndexValue, u64, u64)>,
{
type Item = Result<ScanVersionResult<'a>>;

fn next(&mut self) -> Option<Self::Item> {
// Return any pending versions first
if let Some(version) = self.current_key_versions.pop_front() {
return Some(Ok(version));
}

// Get next item
let next_item = self.snap_iter.next()?;
let (key, value, _, ts) = next_item;

// Check if this is a new key
if self.current_key != Some(key) {
self.current_key = Some(key);
self.unique_keys_count += 1;

// Check limit
if let Some(limit) = self.limit {
if self.unique_keys_count > limit {
return None;
}
}
}

// Process version
let is_deleted = value.metadata().is_some_and(|md| md.is_tombstone());
match value.resolve(self.core) {
Ok(v) => {
// Add first version
self.current_key_versions
.push_back((key, v, ts, is_deleted));

// Collect all other versions for this key
while let Some(&(next_key, value, _, ts)) = self.snap_iter.peek() {
if next_key != key {
break;
}
// Consume the peeked item
self.snap_iter.next();
let is_deleted = value.metadata().is_some_and(|md| md.is_tombstone());
if let Ok(v) = value.resolve(self.core) {
self.current_key_versions
.push_back((key, v, ts, is_deleted));
}
}

// The next iteration will handle returning the first version
self.next()
}
Err(e) => Some(Err(e)),
}
}
}
8 changes: 4 additions & 4 deletions src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1598,9 +1598,9 @@ mod tests {
txn2.commit().await.unwrap();

let txn3 = store.begin().unwrap();
let versions = txn3
let versions: Vec<_> = txn3
.scan_all_versions(key.as_ref()..=key.as_ref(), None)
.unwrap();
.collect();
assert!(versions.is_empty());

store.close().await.unwrap();
Expand Down Expand Up @@ -1629,9 +1629,9 @@ mod tests {
txn2.commit().await.unwrap();

let txn3 = store.begin().unwrap();
let versions = txn3
let versions: Vec<_> = txn3
.scan_all_versions(key.as_ref()..=key.as_ref(), None)
.unwrap();
.collect();
assert!(versions.is_empty());

store.close().await.unwrap();
Expand Down
Loading

0 comments on commit 79cb1bb

Please sign in to comment.