Skip to content

Commit

Permalink
Merge pull request #1830 from tursodatabase/lucio/cache-server-frame
Browse files Browse the repository at this point in the history
libsql: add `durable_frame_num` caching and metadata file
  • Loading branch information
LucioFranco authored Nov 22, 2024
2 parents 49e6393 + 11cce47 commit 4a5f373
Show file tree
Hide file tree
Showing 10 changed files with 687 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ fallible-iterator = { version = "0.3", optional = true }
libsql_replication = { version = "0.6", path = "../libsql-replication", optional = true }
async-stream = { version = "0.3.5", optional = true }

crc32fast = { version = "1", optional = true }
chrono = { version = "0.4", optional = true }

[dev-dependencies]
criterion = { version = "0.5", features = ["html_reports", "async", "async_futures", "async_tokio"] }
pprof = { version = "0.12.1", features = ["criterion", "flamegraph"] }
Expand Down Expand Up @@ -105,6 +108,10 @@ sync = [
"dep:tokio",
"dep:futures",
"dep:serde_json",
"dep:crc32fast",
"dep:chrono",
"dep:uuid",
"tokio/fs"
]
hrana = [
"parser",
Expand Down
7 changes: 7 additions & 0 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,12 @@ cfg_core! {
}

cfg_replication_or_sync! {

pub type FrameNo = u64;

#[derive(Debug)]
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
pub struct Replicated {
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
Expand All @@ -47,12 +50,16 @@ cfg_replication_or_sync! {
/// where in the log you might be. Beware that this value can be reset to a lower value by the
/// server in certain situations. Please use `frames_synced` if you want to track the amount of
/// work a sync has done.
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}

/// The count of frames synced during this call of `sync`. A frame is a 4kB frame from the
/// libsql write ahead log.
// TODO(lucio): remove this once we use these fields in our sync code
#[allow(dead_code)]
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
Expand Down
4 changes: 2 additions & 2 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl Builder<()> {
}
}

cfg_replication_or_remote! {
cfg_replication_or_remote_or_sync! {
/// Remote configuration type used in [`Builder`].
pub struct Remote {
url: String,
Expand Down Expand Up @@ -505,7 +505,7 @@ cfg_remote! {
}
}

cfg_replication_or_remote! {
cfg_replication_or_remote_or_sync! {
impl Remote {
fn connector<C>(mut self, connector: C) -> Remote
where
Expand Down
9 changes: 9 additions & 0 deletions libsql/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ pub enum Error {
TransactionalBatchError(String),
#[error("Invalid blob size, expected {0}")]
InvalidBlobSize(usize),
#[error("sync error: {0}")]
Sync(crate::BoxError),
}

#[cfg(feature = "hrana")]
Expand All @@ -64,6 +66,13 @@ impl From<crate::hrana::HranaError> for Error {
}
}

#[cfg(feature = "sync")]
impl From<crate::sync::SyncError> for Error {
fn from(e: crate::sync::SyncError) -> Self {
Error::Sync(e.into())
}
}

impl From<std::convert::Infallible> for Error {
fn from(_: std::convert::Infallible) -> Self {
unreachable!()
Expand Down
18 changes: 11 additions & 7 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,11 +144,11 @@ impl Database {
endpoint
};
let mut db = Database::open(&db_path, flags)?;
db.sync_ctx = Some(tokio::sync::Mutex::new(SyncContext::new(
connector,
endpoint,
Some(auth_token),
)));

let sync_ctx =
SyncContext::new(connector, db_path.into(), endpoint, Some(auth_token)).await?;
db.sync_ctx = Some(tokio::sync::Mutex::new(sync_ctx));

Ok(db)
}

Expand Down Expand Up @@ -388,7 +388,7 @@ impl Database {
#[cfg(feature = "sync")]
/// Push WAL frames to remote.
pub async fn push(&self) -> Result<crate::database::Replicated> {
let sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let mut sync_ctx = self.sync_ctx.as_ref().unwrap().lock().await;
let conn = self.connect()?;

let page_size = {
Expand All @@ -402,7 +402,7 @@ impl Database {

let max_frame_no = conn.wal_frame_count();

let generation = 1; // TODO: Probe from WAL.
let generation = sync_ctx.generation(); // TODO: Probe from WAL.
let start_frame_no = sync_ctx.durable_frame_num() + 1;
let end_frame_no = max_frame_no;

Expand All @@ -423,6 +423,10 @@ impl Database {
frame_no += 1;
}

sync_ctx.write_metadata().await?;

// TODO(lucio): this can underflow if the server previously returned a higher max_frame_no
// than what we have stored here.
let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::database::Replicated {
frame_no: None,
Expand Down
2 changes: 1 addition & 1 deletion libsql/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ macro_rules! cfg_core {
}
}

macro_rules! cfg_replication_or_remote {
macro_rules! cfg_replication_or_remote_or_sync {
($($item:item)*) => {
$(
#[cfg(any(feature = "replication", feature = "sync", feature = "remote"))]
Expand Down
Loading

0 comments on commit 4a5f373

Please sign in to comment.