Skip to content

Commit

Permalink
perf: remove heap allocation in snapshot point read path
Browse files Browse the repository at this point in the history
+ also not using MvccStream because we just need to terminate on first matching item
  • Loading branch information
marvin-j97 committed Dec 14, 2024
1 parent 26847b3 commit ab28434
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 35 deletions.
11 changes: 3 additions & 8 deletions src/segment/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,6 @@ impl Segment {
key: K,
seqno: Option<SeqNo>,
) -> crate::Result<Option<InternalValue>> {
use crate::mvcc_stream::MvccStream;
use block_index::BlockIndex;
use value_block::{CachePolicy, ValueBlock};
use value_block_consumer::ValueBlockConsumer;
Expand Down Expand Up @@ -379,11 +378,7 @@ impl Segment {
None,
);
reader.lo_block_size = block.header.data_length.into();
reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(
block,
&Some(key.into()), // TODO: this may cause a heap alloc
&None,
));
reader.lo_block_items = Some(ValueBlockConsumer::with_bounds(block, Some(key), None));
reader.lo_initialized = true;

// NOTE: For finding a specific seqno,
Expand All @@ -403,7 +398,7 @@ impl Segment {
// unfortunately is in the next block
//
// Also because of weak tombstones, we may have to look further than the first item we encounter
let reader = reader.filter(|x| {
let mut reader = reader.filter(|x| {
match x {
Ok(entry) => {
// Check for seqno if needed
Expand All @@ -417,7 +412,7 @@ impl Segment {
}
});

let Some(entry) = MvccStream::new(reader).next().transpose()? else {
let Some(entry) = reader.next().transpose()? else {
return Ok(None);
};

Expand Down
6 changes: 5 additions & 1 deletion src/segment/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ impl Reader {
Ok(Some((
block.header.data_length.into(),
block.header.previous_block_offset,
ValueBlockConsumer::with_bounds(block, &self.start_key, &self.end_key),
ValueBlockConsumer::with_bounds(
block,
self.start_key.as_deref(),
self.end_key.as_deref(),
),
)))
})
}
Expand Down
41 changes: 15 additions & 26 deletions src/segment/value_block_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
// (found in the LICENSE-* files in the repository)

use super::value_block::ValueBlock;
use crate::{value::InternalValue, UserKey};
use crate::value::InternalValue;
use std::sync::Arc;

pub struct ValueBlockConsumer {
Expand All @@ -15,14 +15,14 @@ pub struct ValueBlockConsumer {
impl ValueBlockConsumer {
#[must_use]
pub fn new(inner: Arc<ValueBlock>) -> Self {
Self::with_bounds(inner, &None, &None)
Self::with_bounds(inner, None, None)
}

#[must_use]
pub fn with_bounds(
inner: Arc<ValueBlock>,
start_key: &Option<UserKey>,
end_key: &Option<UserKey>,
start_key: Option<&[u8]>,
end_key: Option<&[u8]>,
) -> Self {
let mut lo = start_key.as_ref().map_or(0, |key| {
inner.items.partition_point(|x| &*x.key.user_key < *key)
Expand Down Expand Up @@ -90,12 +90,9 @@ impl DoubleEndedIterator for ValueBlockConsumer {
#[allow(clippy::expect_used)]
mod tests {
use super::*;
use crate::{
segment::{
block::{checksum::Checksum, header::Header},
value_block::BlockOffset,
},
Slice,
use crate::segment::{
block::{checksum::Checksum, header::Header},
value_block::BlockOffset,
};
use test_log::test;

Expand Down Expand Up @@ -227,15 +224,13 @@ mod tests {
InternalValue::from_components(*b"e", vec![], 0, crate::ValueType::Value),
]);

let mut iter =
ValueBlockConsumer::with_bounds(block.clone().into(), &Some(Slice::from(*b"c")), &None);
let mut iter = ValueBlockConsumer::with_bounds(block.clone().into(), Some(b"c"), None);
assert_eq!(*b"c", &*iter.next().expect("should exist").key.user_key);
assert_eq!(*b"d", &*iter.next().expect("should exist").key.user_key);
assert_eq!(*b"e", &*iter.next().expect("should exist").key.user_key);
iter_closed!(iter);

let mut iter =
ValueBlockConsumer::with_bounds(block.into(), &Some(Slice::from(*b"c")), &None);
let mut iter = ValueBlockConsumer::with_bounds(block.into(), Some(b"c"), None);
assert_eq!(
*b"e",
&*iter.next_back().expect("should exist").key.user_key
Expand All @@ -261,15 +256,13 @@ mod tests {
InternalValue::from_components(*b"e", vec![], 0, crate::ValueType::Value),
]);

let mut iter =
ValueBlockConsumer::with_bounds(block.clone().into(), &None, &Some(Slice::from(*b"c")));
let mut iter = ValueBlockConsumer::with_bounds(block.clone().into(), None, Some(b"c"));
assert_eq!(*b"a", &*iter.next().expect("should exist").key.user_key);
assert_eq!(*b"b", &*iter.next().expect("should exist").key.user_key);
assert_eq!(*b"c", &*iter.next().expect("should exist").key.user_key);
iter_closed!(iter);

let mut iter =
ValueBlockConsumer::with_bounds(block.into(), &None, &Some(Slice::from(*b"c")));
let mut iter = ValueBlockConsumer::with_bounds(block.into(), None, Some(b"c"));
assert_eq!(
*b"c",
&*iter.next_back().expect("should exist").key.user_key
Expand All @@ -294,12 +287,10 @@ mod tests {
InternalValue::from_components(*b"e", vec![], 0, crate::ValueType::Value),
]);

let mut iter =
ValueBlockConsumer::with_bounds(block.clone().into(), &None, &Some(Slice::from(*b"a")));
let mut iter = ValueBlockConsumer::with_bounds(block.clone().into(), None, Some(b"a"));
iter_closed!(iter);

let mut iter =
ValueBlockConsumer::with_bounds(block.into(), &None, &Some(Slice::from(*b"a"))).rev();
let mut iter = ValueBlockConsumer::with_bounds(block.into(), None, Some(b"a")).rev();
iter_closed!(iter);
}

Expand All @@ -313,12 +304,10 @@ mod tests {
InternalValue::from_components(*b"e", vec![], 0, crate::ValueType::Value),
]);

let mut iter =
ValueBlockConsumer::with_bounds(block.clone().into(), &Some(Slice::from(*b"f")), &None);
let mut iter = ValueBlockConsumer::with_bounds(block.clone().into(), Some(b"f"), None);
iter_closed!(iter);

let mut iter =
ValueBlockConsumer::with_bounds(block.into(), &Some(Slice::from(*b"f")), &None).rev();
let mut iter = ValueBlockConsumer::with_bounds(block.into(), Some(b"f"), None).rev();
iter_closed!(iter);
}
}

0 comments on commit ab28434

Please sign in to comment.