From be6330f34e972a6e1e86da1eeb119b6a16ab23ab Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 20 Nov 2023 18:40:10 +0100 Subject: [PATCH] review edits --- .../src/replication/primary/logger.rs | 2 +- libsql-server/src/replication/snapshot.rs | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 11 deletions(-) diff --git a/libsql-server/src/replication/primary/logger.rs b/libsql-server/src/replication/primary/logger.rs index 01f0afc8c3..928f354423 100644 --- a/libsql-server/src/replication/primary/logger.rs +++ b/libsql-server/src/replication/primary/logger.rs @@ -625,7 +625,7 @@ impl LogFile { tracing::info!("performing log compaction"); // To perform the compaction, we create a new, empty file in the `to_compact` directory. // We will then atomically swap that file with the current log file. - // I case of crash, when filling the compactor job queue, if we find that we find a log + // In case of a crash, when filling the compactor job queue, if we find that we find a log // file that doesn't contains only a header, we can safely assume that it was from a // previous crash that happenned in the middle of this operation. let to_compact_id = Uuid::new_v4(); diff --git a/libsql-server/src/replication/snapshot.rs b/libsql-server/src/replication/snapshot.rs index de1bc4bcbe..1393a7c233 100644 --- a/libsql-server/src/replication/snapshot.rs +++ b/libsql-server/src/replication/snapshot.rs @@ -5,11 +5,11 @@ use std::path::{Path, PathBuf}; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::future::Future; use anyhow::{bail, Context}; use bytemuck::bytes_of; use futures::TryStreamExt; -use futures_core::Future; use libsql_replication::frame::FrameMut; use libsql_replication::snapshot::{SnapshotFile, SnapshotFileHeader}; use once_cell::sync::Lazy; @@ -17,6 +17,7 @@ use regex::Regex; use tokio::io::{AsyncSeekExt, AsyncWriteExt}; use tokio::sync::mpsc; use tokio_stream::{Stream, StreamExt}; +use tokio_util::sync::ReusableBoxFuture; use uuid::Uuid; use crate::namespace::NamespaceName; @@ -277,27 +278,28 @@ impl SnapshotMerger { ) -> anyhow::Result<()> { let mut snapshots = Self::init_snapshot_info_list(db_path).await?; let mut working = false; - let mut job: Pin> + Sync + Send>> = - Box::pin(std::future::pending()); + let mut job = ReusableBoxFuture::>::new(std::future::pending()); + let db_path: Arc = db_path.to_path_buf().into(); loop { tokio::select! { Some((name, size, db_page_count)) = receiver.recv() => { snapshots.push((name, size)); if !working && Self::should_compact(&snapshots, db_page_count) { let snapshots = std::mem::take(&mut snapshots); - let fut = async move { + let db_path = db_path.clone(); + let handle = tokio::spawn(async move { let compacted_snapshot_info = - Self::merge_snapshots(snapshots, db_path, log_id).await?; - Ok(compacted_snapshot_info) - }; - job = Box::pin(fut); + Self::merge_snapshots(snapshots, db_path.as_ref(), log_id).await?; + anyhow::Result::<_, anyhow::Error>::Ok(compacted_snapshot_info) + }); + job.set(async move { Ok(handle.await?) }); working = true; } } ret = &mut job, if working => { working = false; - job = Box::pin(std::future::pending()); - let ret = ret?; + job.set(std::future::pending()); + let ret = ret??; // the new merged snapshot is prepended to the snapshot list snapshots.insert(0, ret); } @@ -515,6 +517,12 @@ async fn perform_compaction( file_to_compact: LogFile, log_id: Uuid, ) -> anyhow::Result<(String, u64, u32)> { + let header = file_to_compact.header(); + tracing::info!("attempting to compact {} frame from logfile {}, starting at frame no {}", + header.frame_count, + Uuid::from_u128(header.log_id), + header.start_frame_no, + ); let mut builder = SnapshotBuilder::new(db_path, log_id).await?; builder .append_frames(file_to_compact.into_rev_stream_mut())