Skip to content

Commit

Permalink
libsql: Fix WAL pull logic on last frame
Browse files Browse the repository at this point in the history
The WAL sync protocol only allows you to fetch frames for a range.
While the protocol does not have mechanism to determine what is the
latest frame on the server for pull, it does signal with HTTP status
code 400 when you attempt to read a frame that does not exists.

Let's use this signal to fix WAL pull logic not to fail always. In the
future, we might want to consider extending the protocol to allow
clients to proble for latest frame. That, however, will make the
protocol a bit more chatty, so I am trying to avoid that as long as I
can.
  • Loading branch information
penberg committed Dec 11, 2024
1 parent 3f5ccae commit be0fcbc
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
5 changes: 4 additions & 1 deletion libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,10 +476,13 @@ impl Database {

loop {
match sync_ctx.pull_one_frame(generation, frame_no).await {
Ok(frame) => {
Ok(Some(frame)) => {
conn.wal_insert_frame(&frame)?;
frame_no += 1;
}
Ok(None) => {
break;
}
Err(e) => {
tracing::debug!("pull_one_frame error: {:?}", e);
err.replace(e);
Expand Down
9 changes: 6 additions & 3 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl SyncContext {
}

#[tracing::instrument(skip(self))]
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Bytes> {
pub(crate) async fn pull_one_frame(&mut self, generation: u32, frame_no: u32) -> Result<Option<Bytes>> {
let uri = format!(
"{}/sync/{}/{}/{}",
self.sync_url,
Expand Down Expand Up @@ -232,7 +232,7 @@ impl SyncContext {
}
}

async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Bytes> {
async fn pull_with_retry(&self, uri: String, max_retries: usize) -> Result<Option<Bytes>> {
let mut nr_retries = 0;
loop {
let mut req = http::Request::builder().method("GET").uri(uri.clone());
Expand All @@ -256,7 +256,10 @@ impl SyncContext {
let frame = hyper::body::to_bytes(res.into_body())
.await
.map_err(SyncError::HttpBody)?;
return Ok(frame);
return Ok(Some(frame));
}
if res.status() == StatusCode::BAD_REQUEST {
return Ok(None);
}
// If we've retried too many times or the error is not a server error,
// return the error.
Expand Down

0 comments on commit be0fcbc

Please sign in to comment.