Skip to content

Commit

Permalink
review edits
Browse files Browse the repository at this point in the history
  • Loading branch information
MarinPostma committed Nov 20, 2023
1 parent 06904dd commit be6330f
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
2 changes: 1 addition & 1 deletion libsql-server/src/replication/primary/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,7 @@ impl LogFile {
tracing::info!("performing log compaction");
// To perform the compaction, we create a new, empty file in the `to_compact` directory.
// We will then atomically swap that file with the current log file.
// I case of crash, when filling the compactor job queue, if we find that we find a log
// In case of a crash, when filling the compactor job queue, if we find that we find a log
// file that doesn't contains only a header, we can safely assume that it was from a
// previous crash that happenned in the middle of this operation.
let to_compact_id = Uuid::new_v4();
Expand Down
28 changes: 18 additions & 10 deletions libsql-server/src/replication/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,19 @@ use std::path::{Path, PathBuf};
use std::pin::Pin;

Check failure on line 5 in libsql-server/src/replication/snapshot.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/replication/snapshot.rs
use std::str::FromStr;
use std::sync::Arc;
use std::future::Future;

use anyhow::{bail, Context};
use bytemuck::bytes_of;
use futures::TryStreamExt;
use futures_core::Future;
use libsql_replication::frame::FrameMut;
use libsql_replication::snapshot::{SnapshotFile, SnapshotFileHeader};
use once_cell::sync::Lazy;
use regex::Regex;
use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio::sync::mpsc;
use tokio_stream::{Stream, StreamExt};
use tokio_util::sync::ReusableBoxFuture;
use uuid::Uuid;

use crate::namespace::NamespaceName;
Expand Down Expand Up @@ -277,27 +278,28 @@ impl SnapshotMerger {
) -> anyhow::Result<()> {
let mut snapshots = Self::init_snapshot_info_list(db_path).await?;
let mut working = false;
let mut job: Pin<Box<dyn Future<Output = anyhow::Result<_>> + Sync + Send>> =
Box::pin(std::future::pending());
let mut job = ReusableBoxFuture::<anyhow::Result<_>>::new(std::future::pending());
let db_path: Arc<Path> = db_path.to_path_buf().into();
loop {
tokio::select! {
Some((name, size, db_page_count)) = receiver.recv() => {
snapshots.push((name, size));
if !working && Self::should_compact(&snapshots, db_page_count) {
let snapshots = std::mem::take(&mut snapshots);
let fut = async move {
let db_path = db_path.clone();
let handle = tokio::spawn(async move {
let compacted_snapshot_info =
Self::merge_snapshots(snapshots, db_path, log_id).await?;
Ok(compacted_snapshot_info)
};
job = Box::pin(fut);
Self::merge_snapshots(snapshots, db_path.as_ref(), log_id).await?;
anyhow::Result::<_, anyhow::Error>::Ok(compacted_snapshot_info)
});
job.set(async move { Ok(handle.await?) });
working = true;
}
}
ret = &mut job, if working => {
working = false;
job = Box::pin(std::future::pending());
let ret = ret?;
job.set(std::future::pending());
let ret = ret??;
// the new merged snapshot is prepended to the snapshot list
snapshots.insert(0, ret);
}
Expand Down Expand Up @@ -515,6 +517,12 @@ async fn perform_compaction(
file_to_compact: LogFile,
log_id: Uuid,

Check failure on line 518 in libsql-server/src/replication/snapshot.rs

View workflow job for this annotation

GitHub Actions / Run Checks

Diff in /home/runner/work/libsql/libsql/libsql-server/src/replication/snapshot.rs
) -> anyhow::Result<(String, u64, u32)> {
let header = file_to_compact.header();
tracing::info!("attempting to compact {} frame from logfile {}, starting at frame no {}",
header.frame_count,
Uuid::from_u128(header.log_id),
header.start_frame_no,
);
let mut builder = SnapshotBuilder::new(db_path, log_id).await?;
builder
.append_frames(file_to_compact.into_rev_stream_mut())
Expand Down

0 comments on commit be6330f

Please sign in to comment.