diff --git a/libsql-server/src/wal_toolkit.rs b/libsql-server/src/wal_toolkit.rs index 6f8e9d9e6d..5297b932da 100644 --- a/libsql-server/src/wal_toolkit.rs +++ b/libsql-server/src/wal_toolkit.rs @@ -4,14 +4,18 @@ use anyhow::Context as _; use aws_config::{retry::RetryConfig, BehaviorVersion, Region}; use aws_sdk_s3::config::{Credentials, SharedCredentialsProvider}; use chrono::DateTime; +use hashbrown::HashSet; +use libsql_sys::name::NamespaceName; use libsql_wal::io::StdIO; use libsql_wal::storage::backend::s3::S3Backend; +use libsql_wal::storage::backend::Backend; use libsql_wal::storage::compaction::strategy::identity::IdentityStrategy; use libsql_wal::storage::compaction::strategy::log_strategy::LogReductionStrategy; use libsql_wal::storage::compaction::strategy::PartitionStrategy; use libsql_wal::storage::compaction::Compactor; +use rusqlite::OpenFlags; -#[derive(Clone, Debug, clap::ValueEnum)] +#[derive(Clone, Debug, clap::ValueEnum, Copy)] pub enum CompactStrategy { Logarithmic, CompactAll, @@ -20,7 +24,20 @@ pub enum CompactStrategy { #[derive(Debug, clap::Subcommand)] pub enum WalToolkit { /// Register namespaces to monitor - Monitor { namespace: String }, + Monitor { + /// list monitored namespaces + #[clap(long, short)] + list: bool, + /// Monitor the passed namespace + #[clap(long, short)] + add: Option, + /// Unmonitor the passed namespace + #[clap(long, short)] + delete: Option, + /// Sync namespaces from a sqld meta-store + #[clap(long)] + from_db: Option, + }, /// Analyze segments for a namespaces Analyze { /// list all segments @@ -36,7 +53,11 @@ pub enum WalToolkit { /// prints the compaction plan, but doesn't perform it. #[clap(long)] dry_run: bool, - namespace: String, + /// only compact if it takes more than `threshold` segments to restore + #[clap(long, short, default_value = "1")] + threshold: usize, + /// namespace to compact, otherwise, all namespaces are compacted + namespace: Option, }, /// Sync namespace metadata from remote storage Sync { @@ -62,109 +83,51 @@ impl WalToolkit { tokio::fs::create_dir_all(compact_path).await?; let mut compactor = Compactor::new(backend.into(), compact_path)?; match self { - Self::Monitor { namespace } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); - compactor.monitor(&namespace).await?; - println!("monitoring {namespace}"); + Self::Monitor { + add, + list, + from_db, + delete, + } => { + handle_monitor( + *list, + &mut compactor, + add.as_deref(), + delete.as_deref(), + from_db.as_deref(), + ) + .await?; } Self::Analyze { namespace, list_all, } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); - let analysis = compactor.analyze(&namespace)?; - println!("stats for {namespace}:"); - println!("- segment count: {}", analysis.segment_count()); - println!("- last frame_no: {}", analysis.last_frame_no()); - let set = analysis.shortest_restore_path(); - println!("- shortest restore path len: {}", set.len()); - if let Some((first, last)) = compactor.get_segment_range(&namespace)? { - println!( - "- oldest segment: {}-{} ({})", - first.key.start_frame_no, - first.key.end_frame_no, - DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap() - ); - println!( - "- most recent segment: {}-{} ({})", - last.key.start_frame_no, - last.key.end_frame_no, - DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap() - ); - } - - if *list_all { - println!("segments:"); - compactor.list_all(&namespace, |info| { - println!( - "- {}-{} ({})", - info.key.start_frame_no, - info.key.end_frame_no, - DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap() - ); - })?; - } + handle_analyze(namespace, &compactor, *list_all)?; } Self::Compact { strategy, dry_run, namespace, + threshold, } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); - let analysis = compactor.analyze(&namespace)?; - let strat: Box = match strategy { - CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), - CompactStrategy::CompactAll => Box::new(IdentityStrategy), - }; - let set = analysis.shortest_restore_path(); - let partition = strat.partition(&set); - - println!("initial shortest restore path len: {}", set.len()); - println!("compacting into {} segments", partition.len()); - for set in partition.iter() { - println!("- {:?}", set.range().unwrap()); - } - if *dry_run { - println!("dry run: stopping"); - } else { - println!("performing compaction"); - let part_len = partition.len(); - for (idx, set) in partition.into_iter().enumerate() { - let Some((start, end)) = set.range() else { - continue; - }; - println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); - // TODO: we can compact in conccurently - compactor.compact(set).await?; - } - } + handle_compact( + namespace.as_deref(), + &mut compactor, + *threshold, + *strategy, + *dry_run, + ) + .await? + } + Self::Sync { full, namespace } => { + handle_sync(namespace.as_deref(), &mut compactor, full).await? } - Self::Sync { full, namespace } => match namespace { - Some(_ns) => { - todo!() - } - None if *full => { - compactor.sync_full().await?; - println!("all monitored namespace fully up to date."); - } - _ => todo!(), - }, Self::Restore { namespace, out, verify, } => { - let namespace = libsql_sys::name::NamespaceName::from_string(namespace.to_string()); - let analysis = compactor.analyze(&namespace)?; - let set = analysis.shortest_restore_path(); - compactor.restore(set, &out).await?; - if *verify { - let conn = libsql_sys::rusqlite::Connection::open(&out)?; - conn.pragma_query(None, "integrity_check", |r| { - println!("{r:?}"); - Ok(()) - })?; - } + handle_restore(namespace, compactor, out, *verify).await?; } } @@ -172,6 +135,208 @@ impl WalToolkit { } } +async fn handle_restore( + namespace: &str, + compactor: Compactor>, + out: &Path, + verify: bool, +) -> Result<(), anyhow::Error> { + let namespace = NamespaceName::from_string(namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + let set = analysis.shortest_restore_path(); + compactor.restore(set, &out).await?; + Ok(if verify { + let conn = libsql_sys::rusqlite::Connection::open(&out)?; + conn.pragma_query(None, "integrity_check", |r| { + println!("{r:?}"); + Ok(()) + })?; + }) +} + +async fn handle_sync( + namespace: Option<&str>, + compactor: &mut Compactor>, + full: &bool, +) -> Result<(), anyhow::Error> { + Ok(match namespace { + Some(ns) => { + let namespace = NamespaceName::from_string(ns.to_string()); + compactor.sync_one(&namespace, *full).await?; + println!("`{namespace}` fully up to date."); + } + None => { + compactor.sync_all(*full).await?; + println!("all monitored namespace fully up to date."); + } + }) +} + +async fn handle_compact( + namespace: Option<&str>, + compactor: &mut Compactor>, + threshold: usize, + strategy: CompactStrategy, + dry_run: bool, +) -> Result<(), anyhow::Error> { + Ok(match namespace { + Some(namespace) => { + let namespace = NamespaceName::from_string(namespace.to_string()); + compact_namespace(compactor, &namespace, threshold, strategy, dry_run).await?; + } + None => { + let mut out = Vec::new(); + compactor.list_monitored_namespaces(|ns| { + out.push(ns); + })?; + + for ns in &out { + compact_namespace(compactor, ns, threshold, strategy, dry_run).await?; + } + } + }) +} + +fn handle_analyze( + namespace: &str, + compactor: &Compactor>, + list_all: bool, +) -> Result<(), anyhow::Error> { + let namespace = NamespaceName::from_string(namespace.to_string()); + let analysis = compactor.analyze(&namespace)?; + println!("stats for {namespace}:"); + println!("- segment count: {}", analysis.segment_count()); + println!("- last frame_no: {}", analysis.last_frame_no()); + let set = analysis.shortest_restore_path(); + println!("- shortest restore path len: {}", set.len()); + if let Some((first, last)) = compactor.get_segment_range(&namespace)? { + println!( + "- oldest segment: {}-{} ({})", + first.key.start_frame_no, + first.key.end_frame_no, + DateTime::from_timestamp_millis(first.key.timestamp as _).unwrap() + ); + println!( + "- most recent segment: {}-{} ({})", + last.key.start_frame_no, + last.key.end_frame_no, + DateTime::from_timestamp_millis(last.key.timestamp as _).unwrap() + ); + } + Ok(if list_all { + println!("segments:"); + compactor.list_all_segments(&namespace, |info| { + println!( + "- {}-{} ({})", + info.key.start_frame_no, + info.key.end_frame_no, + DateTime::from_timestamp_millis(info.key.timestamp as _).unwrap() + ); + })?; + }) +} + +async fn handle_monitor( + list: bool, + compactor: &mut Compactor>, + add: Option<&str>, + delete: Option<&str>, + from_db: Option<&Path>, +) -> Result<(), anyhow::Error> { + if list { + compactor.list_monitored_namespaces(|ns| { + println!("{ns}"); + })?; + } else if let Some(namespace) = add { + let namespace = NamespaceName::from_string(namespace.to_string()); + compactor.monitor(&namespace).await?; + println!("monitoring {namespace}"); + } + Ok(if let Some(namespace) = delete { + let namespace = NamespaceName::from_string(namespace.to_string()); + compactor.unmonitor(&namespace)?; + println!("{namespace} is unmonitored"); + } else if let Some(path) = from_db { + let metastore_path = path.join("metastore").join("data"); + let conn = rusqlite::Connection::open_with_flags( + metastore_path, + OpenFlags::SQLITE_OPEN_READ_ONLY, + )?; + let mut stmt = conn.prepare("SELECT namespace FROM namespace_configs")?; + let metastore_namespaces = stmt + .query(())? + .mapped(|r| Ok(NamespaceName::from_string(r.get(0)?))) + .collect::, _>>()?; + + let mut monitored_namespace = HashSet::new(); + compactor.list_monitored_namespaces(|n| { + monitored_namespace.insert(n); + })?; + + let to_remove = monitored_namespace.difference(&metastore_namespaces); + for ns in to_remove { + println!("- {ns}"); + compactor.unmonitor(ns)?; + } + + let to_add = metastore_namespaces.difference(&monitored_namespace); + for ns in to_add { + println!("+ {ns}"); + compactor.monitor(&ns).await?; + } + }) +} + +async fn compact_namespace( + compactor: &mut Compactor, + namespace: &NamespaceName, + threshold: usize, + strategy: CompactStrategy, + dry_run: bool, +) -> anyhow::Result<()> { + let analysis = compactor.analyze(&namespace)?; + let strat: Box = match strategy { + CompactStrategy::Logarithmic => Box::new(LogReductionStrategy), + CompactStrategy::CompactAll => Box::new(IdentityStrategy), + }; + let set = analysis.shortest_restore_path(); + if set.len() <= threshold { + println!( + "skipping {namespace}: shortest restore path is {}, and threshold is {threshold}", + set.len() + ); + return Ok(()); + } + let partition = strat.partition(&set); + + println!("compacting {namespace}:"); + println!("-> initial shortest restore path len: {}", set.len()); + println!("-> compacting into {} segments", partition.len()); + for set in partition.iter() { + println!("\t- {:?}", set.range().unwrap()); + } + + if dry_run { + println!("dry run: stopping"); + } else { + println!("performing compaction"); + let part_len = partition.len(); + for (idx, set) in partition.into_iter().enumerate() { + let Some((start, end)) = set.range() else { + continue; + }; + println!("compacting {start}-{end} ({}/{})", idx + 1, part_len); + // TODO: we can compact in conccurently + compactor.compact(set).await?; + + // sync back the new segments + compactor.sync_one(&namespace, false).await?; + } + } + + Ok(()) +} + async fn setup_storage(opt: &S3Args) -> anyhow::Result> { let config = aws_config::load_defaults(BehaviorVersion::latest()).await; diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index dc6b24584f..01a66a1516 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -15,7 +15,7 @@ use uuid::Uuid; use zerocopy::AsBytes; use crate::io::buf::ZeroCopyBuf; -use crate::io::FileExt as _; +use crate::io::FileExt; use crate::segment::compacted::CompactedSegment; use crate::segment::compacted::CompactedSegmentDataFooter; use crate::segment::compacted::CompactedSegmentDataHeader; @@ -52,16 +52,17 @@ impl Compactor { pub fn new(backend: Arc, compactor_path: &Path) -> Result { let meta = rusqlite::Connection::open(compactor_path.join("meta.db"))?; // todo! set pragmas: wal + foreign key check - meta.execute("CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL)", ()).unwrap(); + meta.pragma_update(None, "journal_mode", "wal")?; + meta.execute(r#"CREATE TABLE IF NOT EXISTS monitored_namespaces (id INTEGER PRIMARY KEY AUTOINCREMENT, namespace_name BLOB NOT NULL, UNIQUE(namespace_name))"#, ()).unwrap(); meta.execute( - "CREATE TABLE IF NOT EXISTS segments ( + r#"CREATE TABLE IF NOT EXISTS segments ( start_frame_no INTEGER, end_frame_no INTEGER, timestamp DATE, size INTEGER, - namespace_id INTEGER, - PRIMARY KEY (start_frame_no, end_frame_no), - FOREIGN KEY(namespace_id) REFERENCES monitored_namespaces(id))", + namespace_id INTEGER REFERENCES monitored_namespaces(id) ON DELETE CASCADE, + PRIMARY KEY (start_frame_no, end_frame_no)) + "#, (), )?; @@ -79,10 +80,13 @@ impl Compactor { let tx = self.meta.transaction()?; let id = { let mut stmt = tx.prepare_cached("INSERT OR IGNORE INTO monitored_namespaces(namespace_name) VALUES (?) RETURNING id")?; - stmt.query_row([namespace.as_str()], |r| r.get(0))? + stmt.query_row([namespace.as_str()], |r| r.get(0)) + .optional()? }; - sync_one(self.backend.as_ref(), namespace, id, &tx).await?; + if let Some(id) = id { + sync_one(self.backend.as_ref(), namespace, id, &tx, true).await?; + } tx.commit()?; @@ -139,7 +143,7 @@ impl Compactor { } /// sync all segments from storage with local cache - pub async fn sync_full(&mut self) -> Result<()> + pub async fn sync_all(&mut self, full: bool) -> Result<()> where B: Backend, { @@ -147,13 +151,35 @@ impl Compactor { .meta .transaction_with_behavior(TransactionBehavior::Immediate)?; { - tx.execute("DELETE FROM segments", ())?; let mut stmt = tx.prepare("SELECT namespace_name, id FROM monitored_namespaces")?; let mut namespace_rows = stmt.query(())?; while let Some(row) = namespace_rows.next()? { let namespace = NamespaceName::from_string(row.get::<_, String>(0)?); let id = row.get::<_, u64>(1)?; - sync_one(self.backend.as_ref(), &namespace, id, &tx).await?; + sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?; + } + } + + tx.commit()?; + + Ok(()) + } + + pub async fn sync_one(&mut self, namespace: &NamespaceName, full: bool) -> Result<()> + where + B: Backend, + { + let tx = self + .meta + .transaction_with_behavior(TransactionBehavior::Immediate)?; + { + let mut stmt = + tx.prepare_cached("SELECT id FROM monitored_namespaces WHERE namespace_name = ?")?; + let id = stmt + .query_row([namespace.as_str()], |row| row.get(0)) + .optional()?; + if let Some(id) = id { + sync_one(self.backend.as_ref(), &namespace, id, &tx, full).await?; } } @@ -294,6 +320,9 @@ impl Compactor { ret?; let (start, end) = set.range().expect("non-empty set"); + let timestamp = DateTime::from_timestamp_millis(set.last().unwrap().timestamp as _) + .unwrap() + .to_utc(); self.backend .store( &self.backend.default_config(), @@ -302,11 +331,7 @@ impl Compactor { segment_id: Uuid::new_v4(), start_frame_no: start, end_frame_no: end, - segment_timestamp: DateTime::from_timestamp_millis( - set.last().unwrap().timestamp as _, - ) - .unwrap() - .to_utc(), + segment_timestamp: timestamp, }, out_file, out_index.into_inner().unwrap(), @@ -352,9 +377,21 @@ impl Compactor { Ok(()) } - pub fn list_all(&self, namespace: &NamespaceName, f: impl FnMut(SegmentInfo)) -> Result<()> { + pub fn list_all_segments( + &self, + namespace: &NamespaceName, + f: impl FnMut(SegmentInfo), + ) -> Result<()> { list_segments(&self.meta, namespace, f) } + + pub fn list_monitored_namespaces(&self, f: impl FnMut(NamespaceName)) -> Result<()> { + list_namespace(&self.meta, f) + } + + pub fn unmonitor(&self, ns: &NamespaceName) -> Result<()> { + unmonitor(&self.meta, ns) + } } pub struct AnalyzedSegments { @@ -442,13 +479,25 @@ async fn sync_one( namespace: &NamespaceName, id: u64, conn: &rusqlite::Connection, + full: bool, ) -> Result<()> { + let until = if full { + get_last_frame_no(conn, id)? + } else { + None + }; + let segs = backend.list_segments(backend.default_config(), &namespace, 0); tokio::pin!(segs); while let Some(info) = segs.next().await { let info = info.unwrap(); - register_segment_info(&conn, info, id)?; + register_segment_info(&conn, &info, id)?; + if let Some(until) = until { + if info.key.start_frame_no <= until { + break; + } + } } Ok(()) @@ -489,9 +538,25 @@ fn list_segments<'a>( Ok(()) } +fn list_namespace<'a>( + conn: &'a rusqlite::Connection, + mut f: impl FnMut(NamespaceName), +) -> Result<()> { + let mut stmt = conn.prepare_cached(r#"SELECT namespace_name FROM monitored_namespaces"#)?; + + stmt.query_map((), |r| { + let n = NamespaceName::from_string(r.get(0)?); + f(n); + Ok(()) + })? + .try_for_each(|c| c)?; + + Ok(()) +} + fn register_segment_info( conn: &rusqlite::Connection, - info: SegmentInfo, + info: &SegmentInfo, namespace_id: u64, ) -> Result<()> { let mut stmt = conn.prepare_cached( @@ -567,3 +632,17 @@ fn segments_range( Ok(first.zip(last)) } + +fn get_last_frame_no(conn: &rusqlite::Connection, namespace_id: u64) -> Result> { + let mut stmt = + conn.prepare_cached("SELECT MAX(end_frame_no) FROM segments WHERE namespace_id = ?")?; + Ok(stmt.query_row([namespace_id], |row| row.get(0))?) +} + +fn unmonitor(conn: &rusqlite::Connection, namespace: &NamespaceName) -> Result<()> { + conn.execute( + "DELETE FROM monitored_namespaces WHERE namespace_name = ?", + [namespace.as_str()], + )?; + Ok(()) +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index b09cebf85b..bdb353e007 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,3 +1,3 @@ [toolchain] profile = "default" -channel = "1.80.0" +channel = "1.81.0"