diff --git a/Cargo.lock b/Cargo.lock index 6390595f76..cc5ba428fa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -679,6 +679,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f280f434214856abace637b1f944d50ccca216814813acd195cdd7f206ce17f" dependencies = [ "aws-smithy-types", + "chrono", "time", ] @@ -3310,6 +3311,7 @@ dependencies = [ name = "libsql-wal" version = "0.1.0" dependencies = [ + "anyhow", "arc-swap", "async-lock 3.4.0", "async-stream", @@ -3317,6 +3319,7 @@ dependencies = [ "aws-credential-types", "aws-sdk-s3", "aws-smithy-runtime", + "aws-smithy-types-convert", "bitflags 2.6.0", "bytes", "chrono", @@ -3337,6 +3340,7 @@ dependencies = [ "nix 0.28.0", "once_cell", "parking_lot", + "petgraph", "priority-queue 2.0.3", "rand", "rand_chacha", diff --git a/libsql-wal/Cargo.toml b/libsql-wal/Cargo.toml index f24f2e4c59..f8870f01e9 100644 --- a/libsql-wal/Cargo.toml +++ b/libsql-wal/Cargo.toml @@ -44,6 +44,9 @@ tracing-subscriber = { version = "0.3.18", optional = true } aws-credential-types = { version = "1.2.0", optional = true } dashmap = "6.0.1" rand = "0.8.5" +aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] } +petgraph = "0.6.5" +anyhow = { version = "1.0.86", optional = true } [dev-dependencies] criterion = "0.5.1" @@ -78,9 +81,15 @@ shell-bin = [ "dep:inquire", "s3", "dep:tracing-subscriber", + "dep:anyhow", ] [[bin]] name = "shell" path = "src/bins/shell/main.rs" required-features = ["shell-bin"] + +[[bin]] +name = "compactor" +path = "src/bins/compactor/main.rs" +required-features = ["shell-bin"] diff --git a/libsql-wal/src/bins/compactor/main.rs b/libsql-wal/src/bins/compactor/main.rs new file mode 100644 index 0000000000..1818ec3e37 --- /dev/null +++ b/libsql-wal/src/bins/compactor/main.rs @@ -0,0 +1,225 @@ +use std::path::PathBuf; + +use anyhow::Context; +use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; +use aws_credential_types::Credentials; +use aws_sdk_s3::config::SharedCredentialsProvider; +use clap::{Parser, ValueEnum}; +use libsql_wal::io::StdIO; +use libsql_wal::storage::backend::s3::S3Backend; +use libsql_wal::storage::compaction::strategy::{ + identity::IdentityStrategy, log_strategy::LogReductionStrategy, PartitionStrategy, +}; +use libsql_wal::storage::compaction::Compactor; + +#[derive(Debug, clap::Args)] +struct S3Args { + #[arg(long, requires = "S3Args")] + enable_s3: bool, + #[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")] + cluster_id: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")] + s3_url: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")] + s3_access_key: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")] + s3_access_key_id: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")] + s3_bucket: Option, + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")] + s3_region_id: Option, +} + +#[derive(Clone, Debug, ValueEnum)] +enum CompactStrategy { + Logarithmic, + CompactAll, +} + +#[derive(Debug, clap::Parser)] +struct Command { + #[arg(long, short, default_value = "compactor")] + path: PathBuf, + #[command(flatten)] + s3_args: S3Args, + #[command(subcommand)] + subcommand: Subcommand, +} + +#[derive(Debug, clap::Subcommand)] +enum Subcommand { + /// Register namespaces to monitor + Monitor { namespace: String }, + /// Analyze segments for a namespaces + Analyze { + /// list all segments + #[clap(long)] + list_all: bool, + namespace: String, + }, + /// Compact segments into bigger segments + Compact { + // compaction strategy + #[clap(long, short)] + strategy: CompactStrategy, + /// prints the compaction plan, but doesn't perform it. + #[clap(long)] + dry_run: bool, + namespace: String, + }, + /// Sync namespace metadata from remote storage + Sync { + /// When performing a full sync, all the segment space is scanned again. By default, only + /// segments with frame_no greated that the last frame_no are retrieved. + #[clap(long)] + full: bool, + /// unless this is specified, all monitored namespaces are synced + namespace: Option, + }, + /// Restore namespace + Restore { + #[clap(long)] + verify: bool, + namespace: String, + out: PathBuf, + }, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let cmd: Command = Command::parse(); + + let backend = setup_storage(&cmd.s3_args).await?; + tokio::fs::create_dir_all(&cmd.path).await?; + let mut compactor = Compactor::new(backend.into(), &cmd.path)?; + match cmd.subcommand { + Subcommand::Monitor { namespace } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace); + compactor.monitor(&namespace).await?; + println!("monitoring {namespace}"); + } + Subcommand::Analyze { + namespace, + list_all, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace); + let analysis = compactor.analyze(&namespace)?; + println!("stats for {namespace}:"); + println!("- segment count: {}", analysis.segment_count()); + println!("- last frame_no: {}", analysis.last_frame_no()); + let set = analysis.shortest_restore_path(); + println!("- shortest restore path len: {}", set.len()); + if let Some((first, last)) = compactor.get_segment_range(&namespace)? { + println!( + "- oldest segment: {}-{} ({})", + first.key.start_frame_no, first.key.end_frame_no, first.created_at + ); + println!( + "- most recent segment: {}-{} ({})", + last.key.start_frame_no, last.key.end_frame_no, last.created_at + ); + } + + if list_all { + println!("segments:"); + compactor.list_all(&namespace, |info| { + println!( + "- {}-{} ({})", + info.key.start_frame_no, info.key.end_frame_no, info.created_at + ); + })?; + } + } + Subcommand::Compact { + strategy, + dry_run, + namespace, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace); + let analysis = compactor.analyze(&namespace)?; + let strat: Box = match strategy { + CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), + CompactStrategy::CompactAll => Box::new(IdentityStrategy), + }; + let set = analysis.shortest_restore_path(); + let partition = strat.partition(&set); + + println!("initial shortest restore path len: {}", set.len()); + println!("compacting into {} segments", partition.len()); + for set in partition.iter() { + println!("- {:?}", set.range().unwrap()); + } + if dry_run { + println!("dry run: stopping"); + } else { + println!("performing compaction"); + let part_len = partition.len(); + for (idx, set) in partition.into_iter().enumerate() { + let Some((start, end)) = set.range() else { + continue; + }; + println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); + // TODO: we can compact in conccurently + compactor.compact(set).await?; + } + } + } + Subcommand::Sync { full, namespace } => match namespace { + Some(_ns) => { + todo!() + } + None if full => { + compactor.sync_full().await?; + println!("all monitored namespace fully up to date."); + } + _ => todo!(), + }, + Subcommand::Restore { + namespace, + out, + verify, + } => { + let namespace = libsql_sys::name::NamespaceName::from_string(namespace); + let analysis = compactor.analyze(&namespace)?; + let set = analysis.shortest_restore_path(); + compactor.restore(set, &out).await?; + if verify { + let conn = libsql_sys::rusqlite::Connection::open(&out)?; + conn.pragma_query(None, "integrity_check", |r| { + println!("{r:?}"); + Ok(()) + })?; + } + } + } + + Ok(()) +} + +async fn setup_storage(opt: &S3Args) -> anyhow::Result> { + let config = aws_config::load_defaults(BehaviorVersion::latest()).await; + + let mut builder = config.into_builder(); + builder.set_endpoint_url(opt.s3_url.clone()); + builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into()); + builder.set_region(Region::new( + opt.s3_region_id.clone().expect("expected aws region"), + )); + let cred = Credentials::new( + opt.s3_access_key_id.as_ref().unwrap(), + opt.s3_access_key.as_ref().unwrap(), + None, + None, + "Static", + ); + builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred))); + let config = builder.build(); + let backend = S3Backend::from_sdk_config( + config, + opt.s3_bucket.clone().context("missing bucket id")?, + opt.cluster_id.clone().context("missing cluster id")?, + ) + .await?; + + Ok(backend) +} diff --git a/libsql-wal/src/bins/shell/main.rs b/libsql-wal/src/bins/shell/main.rs index af8eec962b..39497f6fe1 100644 --- a/libsql-wal/src/bins/shell/main.rs +++ b/libsql-wal/src/bins/shell/main.rs @@ -40,20 +40,21 @@ struct Cli { "s3_bucket", "cluster_id", ])] + struct S3Args { #[arg(long, requires = "S3Args")] enable_s3: bool, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")] cluster_id: Option, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")] s3_url: Option, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")] s3_access_key: Option, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")] s3_access_key_id: Option, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")] s3_bucket: Option, - #[arg(long)] + #[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")] s3_region_id: Option, } diff --git a/libsql-wal/src/io/buf.rs b/libsql-wal/src/io/buf.rs index ba51018d74..34ce446409 100644 --- a/libsql-wal/src/io/buf.rs +++ b/libsql-wal/src/io/buf.rs @@ -132,6 +132,10 @@ impl ZeroCopyBoxIoBuf { } } + pub fn new_uninit(inner: Box) -> Self { + Self { init: 0, inner } + } + fn is_init(&self) -> bool { self.init == size_of::() } diff --git a/libsql-wal/src/replication/storage.rs b/libsql-wal/src/replication/storage.rs index ae3c13ac97..9b3bb15963 100644 --- a/libsql-wal/src/replication/storage.rs +++ b/libsql-wal/src/replication/storage.rs @@ -7,6 +7,7 @@ use roaring::RoaringBitmap; use tokio_stream::Stream; use zerocopy::FromZeroes; +use crate::io::buf::ZeroCopyBoxIoBuf; use crate::segment::Frame; use crate::storage::Storage; @@ -61,8 +62,9 @@ where }, }; - let (frame, ret) = segment.read_frame(Frame::new_box_zeroed(), offset as u32).await; + let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await; ret?; + let frame = frame.into_inner(); debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0"); if frame.header().frame_no() >= until { yield frame; diff --git a/libsql-wal/src/segment/compacted.rs b/libsql-wal/src/segment/compacted.rs index 689964f2b4..4194b34820 100644 --- a/libsql-wal/src/segment/compacted.rs +++ b/libsql-wal/src/segment/compacted.rs @@ -4,8 +4,9 @@ use std::mem::size_of; use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; -use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf}; +use crate::io::buf::{IoBufMut, ZeroCopyBuf}; use crate::io::FileExt; +use crate::segment::FrameHeader; use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION}; use super::{Frame, Result}; @@ -82,14 +83,33 @@ impl CompactedSegment { Ok(Self { file, header }) } - pub(crate) async fn read_frame( + pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self { + Self { header, file } + } + + pub(crate) async fn read_frame( &self, - frame: Box, + buf: B, offset: u32, - ) -> (Box, io::Result<()>) { + ) -> (B, io::Result<()>) { + assert_eq!(buf.bytes_init(), 0); + assert_eq!(buf.bytes_total(), size_of::()); let offset = size_of::() + size_of::() * offset as usize; - let buf = ZeroCopyBoxIoBuf::new(frame); let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; - (buf.into_inner(), ret) + (buf, ret) + } + + pub(crate) async fn read_page( + &self, + buf: B, + offset: u32, + ) -> (B, io::Result<()>) { + assert_eq!(buf.bytes_init(), 0); + assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize); + let offset = size_of::() + + size_of::() * offset as usize + + size_of::(); + let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await; + (buf, ret) } } diff --git a/libsql-wal/src/storage/async_storage.rs b/libsql-wal/src/storage/async_storage.rs index 843ca6c920..da4f3a75ab 100644 --- a/libsql-wal/src/storage/async_storage.rs +++ b/libsql-wal/src/storage/async_storage.rs @@ -7,6 +7,7 @@ use chrono::Utc; use libsql_sys::name::NamespaceName; use tokio::sync::{mpsc, oneshot}; use tokio::task::JoinSet; +use tokio_stream::Stream; use crate::io::{FileExt, Io, StdIO}; use crate::segment::compacted::CompactedSegment; @@ -267,6 +268,16 @@ where let segment = CompactedSegment::open(file).await?; Ok(segment) } + + fn list_segments<'a>( + &'a self, + namespace: &'a NamespaceName, + until: u64, + config_override: Option, + ) -> impl Stream> + 'a { + let config = config_override.unwrap_or_else(|| self.backend.default_config()); + self.backend.list_segments(config, namespace, until) + } } pub struct AsyncStorageInitConfig { diff --git a/libsql-wal/src/storage/backend/mod.rs b/libsql-wal/src/storage/backend/mod.rs index 25cc57f1a0..e054dbb8cd 100644 --- a/libsql-wal/src/storage/backend/mod.rs +++ b/libsql-wal/src/storage/backend/mod.rs @@ -4,10 +4,12 @@ use std::{future::Future, path::Path}; use chrono::{DateTime, Utc}; use fst::Map; +use tokio_stream::Stream; use uuid::Uuid; -use super::{RestoreOptions, Result, SegmentKey}; +use super::{RestoreOptions, Result, SegmentInfo, SegmentKey}; use crate::io::file::FileExt; +use crate::segment::compacted::CompactedSegmentDataHeader; use libsql_sys::name::NamespaceName; // pub mod fs; @@ -63,7 +65,7 @@ pub trait Backend: Send + Sync + 'static { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result<()>; + ) -> Result; // this method taking self: Arc is an infortunate consequence of rust type system making // impl FileExt variant with all the arguments, with no escape hatch... @@ -98,6 +100,13 @@ pub trait Backend: Send + Sync + 'static { dest: impl FileExt, ) -> Result<()>; + fn list_segments<'a>( + &'a self, + config: Self::Config, + namespace: &'a NamespaceName, + until: u64, + ) -> impl Stream> + 'a; + /// Returns the default configuration for this storage fn default_config(&self) -> Self::Config; } @@ -176,7 +185,7 @@ impl Backend for Arc { namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result<()> { + ) -> Result { self.as_ref() .fetch_segment_data_to_file(config, namespace, key, file) .await @@ -194,4 +203,13 @@ impl Backend for Arc { .fetch_segment_data(config, namespace, key) .await } + + fn list_segments<'a>( + &'a self, + config: Self::Config, + namespace: &'a NamespaceName, + until: u64, + ) -> impl Stream> + 'a { + self.as_ref().list_segments(config, namespace, until) + } } diff --git a/libsql-wal/src/storage/backend/s3.rs b/libsql-wal/src/storage/backend/s3.rs index 66aff1f2da..0c4b7635fe 100644 --- a/libsql-wal/src/storage/backend/s3.rs +++ b/libsql-wal/src/storage/backend/s3.rs @@ -13,11 +13,13 @@ use aws_sdk_s3::operation::create_bucket::CreateBucketError; use aws_sdk_s3::primitives::{ByteStream, SdkBody}; use aws_sdk_s3::types::CreateBucketConfiguration; use aws_sdk_s3::Client; +use aws_smithy_types_convert::date_time::DateTimeExt; use bytes::{Bytes, BytesMut}; use http_body::{Frame as HttpFrame, SizeHint}; use libsql_sys::name::NamespaceName; use roaring::RoaringBitmap; -use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; +use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader}; +use tokio_stream::Stream; use tokio_util::sync::ReusableBoxFuture; use zerocopy::byteorder::little_endian::{U16 as lu16, U32 as lu32, U64 as lu64}; use zerocopy::{AsBytes, FromBytes, FromZeroes}; @@ -28,7 +30,7 @@ use crate::io::compat::copy_to_file; use crate::io::{FileExt, Io, StdIO}; use crate::segment::compacted::CompactedSegmentDataHeader; use crate::segment::Frame; -use crate::storage::{Error, RestoreOptions, Result, SegmentKey}; +use crate::storage::{Error, RestoreOptions, Result, SegmentInfo, SegmentKey}; use crate::LIBSQL_MAGIC; pub struct S3Backend { @@ -135,12 +137,17 @@ impl S3Backend { folder_key: &FolderKey<'_>, segment_key: &SegmentKey, file: &impl FileExt, - ) -> Result<()> { + ) -> Result { let reader = self .fetch_segment_data_reader(config, folder_key, segment_key) .await?; + let mut reader = tokio::io::BufReader::with_capacity(8196, reader); + while reader.fill_buf().await?.len() < size_of::() {} + let header = CompactedSegmentDataHeader::read_from_prefix(reader.buffer()).unwrap(); + copy_to_file(reader, file).await?; - Ok(()) + + Ok(header) } async fn s3_get(&self, config: &S3Config, key: String) -> Result { @@ -301,6 +308,52 @@ impl S3Backend { Ok(index) } + + fn list_segments_inner<'a>( + &'a self, + config: Arc, + namespace: &'a NamespaceName, + _until: u64, + ) -> impl Stream> + 'a { + async_stream::try_stream! { + let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace }; + let lookup_key_prefix = s3_segment_index_lookup_key_prefix(&folder_key); + + let mut continuation_token = None; + loop { + let objects = self + .client + .list_objects_v2() + .bucket(&config.bucket) + .prefix(lookup_key_prefix.clone()) + .set_continuation_token(continuation_token.take()) + .send() + .await + .map_err(|e| Error::unhandled(e, "failed to list bucket"))?; + + for entry in objects.contents() { + let key = entry.key().expect("misssing key?"); + let key_path: &Path = key.as_ref(); + let Some(key) = SegmentKey::validate_from_path(key_path, &folder_key.namespace) else { continue }; + + let infos = SegmentInfo { + key, + size: entry.size().unwrap_or(0) as usize, + created_at: entry.last_modified().unwrap().to_chrono_utc().unwrap(), + }; + + yield infos; + } + + if objects.is_truncated().unwrap_or(false) { + assert!(objects.next_continuation_token.is_some()); + continuation_token = objects.next_continuation_token; + } else { + break + } + } + } + } } pub struct S3Config { @@ -487,14 +540,15 @@ where namespace: &NamespaceName, key: &SegmentKey, file: &impl FileExt, - ) -> Result<()> { + ) -> Result { let folder_key = FolderKey { cluster_id: &config.cluster_id, namespace: &namespace, }; - self.fetch_segment_data_inner(config, &folder_key, key, file) + let header = self + .fetch_segment_data_inner(config, &folder_key, key, file) .await?; - Ok(()) + Ok(header) } async fn fetch_segment_data( @@ -508,6 +562,15 @@ where .await?; Ok(file) } + + fn list_segments<'a>( + &'a self, + config: Self::Config, + namespace: &'a NamespaceName, + until: u64, + ) -> impl Stream> + 'a { + self.list_segments_inner(config, namespace, until) + } } #[derive(Clone, Copy)] diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs new file mode 100644 index 0000000000..9c46c206be --- /dev/null +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -0,0 +1,553 @@ +use std::ops::Deref; +use std::path::Path; +use std::path::PathBuf; +use std::sync::Arc; + +use chrono::DateTime; +use chrono::Utc; +use fst::map::OpBuilder; +use fst::Streamer; +use libsql_sys::name::NamespaceName; +use libsql_sys::rusqlite::OptionalExtension; +use libsql_sys::rusqlite::{self, TransactionBehavior}; +use tempfile::tempdir; +use tokio_stream::StreamExt; +use uuid::Uuid; +use zerocopy::AsBytes; + +use crate::io::buf::ZeroCopyBuf; +use crate::io::FileExt as _; +use crate::segment::compacted::CompactedSegment; +use crate::segment::compacted::CompactedSegmentDataFooter; +use crate::segment::compacted::CompactedSegmentDataHeader; +use crate::segment::Frame; +use crate::storage::backend::SegmentMeta; +use crate::LIBSQL_MAGIC; +use crate::LIBSQL_PAGE_SIZE; +use crate::LIBSQL_WAL_VERSION; + +use super::backend::Backend; +use super::{SegmentInfo, SegmentKey}; + +pub mod strategy; + +type Result = std::result::Result; + +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("error reading from meta db: {0}")] + Meta(#[from] rusqlite::Error), + #[error("io error: {0}")] + Io(#[from] std::io::Error), + #[error("storage error: {0}")] + Storage(#[from] crate::storage::Error), +} + +pub struct Compactor { + backend: Arc, + meta: rusqlite::Connection, + path: PathBuf, +} + +impl Compactor { + pub fn new(backend: Arc, compactor_path: &Path) -> Result { + let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?; + // todo! set pragmas: wal + foreign key check + meta.execute("CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL)", ()).unwrap(); + meta.execute( + "CREATE TABLE IF NOT EXISTS segments ( + start_frame_no INTEGER, + end_frame_no INTEGER, + created_at DATE, + size INTEGER, + namespace_id INTEGER, + PRIMARY KEY (start_frame_no, end_frame_no), + FOREIGN KEY(namespace_id) REFERENCES monitored_namespaces(id))", + (), + )?; + + Ok(Self { + backend, + meta, + path: compactor_path.into(), + }) + } + + pub async fn monitor(&mut self, namespace: &NamespaceName) -> Result<()> + where + B: Backend, + { + let tx = self.meta.transaction()?; + let id = { + let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?; + stmt.query_row([namespace.as_str()], |r| r.get(0))? + }; + + sync_one(self.backend.as_ref(), namespace, id, &tx).await?; + + tx.commit()?; + + Ok(()) + } + + pub fn analyze(&self, namespace: &NamespaceName) -> Result { + let mut stmt = self.meta.prepare_cached( + r#" + SELECT start_frame_no, end_frame_no + FROM segments as s + JOIN monitored_namespaces as m + ON m.id = s.namespace_id + WHERE m.namespace_name = ?"#, + )?; + let mut rows = stmt.query([namespace.as_str()])?; + let mut graph = petgraph::graphmap::DiGraphMap::new(); + let mut last_frame_no = 0; + while let Some(row) = rows.next()? { + let start_frame_no: u64 = row.get(0)?; + let end_frame_no: u64 = row.get(1)?; + // it's free to go from one end of a segment to the next + graph.add_edge(start_frame_no, end_frame_no, 0); + if start_frame_no != 1 { + // going from a segment to the next costs us + graph.add_edge(start_frame_no - 1, start_frame_no, 1); + } + last_frame_no = last_frame_no.max(end_frame_no); + } + + Ok(AnalyzedSegments { + graph, + last_frame_no, + namespace: namespace.clone(), + }) + } + + pub fn get_segment_range( + &self, + namespace: &NamespaceName, + ) -> Result> { + segments_range(&self.meta, namespace) + } + + /// Polls storage for new frames since last sync + #[tracing::instrument(skip(self))] + async fn sync_latest(&self) -> Result<()> + where + B: Backend, + { + // let tx = self.meta.transaction()?; + // let stream = self.storage.list_segments(); + + Ok(()) + } + + /// sync all segments from storage with local cache + pub async fn sync_full(&mut self) -> Result<()> + where + B: Backend, + { + let tx = self + .meta + .transaction_with_behavior(TransactionBehavior::Immediate)?; + { + tx.execute("DELETE FROM segments", ())?; + let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?; + let mut namespace_rows = stmt.query(())?; + while let Some(row) = namespace_rows.next()? { + let namespace = NamespaceName::from_string(row.get::<_, String>(0)?); + let id = row.get::<_, u64>(1)?; + sync_one(self.backend.as_ref(), &namespace, id, &tx).await?; + } + } + + tx.commit()?; + + Ok(()) + } + + async fn fetch( + &self, + set: &SegmentSet, + into: &Path, + ) -> Result<( + Vec>, + Vec>>, + )> + where + B: Backend, + { + let mut indexes = Vec::with_capacity(set.len()); + let mut segments = Vec::with_capacity(set.len()); + for key in set.iter() { + let file = std::fs::File::options() + .create_new(true) + .write(true) + .read(true) + .open(into.join(&format!("{key}.data"))) + .unwrap(); + let header = self + .backend + .fetch_segment_data_to_file( + &self.backend.default_config(), + &set.namespace, + key, + &file, + ) + .await + .unwrap(); + let index = self + .backend + .fetch_segment_index(&self.backend.default_config(), &set.namespace, key) + .await + .unwrap(); + indexes.push(index); + segments.push(CompactedSegment::from_parts(file, header)); + } + + Ok((segments, indexes)) + } + + pub async fn compact(&self, set: SegmentSet) -> Result<()> + where + B: Backend, + { + assert!(!set.is_empty()); + let tmp = tempdir().unwrap(); + // FIXME: bruteforce: we don't necessarily need to download all the segments to cover all + // the changes. Iterating backward over the set items and filling the gaps in the pages + // range would, in theory, be sufficient + // another alternative is to download all the indexes, and lazily download the segment data + // TODO: fetch conccurently + let (segments, indexes) = self.fetch(&set, tmp.path()).await?; + let last_header = segments.last().expect("non-empty set").header(); + + // It's unfortunate that we need to know the number of frames in the final segment ahead of + // time, but it's necessary for computing the checksum. This seems like the least costly + // methods (over recomputing the whole checksum). + let mut union = OpBuilder::from_iter(indexes.iter()).union(); + let mut count = 0; + while let Some(_) = union.next() { + count += 1; + } + + let mut hasher = crc32fast::Hasher::new(); + + let out_file = std::fs::File::options() + .create_new(true) + .write(true) + .read(true) + .open(tmp.path().join("out")) + .unwrap(); + let header = CompactedSegmentDataHeader { + frame_count: (count as u32).into(), + segment_id: Uuid::new_v4().to_u128_le().into(), + start_frame_no: set.range().expect("non-empty set").0.into(), + end_frame_no: set.range().expect("non-empty set").1.into(), + size_after: last_header.size_after, + version: LIBSQL_WAL_VERSION.into(), + magic: LIBSQL_MAGIC.into(), + page_size: last_header.page_size, + }; + + hasher.update(header.as_bytes()); + let (_, ret) = out_file + .write_all_at_async(ZeroCopyBuf::new_init(header), 0) + .await; + ret?; + + let mut union = OpBuilder::from_iter(indexes.iter()).union(); + let mut buffer = Box::new(ZeroCopyBuf::::new_uninit()); + let mut out_index = fst::MapBuilder::memory(); + let mut current_offset = 0; + + while let Some((page_no_bytes, indexed_offsets)) = union.next() { + let (index, offset) = indexed_offsets + .iter() + .max_by_key(|v| v.index) + .map(|v| (v.index, v.value)) + .expect("union returned something, must be non-empty"); + let segment = &segments[index]; + let (frame, ret) = segment.read_frame(buffer, offset as u32).await; + ret?; + hasher.update(&frame.get_ref().as_bytes()); + let dest_offset = + size_of::() + current_offset * size_of::(); + let (mut frame, ret) = out_file.write_all_at_async(frame, dest_offset as u64).await; + ret?; + out_index + .insert(page_no_bytes, current_offset as _) + .unwrap(); + current_offset += 1; + frame.deinit(); + buffer = frame; + } + + let footer = CompactedSegmentDataFooter { + checksum: hasher.finalize().into(), + }; + + let footer_offset = + size_of::() + current_offset * size_of::(); + let (_, ret) = out_file + .write_all_at_async(ZeroCopyBuf::new_init(footer), footer_offset as _) + .await; + ret?; + + let (start, end) = set.range().expect("non-empty set"); + self.backend + .store( + &self.backend.default_config(), + SegmentMeta { + namespace: set.namespace.clone(), + segment_id: Uuid::new_v4(), + start_frame_no: start, + end_frame_no: end, + created_at: Utc::now(), + }, + out_file, + out_index.into_inner().unwrap(), + ) + .await?; + + Ok(()) + } + + /// Restore a datatase file from a segment set + /// set must start at frame_no 1 + pub async fn restore(&self, set: SegmentSet, to: impl AsRef) -> Result<()> + where + B: Backend, + { + if set.is_empty() { + return Ok(()); + } + assert_eq!(set.range().unwrap().0, 1); + let tmp = tempdir()?; + let (segments, indexes) = self.fetch(&set, tmp.path()).await?; + let mut union = OpBuilder::from_iter(indexes.iter()).union(); + let mut buffer = Vec::with_capacity(LIBSQL_PAGE_SIZE as usize); + let out_file = std::fs::File::create(to)?; + + while let Some((page_no_bytes, indexed_offsets)) = union.next() { + let page_no = u32::from_be_bytes(page_no_bytes.try_into().unwrap()); + let (index, offset) = indexed_offsets + .iter() + .max_by_key(|v| v.index) + .map(|v| (v.index, v.value as u32)) + .expect("union returned something, must be non-empty"); + let segment = &segments[index]; + let (b, ret) = segment.read_page(buffer, offset).await; + ret?; + let offset = (page_no as u64 - 1) * LIBSQL_PAGE_SIZE as u64; + let (mut b, ret) = out_file.write_all_at_async(b, offset).await; + ret?; + b.clear(); + buffer = b; + } + + Ok(()) + } + + pub fn list_all(&self, namespace: &NamespaceName, f: impl FnMut(SegmentInfo)) -> Result<()> { + list_segments(&self.meta, namespace, f) + } +} + +pub struct AnalyzedSegments { + graph: petgraph::graphmap::DiGraphMap, + last_frame_no: u64, + namespace: NamespaceName, +} + +impl AnalyzedSegments { + /// returns a list of keys that covers frame_no 1 to last in the shortest amount of segments + pub fn shortest_restore_path(&self) -> SegmentSet { + let path = petgraph::algo::astar( + &self.graph, + 1, + |n| n == self.last_frame_no, + |(_, _, &x)| x, + |n| self.last_frame_no - n, + ); + let mut segments = Vec::new(); + match path { + Some((_len, nodes)) => { + for chunk in nodes.chunks(2) { + let key = SegmentKey { + start_frame_no: chunk[0], + end_frame_no: chunk[1], + }; + segments.push(key); + } + } + None => (), + } + SegmentSet { + segments, + namespace: self.namespace.clone(), + } + } + + pub fn last_frame_no(&self) -> u64 { + self.last_frame_no + } + + pub fn segment_count(&self) -> usize { + self.graph.node_count() / 2 + } +} + +/// A set of segments, with the guarantee that segments are non-overlapping and increasing in +/// frameno +#[derive(Clone)] +pub struct SegmentSet { + namespace: NamespaceName, + segments: Vec, +} + +impl SegmentSet { + pub fn range(&self) -> Option<(u64, u64)> { + self.segments + .first() + .zip(self.segments.last()) + .map(|(f, l)| (f.start_frame_no, l.end_frame_no)) + } +} + +impl Deref for SegmentSet { + type Target = [SegmentKey]; + + fn deref(&self) -> &Self::Target { + &self.segments + } +} + +async fn sync_one( + backend: &B, + namespace: &NamespaceName, + id: u64, + conn: &rusqlite::Connection, +) -> Result<()> { + let segs = backend.list_segments(backend.default_config(), &namespace, 0); + tokio::pin!(segs); + + while let Some(info) = segs.next().await { + let info = info.unwrap(); + register_segment_info(&conn, info, id)?; + } + + Ok(()) +} + +fn list_segments<'a>( + conn: &'a rusqlite::Connection, + namespace: &'a NamespaceName, + mut f: impl FnMut(SegmentInfo), +) -> Result<()> { + let mut stmt = conn.prepare_cached( + r#" + SELECT created_at, size, start_frame_no, end_frame_no + FROM segments as s + JOIN monitored_namespaces as m + ON m.id == s.namespace_id + WHERE m.namespace_name = ? + ORDER BY end_frame_no, start_frame_no + "#, + )?; + + let iter = stmt.query_map([namespace.as_str()], |r| { + Ok(SegmentInfo { + key: SegmentKey { + start_frame_no: r.get(2)?, + end_frame_no: r.get(3)?, + }, + size: r.get(1)?, + created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), + }) + })?; + + for info in iter { + let info = info?; + f(info); + } + + Ok(()) +} + +fn register_segment_info( + conn: &rusqlite::Connection, + info: SegmentInfo, + namespace_id: u64, +) -> Result<()> { + let mut stmt = conn.prepare_cached( + r#" + INSERT OR IGNORE INTO segments ( + start_frame_no, + end_frame_no, + created_at, + size, + namespace_id + ) + VALUES (?, ?, ?, ?, ?)"#, + )?; + stmt.execute(( + info.key.start_frame_no, + info.key.end_frame_no, + info.created_at.timestamp(), + info.size, + namespace_id, + ))?; + Ok(()) +} + +fn segments_range( + conn: &rusqlite::Connection, + namespace: &NamespaceName, +) -> Result> { + let mut stmt = conn.prepare_cached( + r#" + SELECT min(created_at), size, start_frame_no, end_frame_no + FROM segments as s + JOIN monitored_namespaces as m + ON m.id == s.namespace_id + WHERE m.namespace_name = ? + LIMIT 1 +"#, + )?; + let first = stmt + .query_row([namespace.as_str()], |r| { + Ok(SegmentInfo { + key: SegmentKey { + start_frame_no: r.get(2)?, + end_frame_no: r.get(3)?, + }, + size: r.get(1)?, + created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), + }) + }) + .optional()?; + + let mut stmt = conn.prepare_cached( + r#" + SELECT max(created_at), size, start_frame_no, end_frame_no + FROM segments as s + JOIN monitored_namespaces as m + ON m.id == s.namespace_id + WHERE m.namespace_name = ? + LIMIT 1 +"#, + )?; + let last = stmt + .query_row([namespace.as_str()], |r| { + Ok(SegmentInfo { + key: SegmentKey { + start_frame_no: r.get(2)?, + end_frame_no: r.get(3)?, + }, + size: r.get(1)?, + created_at: DateTime::from_timestamp(r.get(0)?, 0).unwrap(), + }) + }) + .optional()?; + + Ok(first.zip(last)) +} diff --git a/libsql-wal/src/storage/compaction/strategy/identity.rs b/libsql-wal/src/storage/compaction/strategy/identity.rs new file mode 100644 index 0000000000..d7e61b7e28 --- /dev/null +++ b/libsql-wal/src/storage/compaction/strategy/identity.rs @@ -0,0 +1,14 @@ +use crate::storage::compaction::SegmentSet; + +use super::PartitionStrategy; + +/// partition strategy that doesn't split the passed set +pub struct IdentityStrategy; + +impl PartitionStrategy for IdentityStrategy { + fn partition(&self, segments: &SegmentSet) -> Vec { + let mut out = Vec::with_capacity(1); + out.push(segments.clone()); + out + } +} diff --git a/libsql-wal/src/storage/compaction/strategy/log_strategy.rs b/libsql-wal/src/storage/compaction/strategy/log_strategy.rs new file mode 100644 index 0000000000..b430008bd3 --- /dev/null +++ b/libsql-wal/src/storage/compaction/strategy/log_strategy.rs @@ -0,0 +1,32 @@ +use std::ops::Deref as _; + +use crate::storage::compaction::SegmentSet; + +use super::PartitionStrategy; + +/// partition the SegmentSet in logarithmically reducing sets +pub struct LogReductionStrategy; + +impl PartitionStrategy for LogReductionStrategy { + fn partition(&self, segments: &SegmentSet) -> Vec { + let mut segs = segments.deref(); + let mut out = Vec::new(); + while !segs.is_empty() { + let (lhs, rhs) = segs.split_at(segs.len() / 2); + out.push(SegmentSet { + segments: lhs.to_vec(), + namespace: segments.namespace.clone(), + }); + segs = rhs; + if segs.len() == 1 { + out.push(SegmentSet { + segments: rhs.to_vec(), + namespace: segments.namespace.clone(), + }); + break; + } + } + + out + } +} diff --git a/libsql-wal/src/storage/compaction/strategy/mod.rs b/libsql-wal/src/storage/compaction/strategy/mod.rs new file mode 100644 index 0000000000..bb74e95ca8 --- /dev/null +++ b/libsql-wal/src/storage/compaction/strategy/mod.rs @@ -0,0 +1,8 @@ +use super::SegmentSet; + +pub mod identity; +pub mod log_strategy; + +pub trait PartitionStrategy { + fn partition(&self, segments: &SegmentSet) -> Vec; +} diff --git a/libsql-wal/src/storage/job.rs b/libsql-wal/src/storage/job.rs index 797170644c..6f1620066d 100644 --- a/libsql-wal/src/storage/job.rs +++ b/libsql-wal/src/storage/job.rs @@ -116,6 +116,7 @@ mod test { use crate::io::file::FileExt; use crate::io::StdIO; + use crate::segment::compacted::CompactedSegmentDataHeader; use crate::storage::{RestoreOptions, SegmentKey}; // use crate::registry::WalRegistry; // use crate::segment::compacted::CompactedSegmentDataHeader; @@ -497,7 +498,7 @@ mod test { _namespace: &NamespaceName, _key: &SegmentKey, _file: &impl FileExt, - ) -> Result<()> { + ) -> Result { todo!() } @@ -519,6 +520,16 @@ mod test { ) -> Result>> { todo!() } + + fn list_segments<'a>( + &'a self, + _config: Self::Config, + _namespace: &'a NamespaceName, + _until: u64, + ) -> impl tokio_stream::Stream> + 'a + { + tokio_stream::iter(std::iter::from_fn(|| todo!())) + } } let job = Job { diff --git a/libsql-wal/src/storage/mod.rs b/libsql-wal/src/storage/mod.rs index 56ed5feff8..5ebff20220 100644 --- a/libsql-wal/src/storage/mod.rs +++ b/libsql-wal/src/storage/mod.rs @@ -1,10 +1,9 @@ use std::collections::BTreeMap; -use std::fmt; -use std::future::Future; use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::{fmt, future::Future}; use chrono::{DateTime, Utc}; use fst::Map; @@ -12,6 +11,7 @@ use hashbrown::HashMap; use libsql_sys::name::NamespaceName; use libsql_sys::wal::either::Either; use tempfile::{tempdir, TempDir}; +use tokio_stream::Stream; use crate::io::{FileExt, Io, StdIO}; use crate::segment::compacted::CompactedSegment; @@ -22,6 +22,7 @@ pub use self::error::Error; pub mod async_storage; pub mod backend; +pub mod compaction; pub(crate) mod error; mod job; mod scheduler; @@ -205,6 +206,20 @@ pub trait Storage: Send + Sync + 'static { fn shutdown(&self) -> impl Future + Send { async { () } } + + fn list_segments<'a>( + &'a self, + namespace: &'a NamespaceName, + until: u64, + config_override: Option, + ) -> impl Stream> + 'a; +} + +#[derive(Debug)] +pub struct SegmentInfo { + pub key: SegmentKey, + pub size: usize, + pub created_at: DateTime, } /// special zip function for Either storage implementation @@ -323,6 +338,22 @@ where Either::B(b) => b.shutdown().await, } } + + fn list_segments<'a>( + &'a self, + namespace: &'a NamespaceName, + until: u64, + config_override: Option, + ) -> impl Stream> + 'a { + match zip(self, config_override) { + Either::A((s, c)) => { + tokio_util::either::Either::Left(s.list_segments(namespace, until, c)) + } + Either::B((s, c)) => { + tokio_util::either::Either::Right(s.list_segments(namespace, until, c)) + } + } + } } /// a placeholder storage that doesn't store segment @@ -388,6 +419,17 @@ impl Storage for NoStorage { #[allow(unreachable_code)] Result::>::Err(Error::InvalidIndex("")) } + + fn list_segments<'a>( + &'a self, + _namespace: &'a NamespaceName, + _until: u64, + _config_override: Option, + ) -> impl Stream> + 'a { + unimplemented!("no storage!"); + #[allow(unreachable_code)] + tokio_stream::empty() + } } #[doc(hidden)] @@ -555,6 +597,17 @@ impl Storage for TestStorage { panic!("not storing") } } + + fn list_segments<'a>( + &'a self, + _namespace: &'a NamespaceName, + _until: u64, + _config_override: Option, + ) -> impl Stream> + 'a { + todo!(); + #[allow(unreachable_code)] + tokio_stream::empty() + } } pub struct StoreSegmentRequest {