From 19e78005d5f4f910f1f2c246647111ea1b247055 Mon Sep 17 00:00:00 2001 From: Pekka Enberg Date: Wed, 11 Dec 2024 14:30:54 +0200 Subject: [PATCH] libsql: Improve WAL sync logic This improves the WAL sync to follow this logic: - If there are no local writes, attempt to pull. We know there are no local writes if the cached remote max frame number is greater than or equal to local max frame number. - If there are local writes, attempt to push. We know there are local writes if the cached remote max number is less than lcoal max frame number. This also removes the `offline_writes_pull.rs` example and augments `offline_writes.rs` to sync before attempting to write. --- libsql/examples/offline_writes.rs | 3 ++ libsql/examples/offline_writes_pull.rs | 55 -------------------------- libsql/src/local/database.rs | 54 +++++++++++++------------ 3 files changed, 32 insertions(+), 80 deletions(-) delete mode 100644 libsql/examples/offline_writes_pull.rs diff --git a/libsql/examples/offline_writes.rs b/libsql/examples/offline_writes.rs index ffea6a8fba..a59dc166c4 100644 --- a/libsql/examples/offline_writes.rs +++ b/libsql/examples/offline_writes.rs @@ -39,6 +39,9 @@ async fn main() { let conn = db.connect().unwrap(); + println!("Syncing database from remote..."); + db.sync().await.unwrap(); + conn.execute( r#" CREATE TABLE IF NOT EXISTS guest_book_entries ( diff --git a/libsql/examples/offline_writes_pull.rs b/libsql/examples/offline_writes_pull.rs deleted file mode 100644 index 3505df75c2..0000000000 --- a/libsql/examples/offline_writes_pull.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Example of using a offline writes with libSQL. - -use libsql::Builder; - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - // The local database path where the data will be stored. - let db_path = std::env::var("LIBSQL_DB_PATH") - .map_err(|_| { - eprintln!( - "Please set the LIBSQL_DB_PATH environment variable to set to local database path." - ) - }) - .unwrap(); - - // The remote sync URL to use. - let sync_url = std::env::var("LIBSQL_SYNC_URL") - .map_err(|_| { - eprintln!( - "Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL." - ) - }) - .unwrap(); - - // The authentication token to use. - let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string()); - - let db_builder = Builder::new_synced_database(db_path, sync_url, auth_token); - - let db = match db_builder.build().await { - Ok(db) => db, - Err(error) => { - eprintln!("Error connecting to remote sync server: {}", error); - return; - } - }; - - println!("Syncing database from remote..."); - db.sync().await.unwrap(); - - let conn = db.connect().unwrap(); - let mut results = conn - .query("SELECT * FROM guest_book_entries", ()) - .await - .unwrap(); - println!("Guest book entries:"); - while let Some(row) = results.next().await.unwrap() { - let text: String = row.get(0).unwrap(); - println!(" {}", text); - } - - println!("Done!"); -} diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 72b4c5d0e5..726bb98a4e 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -391,29 +391,36 @@ impl Database { use crate::sync::SyncError; use crate::Error; - match self.try_push().await { - Ok(rep) => Ok(rep), - Err(Error::Sync(err)) => { - // Retry the sync because we are ahead of the server and we need to push some older - // frames. - if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = - err.downcast_ref::() - { - tracing::debug!("got InvalidPushFrameNo, retrying push"); - self.try_push().await - } else { - Err(Error::Sync(err)) + let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await; + let conn = self.connect()?; + + let durable_frame_no = sync_ctx.durable_frame_num(); + let max_frame_no = conn.wal_frame_count(); + + if max_frame_no > durable_frame_no { + match self.try_push(&mut sync_ctx, &conn).await { + Ok(rep) => Ok(rep), + Err(Error::Sync(err)) => { + // Retry the sync because we are ahead of the server and we need to push some older + // frames. + if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = + err.downcast_ref::() + { + tracing::debug!("got InvalidPushFrameNo, retrying push"); + self.try_push(&mut sync_ctx, &conn).await + } else { + Err(Error::Sync(err)) + } } + Err(e) => Err(e), } - Err(e) => Err(e), + } else { + self.try_pull(&mut sync_ctx, &conn).await } } #[cfg(feature = "sync")] - async fn try_push(&self) -> Result { - let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await; - let conn = self.connect()?; - + async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { let page_size = { let rows = conn .query("PRAGMA page_size", crate::params::Params::None)? @@ -424,9 +431,11 @@ impl Database { }; let max_frame_no = conn.wal_frame_count(); - if max_frame_no == 0 { - return self.try_pull(&mut sync_ctx).await; + return Ok(crate::database::Replicated { + frame_no: None, + frames_synced: 0, + }); } let generation = sync_ctx.generation(); // TODO: Probe from WAL. @@ -452,10 +461,6 @@ impl Database { sync_ctx.write_metadata().await?; - if start_frame_no > end_frame_no { - return self.try_pull(&mut sync_ctx).await; - } - // TODO(lucio): this can underflow if the server previously returned a higher max_frame_no // than what we have stored here. let frame_count = end_frame_no - start_frame_no + 1; @@ -466,10 +471,9 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result { + async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { let generation = sync_ctx.generation(); let mut frame_no = sync_ctx.durable_frame_num() + 1; - let conn = self.connect()?; conn.wal_insert_begin()?; let mut err = None;