Skip to content

Commit

Permalink
add compactor bin
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Sep 9, 2024
1 parent 59f5fe5 commit 00682fd
Show file tree
Hide file tree
Showing 2 changed files with 231 additions and 0 deletions.
6 changes: 6 additions & 0 deletions libsql-wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,15 @@ shell-bin = [
"dep:inquire",
"s3",
"dep:tracing-subscriber",
"dep:anyhow",
]

[[bin]]
name = "shell"
path = "src/bins/shell/main.rs"
required-features = ["shell-bin"]

[[bin]]
name = "compactor"
path = "src/bins/compactor/main.rs"
required-features = ["shell-bin"]
225 changes: 225 additions & 0 deletions libsql-wal/src/bins/compactor/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use std::path::PathBuf;

use anyhow::Context;
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::SharedCredentialsProvider;
use clap::{Parser, ValueEnum};
use libsql_wal::io::StdIO;
use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::compaction::strategy::{
identity::IdentityStrategy, log_strategy::LogReductionStrategy, PartitionStrategy,
};
use libsql_wal::storage::compaction::Compactor;

#[derive(Debug, clap::Args)]
struct S3Args {
#[arg(long, requires = "S3Args")]
enable_s3: bool,
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
cluster_id: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
s3_url: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
s3_access_key: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
s3_access_key_id: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
s3_bucket: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
s3_region_id: Option<String>,
}

#[derive(Clone, Debug, ValueEnum)]
enum CompactStrategy {
Logarithmic,
CompactAll,
}

#[derive(Debug, clap::Parser)]
struct Command {
#[arg(long, short, default_value = "compactor")]
path: PathBuf,
#[command(flatten)]
s3_args: S3Args,
#[command(subcommand)]
subcommand: Subcommand,
}

#[derive(Debug, clap::Subcommand)]
enum Subcommand {
/// Register namespaces to monitor
Monitor { namespace: String },
/// Analyze segments for a namespaces
Analyze {
/// list all segments
#[clap(long)]
list_all: bool,
namespace: String,
},
/// Compact segments into bigger segments
Compact {
// compaction strategy
#[clap(long, short)]
strategy: CompactStrategy,
/// prints the compaction plan, but doesn't perform it.
#[clap(long)]
dry_run: bool,
namespace: String,
},
/// Sync namespace metadata from remote storage
Sync {
/// When performing a full sync, all the segment space is scanned again. By default, only
/// segments with frame_no greated that the last frame_no are retrieved.
#[clap(long)]
full: bool,
/// unless this is specified, all monitored namespaces are synced
namespace: Option<String>,
},
/// Restore namespace
Restore {
#[clap(long)]
verify: bool,
namespace: String,
out: PathBuf,
},
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cmd: Command = Command::parse();

let backend = setup_storage(&cmd.s3_args).await?;
tokio::fs::create_dir_all(&cmd.path).await?;
let mut compactor = Compactor::new(backend.into(), &cmd.path)?;
match cmd.subcommand {
Subcommand::Monitor { namespace } => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
compactor.monitor(&namespace).await?;
println!("monitoring {namespace}");
}
Subcommand::Analyze {
namespace,
list_all,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
println!("stats for {namespace}:");
println!("- segment count: {}", analysis.segment_count());
println!("- last frame_no: {}", analysis.last_frame_no());
let set = analysis.shortest_restore_path();
println!("- shortest restore path len: {}", set.len());
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
println!(
"- oldest segment: {}-{} ({})",
first.key.start_frame_no, first.key.end_frame_no, first.created_at
);
println!(
"- most recent segment: {}-{} ({})",
last.key.start_frame_no, last.key.end_frame_no, last.created_at
);
}

if list_all {
println!("segments:");
compactor.list_all(&namespace, |info| {
println!(
"- {}-{} ({})",
info.key.start_frame_no, info.key.end_frame_no, info.created_at
);
})?;
}
}
Subcommand::Compact {
strategy,
dry_run,
namespace,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
let strat: Box<dyn PartitionStrategy> = match strategy {
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
};
let set = analysis.shortest_restore_path();
let partition = strat.partition(&set);

println!("initial shortest restore path len: {}", set.len());
println!("compacting into {} segments", partition.len());
for set in partition.iter() {
println!("- {:?}", set.range().unwrap());
}
if dry_run {
println!("dry run: stopping");
} else {
println!("performing compaction");
let part_len = partition.len();
for (idx, set) in partition.into_iter().enumerate() {
let Some((start, end)) = set.range() else {
continue;
};
println!("compacting {start}-{end} ({}/{})", idx + 1, part_len);
// TODO: we can compact in conccurently
compactor.compact(set).await?;
}
}
}
Subcommand::Sync { full, namespace } => match namespace {
Some(_ns) => {
todo!()
}
None if full => {
compactor.sync_full().await?;
println!("all monitored namespace fully up to date.");
}
_ => todo!(),
},
Subcommand::Restore {
namespace,
out,
verify,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
let set = analysis.shortest_restore_path();
compactor.restore(set, &out).await?;
if verify {
let conn = libsql_sys::rusqlite::Connection::open(&out)?;
conn.pragma_query(None, "integrity_check", |r| {
println!("{r:?}");
Ok(())
})?;
}
}
}

Ok(())
}

async fn setup_storage(opt: &S3Args) -> anyhow::Result<S3Backend<StdIO>> {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

let mut builder = config.into_builder();
builder.set_endpoint_url(opt.s3_url.clone());
builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into());
builder.set_region(Region::new(
opt.s3_region_id.clone().expect("expected aws region"),
));
let cred = Credentials::new(
opt.s3_access_key_id.as_ref().unwrap(),
opt.s3_access_key.as_ref().unwrap(),
None,
None,
"Static",
);
builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred)));
let config = builder.build();
let backend = S3Backend::from_sdk_config(
config,
opt.s3_bucket.clone().context("missing bucket id")?,
opt.cluster_id.clone().context("missing cluster id")?,
)
.await?;

Ok(backend)
}

0 comments on commit 00682fd

Please sign in to comment.