Skip to content

Commit

Permalink
Merge pull request #1737 from tursodatabase/fix-s3-keyspace
Browse files Browse the repository at this point in the history
libsql-wal: fix SegmentKey
  • Loading branch information
MarinPostma authored Sep 16, 2024
2 parents 6e0daad + c5359a6 commit 4c0cfda
Show file tree
Hide file tree
Showing 8 changed files with 37 additions and 116 deletions.
3 changes: 2 additions & 1 deletion libsql-wal/src/replication/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use zerocopy::FromZeroes;

use crate::io::buf::ZeroCopyBoxIoBuf;
use crate::segment::Frame;
use crate::storage::backend::FindSegmentReq;
use crate::storage::Storage;

use super::Result;
Expand Down Expand Up @@ -45,7 +46,7 @@ where
) -> Pin<Box<dyn Stream<Item = Result<Box<Frame>>> + Send + 'a>> {
Box::pin(async_stream::try_stream! {
loop {
let key = self.storage.find_segment(&self.namespace, current, None).await?;
let key = self.storage.find_segment(&self.namespace, FindSegmentReq::EndFrameNoLessThan(current), None).await?;
let index = self.storage.fetch_segment_index(&self.namespace, &key, None).await?;
let mut pages = index.into_stream();
let mut maybe_seg = None;
Expand Down
13 changes: 3 additions & 10 deletions libsql-wal/src/storage/async_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::io::{FileExt, Io, StdIO};
use crate::segment::compacted::CompactedSegment;
use crate::segment::Segment;

use super::backend::Backend;
use super::backend::{Backend, FindSegmentReq};
use super::scheduler::Scheduler;
use super::{OnStoreCallback, RestoreOptions, Storage, StoreSegmentRequest};

Expand Down Expand Up @@ -228,18 +228,11 @@ where
async fn find_segment(
&self,
namespace: &NamespaceName,
frame_no: u64,
req: FindSegmentReq,
config_override: Option<Self::Config>,
) -> super::Result<super::SegmentKey> {
let config = config_override.unwrap_or_else(|| self.backend.default_config());
let key = self
.backend
.find_segment(
&config,
namespace,
super::backend::FindSegmentReq::Frame(frame_no),
)
.await?;
let key = self.backend.find_segment(&config, namespace, req).await?;
Ok(key)
}

Expand Down
26 changes: 3 additions & 23 deletions libsql-wal/src/storage/backend/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#![allow(dead_code)]
use std::future::Future;
use std::sync::Arc;
use std::{future::Future, path::Path};

use chrono::{DateTime, Utc};
use fst::Map;
Expand Down Expand Up @@ -31,9 +31,10 @@ pub struct DbMeta {
pub max_frame_no: u64,
}

#[derive(Debug, Clone, Copy)]
pub enum FindSegmentReq {
/// returns a segment containing this frame
Frame(u64),
EndFrameNoLessThan(u64),
/// Returns the segment with closest timestamp less than or equal to the requested timestamp
Timestamp(DateTime<Utc>),
}
Expand Down Expand Up @@ -83,15 +84,6 @@ pub trait Backend: Send + Sync + 'static {
key: SegmentKey,
) -> impl Future<Output = Result<impl FileExt>> + Send;

// /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`.
async fn fetch_segment(
&self,
config: &Self::Config,
namespace: &NamespaceName,
frame_no: u64,
dest_path: &Path,
) -> Result<Map<Arc<[u8]>>>;

/// Fetch meta for `namespace`
fn meta(
&self,
Expand Down Expand Up @@ -132,18 +124,6 @@ impl<T: Backend> Backend for Arc<T> {
.store(config, meta, segment_data, segment_index)
}

async fn fetch_segment(
&self,
config: &Self::Config,
namespace: &NamespaceName,
frame_no: u64,
dest_path: &Path,
) -> Result<fst::Map<Arc<[u8]>>> {
self.as_ref()
.fetch_segment(config, namespace, frame_no, dest_path)
.await
}

async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result<DbMeta> {
self.as_ref().meta(config, namespace).await
}
Expand Down
67 changes: 12 additions & 55 deletions libsql-wal/src/storage/backend/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ pub struct S3Backend<IO> {
default_config: Arc<S3Config>,
io: IO,
}

impl S3Backend<StdIO> {
pub async fn from_sdk_config(
aws_config: SdkConfig,
Expand Down Expand Up @@ -210,7 +209,7 @@ impl<IO: Io> S3Backend<IO> {
frame_no: u64,
) -> Result<Option<SegmentKey>> {
let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key);
let lookup_key = s3_segment_index_lookup_key(&folder_key, frame_no);
let lookup_key = s3_segment_index_ends_before_lookup_key(&folder_key, frame_no);

let objects = self
.client
Expand Down Expand Up @@ -527,7 +526,8 @@ impl fmt::Display for SegmentIndexLookupKey<'_> {
}
}

fn s3_segment_index_lookup_key<'a>(
/// return the biggest segment whose end frame number is less than frame_no
fn s3_segment_index_ends_before_lookup_key<'a>(
folder_key: &'a FolderKey,
frame_no: u64,
) -> SegmentIndexLookupKey<'a> {
Expand Down Expand Up @@ -580,35 +580,6 @@ where
Ok(())
}

async fn fetch_segment(
&self,
config: &Self::Config,
namespace: &NamespaceName,
frame_no: u64,
dest_path: &Path,
) -> Result<fst::Map<Arc<[u8]>>> {
let folder_key = FolderKey {
cluster_id: &config.cluster_id,
namespace: &namespace,
};

let Some(segment_key) = self
.find_segment_by_frame_no(config, &folder_key, frame_no)
.await?
else {
return Err(Error::FrameNotFound(frame_no));
};

if segment_key.includes(frame_no) {
// TODO: make open async
let file = self.io.open(false, false, true, dest_path)?;
self.fetch_segment_from_key(config, &folder_key, &segment_key, &file)
.await
} else {
return Err(Error::FrameNotFound(frame_no));
}
}

async fn meta(
&self,
config: &Self::Config,
Expand Down Expand Up @@ -658,14 +629,14 @@ where
};

match req {
FindSegmentReq::Frame(frame_no) => self
FindSegmentReq::EndFrameNoLessThan(frame_no) => self
.find_segment_by_frame_no(config, &folder_key, frame_no)
.await?
.ok_or_else(|| Error::FrameNotFound(frame_no)),
.ok_or_else(|| Error::SegmentNotFound(req)),
FindSegmentReq::Timestamp(ts) => self
.find_segment_by_timestamp(config, &folder_key, ts)
.await?
.ok_or_else(|| Error::SegmentNotFoundTimestamp(ts)),
.ok_or_else(|| Error::SegmentNotFound(req)),
}
}

Expand Down Expand Up @@ -831,7 +802,6 @@ mod tests {
use fst::MapBuilder;
use s3s::auth::SimpleAuth;
use s3s::service::{S3ServiceBuilder, SharedS3Service};
use tempfile::NamedTempFile;
use uuid::Uuid;

use crate::io::StdIO;
Expand Down Expand Up @@ -901,7 +871,7 @@ mod tests {
SegmentMeta {
namespace: ns.clone(),
segment_id: Uuid::new_v4(),
start_frame_no: 0u64.into(),
start_frame_no: 1u64.into(),
end_frame_no: 64u64.into(),
segment_timestamp: Utc::now(),
},
Expand Down Expand Up @@ -936,30 +906,17 @@ mod tests {
let db_meta = storage.meta(&s3_config, &ns).await.unwrap();
assert_eq!(db_meta.max_frame_no, 128);

let tmp = NamedTempFile::new().unwrap();

let index = storage
.fetch_segment(&s3_config, &ns, 1, tmp.path())
let key = storage
.find_segment(&s3_config, &ns, FindSegmentReq::EndFrameNoLessThan(65))
.await
.unwrap();
assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);
assert_eq!(key.start_frame_no, 1);
assert_eq!(key.end_frame_no, 64);

let index = storage
.fetch_segment(&s3_config, &ns, 63, tmp.path())
.fetch_segment_index(&s3_config, &ns, &key)
.await
.unwrap();
assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42);

let index = storage
.fetch_segment(&s3_config, &ns, 64, tmp.path())
.await
.unwrap();
assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44);

let index = storage
.fetch_segment(&s3_config, &ns, 65, tmp.path())
.await
.unwrap();
assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44);
}
}
1 change: 0 additions & 1 deletion libsql-wal/src/storage/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,6 @@ impl AnalyzedSegments {
end_frame_no,
timestamp,
};
dbg!(&key);
segments.push(key);
}
}
Expand Down
8 changes: 3 additions & 5 deletions libsql-wal/src/storage/error.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::panic::Location;

use chrono::{DateTime, Utc};
use super::backend::FindSegmentReq;

#[derive(thiserror::Error, Debug)]
pub enum Error {
Expand All @@ -10,10 +10,8 @@ pub enum Error {
Store(String),
#[error("error compacting segment: {0}")]
Compact(#[from] crate::error::Error),
#[error("frame not {0} found")]
FrameNotFound(u64),
#[error("No satisfying segment found for timestamp {0}")]
SegmentNotFoundTimestamp(DateTime<Utc>),
#[error("segment not found for request {0:?}")]
SegmentNotFound(FindSegmentReq),
#[error("unhandled storage error: {error}, in {context}")]
UnhandledStorageError {
error: Box<dyn std::error::Error + Send + Sync + 'static>,
Expand Down
10 changes: 0 additions & 10 deletions libsql-wal/src/storage/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,16 +266,6 @@ mod test {
Ok(std::fs::File::open("").unwrap())
}

async fn fetch_segment(
&self,
_config: &Self::Config,
_namespace: &NamespaceName,
_frame_no: u64,
_dest_path: &std::path::Path,
) -> Result<fst::Map<Arc<[u8]>>> {
todo!()
}

fn list_segments<'a>(
&'a self,
_config: Self::Config,
Expand Down
25 changes: 14 additions & 11 deletions libsql-wal/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use crate::io::{FileExt, Io, StdIO};
use crate::segment::compacted::CompactedSegment;
use crate::segment::{sealed::SealedSegment, Segment};

use self::backend::SegmentMeta;
use self::backend::{FindSegmentReq, SegmentMeta};
pub use self::error::Error;

pub mod async_storage;
Expand Down Expand Up @@ -142,10 +142,10 @@ impl FromStr for SegmentKey {
type Err = ();

fn from_str(s: &str) -> std::result::Result<Self, Self::Err> {
let (rev_start_fno, s) = s.split_at(20);
let start_frame_no = u64::MAX - rev_start_fno.parse::<u64>().map_err(|_| ())?;
let (rev_end_fno, timestamp) = s[1..].split_at(20);
let (rev_end_fno, s) = s.split_at(20);
let end_frame_no = u64::MAX - rev_end_fno.parse::<u64>().map_err(|_| ())?;
let (start_fno, timestamp) = s[1..].split_at(20);
let start_frame_no = start_fno.parse::<u64>().map_err(|_| ())?;
let timestamp = timestamp[1..].parse().map_err(|_| ())?;
Ok(Self {
start_frame_no,
Expand All @@ -160,8 +160,8 @@ impl fmt::Display for SegmentKey {
write!(
f,
"{:020}-{:020}-{:020}",
u64::MAX - self.start_frame_no,
u64::MAX - self.end_frame_no,
self.start_frame_no,
self.timestamp,
)
}
Expand Down Expand Up @@ -207,7 +207,7 @@ pub trait Storage: Send + Sync + 'static {
fn find_segment(
&self,
namespace: &NamespaceName,
frame_no: u64,
frame_no: FindSegmentReq,
config_override: Option<Self::Config>,
) -> impl Future<Output = Result<SegmentKey>> + Send;

Expand Down Expand Up @@ -306,7 +306,7 @@ where
fn find_segment(
&self,
namespace: &NamespaceName,
frame_no: u64,
frame_no: FindSegmentReq,
config_override: Option<Self::Config>,
) -> impl Future<Output = Result<SegmentKey>> + Send {
async move {
Expand Down Expand Up @@ -415,7 +415,7 @@ impl Storage for NoStorage {
async fn find_segment(
&self,
_namespace: &NamespaceName,
_frame_no: u64,
_frame_no: FindSegmentReq,
_config_override: Option<Self::Config>,
) -> Result<SegmentKey> {
unimplemented!()
Expand Down Expand Up @@ -564,14 +564,17 @@ impl<IO: Io> Storage for TestStorage<IO> {
async fn find_segment(
&self,
namespace: &NamespaceName,
frame_no: u64,
req: FindSegmentReq,
_config_override: Option<Self::Config>,
) -> Result<SegmentKey> {
let inner = self.inner.lock().await;
if inner.store {
let FindSegmentReq::EndFrameNoLessThan(fno) = req else {
panic!("unsupported lookup by ts")
};
if let Some(segs) = inner.stored.get(namespace) {
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(frame_no)) else {
return Err(Error::FrameNotFound(frame_no));
let Some((key, _path)) = segs.iter().find(|(k, _)| k.includes(fno)) else {
return Err(Error::SegmentNotFound(req));
};
return Ok(*key);
} else {
Expand Down

0 comments on commit 4c0cfda

Please sign in to comment.