diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index f70debf3b7..f1703912b6 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -320,7 +320,13 @@ impl Database { #[cfg(feature = "replication")] /// Sync with primary at least to a given replication index - pub async fn sync_until(&self, replication_index: FrameNo) -> Result { + pub async fn sync_until( + &self, + + + &self, + replication_index: FrameNo, + ) -> Result { if let Some(ctx) = &self.replication_ctx { let mut frame_no: Option = ctx.replicator.committed_frame_no().await; let mut frames_synced: usize = 0; @@ -381,65 +387,146 @@ impl Database { /// Push WAL frames to remote. pub async fn push(&self) -> Result { let sync_ctx = self.sync_ctx.as_ref().unwrap(); - let conn = self.connect()?; + let conn = self.con + nect()?; + let page_size = { - let rows = conn.query("PRAGMA page_size", crate::params::Params::None)?.unwrap(); + let rows = conn + .query("PRAGMA page_size", crate::params::Params::None)? + .unwrap(); let row = rows.next()?.unwrap(); let page_size = row.get::(0)?; - page_size + page_size }; let mut max_frame_no: std::os::raw::c_uint = 0; unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) }; - + let generation = 1; // TODO: Probe from WAL. let start_frame_no = sync_ctx.durable_frame_num + 1; let end_frame_no = max_frame_no; - let mut frame_no = start_frame_no; + let mut frame_no = start_fr + ame_no; + while frame_no <= end_frame_no { // The server returns its maximum frame number. To avoid resending // frames the server already knows about, we need to update the // frame number to the one returned by the server. - let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?; - if max_frame_no > frame_no { + let max_frame_no = self + .push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size) + .await?; + if max_frame_no > frame_no { frame_no = max_frame_no; } frame_no += 1; } let frame_count = end_frame_no - start_frame_no + 1; - Ok(crate::database::Replicated{ - frame_no: None, + Ok(crate::database:: + &self, + ated {, + + + + + + frame_no: None, + frames_synced: frame_count as usize, }) - } + }, + conn.handle(), + frame_no, + + + #[cfg(feature = "sync")] - async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result { - let frame_size: usize = 24+page_size as usize; + async fn push_one_frame(, + + + + &self, + conn: &Connection, + + + generation, + frame_no, + frame_no + 1 + + sync_ctx: &SyncContext, + + uri, + &sync_ctx.auth_token, + frame.to_vec(), + sync_ctx.max_retries, + ) + + generation: u32, + frame_no: u32, + page_size: u32, + ) -> Result { + let frame_size: usize + &self, + + page_size a + s usize;, + + + let frame = vec![0; frame_size]; let rc = unsafe { - libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, frame_size as u32) + libsql_sys::ffi::libsql_wal_get_frame( + conn.handle(), + frame_no, + frame.as_ptr() as *mut _, + frame_size as u32, + + ) }; if rc != 0 { - return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no))); + return Err(crate::errors::Error::SqliteFailure( + rc as std::ffi::c_int, + format!("Failed to get frame: {}", frame_no), + )); } - let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1); - let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?; + let uri = format!( + "{}/sync/{}/{}/{}", + sync_ctx.sync_url, + + res.status() + + generation, + frame_no, + frame_no + 1 + ); + let max_frame_no = self + .push_with_retry( + uri, + &sync_ctx.auth_token, + frame.to_vec(), + sync_ctx.max_retries, + ) + .await?; Ok(max_frame_no) } #[cfg(feature = "sync")] - async fn push_with_retry(&self, uri: String, auth_token: &Option, frame: Vec, max_retries: usize) -> Result { + async fn push_with_retry( + &self, + uri: String, + auth_token: &Option, + frame: Vec, + max_retries: usize, + ) -> Result { let mut nr_retries = 0; loop { let client = reqwest::Client::new(); let mut builder = client.post(uri.to_owned()); - match auth_token { + match auth_token { Some(ref auth_token) => { - builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned())); + builder = builder + .header("Authorization", format!("Bearer {}", auth_token.to_owned())); } None => {} } @@ -450,7 +537,10 @@ impl Database { return Ok(max_frame_no as u32); } if nr_retries > max_retries { - return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status()))); + return Err(crate::errors::Error::ConnectionFailed(format!( + "Failed to push frame: {}", + res.status() + ))); } let delay = std::time::Duration::from_millis(100 * (1 << nr_retries)); tokio::time::sleep(delay).await;