Skip to content

Commit

Permalink
Merge pull request #1824 from tursodatabase/lucio/run-ide-fmt
Browse files Browse the repository at this point in the history
chore: run ide fmt on local/database.rs
  • Loading branch information
LucioFranco authored Nov 18, 2024
2 parents 9e1f196 + fddf6db commit 909b8af
Showing 1 changed file with 60 additions and 15 deletions.
75 changes: 60 additions & 15 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,10 @@ impl Database {

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::database::Replicated> {
pub async fn sync_until(
&self,
replication_index: FrameNo,
) -> Result<crate::database::Replicated> {
if let Some(ctx) = &self.replication_ctx {
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
let mut frames_synced: usize = 0;
Expand Down Expand Up @@ -384,15 +387,17 @@ impl Database {
let conn = self.connect()?;

let page_size = {
let rows = conn.query("PRAGMA page_size", crate::params::Params::None)?.unwrap();
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
.unwrap();
let row = rows.next()?.unwrap();
let page_size = row.get::<u32>(0)?;
page_size
};

let mut max_frame_no: std::os::raw::c_uint = 0;
unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) };

let generation = 1; // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;
Expand All @@ -402,44 +407,81 @@ impl Database {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
let max_frame_no = self
.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size)
.await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated{
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: frame_count as usize,
})
}

#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
async fn push_one_frame(
&self,
conn: &Connection,
sync_ctx: &SyncContext,
generation: u32,
frame_no: u32,
page_size: u32,
) -> Result<u32> {
let frame_size: usize = 24 + page_size as usize;
let frame = vec![0; frame_size];
let rc = unsafe {
libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, frame_size as u32)
libsql_sys::ffi::libsql_wal_get_frame(
conn.handle(),
frame_no,
frame.as_ptr() as *mut _,
frame_size as u32,
)
};
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
return Err(crate::errors::Error::SqliteFailure(
rc as std::ffi::c_int,
format!("Failed to get frame: {}", frame_no),
));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
let uri = format!(
"{}/sync/{}/{}/{}",
sync_ctx.sync_url,
generation,
frame_no,
frame_no + 1
);
let max_frame_no = self
.push_with_retry(
uri,
&sync_ctx.auth_token,
frame.to_vec(),
sync_ctx.max_retries,
)
.await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
async fn push_with_retry(
&self,
uri: String,
auth_token: &Option<String>,
frame: Vec<u8>,
max_retries: usize,
) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
let mut builder = client.post(uri.to_owned());
match auth_token {
match auth_token {
Some(ref auth_token) => {
builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
builder = builder
.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
}
None => {}
}
Expand All @@ -450,7 +492,10 @@ impl Database {
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
return Err(crate::errors::Error::ConnectionFailed(format!(
"Failed to push frame: {}",
res.status()
)));
}
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
tokio::time::sleep(delay).await;
Expand Down

0 comments on commit 909b8af

Please sign in to comment.