Skip to content

Commit

Permalink
libsql: rework sync v2 structure
Browse files Browse the repository at this point in the history
  • Loading branch information
LucioFranco committed Nov 14, 2024
1 parent 89b3460 commit 508d725
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 20 deletions.
78 changes: 61 additions & 17 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub struct Database {
#[cfg(feature = "replication")]
pub replication_ctx: Option<ReplicationContext>,
#[cfg(feature = "sync")]
pub sync_ctx: Option<SyncContext>,
pub sync_ctx: Option<std::sync::Mutex<SyncContext>>,
}

impl Database {
Expand Down Expand Up @@ -143,7 +143,7 @@ impl Database {
endpoint
};
let mut db = Database::open(&db_path, flags)?;
db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token)));
db.sync_ctx = Some(SyncContext::new(endpoint, Some(auth_token), &db_path));
Ok(db)
}

Expand Down Expand Up @@ -320,7 +320,10 @@ impl Database {

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::database::Replicated> {
pub async fn sync_until(
&self,
replication_index: FrameNo,
) -> Result<crate::database::Replicated> {
if let Some(ctx) = &self.replication_ctx {
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
let mut frames_synced: usize = 0;
Expand Down Expand Up @@ -384,62 +387,100 @@ impl Database {
let conn = self.connect()?;

let page_size = {
let rows = conn.query("PRAGMA page_size", crate::params::Params::None)?.unwrap();
let rows = conn
.query("PRAGMA page_size", crate::params::Params::None)?
.unwrap();
let row = rows.next()?.unwrap();
let page_size = row.get::<u32>(0)?;
page_size
};

let mut max_frame_no: std::os::raw::c_uint = 0;
unsafe { libsql_sys::ffi::libsql_wal_frame_count(conn.handle(), &mut max_frame_no) };

let generation = 1; // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num + 1;
let end_frame_no = max_frame_no;

// let max_frame_no = sync_ctx.max_frame_no();

let mut frame_no = start_frame_no;
while frame_no <= end_frame_no {
// The server returns its maximum frame number. To avoid resending
// frames the server already knows about, we need to update the
// frame number to the one returned by the server.
let max_frame_no = self.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size).await?;
let max_frame_no = self
.push_one_frame(&conn, &sync_ctx, generation, frame_no, page_size)
.await?;
if max_frame_no > frame_no {
frame_no = max_frame_no;
}
frame_no += 1;
}

let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated{
Ok(crate::database::Replicated {
frame_no: None,
frames_synced: frame_count as usize,
})
}

#[cfg(feature = "sync")]
async fn push_one_frame(&self, conn: &Connection, sync_ctx: &SyncContext, generation: u32, frame_no: u32, page_size: u32) -> Result<u32> {
let frame_size: usize = 24+page_size as usize;
async fn push_one_frame(
&self,
conn: &Connection,
sync_url: &str,
sync_auth_token: &Option<String>,
sync_max_retries: usize,
generation: u32,
frame_no: u32,
page_size: u32,
) -> Result<u32> {
let frame_size: usize = 24 + page_size as usize;
let frame = vec![0; frame_size];
let rc = unsafe {
libsql_sys::ffi::libsql_wal_get_frame(conn.handle(), frame_no, frame.as_ptr() as *mut _, frame_size as u32)
libsql_sys::ffi::libsql_wal_get_frame(
conn.handle(),
frame_no,
frame.as_ptr() as *mut _,
frame_size as u32,
)
};
if rc != 0 {
return Err(crate::errors::Error::SqliteFailure(rc as std::ffi::c_int, format!("Failed to get frame: {}", frame_no)));
return Err(crate::errors::Error::SqliteFailure(
rc as std::ffi::c_int,
format!("Failed to get frame: {}", frame_no),
));
}
let uri = format!("{}/sync/{}/{}/{}", sync_ctx.sync_url, generation, frame_no, frame_no+1);
let max_frame_no = self.push_with_retry(uri, &sync_ctx.auth_token, frame.to_vec(), sync_ctx.max_retries).await?;
let uri = format!(
"{}/sync/{}/{}/{}",
&sync_url,
generation,
frame_no,
frame_no + 1
);
let max_frame_no = self
.push_with_retry(uri, &sync_auth_token, frame.to_vec(), sync_max_retries)
.await?;
Ok(max_frame_no)
}

#[cfg(feature = "sync")]
async fn push_with_retry(&self, uri: String, auth_token: &Option<String>, frame: Vec<u8>, max_retries: usize) -> Result<u32> {
async fn push_with_retry(
&self,
uri: String,
auth_token: &Option<String>,
frame: Vec<u8>,
max_retries: usize,
) -> Result<u32> {
let mut nr_retries = 0;
loop {
let client = reqwest::Client::new();
let mut builder = client.post(uri.to_owned());
match auth_token {
match auth_token {
Some(ref auth_token) => {
builder = builder.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
builder = builder
.header("Authorization", format!("Bearer {}", auth_token.to_owned()));
}
None => {}
}
Expand All @@ -450,7 +491,10 @@ impl Database {
return Ok(max_frame_no as u32);
}
if nr_retries > max_retries {
return Err(crate::errors::Error::ConnectionFailed(format!("Failed to push frame: {}", res.status())));
return Err(crate::errors::Error::ConnectionFailed(format!(
"Failed to push frame: {}",
res.status()
)));
}
let delay = std::time::Duration::from_millis(100 * (1 << nr_retries));
tokio::time::sleep(delay).await;
Expand Down
60 changes: 57 additions & 3 deletions libsql/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,72 @@
const DEFAULT_MAX_RETRIES: usize = 5;

use crate::Result;

pub struct SyncContext {
pub sync_url: String,
pub auth_token: Option<String>,
pub max_retries: usize,
pub durable_frame_num: u32,
max_frame_no: u32,
db_path: String,
}

impl SyncContext {
pub fn new(sync_url: String, auth_token: Option<String>) -> Self {
Self {
pub fn new(sync_url: String, auth_token: Option<String>, db_path: impl Into<String>) -> Self {
let mut ctx = Self {
sync_url,
auth_token,
durable_frame_num: 0,
max_retries: DEFAULT_MAX_RETRIES,
}
max_frame_no: 0,
db_path: db_path.into(),
};

ctx.read_and_update_metadata().unwrap();

ctx
}

pub(crate) fn max_frame_no(&self) -> u32 {
self.max_frame_no
}

pub(crate) fn set_max_frame_no(&mut self, max_frame_no: u32) -> Result<()> {
// TODO: check if max_frame_no is larger than current known max_frame_no
self.max_frame_no = max_frame_no;

self.update_metadata()?;

Ok(())
}

fn update_metadata(&self) -> Result<()> {
let path = format!("{}-info", self.db_path);

let contents = serde_json::to_vec(&MetadataJson {
max_frame_no: self.max_frame_no,
})
.unwrap();

std::fs::write(path, contents).unwrap();

Ok(())
}

fn read_and_update_metadata(&mut self) -> Result<()> {
let path = format!("{}-info", self.db_path);

let contents = std::fs::read(&path).unwrap();

let metadata = serde_json::from_slice::<MetadataJson>(&contents[..]).unwrap();

self.max_frame_no = metadata.max_frame_no;

Ok(())
}
}

#[derive(serde::Serialize, serde::Deserialize)]
struct MetadataJson {
max_frame_no: u32,
}

0 comments on commit 508d725

Please sign in to comment.