diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index 9b3bb15963..e759f08ae4 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -9,6 +9,7 @@ use zerocopy::FromZeroes; use crate::io::buf::ZeroCopyBoxIoBuf; use crate::segment::Frame; +use crate::storage::backend::FindSegmentReq; use crate::storage::Storage; use super::Result; @@ -45,7 +46,7 @@ where ) -> Pin>> + Send + 'a>> { Box::pin(async_stream::try_stream! { loop { - let key = self.storage.find_segment(&self.namespace, current, None).await?; + let key = self.storage.find_segment(&self.namespace, FindSegmentReq::EndFrameNoLessThan(current), None).await?; let index = self.storage.fetch_segment_index(&self.namespace, &key, None).await?; let mut pages = index.into_stream(); let mut maybe_seg = None; diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index d02703376c..3fc62fa56d 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -13,7 +13,7 @@ use crate::io::{FileExt, Io, StdIO}; use crate::segment::compacted::CompactedSegment; use crate::segment::Segment; -use super::backend::Backend; +use super::backend::{Backend, FindSegmentReq}; use super::scheduler::Scheduler; use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest}; @@ -228,18 +228,11 @@ where async fn find_segment( &self, namespace: &NamespaceName, - frame_no: u64, + req: FindSegmentReq, config_override: Option, ) -> super::Result { let config = config_override.unwrap_or_else(|| self.backend.default_config()); - let key = self - .backend - .find_segment( - &config, - namespace, - super::backend::FindSegmentReq::Frame(frame_no), - ) - .await?; + let key = self.backend.find_segment(&config, namespace, req).await?; Ok(key) } diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index fb9c1ba8ba..e9ceb743ea 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -1,6 +1,6 @@ #![allow(dead_code)] +use std::future::Future; use std::sync::Arc; -use std::{future::Future, path::Path}; use chrono::{DateTime, Utc}; use fst::Map; @@ -31,9 +31,10 @@ pub struct DbMeta { pub max_frame_no: u64, } +#[derive(Debug, Clone, Copy)] pub enum FindSegmentReq { /// returns a segment containing this frame - Frame(u64), + EndFrameNoLessThan(u64), /// Returns the segment with closest timestamp less than or equal to the requested timestamp Timestamp(DateTime), } @@ -83,15 +84,6 @@ pub trait Backend: Send + Sync + 'static { key: SegmentKey, ) -> impl Future> + Send; - // /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`. - async fn fetch_segment( - &self, - config: &Self::Config, - namespace: &NamespaceName, - frame_no: u64, - dest_path: &Path, - ) -> Result>>; - /// Fetch meta for `namespace` fn meta( &self, @@ -132,18 +124,6 @@ impl Backend for Arc { .store(config, meta, segment_data, segment_index) } - async fn fetch_segment( - &self, - config: &Self::Config, - namespace: &NamespaceName, - frame_no: u64, - dest_path: &Path, - ) -> Result>> { - self.as_ref() - .fetch_segment(config, namespace, frame_no, dest_path) - .await - } - async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result { self.as_ref().meta(config, namespace).await } diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 738595cc00..81f7dfcbad 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -39,7 +39,6 @@ pub struct S3Backend { default_config: Arc, io: IO, } - impl S3Backend { pub async fn from_sdk_config( aws_config: SdkConfig, @@ -210,7 +209,7 @@ impl S3Backend { frame_no: u64, ) -> Result> { let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key); - let lookup_key = s3_segment_index_lookup_key(&folder_key, frame_no); + let lookup_key = s3_segment_index_ends_before_lookup_key(&folder_key, frame_no); let objects = self .client @@ -527,7 +526,8 @@ impl fmt::Display for SegmentIndexLookupKey<'_> { } } -fn s3_segment_index_lookup_key<'a>( +/// return the biggest segment whose end frame number is less than frame_no +fn s3_segment_index_ends_before_lookup_key<'a>( folder_key: &'a FolderKey, frame_no: u64, ) -> SegmentIndexLookupKey<'a> { @@ -580,35 +580,6 @@ where Ok(()) } - async fn fetch_segment( - &self, - config: &Self::Config, - namespace: &NamespaceName, - frame_no: u64, - dest_path: &Path, - ) -> Result>> { - let folder_key = FolderKey { - cluster_id: &config.cluster_id, - namespace: &namespace, - }; - - let Some(segment_key) = self - .find_segment_by_frame_no(config, &folder_key, frame_no) - .await? - else { - return Err(Error::FrameNotFound(frame_no)); - }; - - if segment_key.includes(frame_no) { - // TODO: make open async - let file = self.io.open(false, false, true, dest_path)?; - self.fetch_segment_from_key(config, &folder_key, &segment_key, &file) - .await - } else { - return Err(Error::FrameNotFound(frame_no)); - } - } - async fn meta( &self, config: &Self::Config, @@ -658,14 +629,14 @@ where }; match req { - FindSegmentReq::Frame(frame_no) => self + FindSegmentReq::EndFrameNoLessThan(frame_no) => self .find_segment_by_frame_no(config, &folder_key, frame_no) .await? - .ok_or_else(|| Error::FrameNotFound(frame_no)), + .ok_or_else(|| Error::SegmentNotFound(req)), FindSegmentReq::Timestamp(ts) => self .find_segment_by_timestamp(config, &folder_key, ts) .await? - .ok_or_else(|| Error::SegmentNotFoundTimestamp(ts)), + .ok_or_else(|| Error::SegmentNotFound(req)), } } @@ -831,7 +802,6 @@ mod tests { use fst::MapBuilder; use s3s::auth::SimpleAuth; use s3s::service::{S3ServiceBuilder, SharedS3Service}; - use tempfile::NamedTempFile; use uuid::Uuid; use crate::io::StdIO; @@ -901,7 +871,7 @@ mod tests { SegmentMeta { namespace: ns.clone(), segment_id: Uuid::new_v4(), - start_frame_no: 0u64.into(), + start_frame_no: 1u64.into(), end_frame_no: 64u64.into(), segment_timestamp: Utc::now(), }, @@ -936,30 +906,17 @@ mod tests { let db_meta = storage.meta(&s3_config, &ns).await.unwrap(); assert_eq!(db_meta.max_frame_no, 128); - let tmp = NamedTempFile::new().unwrap(); - - let index = storage - .fetch_segment(&s3_config, &ns, 1, tmp.path()) + let key = storage + .find_segment(&s3_config, &ns, FindSegmentReq::EndFrameNoLessThan(65)) .await .unwrap(); - assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42); + assert_eq!(key.start_frame_no, 1); + assert_eq!(key.end_frame_no, 64); let index = storage - .fetch_segment(&s3_config, &ns, 63, tmp.path()) + .fetch_segment_index(&s3_config, &ns, &key) .await .unwrap(); assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42); - - let index = storage - .fetch_segment(&s3_config, &ns, 64, tmp.path()) - .await - .unwrap(); - assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44); - - let index = storage - .fetch_segment(&s3_config, &ns, 65, tmp.path()) - .await - .unwrap(); - assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44); } } diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index 86053d3a6e..dc6b24584f 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -392,7 +392,6 @@ impl AnalyzedSegments { end_frame_no, timestamp, }; - dbg!(&key); segments.push(key); } } diff --git a/libsql-wal/src/storage/error.rs b/libsql-wal/src/storage/error.rs index f4b2f48930..6c9b0cf04a 100644 --- a/libsql-wal/src/storage/error.rs +++ b/libsql-wal/src/storage/error.rs @@ -1,6 +1,6 @@ use std::panic::Location; -use chrono::{DateTime, Utc}; +use super::backend::FindSegmentReq; #[derive(thiserror::Error, Debug)] pub enum Error { @@ -10,10 +10,8 @@ pub enum Error { Store(String), #[error("error compacting segment: {0}")] Compact(#[from] crate::error::Error), - #[error("frame not {0} found")] - FrameNotFound(u64), - #[error("No satisfying segment found for timestamp {0}")] - SegmentNotFoundTimestamp(DateTime), + #[error("segment not found for request {0:?}")] + SegmentNotFound(FindSegmentReq), #[error("unhandled storage error: {error}, in {context}")] UnhandledStorageError { error: Box, diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index 8f4fd9e393..603fe4a431 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -266,16 +266,6 @@ mod test { Ok(std::fs::File::open("").unwrap()) } - async fn fetch_segment( - &self, - _config: &Self::Config, - _namespace: &NamespaceName, - _frame_no: u64, - _dest_path: &std::path::Path, - ) -> Result>> { - todo!() - } - fn list_segments<'a>( &'a self, _config: Self::Config, diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 8e746280a4..86f09abaae 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -18,7 +18,7 @@ use crate::io::{FileExt, Io, StdIO}; use crate::segment::compacted::CompactedSegment; use crate::segment::{sealed::SealedSegment, Segment}; -use self::backend::SegmentMeta; +use self::backend::{FindSegmentReq, SegmentMeta}; pub use self::error::Error; pub mod async_storage; @@ -142,10 +142,10 @@ impl FromStr for SegmentKey { type Err = (); fn from_str(s: &str) -> std::result::Result { - let (rev_start_fno, s) = s.split_at(20); - let start_frame_no = u64::MAX - rev_start_fno.parse::().map_err(|_| ())?; - let (rev_end_fno, timestamp) = s[1..].split_at(20); + let (rev_end_fno, s) = s.split_at(20); let end_frame_no = u64::MAX - rev_end_fno.parse::().map_err(|_| ())?; + let (start_fno, timestamp) = s[1..].split_at(20); + let start_frame_no = start_fno.parse::().map_err(|_| ())?; let timestamp = timestamp[1..].parse().map_err(|_| ())?; Ok(Self { start_frame_no, @@ -160,8 +160,8 @@ impl fmt::Display for SegmentKey { write!( f, "{:020}-{:020}-{:020}", - u64::MAX - self.start_frame_no, u64::MAX - self.end_frame_no, + self.start_frame_no, self.timestamp, ) } @@ -207,7 +207,7 @@ pub trait Storage: Send + Sync + 'static { fn find_segment( &self, namespace: &NamespaceName, - frame_no: u64, + frame_no: FindSegmentReq, config_override: Option, ) -> impl Future> + Send; @@ -306,7 +306,7 @@ where fn find_segment( &self, namespace: &NamespaceName, - frame_no: u64, + frame_no: FindSegmentReq, config_override: Option, ) -> impl Future> + Send { async move { @@ -415,7 +415,7 @@ impl Storage for NoStorage { async fn find_segment( &self, _namespace: &NamespaceName, - _frame_no: u64, + _frame_no: FindSegmentReq, _config_override: Option, ) -> Result { unimplemented!() @@ -564,14 +564,17 @@ impl Storage for TestStorage { async fn find_segment( &self, namespace: &NamespaceName, - frame_no: u64, + req: FindSegmentReq, _config_override: Option, ) -> Result { let inner = self.inner.lock().await; if inner.store { + let FindSegmentReq::EndFrameNoLessThan(fno) = req else { + panic!("unsupported lookup by ts") + }; if let Some(segs) = inner.stored.get(namespace) { - let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(frame_no)) else { - return Err(Error::FrameNotFound(frame_no)); + let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(fno)) else { + return Err(Error::SegmentNotFound(req)); }; return Ok(*key); } else {