From 9db13fe48c508ef955ec93189fe1889771f3ddc7 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 22 Jul 2024 11:28:35 +0200 Subject: [PATCH] libsql wal restore (#1569) * make copy_to_file take ref to file * turn compacted segment frame_count to u32 * add size after to compacted segment header * make durable frame_no async * pass namespace by ref * add backend to async storage * add restore to Storage * add restore to shell * fix shell bug * error handling * fix tests --- libsql-wal/src/bins/shell/main.rs | 165 +++++++++++------- libsql-wal/src/io/compat.rs | 2 +- libsql-wal/src/lib.rs | 15 +- libsql-wal/src/registry.rs | 3 +- libsql-wal/src/segment/compacted.rs | 3 +- libsql-wal/src/segment/sealed.rs | 3 +- libsql-wal/src/storage/async_storage.rs | 69 +++++--- libsql-wal/src/storage/backend/mod.rs | 67 +++---- libsql-wal/src/storage/backend/s3.rs | 221 ++++++++++++++++++------ libsql-wal/src/storage/error.rs | 8 + libsql-wal/src/storage/job.rs | 15 +- libsql-wal/src/storage/mod.rs | 63 ++++++- libsql-wal/tests/misc.rs | 8 +- 13 files changed, 432 insertions(+), 210 deletions(-) diff --git a/libsql-wal/src/bins/shell/main.rs b/libsql-wal/src/bins/shell/main.rs index b1d837dd62..99feafdd5a 100644 --- a/libsql-wal/src/bins/shell/main.rs +++ b/libsql-wal/src/bins/shell/main.rs @@ -1,27 +1,25 @@ +use std::fs::OpenOptions; use std::path::{Path, PathBuf}; use std::sync::Arc; use aws_config::{BehaviorVersion, Region}; use aws_credential_types::Credentials; use aws_sdk_s3::config::SharedCredentialsProvider; -use clap::Parser; -use libsql_wal::storage::backend::Backend; +use clap::{Parser, ValueEnum}; use tokio::task::{block_in_place, JoinSet}; use libsql_sys::name::NamespaceName; -use libsql_sys::rusqlite::{OpenFlags, OptionalExtension}; +use libsql_sys::rusqlite::OpenFlags; use libsql_wal::io::StdIO; use libsql_wal::registry::WalRegistry; use libsql_wal::segment::sealed::SealedSegment; use libsql_wal::storage::async_storage::{AsyncStorage, AsyncStorageInitConfig}; -use libsql_wal::storage::backend::s3::{S3Backend, S3Config}; +use libsql_wal::storage::backend::s3::S3Backend; use libsql_wal::storage::Storage; use libsql_wal::wal::LibsqlWalManager; #[derive(Debug, clap::Parser)] struct Cli { - #[arg(long, short = 'p')] - db_path: PathBuf, #[command(flatten)] s3_args: S3Args, #[arg(long, short = 'n')] @@ -58,10 +56,24 @@ struct S3Args { s3_region_id: Option, } +#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] +enum RestoreOptions { + Latest, +} + #[derive(Debug, clap::Subcommand)] enum Subcommand { - Shell, + Shell { + #[arg(long, short = 'p')] + db_path: PathBuf, + }, Infos, + Restore { + #[arg(long)] + from: RestoreOptions, + #[arg(long, short)] + path: PathBuf, + }, } #[tokio::main] @@ -70,19 +82,8 @@ async fn main() { let mut join_set = JoinSet::new(); if cli.s3_args.enable_s3 { - let registry = setup_s3_registry( - &cli.db_path, - &cli.s3_args.s3_bucket.as_ref().unwrap(), - &cli.s3_args.cluster_id.as_ref().unwrap(), - &cli.s3_args.s3_url.as_ref().unwrap(), - &cli.s3_args.s3_region_id.as_ref().unwrap(), - &cli.s3_args.s3_access_key_id.as_ref().unwrap(), - &cli.s3_args.s3_access_key.as_ref().unwrap(), - &mut join_set, - ) - .await; - - handle(registry, &cli).await; + let storage = setup_s3_storage(&cli, &mut join_set).await; + handle(&cli, storage).await; } else { todo!() } @@ -90,43 +91,64 @@ async fn main() { while join_set.join_next().await.is_some() {} } -async fn handle(env: Env, cli: &Cli) +async fn handle(cli: &Cli, storage: S) where S: Storage>, - B: Backend, { - match cli.subcommand { - Subcommand::Shell => { - let path = cli.db_path.join("dbs").join(&cli.namespace); + match &cli.subcommand { + Subcommand::Shell { db_path } => { + let registry = WalRegistry::new(db_path.clone(), storage).unwrap(); run_shell( - env.registry, - &path, + registry, + &db_path, NamespaceName::from_string(cli.namespace.clone()), ) - .await + .await; + } + Subcommand::Infos => handle_infos(&cli.namespace, storage).await, + Subcommand::Restore { from, path } => { + let namespace = NamespaceName::from_string(cli.namespace.clone()); + handle_restore(&namespace, storage, *from, path).await } - Subcommand::Infos => handle_infos(&cli.namespace, env).await, } } -async fn handle_infos(namespace: &str, env: Env) -where - B: Backend, +async fn handle_restore( + namespace: &NamespaceName, + storage: S, + _from: RestoreOptions, + db_path: &Path, +) where + S: Storage, { - let namespace = NamespaceName::from_string(namespace.to_owned()); - let meta = env - .backend - .meta(&env.backend.default_config(), namespace.clone()) + let options = libsql_wal::storage::RestoreOptions::Latest; + let file = OpenOptions::new() + .create_new(true) + .write(true) + .open(db_path) + .unwrap(); + storage + .restore(file, &namespace, options, None) .await .unwrap(); +} + +async fn handle_infos(namespace: &str, storage: S) +where + S: Storage, +{ + let namespace = NamespaceName::from_string(namespace.to_owned()); + let durable = storage.durable_frame_no(&namespace, None).await; println!("namespace: {namespace}"); - println!("max durable frame: {}", meta.max_frame_no); + println!("max durable frame: {durable}"); } async fn run_shell(registry: WalRegistry, db_path: &Path, namespace: NamespaceName) where S: Storage>, { + let db_path = db_path.join("dbs").join(namespace.as_str()); + tokio::fs::create_dir_all(&db_path).await.unwrap(); let registry = Arc::new(registry); let resolver = move |path: &Path| { NamespaceName::from_string( @@ -163,14 +185,27 @@ where continue; } - if let Err(e) = block_in_place(|| { - conn.query_row(&q, (), |row| { - println!("{row:?}"); - Ok(()) - }) - .optional() - }) { - println!("error: {e}"); + match block_in_place(|| conn.prepare(&q)) { + Ok(mut stmt) => { + match block_in_place(|| { + stmt.query_map((), |row| { + println!("{row:?}"); + Ok(()) + }) + }) { + Ok(rows) => block_in_place(|| { + rows.for_each(|_| ()); + }), + Err(e) => { + println!("error: {e}"); + continue; + } + } + } + Err(e) => { + println!("error: {e}"); + continue; + } } } Err(_) => { @@ -204,33 +239,30 @@ async fn handle_builtin( false } -struct Env { - registry: WalRegistry, - backend: Arc, -} - -async fn setup_s3_registry( - db_path: &Path, - bucket_name: &str, - cluster_id: &str, - url: &str, - region_id: &str, - access_key_id: &str, - secret_access_key: &str, +async fn setup_s3_storage( + cli: &Cli, join_set: &mut JoinSet<()>, -) -> Env>, S3Backend> { - let cred = Credentials::new(access_key_id, secret_access_key, None, None, ""); +) -> AsyncStorage, SealedSegment> { + let cred = Credentials::new( + cli.s3_args.s3_access_key_id.as_ref().unwrap(), + cli.s3_args.s3_access_key.as_ref().unwrap(), + None, + None, + "", + ); let config = aws_config::SdkConfig::builder() .behavior_version(BehaviorVersion::latest()) - .region(Region::new(region_id.to_string())) + .region(Region::new( + cli.s3_args.s3_region_id.as_ref().unwrap().to_string(), + )) .credentials_provider(SharedCredentialsProvider::new(cred)) - .endpoint_url(url) + .endpoint_url(cli.s3_args.s3_url.as_ref().unwrap()) .build(); let backend = Arc::new( S3Backend::from_sdk_config( config.clone(), - bucket_name.to_string(), - cluster_id.to_string(), + cli.s3_args.s3_bucket.as_ref().unwrap().to_string(), + cli.s3_args.cluster_id.as_ref().unwrap().to_string(), ) .await .unwrap(), @@ -244,7 +276,6 @@ async fn setup_s3_registry( join_set.spawn(async move { storage_loop.run().await; }); - let path = db_path.join("wals"); - let registry = WalRegistry::new(path, storage).unwrap(); - Env { registry, backend } + + storage } diff --git a/libsql-wal/src/io/compat.rs b/libsql-wal/src/io/compat.rs index 30fb93bd90..461cbfdc39 100644 --- a/libsql-wal/src/io/compat.rs +++ b/libsql-wal/src/io/compat.rs @@ -8,7 +8,7 @@ use super::FileExt; /// Copy from src that implements AsyncRead to the detination file, returning how many bytes have /// been copied -pub async fn copy_to_file(mut src: R, dst: F) -> io::Result +pub async fn copy_to_file(mut src: R, dst: &F) -> io::Result where F: FileExt, R: AsyncRead + Unpin, diff --git a/libsql-wal/src/lib.rs b/libsql-wal/src/lib.rs index 39bb4546e5..3ad51d5748 100644 --- a/libsql-wal/src/lib.rs +++ b/libsql-wal/src/lib.rs @@ -56,23 +56,26 @@ pub mod test { pub fn shared(&self, namespace: &str) -> Arc> { let path = self.tmp.path().join(namespace).join("data"); - self.registry - .clone() - .open(path.as_ref(), &NamespaceName::from_string(namespace.into())) - .unwrap() + let registry = self.registry.clone(); + let namespace = NamespaceName::from_string(namespace.into()); + registry.clone().open(path.as_ref(), &namespace).unwrap() } pub fn db_path(&self, namespace: &str) -> PathBuf { self.tmp.path().join(namespace) } - pub fn open_conn(&self, namespace: &str) -> libsql_sys::Connection> { + pub fn open_conn( + &self, + namespace: &'static str, + ) -> libsql_sys::Connection> { let path = self.db_path(namespace); + let wal = self.wal.clone(); std::fs::create_dir_all(&path).unwrap(); libsql_sys::Connection::open( path.join("data"), OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE, - self.wal.clone(), + wal, 100000, None, ) diff --git a/libsql-wal/src/registry.rs b/libsql-wal/src/registry.rs index 262026195b..65f6eec04a 100644 --- a/libsql-wal/src/registry.rs +++ b/libsql-wal/src/registry.rs @@ -245,7 +245,8 @@ where let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1); // TODO: pass config override here - let durable_frame_no = self.storage.durable_frame_no(&namespace, None).into(); + let max_frame_no = self.storage.durable_frame_no_sync(&namespace, None); + let durable_frame_no = max_frame_no.into(); let shared = Arc::new(SharedWal { current, diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index 18aac8605e..1964698942 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -4,10 +4,11 @@ use zerocopy::{AsBytes, FromBytes, FromZeroes}; #[derive(Debug, AsBytes, FromZeroes, FromBytes)] #[repr(C)] pub struct CompactedSegmentDataHeader { - pub(crate) frame_count: lu64, + pub(crate) frame_count: lu32, pub(crate) segment_id: lu128, pub(crate) start_frame_no: lu64, pub(crate) end_frame_no: lu64, + pub(crate) size_after: lu32, } #[derive(Debug, AsBytes, FromZeroes, FromBytes)] diff --git a/libsql-wal/src/segment/sealed.rs b/libsql-wal/src/segment/sealed.rs index da0383dad1..9e1695bfbd 100644 --- a/libsql-wal/src/segment/sealed.rs +++ b/libsql-wal/src/segment/sealed.rs @@ -82,10 +82,11 @@ where let mut hasher = crc32fast::Hasher::new(); let header = CompactedSegmentDataHeader { - frame_count: (self.index().len() as u64).into(), + frame_count: (self.index().len() as u32).into(), segment_id: id.as_u128().into(), start_frame_no: self.header().start_frame_no, end_frame_no: self.header().last_commited_frame_no, + size_after: self.header.size_after, }; hasher.update(header.as_bytes()); diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index 5b4975c6eb..292347e62f 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -13,7 +13,7 @@ use crate::segment::Segment; use super::backend::Backend; use super::scheduler::Scheduler; -use super::{Storage, StoreSegmentRequest}; +use super::{RestoreOptions, Storage, StoreSegmentRequest}; /// Background loop task state. /// @@ -117,7 +117,7 @@ where let backend = self.backend.clone(); let config = config_override.unwrap_or_else(|| backend.default_config()); tokio::spawn(async move { - let meta = backend.meta(&config, namespace).await.unwrap(); + let meta = backend.meta(&config, &namespace).await.unwrap(); let _ = ret.send(meta.max_frame_no); }); } @@ -140,21 +140,22 @@ enum StorageLoopMessage { }, } -pub struct AsyncStorage { +pub struct AsyncStorage { /// send request to the main loop - job_sender: mpsc::UnboundedSender>, + job_sender: mpsc::UnboundedSender>, /// receiver for the current max durable index durable_notifier: mpsc::Receiver<(NamespaceName, u64)>, force_shutdown: oneshot::Sender<()>, + backend: Arc, } -impl Storage for AsyncStorage +impl Storage for AsyncStorage where - C: Send + Sync + 'static, + B: Backend, S: Segment, { type Segment = S; - type Config = C; + type Config = B::Config; fn store( &self, @@ -174,21 +175,36 @@ where .expect("bottomless loop was closed before the handle was dropped"); } - fn durable_frame_no( + async fn durable_frame_no( &self, namespace: &NamespaceName, config_override: Option>, ) -> u64 { - let (ret, rcv) = oneshot::channel(); - self.job_sender - .send(StorageLoopMessage::DurableFrameNoReq { - namespace: namespace.clone(), - ret, - config_override, - }) - .expect("bottomless loop was closed before the handle was dropped"); + let config = config_override.unwrap_or_else(|| self.backend.default_config()); + let meta = self.backend.meta(&config, namespace).await.unwrap(); + meta.max_frame_no + } - rcv.blocking_recv().unwrap() + async fn restore( + &self, + file: impl crate::io::FileExt, + namespace: &NamespaceName, + restore_options: RestoreOptions, + config_override: Option>, + ) -> super::Result<()> { + let config = config_override.unwrap_or_else(|| self.backend.default_config()); + self.backend + .restore(&config, &namespace, restore_options, file) + .await + } + + fn durable_frame_no_sync( + &self, + namespace: &NamespaceName, + config_override: Option>, + ) -> u64 { + tokio::runtime::Handle::current() + .block_on(self.durable_frame_no(namespace, config_override)) } } @@ -197,27 +213,25 @@ pub struct AsyncStorageInitConfig { pub max_in_flight_jobs: usize, } -impl AsyncStorage { - pub async fn new( +impl AsyncStorage { + pub async fn new( config: AsyncStorageInitConfig, - ) -> (AsyncStorage, AsyncStorageLoop) + ) -> (AsyncStorage, AsyncStorageLoop) where - B: Backend, + B: Backend, S: Segment, - C: Send + Sync + 'static, { Self::new_with_io(config, Arc::new(StdIO(()))).await } - pub async fn new_with_io( + pub async fn new_with_io( config: AsyncStorageInitConfig, io: Arc, - ) -> (AsyncStorage, AsyncStorageLoop) + ) -> (AsyncStorage, AsyncStorageLoop) where - B: Backend, + B: Backend, IO: Io, S: Segment, - C: Send + Sync + 'static, { let (job_snd, job_rcv) = tokio::sync::mpsc::unbounded_channel(); let (durable_notifier_snd, durable_notifier_rcv) = tokio::sync::mpsc::channel(16); @@ -226,7 +240,7 @@ impl AsyncStorage { let storage_loop = AsyncStorageLoop { receiver: job_rcv, scheduler, - backend: config.backend, + backend: config.backend.clone(), io, max_in_flight: config.max_in_flight_jobs, force_shutdown: shutdown_rcv, @@ -236,6 +250,7 @@ impl AsyncStorage { job_sender: job_snd, durable_notifier: durable_notifier_rcv, force_shutdown: shutdown_snd, + backend: config.backend, }; (this, storage_loop) diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index a7aa8e04b4..ecfff4bf6a 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -5,10 +5,9 @@ use std::sync::Arc; use chrono::{DateTime, Utc}; use fst::Map; -use tokio::io::AsyncWrite; use uuid::Uuid; -use super::Result; +use super::{RestoreOptions, Result}; use crate::io::file::FileExt; use libsql_sys::name::NamespaceName; @@ -27,13 +26,6 @@ pub struct SegmentMeta { pub struct RestoreRequest {} -pub struct RestoreOptions { - /// Namespace to restore - namespace: NamespaceName, - /// If provided, will restore up to the most recent segment lesser or equal to `before` - before: Option>, -} - pub struct DbMeta { pub max_frame_no: u64, } @@ -54,43 +46,26 @@ pub trait Backend: Send + Sync + 'static { /// Fetch a segment for `namespace` containing `frame_no`, and writes it to `dest`. async fn fetch_segment( &self, - _config: &Self::Config, - _namespace: NamespaceName, - _frame_no: u64, - _dest_path: &Path, + config: &Self::Config, + namespace: &NamespaceName, + frame_no: u64, + dest_path: &Path, ) -> Result>>; /// Fetch meta for `namespace` fn meta( &self, - _config: &Self::Config, - _namespace: NamespaceName, + config: &Self::Config, + namespace: &NamespaceName, ) -> impl Future> + Send; - /// Fetch meta batch - /// implemented in terms of `meta`, can be specialized if implementation is able to query a - /// batch more efficiently. - async fn meta_batch( - &self, - _config: &Self::Config, - _namespaces: Vec, - ) -> Result> { - todo!() - } - - /// Restore namespace, and return the frame index. - /// The default implementation is implemented in terms of fetch_segment, but it can be - /// overridden for a more specific implementation if available; for example, a remote storage - /// server could directly stream the necessary pages, rather than fetching segments until - /// fully restored. - fn restore( + async fn restore( &self, - _config: &Self::Config, - _restore_options: RestoreOptions, - _dest: impl AsyncWrite, - ) -> Result { - todo!("provide default restore implementation") - } + config: &Self::Config, + namespace: &NamespaceName, + restore_options: RestoreOptions, + dest: impl FileExt, + ) -> Result<()>; /// Returns the default configuration for this storage fn default_config(&self) -> Arc; @@ -113,7 +88,7 @@ impl Backend for Arc { async fn fetch_segment( &self, config: &Self::Config, - namespace: NamespaceName, + namespace: &NamespaceName, frame_no: u64, dest_path: &Path, ) -> Result>> { @@ -122,11 +97,23 @@ impl Backend for Arc { .await } - async fn meta(&self, config: &Self::Config, namespace: NamespaceName) -> Result { + async fn meta(&self, config: &Self::Config, namespace: &NamespaceName) -> Result { self.as_ref().meta(config, namespace).await } fn default_config(&self) -> Arc { self.as_ref().default_config() } + + async fn restore( + &self, + config: &Self::Config, + namespace: &NamespaceName, + restore_options: RestoreOptions, + dest: impl FileExt, + ) -> Result<()> { + self.as_ref() + .restore(config, namespace, restore_options, dest) + .await + } } diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 8b1451a5e2..7ed3820dd5 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -13,14 +13,20 @@ use aws_sdk_s3::primitives::{ByteStream, SdkBody}; use aws_sdk_s3::types::CreateBucketConfiguration; use aws_sdk_s3::Client; use bytes::{Bytes, BytesMut}; -use http_body::{Frame, SizeHint}; +use http_body::{Frame as HttpFrame, SizeHint}; use libsql_sys::name::NamespaceName; +use roaring::RoaringBitmap; +use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; use tokio_util::sync::ReusableBoxFuture; +use zerocopy::{AsBytes, FromZeroes}; use super::{Backend, SegmentMeta}; +use crate::io::buf::ZeroCopyBuf; use crate::io::compat::copy_to_file; use crate::io::{FileExt, Io, StdIO}; -use crate::storage::{Error, Result}; +use crate::segment::compacted::CompactedSegmentDataHeader; +use crate::segment::Frame; +use crate::storage::{Error, RestoreOptions, Result}; pub struct S3Backend { client: Client, @@ -91,20 +97,28 @@ impl S3Backend { }) } - async fn fetch_segment_data( + async fn fetch_segment_data_reader( &self, config: &S3Config, folder_key: &FolderKey<'_>, segment_key: &SegmentKey, - dest_path: &Path, - ) -> Result<()> { + ) -> Result { let key = s3_segment_data_key(folder_key, segment_key); let stream = self.s3_get(config, key).await?; - let reader = stream.into_async_read(); - // TODO: make open async - let file = self.io.open(false, false, true, dest_path)?; - copy_to_file(reader, file).await?; + Ok(stream.into_async_read()) + } + async fn fetch_segment_data( + &self, + config: &S3Config, + folder_key: &FolderKey<'_>, + segment_key: &SegmentKey, + file: &impl FileExt, + ) -> Result<()> { + let reader = self + .fetch_segment_data_reader(config, folder_key, segment_key) + .await?; + copy_to_file(reader, file).await?; Ok(()) } @@ -116,10 +130,22 @@ impl S3Backend { .key(key) .send() .await - .unwrap() + .map_err(|e| Error::unhandled(e, "error sending s3 GET request"))? .body) } + async fn s3_put(&self, config: &S3Config, key: String, body: ByteStream) -> Result<()> { + self.client + .put_object() + .bucket(&config.bucket) + .body(body) + .key(key) + .send() + .await + .map_err(|e| Error::unhandled(e, "error sending s3 PUT request"))?; + Ok(()) + } + async fn fetch_segment_index( &self, config: &S3Config, @@ -129,8 +155,12 @@ impl S3Backend { let s3_index_key = s3_segment_index_key(folder_key, segment_key); let stream = self.s3_get(config, s3_index_key).await?; // TODO: parse header, check if too large to fit memory - let bytes = stream.collect().await.unwrap().to_vec(); - let index = fst::Map::new(bytes).unwrap(); + let bytes = stream + .collect() + .await + .map_err(|e| Error::unhandled(e, ""))? + .to_vec(); + let index = fst::Map::new(bytes).map_err(|_| Error::InvalidIndex)?; Ok(index) } @@ -150,23 +180,100 @@ impl S3Backend { .start_after(lookup_key) .send() .await - .unwrap(); + .map_err(|e| Error::unhandled(e, "failed to list bucket"))?; let Some(contents) = objects.contents().first() else { return Ok(None); }; - let key = contents.key().unwrap(); + let key = contents.key().expect("misssing key?"); let key_path: &Path = key.as_ref(); let segment_key: SegmentKey = key_path .file_stem() - .unwrap() + .expect("invalid key") .to_str() - .unwrap() + .expect("invalid key") .parse() - .unwrap(); + .expect("invalid key"); Ok(Some(segment_key)) } + + // This method could probably be optimized a lot by using indexes and only downloading useful + // segments + async fn restore_latest( + &self, + config: &S3Config, + namespace: &NamespaceName, + dest: impl FileExt, + ) -> Result<()> { + let folder_key = FolderKey { + cluster_id: &config.cluster_id, + namespace, + }; + let Some(latest_key) = self.find_segment(config, &folder_key, u64::MAX).await? else { + tracing::info!("nothing to restore for {namespace}"); + return Ok(()); + }; + + let reader = self + .fetch_segment_data_reader(config, &folder_key, &latest_key) + .await?; + let mut reader = BufReader::new(reader); + let mut header: CompactedSegmentDataHeader = CompactedSegmentDataHeader::new_zeroed(); + reader.read_exact(header.as_bytes_mut()).await?; + let db_size = header.size_after.get(); + let mut seen = RoaringBitmap::new(); + let mut frame: Frame = Frame::new_zeroed(); + loop { + for _ in 0..header.frame_count.get() { + reader.read_exact(frame.as_bytes_mut()).await?; + let page_no = frame.header().page_no(); + if !seen.contains(page_no) { + seen.insert(page_no); + let offset = (page_no as u64 - 1) * 4096; + let buf = ZeroCopyBuf::new_init(frame).map_slice(|f| f.get_ref().data()); + let (buf, ret) = dest.write_all_at_async(buf, offset).await; + ret?; + frame = buf.into_inner().into_inner(); + } + } + + // db is restored + if seen.len() == db_size as u64 { + break; + } + + let next_frame_no = header.start_frame_no.get() - 1; + let Some(key) = self + .find_segment(config, &folder_key, next_frame_no) + .await? + else { + todo!("there should be a segment!"); + }; + let r = self + .fetch_segment_data_reader(config, &folder_key, &key) + .await?; + reader = BufReader::new(r); + reader.read_exact(header.as_bytes_mut()).await?; + } + + Ok(()) + } + + async fn fetch_segment_from_key( + &self, + config: &S3Config, + folder_key: &FolderKey<'_>, + segment_key: &SegmentKey, + dest_file: &impl FileExt, + ) -> Result>> { + let (_, index) = tokio::try_join!( + self.fetch_segment_data(config, &folder_key, &segment_key, dest_file), + self.fetch_segment_index(config, &folder_key, &segment_key), + )?; + + Ok(index) + } } pub struct S3Config { @@ -194,12 +301,12 @@ pub struct S3Config { /// let meta = SegmentMeta { start_frame_no: 101, end_frame_no: 1000 }; /// map.insert(SegmentKey(&meta).to_string(), meta); /// -/// dbg!(map.range(format!("{:019}", u64::MAX - 50)..).next()); -/// dbg!(map.range(format!("{:019}", u64::MAX - 0)..).next()); -/// dbg!(map.range(format!("{:019}", u64::MAX - 1)..).next()); -/// dbg!(map.range(format!("{:019}", u64::MAX - 100)..).next()); -/// dbg!(map.range(format!("{:019}", u64::MAX - 101)..).next()); -/// dbg!(map.range(format!("{:019}", u64::MAX - 5000)..).next()); +/// map.range(format!("{:019}", u64::MAX - 50)..).next(); +/// map.range(format!("{:019}", u64::MAX - 0)..).next(); +/// map.range(format!("{:019}", u64::MAX - 1)..).next(); +/// map.range(format!("{:019}", u64::MAX - 100)..).next(); +/// map.range(format!("{:019}", u64::MAX - 101)..).next(); +/// map.range(format!("{:019}", u64::MAX - 5000)..).next(); /// ``` #[derive(Debug, Clone, Copy)] pub struct SegmentKey { @@ -297,28 +404,14 @@ where let body = FileStreamBody::new(segment_data).into_byte_stream(); - self.client - .put_object() - .bucket(&self.default_config.bucket) - .body(body) - .key(s3_data_key) - .send() - .await - .unwrap(); + self.s3_put(config, s3_data_key, body).await?; let s3_index_key = s3_segment_index_key(&folder_key, &segment_key); // TODO: store meta about the index? let body = ByteStream::from(segment_index); - self.client - .put_object() - .bucket(&self.default_config.bucket) - .body(body) - .key(s3_index_key) - .send() - .await - .unwrap(); + self.s3_put(config, s3_index_key, body).await?; Ok(()) } @@ -326,7 +419,7 @@ where async fn fetch_segment( &self, config: &Self::Config, - namespace: NamespaceName, + namespace: &NamespaceName, frame_no: u64, dest_path: &Path, ) -> Result>> { @@ -338,19 +431,22 @@ where let Some(segment_key) = self.find_segment(config, &folder_key, frame_no).await? else { return Err(Error::FrameNotFound(frame_no)); }; - if segment_key.includes(frame_no) { - let (_, index) = tokio::try_join!( - self.fetch_segment_data(config, &folder_key, &segment_key, dest_path), - self.fetch_segment_index(config, &folder_key, &segment_key), - )?; - Ok(index) + if segment_key.includes(frame_no) { + // TODO: make open async + let file = self.io.open(false, false, true, dest_path)?; + self.fetch_segment_from_key(config, &folder_key, &segment_key, &file) + .await } else { - todo!("not found"); + return Err(Error::FrameNotFound(frame_no)); } } - async fn meta(&self, config: &Self::Config, namespace: NamespaceName) -> Result { + async fn meta( + &self, + config: &Self::Config, + namespace: &NamespaceName, + ) -> Result { // request a key bigger than any other to get the last segment let folder_key = FolderKey { cluster_id: &config.cluster_id, @@ -367,6 +463,19 @@ where fn default_config(&self) -> Arc { self.default_config.clone() } + + async fn restore( + &self, + config: &Self::Config, + namespace: &NamespaceName, + restore_options: RestoreOptions, + dest: impl FileExt, + ) -> Result<()> { + match restore_options { + RestoreOptions::Latest => self.restore_latest(config, &namespace, dest).await, + RestoreOptions::Timestamp(_) => todo!(), + } + } } #[derive(Clone, Copy)] @@ -422,7 +531,7 @@ where fn poll_frame( mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>, - ) -> Poll, Self::Error>>> { + ) -> Poll, Self::Error>>> { loop { match self.state { StreamState::Init => { @@ -446,7 +555,7 @@ where } else { self.state = StreamState::Init; self.current_offset += buf.len() as u64; - return Poll::Ready(Some(Ok(Frame::data(buf)))); + return Poll::Ready(Some(Ok(HttpFrame::data(buf)))); } } Poll::Ready(Err(e)) => { @@ -558,7 +667,7 @@ mod tests { .await .unwrap(); - let db_meta = storage.meta(&s3_config, ns.clone()).await.unwrap(); + let db_meta = storage.meta(&s3_config, &ns).await.unwrap(); assert_eq!(db_meta.max_frame_no, 64); let mut builder = MapBuilder::memory(); @@ -580,31 +689,31 @@ mod tests { .await .unwrap(); - let db_meta = storage.meta(&s3_config, ns.clone()).await.unwrap(); + let db_meta = storage.meta(&s3_config, &ns).await.unwrap(); assert_eq!(db_meta.max_frame_no, 128); let tmp = NamedTempFile::new().unwrap(); let index = storage - .fetch_segment(&s3_config, ns.clone(), 1, tmp.path()) + .fetch_segment(&s3_config, &ns, 1, tmp.path()) .await .unwrap(); assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42); let index = storage - .fetch_segment(&s3_config, ns.clone(), 63, tmp.path()) + .fetch_segment(&s3_config, &ns, 63, tmp.path()) .await .unwrap(); assert_eq!(index.get(42u32.to_be_bytes()).unwrap(), 42); let index = storage - .fetch_segment(&s3_config, ns.clone(), 64, tmp.path()) + .fetch_segment(&s3_config, &ns, 64, tmp.path()) .await .unwrap(); assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44); let index = storage - .fetch_segment(&s3_config, ns.clone(), 65, tmp.path()) + .fetch_segment(&s3_config, &ns, 65, tmp.path()) .await .unwrap(); assert_eq!(index.get(44u32.to_be_bytes()).unwrap(), 44); diff --git a/libsql-wal/src/storage/error.rs b/libsql-wal/src/storage/error.rs index ac7c5aefe0..509e993ad7 100644 --- a/libsql-wal/src/storage/error.rs +++ b/libsql-wal/src/storage/error.rs @@ -1,3 +1,5 @@ +use std::panic::Location; + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("io error: {0}")] @@ -12,10 +14,15 @@ pub enum Error { UnhandledStorageError { error: Box, context: String, + loc: String, }, + // We may recover from this error, and rebuild the index from the data file. + #[error("invalid index")] + InvalidIndex, } impl Error { + #[track_caller] pub(crate) fn unhandled( e: impl std::error::Error + Send + Sync + 'static, ctx: impl Into, @@ -23,6 +30,7 @@ impl Error { Self::UnhandledStorageError { error: Box::new(e), context: ctx.into(), + loc: Location::caller().to_string(), } } } diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index f94dfc8664..4ee027dc27 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -115,6 +115,7 @@ mod test { use crate::io::file::FileExt; use crate::io::StdIO; + use crate::storage::RestoreOptions; // use crate::registry::WalRegistry; // use crate::segment::compacted::CompactedSegmentDataHeader; // use crate::segment::sealed::SealedSegment; @@ -444,7 +445,7 @@ mod test { async fn fetch_segment( &self, _config: &Self::Config, - _namespace: NamespaceName, + _namespace: &NamespaceName, _frame_no: u64, _dest_path: &Path, ) -> Result>> { @@ -454,7 +455,7 @@ mod test { async fn meta( &self, _config: &Self::Config, - _namespace: NamespaceName, + _namespace: &NamespaceName, ) -> Result { todo!() } @@ -462,6 +463,16 @@ mod test { fn default_config(&self) -> Arc { Arc::new(()) } + + async fn restore( + &self, + _config: &Self::Config, + _namespace: &NamespaceName, + _restore_options: RestoreOptions, + _dest: impl FileExt, + ) -> Result<()> { + todo!() + } } let job = Job { diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index aac1ae41a5..29fcc159e9 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -18,6 +18,11 @@ mod scheduler; pub type Result = std::result::Result; +pub enum RestoreOptions { + Latest, + Timestamp(DateTime), +} + pub trait Storage: Send + Sync + 'static { type Segment: Segment; type Config; @@ -30,11 +35,25 @@ pub trait Storage: Send + Sync + 'static { config_override: Option>, ); - fn durable_frame_no( + fn durable_frame_no_sync( + &self, + namespace: &NamespaceName, + config_override: Option>, + ) -> u64; + + async fn durable_frame_no( &self, namespace: &NamespaceName, config_override: Option>, ) -> u64; + + async fn restore( + &self, + file: impl FileExt, + namespace: &NamespaceName, + restore_options: RestoreOptions, + config_override: Option>, + ) -> Result<()>; } /// a placeholder storage that doesn't store segment @@ -53,10 +72,28 @@ impl Storage for NoStorage { ) { } - fn durable_frame_no( + async fn durable_frame_no( + &self, + namespace: &NamespaceName, + config: Option>, + ) -> u64 { + self.durable_frame_no_sync(namespace, config) + } + + async fn restore( &self, + _file: impl FileExt, _namespace: &NamespaceName, - _config: Option>, + _restore_options: RestoreOptions, + _config_override: Option>, + ) -> Result<()> { + panic!("can restore from no storage") + } + + fn durable_frame_no_sync( + &self, + _namespace: &NamespaceName, + _config_override: Option>, ) -> u64 { u64::MAX } @@ -90,13 +127,31 @@ impl Storage for TestStorage { ) { } - fn durable_frame_no( + async fn durable_frame_no( &self, _namespace: &NamespaceName, _config: Option>, ) -> u64 { u64::MAX } + + async fn restore( + &self, + _file: impl FileExt, + _namespace: &NamespaceName, + _restore_options: RestoreOptions, + _config_override: Option>, + ) -> Result<()> { + todo!(); + } + + fn durable_frame_no_sync( + &self, + _namespace: &NamespaceName, + _config_override: Option>, + ) -> u64 { + u64::MAX + } } #[derive(Debug)] diff --git a/libsql-wal/tests/misc.rs b/libsql-wal/tests/misc.rs index 30eba38d40..23f9442342 100644 --- a/libsql-wal/tests/misc.rs +++ b/libsql-wal/tests/misc.rs @@ -1,7 +1,7 @@ use libsql_wal::test::TestEnv; -#[test] -fn transaction_rollback() { +#[tokio::test] +async fn transaction_rollback() { let env = TestEnv::new(); let mut conn1 = env.open_conn("test"); let conn2 = env.open_conn("test"); @@ -50,8 +50,8 @@ fn transaction_rollback() { .unwrap(); } -#[test] -fn transaction_savepoints() { +#[tokio::test] +async fn transaction_savepoints() { let env = TestEnv::new(); let mut conn = env.open_conn("test");