From eb59a043fd057dd1f6db57a9c1afb2f8f670a8e8 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Wed, 2 Oct 2024 12:03:34 +0200 Subject: [PATCH] correctly report error on compaction failure --- libsql-wal/src/storage/compaction/mod.rs | 73 +++++++++++++++--------- 1 file changed, 46 insertions(+), 27 deletions(-) diff --git a/libsql-wal/src/storage/compaction/mod.rs b/libsql-wal/src/storage/compaction/mod.rs index 8157f81f98..e801c13ad4 100644 --- a/libsql-wal/src/storage/compaction/mod.rs +++ b/libsql-wal/src/storage/compaction/mod.rs @@ -207,7 +207,7 @@ impl Compactor { out_index: Option<&'a mut MapBuilder>>, mut progress: impl FnMut(u32, u32) + 'a, ) -> ( - impl Stream> + 'a, + impl Stream> + 'a, CompactedSegmentHeader, ) where @@ -363,7 +363,7 @@ impl Compactor { let mut builder = MapBuilder::new(Vec::new()).unwrap(); let (sender, mut receiver) = tokio::sync::mpsc::channel::>(1); - let handle: JoinHandle> = match out_path { + let mut handle: JoinHandle> = match out_path { Some(path) => { let path = path.join(&format!("{new_key}.seg")); let mut data_file = tokio::fs::File::create(path).await?; @@ -397,37 +397,56 @@ impl Compactor { } }; - let (stream, segment_header) = self - .dedup_stream(set.clone(), Some(&mut builder), progress) - .await; + let send_fut = async { + let (stream, segment_header) = self + .dedup_stream(set.clone(), Some(&mut builder), progress) + .await; - sender - .send(Ok(Bytes::copy_from_slice(segment_header.as_bytes()))) - .await - .unwrap(); + if sender + .send(Ok(Bytes::copy_from_slice(segment_header.as_bytes()))) + .await + .is_err() + { + return; + } - { - tokio::pin!(stream); - loop { - match stream.next().await { - Some(Ok((frame_header, frame_data))) => { - sender - .send(Ok(Bytes::copy_from_slice(frame_header.as_bytes()))) - .await - .unwrap(); - sender.send(Ok(frame_data)).await.unwrap(); - } - Some(Err(_e)) => { - panic!() - // sender.send(Err(e.into())).await.unwrap(); + { + tokio::pin!(stream); + loop { + match stream.next().await { + Some(Ok((frame_header, frame_data))) => { + if sender + .send(Ok(Bytes::copy_from_slice(frame_header.as_bytes()))) + .await + .is_err() + { + return; + } + if sender.send(Ok(frame_data)).await.is_err() { + return; + } + } + Some(Err(e)) => { + sender.send(Err(e.into())).await.unwrap(); + return; + } + None => break, } - None => break, } + + drop(sender); } - drop(sender); - } + }; + + tokio::select! { + res = &mut handle => { + res.unwrap()?; + }, + _ = send_fut => { + handle.await.unwrap()?; - handle.await.unwrap()?; + } + } let index = builder.into_inner().unwrap(); match out_path {