Skip to content

Commit

Permalink
libsql wal restore (#1569)
Browse files Browse the repository at this point in the history
* make copy_to_file take ref to file

* turn compacted segment frame_count to u32

* add size after to compacted segment header

* make durable frame_no async

* pass namespace by ref

* add backend to async storage

* add restore to Storage

* add restore to shell

* fix shell bug

* error handling

* fix tests
  • Loading branch information
MarinPostma authored Jul 22, 2024
1 parent 759ffe0 commit 9db13fe
Show file tree
Hide file tree
Showing 13 changed files with 432 additions and 210 deletions.
165 changes: 98 additions & 67 deletions libsql-wal/src/bins/shell/main.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,25 @@
use std::fs::OpenOptions;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use aws_config::{BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::SharedCredentialsProvider;
use clap::Parser;
use libsql_wal::storage::backend::Backend;
use clap::{Parser, ValueEnum};
use tokio::task::{block_in_place, JoinSet};

use libsql_sys::name::NamespaceName;
use libsql_sys::rusqlite::{OpenFlags, OptionalExtension};
use libsql_sys::rusqlite::OpenFlags;
use libsql_wal::io::StdIO;
use libsql_wal::registry::WalRegistry;
use libsql_wal::segment::sealed::SealedSegment;
use libsql_wal::storage::async_storage::{AsyncStorage, AsyncStorageInitConfig};
use libsql_wal::storage::backend::s3::{S3Backend, S3Config};
use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::Storage;
use libsql_wal::wal::LibsqlWalManager;

#[derive(Debug, clap::Parser)]
struct Cli {
#[arg(long, short = 'p')]
db_path: PathBuf,
#[command(flatten)]
s3_args: S3Args,
#[arg(long, short = 'n')]
Expand Down Expand Up @@ -58,10 +56,24 @@ struct S3Args {
s3_region_id: Option<String>,
}

#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)]
enum RestoreOptions {
Latest,
}

#[derive(Debug, clap::Subcommand)]
enum Subcommand {
Shell,
Shell {
#[arg(long, short = 'p')]
db_path: PathBuf,
},
Infos,
Restore {
#[arg(long)]
from: RestoreOptions,
#[arg(long, short)]
path: PathBuf,
},
}

#[tokio::main]
Expand All @@ -70,63 +82,73 @@ async fn main() {
let mut join_set = JoinSet::new();

if cli.s3_args.enable_s3 {
let registry = setup_s3_registry(
&cli.db_path,
&cli.s3_args.s3_bucket.as_ref().unwrap(),
&cli.s3_args.cluster_id.as_ref().unwrap(),
&cli.s3_args.s3_url.as_ref().unwrap(),
&cli.s3_args.s3_region_id.as_ref().unwrap(),
&cli.s3_args.s3_access_key_id.as_ref().unwrap(),
&cli.s3_args.s3_access_key.as_ref().unwrap(),
&mut join_set,
)
.await;

handle(registry, &cli).await;
let storage = setup_s3_storage(&cli, &mut join_set).await;
handle(&cli, storage).await;
} else {
todo!()
}

while join_set.join_next().await.is_some() {}
}

async fn handle<S, B>(env: Env<S, B>, cli: &Cli)
async fn handle<S>(cli: &Cli, storage: S)
where
S: Storage<Segment = SealedSegment<std::fs::File>>,
B: Backend,
{
match cli.subcommand {
Subcommand::Shell => {
let path = cli.db_path.join("dbs").join(&cli.namespace);
match &cli.subcommand {
Subcommand::Shell { db_path } => {
let registry = WalRegistry::new(db_path.clone(), storage).unwrap();
run_shell(
env.registry,
&path,
registry,
&db_path,
NamespaceName::from_string(cli.namespace.clone()),
)
.await
.await;
}
Subcommand::Infos => handle_infos(&cli.namespace, storage).await,
Subcommand::Restore { from, path } => {
let namespace = NamespaceName::from_string(cli.namespace.clone());
handle_restore(&namespace, storage, *from, path).await
}
Subcommand::Infos => handle_infos(&cli.namespace, env).await,
}
}

async fn handle_infos<B, S>(namespace: &str, env: Env<S, B>)
where
B: Backend,
async fn handle_restore<S>(
namespace: &NamespaceName,
storage: S,
_from: RestoreOptions,
db_path: &Path,
) where
S: Storage,
{
let namespace = NamespaceName::from_string(namespace.to_owned());
let meta = env
.backend
.meta(&env.backend.default_config(), namespace.clone())
let options = libsql_wal::storage::RestoreOptions::Latest;
let file = OpenOptions::new()
.create_new(true)
.write(true)
.open(db_path)
.unwrap();
storage
.restore(file, &namespace, options, None)
.await
.unwrap();
}

async fn handle_infos<S>(namespace: &str, storage: S)
where
S: Storage,
{
let namespace = NamespaceName::from_string(namespace.to_owned());
let durable = storage.durable_frame_no(&namespace, None).await;
println!("namespace: {namespace}");
println!("max durable frame: {}", meta.max_frame_no);
println!("max durable frame: {durable}");
}

async fn run_shell<S>(registry: WalRegistry<StdIO, S>, db_path: &Path, namespace: NamespaceName)
where
S: Storage<Segment = SealedSegment<std::fs::File>>,
{
let db_path = db_path.join("dbs").join(namespace.as_str());
tokio::fs::create_dir_all(&db_path).await.unwrap();
let registry = Arc::new(registry);
let resolver = move |path: &Path| {
NamespaceName::from_string(
Expand Down Expand Up @@ -163,14 +185,27 @@ where
continue;
}

if let Err(e) = block_in_place(|| {
conn.query_row(&q, (), |row| {
println!("{row:?}");
Ok(())
})
.optional()
}) {
println!("error: {e}");
match block_in_place(|| conn.prepare(&q)) {
Ok(mut stmt) => {
match block_in_place(|| {
stmt.query_map((), |row| {
println!("{row:?}");
Ok(())
})
}) {
Ok(rows) => block_in_place(|| {
rows.for_each(|_| ());
}),
Err(e) => {
println!("error: {e}");
continue;
}
}
}
Err(e) => {
println!("error: {e}");
continue;
}
}
}
Err(_) => {
Expand Down Expand Up @@ -204,33 +239,30 @@ async fn handle_builtin<S>(
false
}

struct Env<S, B: Backend> {
registry: WalRegistry<StdIO, S>,
backend: Arc<B>,
}

async fn setup_s3_registry(
db_path: &Path,
bucket_name: &str,
cluster_id: &str,
url: &str,
region_id: &str,
access_key_id: &str,
secret_access_key: &str,
async fn setup_s3_storage(
cli: &Cli,
join_set: &mut JoinSet<()>,
) -> Env<AsyncStorage<S3Config, SealedSegment<std::fs::File>>, S3Backend<StdIO>> {
let cred = Credentials::new(access_key_id, secret_access_key, None, None, "");
) -> AsyncStorage<S3Backend<StdIO>, SealedSegment<std::fs::File>> {
let cred = Credentials::new(
cli.s3_args.s3_access_key_id.as_ref().unwrap(),
cli.s3_args.s3_access_key.as_ref().unwrap(),
None,
None,
"",
);
let config = aws_config::SdkConfig::builder()
.behavior_version(BehaviorVersion::latest())
.region(Region::new(region_id.to_string()))
.region(Region::new(
cli.s3_args.s3_region_id.as_ref().unwrap().to_string(),
))
.credentials_provider(SharedCredentialsProvider::new(cred))
.endpoint_url(url)
.endpoint_url(cli.s3_args.s3_url.as_ref().unwrap())
.build();
let backend = Arc::new(
S3Backend::from_sdk_config(
config.clone(),
bucket_name.to_string(),
cluster_id.to_string(),
cli.s3_args.s3_bucket.as_ref().unwrap().to_string(),
cli.s3_args.cluster_id.as_ref().unwrap().to_string(),
)
.await
.unwrap(),
Expand All @@ -244,7 +276,6 @@ async fn setup_s3_registry(
join_set.spawn(async move {
storage_loop.run().await;
});
let path = db_path.join("wals");
let registry = WalRegistry::new(path, storage).unwrap();
Env { registry, backend }

storage
}
2 changes: 1 addition & 1 deletion libsql-wal/src/io/compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use super::FileExt;

/// Copy from src that implements AsyncRead to the detination file, returning how many bytes have
/// been copied
pub async fn copy_to_file<R, F>(mut src: R, dst: F) -> io::Result<usize>
pub async fn copy_to_file<R, F>(mut src: R, dst: &F) -> io::Result<usize>
where
F: FileExt,
R: AsyncRead + Unpin,
Expand Down
15 changes: 9 additions & 6 deletions libsql-wal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,23 +56,26 @@ pub mod test {

pub fn shared(&self, namespace: &str) -> Arc<SharedWal<StdIO>> {
let path = self.tmp.path().join(namespace).join("data");
self.registry
.clone()
.open(path.as_ref(), &NamespaceName::from_string(namespace.into()))
.unwrap()
let registry = self.registry.clone();
let namespace = NamespaceName::from_string(namespace.into());
registry.clone().open(path.as_ref(), &namespace).unwrap()
}

pub fn db_path(&self, namespace: &str) -> PathBuf {
self.tmp.path().join(namespace)
}

pub fn open_conn(&self, namespace: &str) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
pub fn open_conn(
&self,
namespace: &'static str,
) -> libsql_sys::Connection<LibsqlWal<StdIO>> {
let path = self.db_path(namespace);
let wal = self.wal.clone();
std::fs::create_dir_all(&path).unwrap();
libsql_sys::Connection::open(
path.join("data"),
OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE,
self.wal.clone(),
wal,
100000,
None,
)
Expand Down
3 changes: 2 additions & 1 deletion libsql-wal/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ where
let (new_frame_notifier, _) = tokio::sync::watch::channel(next_frame_no.get() - 1);

// TODO: pass config override here
let durable_frame_no = self.storage.durable_frame_no(&namespace, None).into();
let max_frame_no = self.storage.durable_frame_no_sync(&namespace, None);
let durable_frame_no = max_frame_no.into();

let shared = Arc::new(SharedWal {
current,
Expand Down
3 changes: 2 additions & 1 deletion libsql-wal/src/segment/compacted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ use zerocopy::{AsBytes, FromBytes, FromZeroes};
#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
#[repr(C)]
pub struct CompactedSegmentDataHeader {
pub(crate) frame_count: lu64,
pub(crate) frame_count: lu32,
pub(crate) segment_id: lu128,
pub(crate) start_frame_no: lu64,
pub(crate) end_frame_no: lu64,
pub(crate) size_after: lu32,
}

#[derive(Debug, AsBytes, FromZeroes, FromBytes)]
Expand Down
3 changes: 2 additions & 1 deletion libsql-wal/src/segment/sealed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,11 @@ where
let mut hasher = crc32fast::Hasher::new();

let header = CompactedSegmentDataHeader {
frame_count: (self.index().len() as u64).into(),
frame_count: (self.index().len() as u32).into(),
segment_id: id.as_u128().into(),
start_frame_no: self.header().start_frame_no,
end_frame_no: self.header().last_commited_frame_no,
size_after: self.header.size_after,
};

hasher.update(header.as_bytes());
Expand Down
Loading

0 comments on commit 9db13fe

Please sign in to comment.