diff --git a/libsql/src/local/connection.rs b/libsql/src/local/connection.rs index 56ff743ee5..000112c999 100644 --- a/libsql/src/local/connection.rs +++ b/libsql/src/local/connection.rs @@ -495,7 +495,7 @@ impl Connection { Ok(buf) } - pub(crate) fn wal_insert_begin(&self) -> Result<()> { + fn wal_insert_begin(&self) -> Result<()> { let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_begin(self.handle()) }; if rc != 0 { return Err(crate::errors::Error::SqliteFailure( @@ -506,7 +506,7 @@ impl Connection { Ok(()) } - pub(crate) fn wal_insert_end(&self) -> Result<()> { + fn wal_insert_end(&self) -> Result<()> { let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_end(self.handle()) }; if rc != 0 { return Err(crate::errors::Error::SqliteFailure( @@ -517,7 +517,7 @@ impl Connection { Ok(()) } - pub(crate) fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> { + fn wal_insert_frame(&self, frame: &[u8]) -> Result<()> { let rc = unsafe { libsql_sys::ffi::libsql_wal_insert_frame( self.handle(), @@ -534,6 +534,30 @@ impl Connection { } Ok(()) } + + pub(crate) fn wal_insert_handle(&self) -> Result> { + self.wal_insert_begin()?; + Ok(WalInsertHandle { conn: self }) + } +} + +pub(crate) struct WalInsertHandle<'a> { + conn: &'a Connection, +} + +impl WalInsertHandle<'_> { + pub fn insert(&self, frame: &[u8]) -> Result<()> { + self.conn.wal_insert_frame(frame) + } +} + +impl Drop for WalInsertHandle<'_> { + fn drop(&mut self) { + if let Err(err) = self.conn.wal_insert_end() { + tracing::error!("{:?}", err); + Err(err).unwrap() + } + } } impl fmt::Debug for Connection { diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index ce8bd7aa82..48170c58ab 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -478,9 +478,7 @@ impl Database { Err(Error::Sync(err)) => { // Retry the sync because we are ahead of the server and we need to push some older // frames. - if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = - err.downcast_ref::() - { + if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = err.downcast_ref() { tracing::debug!("got InvalidPushFrameNo, retrying push"); self.try_push(&mut sync_ctx, &conn).await } else { @@ -492,6 +490,22 @@ impl Database { } else { self.try_pull(&mut sync_ctx, &conn).await } + .or_else(|err| { + let Error::Sync(err) = err else { + return Err(err); + }; + + // TODO(levy): upcasting should be done *only* at the API boundary, doing this in + // internal code just sucks. + let Some(SyncError::HttpDispatch(_)) = err.downcast_ref() else { + return Err(Error::Sync(err)); + }; + + Ok(crate::database::Replicated { + frame_no: None, + frames_synced: 0, + }) + }) } #[cfg(feature = "sync")] @@ -557,37 +571,29 @@ impl Database { ) -> Result { let generation = sync_ctx.generation(); let mut frame_no = sync_ctx.durable_frame_num() + 1; - conn.wal_insert_begin()?; - let mut err = None; + let insert_handle = conn.wal_insert_handle()?; loop { match sync_ctx.pull_one_frame(generation, frame_no).await { Ok(Some(frame)) => { - conn.wal_insert_frame(&frame)?; + insert_handle.insert(&frame)?; frame_no += 1; } Ok(None) => { - break; + sync_ctx.write_metadata().await?; + return Ok(crate::database::Replicated { + frame_no: None, + frames_synced: 1, + }); } - Err(e) => { - tracing::debug!("pull_one_frame error: {:?}", e); - err.replace(e); - break; + Err(err) => { + tracing::debug!("pull_one_frame error: {:?}", err); + sync_ctx.write_metadata().await?; + return Err(err); } } } - conn.wal_insert_end()?; - sync_ctx.write_metadata().await?; - - if let Some(err) = err { - Err(err) - } else { - Ok(crate::database::Replicated { - frame_no: None, - frames_synced: 1, - }) - } } pub(crate) fn path(&self) -> &str {