From 692bce870c1b6016e85a9492c666386deaffeb96 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 11 Dec 2024 13:21:48 +0200 Subject: [PATCH] libsql: Fix WAL pull logic on last frame The WAL sync protocol only allows you to fetch frames for a range. While the protocol does not have mechanism to determine what is the latest frame on the server for pull, it does signal with HTTP status code 400 when you attempt to read a frame that does not exists. Let's use this signal to fix WAL pull logic not to fail always. In the future, we might want to consider extending the protocol to allow clients to proble for latest frame. That, however, will make the protocol a bit more chatty, so I am trying to avoid that as long as I can. --- libsql/src/local/database.rs | 5 ++++- libsql/src/sync.rs | 19 +++++++++++++------ 2 files changed, 17 insertions(+), 7 deletions(-) diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 8f836daef3..72b4c5d0e5 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -476,10 +476,13 @@ impl Database { loop { match sync_ctx.pull_one_frame(generation, frame_no).await { - Ok(frame) => { + Ok(Some(frame)) => { conn.wal_insert_frame(&frame)?; frame_no += 1; } + Ok(None) => { + break; + } Err(e) => { tracing::debug!("pull_one_frame error: {:?}", e); err.replace(e); diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index f876295f95..ed625f7b68 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -107,7 +107,7 @@ impl SyncContext { } #[tracing::instrument(skip(self))] - pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result { + pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result> { let uri = format!( "{}/sync/{}/{}/{}", self.sync_url, @@ -116,9 +116,13 @@ impl SyncContext { frame_no + 1 ); tracing::debug!("pulling frame"); - let frame = self.pull_with_retry(uri, self.max_retries).await?; - self.durable_frame_num = frame_no; - Ok(frame) + match self.pull_with_retry(uri, self.max_retries).await? { + Some(frame) => { + self.durable_frame_num = frame_no; + Ok(Some(frame)) + } + None => Ok(None), + } } #[tracing::instrument(skip(self, frame))] @@ -232,7 +236,7 @@ impl SyncContext { } } - async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result { + async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result> { let mut nr_retries = 0; loop { let mut req = http::Request::builder().method("GET").uri(uri.clone()); @@ -256,7 +260,10 @@ impl SyncContext { let frame = hyper::body::to_bytes(res.into_body()) .await .map_err(SyncError::HttpBody)?; - return Ok(frame); + return Ok(Some(frame)); + } + if res.status() == StatusCode::BAD_REQUEST { + return Ok(None); } // If we've retried too many times or the error is not a server error, // return the error.