Skip to content

Commit

Permalink
Merge pull request #1723 from tursodatabase/storage-compactor
Browse files Browse the repository at this point in the history
Segment compactor
  • Loading branch information
MarinPostma authored Sep 10, 2024
2 parents 82df117 + 00682fd commit bdb9f20
Show file tree
Hide file tree
Showing 16 changed files with 1,054 additions and 26 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 9 additions & 0 deletions libsql-wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ tracing-subscriber = { version = "0.3.18", optional = true }
aws-credential-types = { version = "1.2.0", optional = true }
dashmap = "6.0.1"
rand = "0.8.5"
aws-smithy-types-convert = { version = "0.60.8", features = ["convert-chrono"] }
petgraph = "0.6.5"
anyhow = { version = "1.0.86", optional = true }

[dev-dependencies]
criterion = "0.5.1"
Expand Down Expand Up @@ -78,9 +81,15 @@ shell-bin = [
"dep:inquire",
"s3",
"dep:tracing-subscriber",
"dep:anyhow",
]

[[bin]]
name = "shell"
path = "src/bins/shell/main.rs"
required-features = ["shell-bin"]

[[bin]]
name = "compactor"
path = "src/bins/compactor/main.rs"
required-features = ["shell-bin"]
225 changes: 225 additions & 0 deletions libsql-wal/src/bins/compactor/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use std::path::PathBuf;

use anyhow::Context;
use aws_config::{retry::RetryConfig, BehaviorVersion, Region};
use aws_credential_types::Credentials;
use aws_sdk_s3::config::SharedCredentialsProvider;
use clap::{Parser, ValueEnum};
use libsql_wal::io::StdIO;
use libsql_wal::storage::backend::s3::S3Backend;
use libsql_wal::storage::compaction::strategy::{
identity::IdentityStrategy, log_strategy::LogReductionStrategy, PartitionStrategy,
};
use libsql_wal::storage::compaction::Compactor;

#[derive(Debug, clap::Args)]
struct S3Args {
#[arg(long, requires = "S3Args")]
enable_s3: bool,
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
cluster_id: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
s3_url: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
s3_access_key: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
s3_access_key_id: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
s3_bucket: Option<String>,
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
s3_region_id: Option<String>,
}

#[derive(Clone, Debug, ValueEnum)]
enum CompactStrategy {
Logarithmic,
CompactAll,
}

#[derive(Debug, clap::Parser)]
struct Command {
#[arg(long, short, default_value = "compactor")]
path: PathBuf,
#[command(flatten)]
s3_args: S3Args,
#[command(subcommand)]
subcommand: Subcommand,
}

#[derive(Debug, clap::Subcommand)]
enum Subcommand {
/// Register namespaces to monitor
Monitor { namespace: String },
/// Analyze segments for a namespaces
Analyze {
/// list all segments
#[clap(long)]
list_all: bool,
namespace: String,
},
/// Compact segments into bigger segments
Compact {
// compaction strategy
#[clap(long, short)]
strategy: CompactStrategy,
/// prints the compaction plan, but doesn't perform it.
#[clap(long)]
dry_run: bool,
namespace: String,
},
/// Sync namespace metadata from remote storage
Sync {
/// When performing a full sync, all the segment space is scanned again. By default, only
/// segments with frame_no greated that the last frame_no are retrieved.
#[clap(long)]
full: bool,
/// unless this is specified, all monitored namespaces are synced
namespace: Option<String>,
},
/// Restore namespace
Restore {
#[clap(long)]
verify: bool,
namespace: String,
out: PathBuf,
},
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
let cmd: Command = Command::parse();

let backend = setup_storage(&cmd.s3_args).await?;
tokio::fs::create_dir_all(&cmd.path).await?;
let mut compactor = Compactor::new(backend.into(), &cmd.path)?;
match cmd.subcommand {
Subcommand::Monitor { namespace } => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
compactor.monitor(&namespace).await?;
println!("monitoring {namespace}");
}
Subcommand::Analyze {
namespace,
list_all,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
println!("stats for {namespace}:");
println!("- segment count: {}", analysis.segment_count());
println!("- last frame_no: {}", analysis.last_frame_no());
let set = analysis.shortest_restore_path();
println!("- shortest restore path len: {}", set.len());
if let Some((first, last)) = compactor.get_segment_range(&namespace)? {
println!(
"- oldest segment: {}-{} ({})",
first.key.start_frame_no, first.key.end_frame_no, first.created_at
);
println!(
"- most recent segment: {}-{} ({})",
last.key.start_frame_no, last.key.end_frame_no, last.created_at
);
}

if list_all {
println!("segments:");
compactor.list_all(&namespace, |info| {
println!(
"- {}-{} ({})",
info.key.start_frame_no, info.key.end_frame_no, info.created_at
);
})?;
}
}
Subcommand::Compact {
strategy,
dry_run,
namespace,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
let strat: Box<dyn PartitionStrategy> = match strategy {
CompactStrategy::Logarithmic => Box::new(LogReductionStrategy),
CompactStrategy::CompactAll => Box::new(IdentityStrategy),
};
let set = analysis.shortest_restore_path();
let partition = strat.partition(&set);

println!("initial shortest restore path len: {}", set.len());
println!("compacting into {} segments", partition.len());
for set in partition.iter() {
println!("- {:?}", set.range().unwrap());
}
if dry_run {
println!("dry run: stopping");
} else {
println!("performing compaction");
let part_len = partition.len();
for (idx, set) in partition.into_iter().enumerate() {
let Some((start, end)) = set.range() else {
continue;
};
println!("compacting {start}-{end} ({}/{})", idx + 1, part_len);
// TODO: we can compact in conccurently
compactor.compact(set).await?;
}
}
}
Subcommand::Sync { full, namespace } => match namespace {
Some(_ns) => {
todo!()
}
None if full => {
compactor.sync_full().await?;
println!("all monitored namespace fully up to date.");
}
_ => todo!(),
},
Subcommand::Restore {
namespace,
out,
verify,
} => {
let namespace = libsql_sys::name::NamespaceName::from_string(namespace);
let analysis = compactor.analyze(&namespace)?;
let set = analysis.shortest_restore_path();
compactor.restore(set, &out).await?;
if verify {
let conn = libsql_sys::rusqlite::Connection::open(&out)?;
conn.pragma_query(None, "integrity_check", |r| {
println!("{r:?}");
Ok(())
})?;
}
}
}

Ok(())
}

async fn setup_storage(opt: &S3Args) -> anyhow::Result<S3Backend<StdIO>> {
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;

let mut builder = config.into_builder();
builder.set_endpoint_url(opt.s3_url.clone());
builder.set_retry_config(RetryConfig::standard().with_max_attempts(10).into());
builder.set_region(Region::new(
opt.s3_region_id.clone().expect("expected aws region"),
));
let cred = Credentials::new(
opt.s3_access_key_id.as_ref().unwrap(),
opt.s3_access_key.as_ref().unwrap(),
None,
None,
"Static",
);
builder.set_credentials_provider(Some(SharedCredentialsProvider::new(cred)));
let config = builder.build();
let backend = S3Backend::from_sdk_config(
config,
opt.s3_bucket.clone().context("missing bucket id")?,
opt.cluster_id.clone().context("missing cluster id")?,
)
.await?;

Ok(backend)
}
13 changes: 7 additions & 6 deletions libsql-wal/src/bins/shell/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@ struct Cli {
"s3_bucket",
"cluster_id",
])]

struct S3Args {
#[arg(long, requires = "S3Args")]
enable_s3: bool,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_DATABASE_ID")]
cluster_id: Option<String>,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_ENDPOINT")]
s3_url: Option<String>,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY")]
s3_access_key: Option<String>,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID")]
s3_access_key_id: Option<String>,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_BUCKET")]
s3_bucket: Option<String>,
#[arg(long)]
#[arg(long, env = "LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION")]
s3_region_id: Option<String>,
}

Expand Down
4 changes: 4 additions & 0 deletions libsql-wal/src/io/buf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ impl<T> ZeroCopyBoxIoBuf<T> {
}
}

pub fn new_uninit(inner: Box<T>) -> Self {
Self { init: 0, inner }
}

fn is_init(&self) -> bool {
self.init == size_of::<T>()
}
Expand Down
4 changes: 3 additions & 1 deletion libsql-wal/src/replication/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use roaring::RoaringBitmap;
use tokio_stream::Stream;
use zerocopy::FromZeroes;

use crate::io::buf::ZeroCopyBoxIoBuf;
use crate::segment::Frame;
use crate::storage::Storage;

Expand Down Expand Up @@ -61,8 +62,9 @@ where
},
};

let (frame, ret) = segment.read_frame(Frame::new_box_zeroed(), offset as u32).await;
let (frame, ret) = segment.read_frame(ZeroCopyBoxIoBuf::new_uninit(Frame::new_box_zeroed()), offset as u32).await;
ret?;
let frame = frame.into_inner();
debug_assert_eq!(frame.header().size_after(), 0, "all frames in a compacted segment should have size_after set to 0");
if frame.header().frame_no() >= until {
yield frame;
Expand Down
32 changes: 26 additions & 6 deletions libsql-wal/src/segment/compacted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ use std::mem::size_of;
use zerocopy::little_endian::{U128 as lu128, U16 as lu16, U32 as lu32, U64 as lu64};
use zerocopy::{AsBytes, FromBytes, FromZeroes};

use crate::io::buf::{ZeroCopyBoxIoBuf, ZeroCopyBuf};
use crate::io::buf::{IoBufMut, ZeroCopyBuf};
use crate::io::FileExt;
use crate::segment::FrameHeader;
use crate::{LIBSQL_MAGIC, LIBSQL_PAGE_SIZE, LIBSQL_WAL_VERSION};

use super::{Frame, Result};
Expand Down Expand Up @@ -82,14 +83,33 @@ impl<F: FileExt> CompactedSegment<F> {
Ok(Self { file, header })
}

pub(crate) async fn read_frame(
pub(crate) fn from_parts(file: F, header: CompactedSegmentDataHeader) -> Self {
Self { header, file }
}

pub(crate) async fn read_frame<B: IoBufMut + Send + 'static>(
&self,
frame: Box<Frame>,
buf: B,
offset: u32,
) -> (Box<Frame>, io::Result<()>) {
) -> (B, io::Result<()>) {
assert_eq!(buf.bytes_init(), 0);
assert_eq!(buf.bytes_total(), size_of::<Frame>());
let offset = size_of::<CompactedSegmentDataHeader>() + size_of::<Frame>() * offset as usize;
let buf = ZeroCopyBoxIoBuf::new(frame);
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
(buf.into_inner(), ret)
(buf, ret)
}

pub(crate) async fn read_page<B: IoBufMut + Send + 'static>(
&self,
buf: B,
offset: u32,
) -> (B, io::Result<()>) {
assert_eq!(buf.bytes_init(), 0);
assert_eq!(buf.bytes_total(), LIBSQL_PAGE_SIZE as usize);
let offset = size_of::<CompactedSegmentDataHeader>()
+ size_of::<Frame>() * offset as usize
+ size_of::<FrameHeader>();
let (buf, ret) = self.file.read_exact_at_async(buf, offset as u64).await;
(buf, ret)
}
}
Loading

0 comments on commit bdb9f20

Please sign in to comment.