diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index e1b346bb5c..46fd5c768e 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -115,6 +115,7 @@ mod stats; #[cfg(test)] mod test; mod utils; +pub mod wal_toolkit; const DB_CREATE_TIMEOUT: Duration = Duration::from_secs(1); const DEFAULT_AUTO_CHECKPOINT: u32 = 1000; diff --git a/libsql-server/src/main.rs b/libsql-server/src/main.rs index 171467bdb1..f2b2d76d70 100644 --- a/libsql-server/src/main.rs +++ b/libsql-server/src/main.rs @@ -9,6 +9,7 @@ use bytesize::ByteSize; use clap::Parser; use hyper::client::HttpConnector; use libsql_server::auth::{parse_http_basic_auth_arg, parse_jwt_keys, user_auth_strategies, Auth}; +use libsql_server::wal_toolkit::{S3Args, WalToolkit}; use tokio::sync::Notify; use tokio::time::Duration; use tracing_subscriber::util::SubscriberInitExt; @@ -317,6 +318,14 @@ enum UtilsSubcommands { #[clap(long)] auth: Option, }, + WalToolkit { + #[arg(long, short, default_value = ".compactor")] + path: PathBuf, + #[clap(flatten)] + s3_args: S3Args, + #[clap(subcommand)] + command: WalToolkit, + }, } impl Cli { @@ -736,6 +745,13 @@ async fn main() -> Result<()> { client.run_namespace(ns).await?; } } + UtilsSubcommands::WalToolkit { + command, + path, + s3_args, + } => { + command.run(path, s3_args).await?; + } } return Ok(()); diff --git a/libsql-server/src/wal_toolkit.rs b/libsql-server/src/wal_toolkit.rs new file mode 100644 index 0000000000..6b9dbc141d --- /dev/null +++ b/libsql-server/src/wal_toolkit.rs @@ -0,0 +1,212 @@ +use std::path::{Path, PathBuf}; + +use anyhow::Context as _; +use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; +use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; +use libsql_wal::io::StdIO; +use libsql_wal::storage::backend::s3::S3Backend; +use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy; +use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy; +use libsql_wal::storage::compaction::strategy::PartitionStrategy; +use libsql_wal::storage::compaction::Compactor; + +#[derive(Clone, Debug, clap::ValueEnum)] +pub enum CompactStrategy { + Logarithmic, + CompactAll, +} + +#[derive(Debug, clap::Subcommand)] +pub enum WalToolkit { + /// Register namespaces to monitor + Monitor { namespace: String }, + /// Analyze segments for a namespaces + Analyze { + /// list all segments + #[clap(long)] + list_all: bool, + namespace: String, + }, + /// Compact segments into bigger segments + Compact { + /// compaction strategy + #[clap(long, short)] + strategy: CompactStrategy, + /// prints the compaction plan, but doesn't perform it. + #[clap(long)] + dry_run: bool, + namespace: String, + }, + /// Sync namespace metadata from remote storage + Sync { + /// When performing a full sync, all the segment space is scanned again. By default, only + /// segments with frame_no greated that the last frame_no are retrieved. + #[clap(long)] + full: bool, + /// unless this is specified, all monitored namespaces are synced + namespace: Option, + }, + /// Restore namespace + Restore { + #[clap(long)] + verify: bool, + namespace: String, + out: PathBuf, + }, +} + +impl WalToolkit { + pub async fn run(&self, compact_path: &Path, s3_args: &S3Args) -> anyhow::Result<()> { + let backend = setup_storage(s3_args).await?; + tokio::fs::create_dir_all(compact_path).await?; + let mut compactor = Compactor::new(backend.into(), compact_path)?; + match self { + Self::Monitor { namespace } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); + compactor.monitor(&namespace).await?; + println!("monitoring {namespace}"); + } + Self::Analyze { + namespace, + list_all, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + println!("stats for {namespace}:"); + println!("- segment count: {}", analysis.segment_count()); + println!("- last frame_no: {}", analysis.last_frame_no()); + let set = analysis.shortest_restore_path(); + println!("- shortest restore path len: {}", set.len()); + if let Some((first, last)) = compactor.get_segment_range(&namespace)? { + println!( + "- oldest segment: {}-{} ({})", + first.key.start_frame_no, first.key.end_frame_no, first.created_at + ); + println!( + "- most recent segment: {}-{} ({})", + last.key.start_frame_no, last.key.end_frame_no, last.created_at + ); + } + + if *list_all { + println!("segments:"); + compactor.list_all(&namespace, |info| { + println!( + "- {}-{} ({})", + info.key.start_frame_no, info.key.end_frame_no, info.created_at + ); + })?; + } + } + Self::Compact { + strategy, + dry_run, + namespace, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + let strat: Box = match strategy { + CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), + CompactStrategy::CompactAll => Box::new(IdentityStrategy), + }; + let set = analysis.shortest_restore_path(); + let partition = strat.partition(&set); + + println!("initial shortest restore path len: {}", set.len()); + println!("compacting into {} segments", partition.len()); + for set in partition.iter() { + println!("- {:?}", set.range().unwrap()); + } + if *dry_run { + println!("dry run: stopping"); + } else { + println!("performing compaction"); + let part_len = partition.len(); + for (idx, set) in partition.into_iter().enumerate() { + let Some((start, end)) = set.range() else { + continue; + }; + println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); + // TODO: we can compact in conccurently + compactor.compact(set).await?; + } + } + } + Self::Sync { full, namespace } => match namespace { + Some(_ns) => { + todo!() + } + None if *full => { + compactor.sync_full().await?; + println!("all monitored namespace fully up to date."); + } + _ => todo!(), + }, + Self::Restore { + namespace, + out, + verify, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + let set = analysis.shortest_restore_path(); + compactor.restore(set, &out).await?; + if *verify { + let conn = libsql_sys::rusqlite::Connection::open(&out)?; + conn.pragma_query(None, "integrity_check", |r| { + println!("{r:?}"); + Ok(()) + })?; + } + } + } + + Ok(()) + } +} + +async fn setup_storage(opt: &S3Args) -> anyhow::Result> { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + + let mut builder = config.into_builder(); + builder.set_endpoint_url(opt.s3_url.clone()); + builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into()); + builder.set_region(Region::new( + opt.s3_region_id.clone().expect("expected aws region"), + )); + let cred = Credentials::new( + opt.s3_access_key_id.as_ref().unwrap(), + opt.s3_access_key.as_ref().unwrap(), + None, + None, + "Static", + ); + builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred))); + let config = builder.build(); + let backend = S3Backend::from_sdk_config( + config, + opt.s3_bucket.clone().context("missing bucket id")?, + opt.cluster_id.clone().context("missing cluster id")?, + ) + .await?; + + Ok(backend) +} + +#[derive(Debug, clap::Args)] +pub struct S3Args { + #[arg(long, requires = "S3Args")] + enable_s3: bool, + #[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")] + cluster_id: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")] + s3_url: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")] + s3_access_key: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")] + s3_access_key_id: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")] + s3_bucket: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")] + s3_region_id: Option, +} diff --git a/libsql-wal/Cargo.toml b/libsql-wal/Cargo.toml index f8870f01e9..ca32f34f32 100644 --- a/libsql-wal/Cargo.toml +++ b/libsql-wal/Cargo.toml @@ -68,7 +68,7 @@ name = "benchmarks" harness = false [features] -default = ["s3", "shell-bin"] +default = ["s3"] s3 = [ "dep:hyper", "dep:aws-smithy-runtime", @@ -76,20 +76,3 @@ s3 = [ "dep:aws-config", "dep:aws-credential-types", ] -shell-bin = [ - "dep:clap", - "dep:inquire", - "s3", - "dep:tracing-subscriber", - "dep:anyhow", -] - -[[bin]] -name = "shell" -path = "src/bins/shell/main.rs" -required-features = ["shell-bin"] - -[[bin]] -name = "compactor" -path = "src/bins/compactor/main.rs" -required-features = ["shell-bin"] diff --git a/libsql-wal/src/bins/compactor/main.rs b/libsql-wal/src/bins/compactor/main.rs deleted file mode 100644 index 1818ec3e37..0000000000 --- a/libsql-wal/src/bins/compactor/main.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::path::PathBuf; - -use anyhow::Context; -use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; -use aws_credential_types::Credentials; -use aws_sdk_s3::config::SharedCredentialsProvider; -use clap::{Parser, ValueEnum}; -use libsql_wal::io::StdIO; -use libsql_wal::storage::backend::s3::S3Backend; -use libsql_wal::storage::compaction::strategy::{ - identity::IdentityStrategy, log_strategy::LogReductionStrategy, PartitionStrategy, -}; -use libsql_wal::storage::compaction::Compactor; - -#[derive(Debug, clap::Args)] -struct S3Args { - #[arg(long, requires = "S3Args")] - enable_s3: bool, - #[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")] - cluster_id: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")] - s3_url: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")] - s3_access_key: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")] - s3_access_key_id: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")] - s3_bucket: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")] - s3_region_id: Option, -} - -#[derive(Clone, Debug, ValueEnum)] -enum CompactStrategy { - Logarithmic, - CompactAll, -} - -#[derive(Debug, clap::Parser)] -struct Command { - #[arg(long, short, default_value = "compactor")] - path: PathBuf, - #[command(flatten)] - s3_args: S3Args, - #[command(subcommand)] - subcommand: Subcommand, -} - -#[derive(Debug, clap::Subcommand)] -enum Subcommand { - /// Register namespaces to monitor - Monitor { namespace: String }, - /// Analyze segments for a namespaces - Analyze { - /// list all segments - #[clap(long)] - list_all: bool, - namespace: String, - }, - /// Compact segments into bigger segments - Compact { - // compaction strategy - #[clap(long, short)] - strategy: CompactStrategy, - /// prints the compaction plan, but doesn't perform it. - #[clap(long)] - dry_run: bool, - namespace: String, - }, - /// Sync namespace metadata from remote storage - Sync { - /// When performing a full sync, all the segment space is scanned again. By default, only - /// segments with frame_no greated that the last frame_no are retrieved. - #[clap(long)] - full: bool, - /// unless this is specified, all monitored namespaces are synced - namespace: Option, - }, - /// Restore namespace - Restore { - #[clap(long)] - verify: bool, - namespace: String, - out: PathBuf, - }, -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - let cmd: Command = Command::parse(); - - let backend = setup_storage(&cmd.s3_args).await?; - tokio::fs::create_dir_all(&cmd.path).await?; - let mut compactor = Compactor::new(backend.into(), &cmd.path)?; - match cmd.subcommand { - Subcommand::Monitor { namespace } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace); - compactor.monitor(&namespace).await?; - println!("monitoring {namespace}"); - } - Subcommand::Analyze { - namespace, - list_all, - } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace); - let analysis = compactor.analyze(&namespace)?; - println!("stats for {namespace}:"); - println!("- segment count: {}", analysis.segment_count()); - println!("- last frame_no: {}", analysis.last_frame_no()); - let set = analysis.shortest_restore_path(); - println!("- shortest restore path len: {}", set.len()); - if let Some((first, last)) = compactor.get_segment_range(&namespace)? { - println!( - "- oldest segment: {}-{} ({})", - first.key.start_frame_no, first.key.end_frame_no, first.created_at - ); - println!( - "- most recent segment: {}-{} ({})", - last.key.start_frame_no, last.key.end_frame_no, last.created_at - ); - } - - if list_all { - println!("segments:"); - compactor.list_all(&namespace, |info| { - println!( - "- {}-{} ({})", - info.key.start_frame_no, info.key.end_frame_no, info.created_at - ); - })?; - } - } - Subcommand::Compact { - strategy, - dry_run, - namespace, - } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace); - let analysis = compactor.analyze(&namespace)?; - let strat: Box = match strategy { - CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), - CompactStrategy::CompactAll => Box::new(IdentityStrategy), - }; - let set = analysis.shortest_restore_path(); - let partition = strat.partition(&set); - - println!("initial shortest restore path len: {}", set.len()); - println!("compacting into {} segments", partition.len()); - for set in partition.iter() { - println!("- {:?}", set.range().unwrap()); - } - if dry_run { - println!("dry run: stopping"); - } else { - println!("performing compaction"); - let part_len = partition.len(); - for (idx, set) in partition.into_iter().enumerate() { - let Some((start, end)) = set.range() else { - continue; - }; - println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); - // TODO: we can compact in conccurently - compactor.compact(set).await?; - } - } - } - Subcommand::Sync { full, namespace } => match namespace { - Some(_ns) => { - todo!() - } - None if full => { - compactor.sync_full().await?; - println!("all monitored namespace fully up to date."); - } - _ => todo!(), - }, - Subcommand::Restore { - namespace, - out, - verify, - } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace); - let analysis = compactor.analyze(&namespace)?; - let set = analysis.shortest_restore_path(); - compactor.restore(set, &out).await?; - if verify { - let conn = libsql_sys::rusqlite::Connection::open(&out)?; - conn.pragma_query(None, "integrity_check", |r| { - println!("{r:?}"); - Ok(()) - })?; - } - } - } - - Ok(()) -} - -async fn setup_storage(opt: &S3Args) -> anyhow::Result> { - let config = aws_config::load_defaults(BehaviorVersion::latest()).await; - - let mut builder = config.into_builder(); - builder.set_endpoint_url(opt.s3_url.clone()); - builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into()); - builder.set_region(Region::new( - opt.s3_region_id.clone().expect("expected aws region"), - )); - let cred = Credentials::new( - opt.s3_access_key_id.as_ref().unwrap(), - opt.s3_access_key.as_ref().unwrap(), - None, - None, - "Static", - ); - builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred))); - let config = builder.build(); - let backend = S3Backend::from_sdk_config( - config, - opt.s3_bucket.clone().context("missing bucket id")?, - opt.cluster_id.clone().context("missing cluster id")?, - ) - .await?; - - Ok(backend) -} diff --git a/libsql-wal/src/bins/shell/main.rs b/libsql-wal/src/bins/shell/main.rs deleted file mode 100644 index 39497f6fe1..0000000000 --- a/libsql-wal/src/bins/shell/main.rs +++ /dev/null @@ -1,289 +0,0 @@ -use std::fs::OpenOptions; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use aws_config::{BehaviorVersion, Region}; -use aws_credential_types::Credentials; -use aws_sdk_s3::config::SharedCredentialsProvider; -use clap::{Parser, ValueEnum}; -use libsql_wal::checkpointer::LibsqlCheckpointer; -use tokio::task::{block_in_place, JoinSet}; - -use libsql_sys::name::NamespaceName; -use libsql_sys::rusqlite::OpenFlags; -use libsql_wal::io::StdIO; -use libsql_wal::registry::WalRegistry; -use libsql_wal::segment::sealed::SealedSegment; -use libsql_wal::storage::async_storage::{AsyncStorage, AsyncStorageInitConfig}; -use libsql_wal::storage::backend::s3::S3Backend; -use libsql_wal::storage::Storage; -use libsql_wal::wal::LibsqlWalManager; - -#[derive(Debug, clap::Parser)] -struct Cli { - #[command(flatten)] - s3_args: S3Args, - #[arg(long, short = 'n')] - namespace: String, - #[command(subcommand)] - subcommand: Subcommand, -} - -#[derive(Debug, clap::Args)] -#[group( - required = false, - multiple = true, - requires_all = [ - "s3_url", - "s3_access_key", - "s3_access_key_id", - "s3_bucket", - "cluster_id", - ])] - -struct S3Args { - #[arg(long, requires = "S3Args")] - enable_s3: bool, - #[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")] - cluster_id: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")] - s3_url: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")] - s3_access_key: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")] - s3_access_key_id: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")] - s3_bucket: Option, - #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")] - s3_region_id: Option, -} - -#[derive(Copy, Clone, PartialEq, Eq, PartialOrd, Ord, ValueEnum, Debug)] -enum RestoreOptions { - Latest, -} - -#[derive(Debug, clap::Subcommand)] -enum Subcommand { - Shell { - #[arg(long, short = 'p')] - db_path: PathBuf, - }, - Infos, - Restore { - #[arg(long)] - from: RestoreOptions, - #[arg(long, short)] - path: PathBuf, - }, -} - -#[tokio::main] -async fn main() { - let cli: Cli = Cli::parse(); - let mut join_set = JoinSet::new(); - - if cli.s3_args.enable_s3 { - let storage = setup_s3_storage(&cli, &mut join_set).await; - handle(&cli, storage, &mut join_set).await; - } else { - todo!() - } - - while join_set.join_next().await.is_some() {} -} - -async fn handle(cli: &Cli, storage: S, join_set: &mut JoinSet<()>) -where - S: Storage>, -{ - match &cli.subcommand { - Subcommand::Shell { db_path } => { - let (sender, receiver) = tokio::sync::mpsc::channel(64); - let registry = - Arc::new(WalRegistry::new(db_path.clone(), storage.into(), sender).unwrap()); - let checkpointer = LibsqlCheckpointer::new(registry.clone(), receiver, 64); - join_set.spawn(checkpointer.run()); - run_shell( - registry, - &db_path, - NamespaceName::from_string(cli.namespace.clone()), - ) - .await; - } - Subcommand::Infos => handle_infos(&cli.namespace, storage).await, - Subcommand::Restore { from, path } => { - let namespace = NamespaceName::from_string(cli.namespace.clone()); - handle_restore(&namespace, storage, *from, path).await - } - } -} - -async fn handle_restore( - namespace: &NamespaceName, - storage: S, - _from: RestoreOptions, - db_path: &Path, -) where - S: Storage, -{ - let options = libsql_wal::storage::RestoreOptions::Latest; - let file = OpenOptions::new() - .create_new(true) - .write(true) - .open(db_path) - .unwrap(); - storage - .restore(file, &namespace, options, None) - .await - .unwrap(); -} - -async fn handle_infos(namespace: &str, storage: S) -where - S: Storage, -{ - let namespace = NamespaceName::from_string(namespace.to_owned()); - let durable = storage.durable_frame_no(&namespace, None).await.unwrap(); - println!("namespace: {namespace}"); - println!("max durable frame: {durable}"); -} - -async fn run_shell( - registry: Arc>, - db_path: &Path, - namespace: NamespaceName, -) where - S: Storage>, -{ - let db_path = db_path.join("dbs").join(namespace.as_str()); - tokio::fs::create_dir_all(&db_path).await.unwrap(); - let resolver = move |path: &Path| { - NamespaceName::from_string( - path.parent() - .unwrap() - .file_name() - .unwrap() - .to_str() - .unwrap() - .to_string(), - ) - }; - let wal_manager = LibsqlWalManager::new(registry.clone(), Arc::new(resolver)); - std::fs::create_dir_all(&db_path).unwrap(); - let path = db_path.join("data"); - let conn = block_in_place(|| { - libsql_sys::Connection::open( - path, - OpenFlags::SQLITE_OPEN_CREATE | OpenFlags::SQLITE_OPEN_READ_WRITE, - wal_manager.clone(), - 100000, - None, - ) - }) - .unwrap(); - - loop { - match inquire::Text::new(">").prompt() { - Ok(q) => { - if q.trim().starts_with(".") { - if handle_builtin(&q, ®istry, &namespace).await { - break; - } - continue; - } - - match block_in_place(|| conn.prepare(&q)) { - Ok(mut stmt) => { - match block_in_place(|| { - stmt.query_map((), |row| { - println!("{row:?}"); - Ok(()) - }) - }) { - Ok(rows) => block_in_place(|| { - rows.for_each(|_| ()); - }), - Err(e) => { - println!("error: {e}"); - continue; - } - } - } - Err(e) => { - println!("error: {e}"); - continue; - } - } - } - Err(_) => { - println!("invalid input") - } - } - } - - drop(conn); - - registry.shutdown().await.unwrap(); -} - -async fn handle_builtin( - q: &str, - registry: &WalRegistry, - namespace: &NamespaceName, -) -> bool { - match q { - ".quit" => return true, - ".seal_current" => match registry.get_async(namespace).await { - Some(shared) => { - shared.seal_current().unwrap(); - } - None => { - println!("wal not yet openned"); - } - }, - unknown => println!("unknown command: `{unknown}`"), - } - false -} - -async fn setup_s3_storage( - cli: &Cli, - join_set: &mut JoinSet<()>, -) -> AsyncStorage, SealedSegment> { - let cred = Credentials::new( - cli.s3_args.s3_access_key_id.as_ref().unwrap(), - cli.s3_args.s3_access_key.as_ref().unwrap(), - None, - None, - "", - ); - let config = aws_config::SdkConfig::builder() - .behavior_version(BehaviorVersion::latest()) - .region(Region::new( - cli.s3_args.s3_region_id.as_ref().unwrap().to_string(), - )) - .credentials_provider(SharedCredentialsProvider::new(cred)) - .endpoint_url(cli.s3_args.s3_url.as_ref().unwrap()) - .build(); - let backend = Arc::new( - S3Backend::from_sdk_config( - config.clone(), - cli.s3_args.s3_bucket.as_ref().unwrap().to_string(), - cli.s3_args.cluster_id.as_ref().unwrap().to_string(), - ) - .await - .unwrap(), - ); - let config = AsyncStorageInitConfig { - backend: backend.clone(), - max_in_flight_jobs: 16, - }; - let (storage, storage_loop) = AsyncStorage::new(config).await; - - join_set.spawn(async move { - storage_loop.run().await; - }); - - storage -}