From 508d72506dc9075eff5452d272aa1ef821c48bc6 Mon Sep 17 00:00:00 2001 From: Lucio Franco Date: Thu, 14 Nov 2024 16:18:56 -0500 Subject: [PATCH] libsql: rework sync v2 structure --- libsql/src/local/database.rs | 78 ++++++++++++++++++++++++++++-------- libsql/src/sync.rs | 60 +++++++++++++++++++++++++-- 2 files changed, 118 insertions(+), 20 deletions(-) diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index f70debf3b7..fb51f6500b 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -33,7 +33,7 @@ pub struct Database { #[cfg(feature = "replication")] pub replication_ctx: Option, #[cfg(feature = "sync")] - pub sync_ctx: Option, + pub sync_ctx: Option>, } impl Database { @@ -143,7 +143,7 @@ impl Database { endpoint }; let mut db = Database::open(&db_path, flags)?; - db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token))); + db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token), &db_path)); Ok(db) } @@ -320,7 +320,10 @@ impl Database { #[cfg(feature = "replication")] /// Sync with primary at least to a given replication index - pub async fn sync_until(&self, replication_index: FrameNo) -> Result { + pub async fn sync_until( + &self, + replication_index: FrameNo, + ) -> Result { if let Some(ctx) = &self.replication_ctx { let mut frame_no: Option = ctx.replicator.committed_frame_no().await; let mut frames_synced: usize = 0; @@ -384,7 +387,9 @@ impl Database { let conn = self.connect()?; let page_size = { - let rows = conn.query("PRAGMA page_size", crate::params::Params::None)?.unwrap(); + let rows = conn + .query("PRAGMA page_size", crate::params::Params::None)? + .unwrap(); let row = rows.next()?.unwrap(); let page_size = row.get::(0)?; page_size @@ -392,17 +397,21 @@ impl Database { let mut max_frame_no: std::os::raw::c_uint = 0; unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) }; - + let generation = 1; // TODO: Probe from WAL. let start_frame_no = sync_ctx.durable_frame_num + 1; let end_frame_no = max_frame_no; + // let max_frame_no = sync_ctx.max_frame_no(); + let mut frame_no = start_frame_no; while frame_no <= end_frame_no { // The server returns its maximum frame number. To avoid resending // frames the server already knows about, we need to update the // frame number to the one returned by the server. - let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?; + let max_frame_no = self + .push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size) + .await?; if max_frame_no > frame_no { frame_no = max_frame_no; } @@ -410,36 +419,68 @@ impl Database { } let frame_count = end_frame_no - start_frame_no + 1; - Ok(crate::database::Replicated{ + Ok(crate::database::Replicated { frame_no: None, frames_synced: frame_count as usize, }) } #[cfg(feature = "sync")] - async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result { - let frame_size: usize = 24+page_size as usize; + async fn push_one_frame( + &self, + conn: &Connection, + sync_url: &str, + sync_auth_token: &Option, + sync_max_retries: usize, + generation: u32, + frame_no: u32, + page_size: u32, + ) -> Result { + let frame_size: usize = 24 + page_size as usize; let frame = vec![0; frame_size]; let rc = unsafe { - libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, frame_size as u32) + libsql_sys::ffi::libsql_wal_get_frame( + conn.handle(), + frame_no, + frame.as_ptr() as *mut _, + frame_size as u32, + ) }; if rc != 0 { - return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no))); + return Err(crate::errors::Error::SqliteFailure( + rc as std::ffi::c_int, + format!("Failed to get frame: {}", frame_no), + )); } - let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1); - let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?; + let uri = format!( + "{}/sync/{}/{}/{}", + &sync_url, + generation, + frame_no, + frame_no + 1 + ); + let max_frame_no = self + .push_with_retry(uri, &sync_auth_token, frame.to_vec(), sync_max_retries) + .await?; Ok(max_frame_no) } #[cfg(feature = "sync")] - async fn push_with_retry(&self, uri: String, auth_token: &Option, frame: Vec, max_retries: usize) -> Result { + async fn push_with_retry( + &self, + uri: String, + auth_token: &Option, + frame: Vec, + max_retries: usize, + ) -> Result { let mut nr_retries = 0; loop { let client = reqwest::Client::new(); let mut builder = client.post(uri.to_owned()); - match auth_token { + match auth_token { Some(ref auth_token) => { - builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned())); + builder = builder + .header("Authorization", format!("Bearer {}", auth_token.to_owned())); } None => {} } @@ -450,7 +491,10 @@ impl Database { return Ok(max_frame_no as u32); } if nr_retries > max_retries { - return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status()))); + return Err(crate::errors::Error::ConnectionFailed(format!( + "Failed to push frame: {}", + res.status() + ))); } let delay = std::time::Duration::from_millis(100 * (1 << nr_retries)); tokio::time::sleep(delay).await; diff --git a/libsql/src/sync.rs b/libsql/src/sync.rs index bd8a241ded..1973e030e3 100644 --- a/libsql/src/sync.rs +++ b/libsql/src/sync.rs @@ -1,18 +1,72 @@ const DEFAULT_MAX_RETRIES: usize = 5; + +use crate::Result; + pub struct SyncContext { pub sync_url: String, pub auth_token: Option, pub max_retries: usize, pub durable_frame_num: u32, + max_frame_no: u32, + db_path: String, } impl SyncContext { - pub fn new(sync_url: String, auth_token: Option) -> Self { - Self { + pub fn new(sync_url: String, auth_token: Option, db_path: impl Into) -> Self { + let mut ctx = Self { sync_url, auth_token, durable_frame_num: 0, max_retries: DEFAULT_MAX_RETRIES, - } + max_frame_no: 0, + db_path: db_path.into(), + }; + + ctx.read_and_update_metadata().unwrap(); + + ctx + } + + pub(crate) fn max_frame_no(&self) -> u32 { + self.max_frame_no } + + pub(crate) fn set_max_frame_no(&mut self, max_frame_no: u32) -> Result<()> { + // TODO: check if max_frame_no is larger than current known max_frame_no + self.max_frame_no = max_frame_no; + + self.update_metadata()?; + + Ok(()) + } + + fn update_metadata(&self) -> Result<()> { + let path = format!("{}-info", self.db_path); + + let contents = serde_json::to_vec(&MetadataJson { + max_frame_no: self.max_frame_no, + }) + .unwrap(); + + std::fs::write(path, contents).unwrap(); + + Ok(()) + } + + fn read_and_update_metadata(&mut self) -> Result<()> { + let path = format!("{}-info", self.db_path); + + let contents = std::fs::read(&path).unwrap(); + + let metadata = serde_json::from_slice::(&contents[..]).unwrap(); + + self.max_frame_no = metadata.max_frame_no; + + Ok(()) + } +} + +#[derive(serde::Serialize, serde::Deserialize)] +struct MetadataJson { + max_frame_no: u32, }