Skip to content

Commit

Permalink
chore: run ide fmt on local/database.rs
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Nov 18, 2024
1 parent 9e1f196 commit fa55295
Showing 1 changed file with 111 additions and 21 deletions.
132 changes: 111 additions & 21 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,13 @@ impl Database {

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::database::Replicated> {
pub async fn sync_until(
&self,


&self,
replication_index: FrameNo,
) -> Result<crate::database::Replicated> {
if let Some(ctx) = &self.replication_ctx {
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
let mut frames_synced: usize = 0;
Expand Down Expand Up @@ -381,65 +387,146 @@ impl Database {
/// Push WAL frames to remote.
pub async fn push(&self) -> Result<crate::database::Replicated> {
let sync_ctx = self.sync_ctx.as_ref().unwrap();
let conn = self.connect()?;
let conn = self.con
nect()?;


let page_size = {
let rows = conn.query("PRAGMA page_size", crate::params::Params::None)?.unwrap();
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
.unwrap();
let row = rows.next()?.unwrap();
let page_size = row.get::<u32>(0)?;
page_size
page_size
};

let mut max_frame_no: std::os::raw::c_uint = 0;
unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) };

let generation = 1; // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;

let mut frame_no = start_frame_no;
let mut frame_no = start_fr
ame_no;

while frame_no <= end_frame_no {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
if max_frame_no > frame_no {
let max_frame_no = self
.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size)
.await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated{
frame_no: None,
Ok(crate::database::
&self,
ated {,





frame_no: None, +
frames_synced: frame_count as usize,
})
}
},
conn.handle(),
frame_no,




#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
async fn push_one_frame(,



&self,
conn: &Connection,


generation,
frame_no,
frame_no + 1

sync_ctx: &SyncContext,

uri,
&sync_ctx.auth_token,
frame.to_vec(),
sync_ctx.max_retries,
)

generation: u32,
frame_no: u32,
page_size: u32,
) -> Result<u32> {
let frame_size: usize
&self,
+ page_size a
s usize;,



let frame = vec![0; frame_size];
let rc = unsafe {
libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, frame_size as u32)
libsql_sys::ffi::libsql_wal_get_frame(
conn.handle(),
frame_no,
frame.as_ptr() as *mut _,
frame_size as u32,

)
};
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
return Err(crate::errors::Error::SqliteFailure(
rc as std::ffi::c_int,
format!("Failed to get frame: {}", frame_no),
));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
let uri = format!(
"{}/sync/{}/{}/{}",
sync_ctx.sync_url,

res.status()

generation,
frame_no,
frame_no + 1
);
let max_frame_no = self
.push_with_retry(
uri,
&sync_ctx.auth_token,
frame.to_vec(),
sync_ctx.max_retries,
)
.await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
async fn push_with_retry(
&self,
uri: String,
auth_token: &Option<String>,
frame: Vec<u8>,
max_retries: usize,
) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
let mut builder = client.post(uri.to_owned());
match auth_token {
match auth_token {
Some(ref auth_token) => {
builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
builder = builder
.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
}
None => {}
}
Expand All @@ -450,7 +537,10 @@ impl Database {
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
return Err(crate::errors::Error::ConnectionFailed(format!(
"Failed to push frame: {}",
res.status()
)));
}
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
tokio::time::sleep(delay).await;
Expand Down

0 comments on commit fa55295

Please sign in to comment.