Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent reporting HttpDispatch errors in sync_offline #1923

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions libsql/src/local/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ impl Connection {
Ok(buf)
}

pub(crate) fn wal_insert_begin(&self) -> Result<()> {
fn wal_insert_begin(&self) -> Result<()> {
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_begin(self.handle()) };
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(
Expand All @@ -506,7 +506,7 @@ impl Connection {
Ok(())
}

pub(crate) fn wal_insert_end(&self) -> Result<()> {
fn wal_insert_end(&self) -> Result<()> {
let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_end(self.handle()) };
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(
Expand All @@ -517,7 +517,7 @@ impl Connection {
Ok(())
}

pub(crate) fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> {
fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> {
let rc = unsafe {
libsql_sys::ffi::libsql_wal_insert_frame(
self.handle(),
Expand All @@ -534,6 +534,30 @@ impl Connection {
}
Ok(())
}

pub(crate) fn wal_insert_handle(&self) -> Result<WalInsertHandle<'_>> {
self.wal_insert_begin()?;
Ok(WalInsertHandle { conn: self })
}
}

pub(crate) struct WalInsertHandle<'a> {
conn: &'a Connection,
}

impl WalInsertHandle<'_> {
pub fn insert(&self, frame: &[u8]) -> Result<()> {
self.conn.wal_insert_frame(frame)
}
}

impl Drop for WalInsertHandle<'_> {
fn drop(&mut self) {
if let Err(err) = self.conn.wal_insert_end() {
tracing::error!("{:?}", err);
Err(err).unwrap()
}
}
}

impl fmt::Debug for Connection {
Expand Down
50 changes: 28 additions & 22 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -478,9 +478,7 @@ impl Database {
Err(Error::Sync(err)) => {
// Retry the sync because we are ahead of the server and we need to push some older
// frames.
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) =
err.downcast_ref::<SyncError>()
{
if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = err.downcast_ref() {
tracing::debug!("got InvalidPushFrameNo, retrying push");
self.try_push(&mut sync_ctx, &conn).await
} else {
Expand All @@ -492,6 +490,22 @@ impl Database {
} else {
self.try_pull(&mut sync_ctx, &conn).await
}
.or_else(|err| {
let Error::Sync(err) = err else {
return Err(err);
};

// TODO(levy): upcasting should be done *only* at the API boundary, doing this in
// internal code just sucks.
let Some(SyncError::HttpDispatch(_)) = err.downcast_ref() else {
return Err(Error::Sync(err));
};

Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 0,
})
})
}

#[cfg(feature = "sync")]
Expand Down Expand Up @@ -557,37 +571,29 @@ impl Database {
) -> Result<crate::database::Replicated> {
let generation = sync_ctx.generation();
let mut frame_no = sync_ctx.durable_frame_num() + 1;
conn.wal_insert_begin()?;

let mut err = None;
let insert_handle = conn.wal_insert_handle()?;

loop {
match sync_ctx.pull_one_frame(generation, frame_no).await {
Ok(Some(frame)) => {
conn.wal_insert_frame(&frame)?;
insert_handle.insert(&frame)?;
frame_no += 1;
}
Ok(None) => {
break;
sync_ctx.write_metadata().await?;
return Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
});
}
Err(e) => {
tracing::debug!("pull_one_frame error: {:?}", e);
err.replace(e);
break;
Err(err) => {
tracing::debug!("pull_one_frame error: {:?}", err);
sync_ctx.write_metadata().await?;
return Err(err);
}
}
}
conn.wal_insert_end()?;
sync_ctx.write_metadata().await?;

if let Some(err) = err {
Err(err)
} else {
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: 1,
})
}
}

pub(crate) fn path(&self) -> &str {
Expand Down
Loading