diff --git a/libsql/examples/offline_writes.rs b/libsql/examples/offline_writes.rs index ffea6a8fba..a59dc166c4 100644 --- a/libsql/examples/offline_writes.rs +++ b/libsql/examples/offline_writes.rs @@ -39,6 +39,9 @@ async fn main() { let conn = db.connect().unwrap(); + println!("Syncing database from remote..."); + db.sync().await.unwrap(); + conn.execute( r#" CREATE TABLE IF NOT EXISTS guest_book_entries ( diff --git a/libsql/examples/offline_writes_pull.rs b/libsql/examples/offline_writes_pull.rs deleted file mode 100644 index 3505df75c2..0000000000 --- a/libsql/examples/offline_writes_pull.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Example of using a offline writes with libSQL. - -use libsql::Builder; - -#[tokio::main] -async fn main() { - tracing_subscriber::fmt::init(); - - // The local database path where the data will be stored. - let db_path = std::env::var("LIBSQL_DB_PATH") - .map_err(|_| { - eprintln!( - "Please set the LIBSQL_DB_PATH environment variable to set to local database path." - ) - }) - .unwrap(); - - // The remote sync URL to use. - let sync_url = std::env::var("LIBSQL_SYNC_URL") - .map_err(|_| { - eprintln!( - "Please set the LIBSQL_SYNC_URL environment variable to set to remote sync URL." - ) - }) - .unwrap(); - - // The authentication token to use. - let auth_token = std::env::var("LIBSQL_AUTH_TOKEN").unwrap_or("".to_string()); - - let db_builder = Builder::new_synced_database(db_path, sync_url, auth_token); - - let db = match db_builder.build().await { - Ok(db) => db, - Err(error) => { - eprintln!("Error connecting to remote sync server: {}", error); - return; - } - }; - - println!("Syncing database from remote..."); - db.sync().await.unwrap(); - - let conn = db.connect().unwrap(); - let mut results = conn - .query("SELECT * FROM guest_book_entries", ()) - .await - .unwrap(); - println!("Guest book entries:"); - while let Some(row) = results.next().await.unwrap() { - let text: String = row.get(0).unwrap(); - println!(" {}", text); - } - - println!("Done!"); -} diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 72b4c5d0e5..726bb98a4e 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -391,29 +391,36 @@ impl Database { use crate::sync::SyncError; use crate::Error; - match self.try_push().await { - Ok(rep) => Ok(rep), - Err(Error::Sync(err)) => { - // Retry the sync because we are ahead of the server and we need to push some older - // frames. - if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = - err.downcast_ref::() - { - tracing::debug!("got InvalidPushFrameNo, retrying push"); - self.try_push().await - } else { - Err(Error::Sync(err)) + let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await; + let conn = self.connect()?; + + let durable_frame_no = sync_ctx.durable_frame_num(); + let max_frame_no = conn.wal_frame_count(); + + if max_frame_no > durable_frame_no { + match self.try_push(&mut sync_ctx, &conn).await { + Ok(rep) => Ok(rep), + Err(Error::Sync(err)) => { + // Retry the sync because we are ahead of the server and we need to push some older + // frames. + if let Some(SyncError::InvalidPushFrameNoLow(_, _)) = + err.downcast_ref::() + { + tracing::debug!("got InvalidPushFrameNo, retrying push"); + self.try_push(&mut sync_ctx, &conn).await + } else { + Err(Error::Sync(err)) + } } + Err(e) => Err(e), } - Err(e) => Err(e), + } else { + self.try_pull(&mut sync_ctx, &conn).await } } #[cfg(feature = "sync")] - async fn try_push(&self) -> Result { - let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await; - let conn = self.connect()?; - + async fn try_push(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { let page_size = { let rows = conn .query("PRAGMA page_size", crate::params::Params::None)? @@ -424,9 +431,11 @@ impl Database { }; let max_frame_no = conn.wal_frame_count(); - if max_frame_no == 0 { - return self.try_pull(&mut sync_ctx).await; + return Ok(crate::database::Replicated { + frame_no: None, + frames_synced: 0, + }); } let generation = sync_ctx.generation(); // TODO: Probe from WAL. @@ -452,10 +461,6 @@ impl Database { sync_ctx.write_metadata().await?; - if start_frame_no > end_frame_no { - return self.try_pull(&mut sync_ctx).await; - } - // TODO(lucio): this can underflow if the server previously returned a higher max_frame_no // than what we have stored here. let frame_count = end_frame_no - start_frame_no + 1; @@ -466,10 +471,9 @@ impl Database { } #[cfg(feature = "sync")] - async fn try_pull(&self, sync_ctx: &mut SyncContext) -> Result { + async fn try_pull(&self, sync_ctx: &mut SyncContext, conn: &Connection) -> Result { let generation = sync_ctx.generation(); let mut frame_no = sync_ctx.durable_frame_num() + 1; - let conn = self.connect()?; conn.wal_insert_begin()?; let mut err = None;