Skip to content

Commit

Permalink
replicated
Browse files Browse the repository at this point in the history
  • Loading branch information
penberg committed Nov 13, 2024
1 parent aebd7de commit 7a78841
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 29 deletions.
27 changes: 25 additions & 2 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ cfg_core! {
}
}


#[derive(Debug)]
pub struct Replicated {
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
}

impl Replicated {
/// The currently synced frame number. This can be used to track
/// where in the log you might be. Beware that this value can be reset to a lower value by the
/// server in certain situations. Please use `frames_synced` if you want to track the amount of
/// work a sync has done.
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}

/// The count of frames synced during this call of `sync`. A frame is a 4kB frame from the
/// libsql write ahead log.
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
}

enum DbType {
#[cfg(feature = "core")]
Memory { db: crate::local::Database },
Expand Down Expand Up @@ -336,7 +359,7 @@ cfg_replication! {

/// Sync database from remote, and returns the committed frame_no after syncing, if
/// applicable.
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
pub async fn sync(&self) -> Result<Replicated> {
match &self.db_type {
#[cfg(feature = "replication")]
DbType::Sync { db, encryption_config: _ } => db.sync().await,
Expand All @@ -348,7 +371,7 @@ cfg_replication! {

/// Sync database from remote until it gets to a given replication_index or further,
/// and returns the committed frame_no after syncing, if applicable.
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<Replicated> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync_until(replication_index).await
} else {
Expand Down
13 changes: 7 additions & 6 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ cfg_sync! {
use crate::{database::OpenFlags, local::connection::Connection};
use crate::{Error::ConnectionFailed, Result};
use libsql_sys::ffi;
use crate::database::Replicated;

// A libSQL database.
pub struct Database {
Expand Down Expand Up @@ -288,7 +289,7 @@ impl Database {
#[cfg(feature = "replication")]
/// Perform a sync step, returning the new replication index, or None, if the nothing was
/// replicated yet
pub async fn sync_oneshot(&self) -> Result<crate::replication::Replicated> {
pub async fn sync_oneshot(&self) -> Result<Replicated> {
if let Some(ctx) = &self.replication_ctx {
ctx.replicator.sync_oneshot().await
} else {
Expand All @@ -301,7 +302,7 @@ impl Database {

#[cfg(feature = "replication")]
/// Sync with primary
pub async fn sync(&self) -> Result<crate::replication::Replicated> {
pub async fn sync(&self) -> Result<Replicated> {
Ok(self.sync_oneshot().await?)
}

Expand All @@ -321,7 +322,7 @@ impl Database {

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<Replicated> {
if let Some(ctx) = &self.replication_ctx {
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
let mut frames_synced: usize = 0;
Expand All @@ -330,7 +331,7 @@ impl Database {
frame_no = res.frame_no();
frames_synced += res.frames_synced();
}
Ok(crate::replication::Replicated {
Ok(Replicated {
frame_no,
frames_synced,
})
Expand Down Expand Up @@ -380,7 +381,7 @@ impl Database {

#[cfg(feature = "sync")]
/// Push WAL frames to remote.
pub async fn push(&self) -> Result<crate::replication::Replicated> {
pub async fn push(&self) -> Result<Replicated> {
let sync_ctx = self.sync_ctx.as_ref().unwrap();
let conn = self.connect()?;

Expand All @@ -403,7 +404,7 @@ impl Database {
}

let frame_count = end_frame_no - start_frame_no + 1;
Ok(crate::replication::Replicated{
Ok(Replicated{
frame_no: None,
frames_synced: frame_count as usize,
})
Expand Down
22 changes: 1 addition & 21 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,27 +34,7 @@ mod connection;
pub(crate) mod local_client;
pub(crate) mod remote_client;

#[derive(Debug)]
pub struct Replicated {
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
}

impl Replicated {
/// The currently synced frame number. This can be used to track
/// where in the log you might be. Beware that this value can be reset to a lower value by the
/// server in certain situations. Please use `frames_synced` if you want to track the amount of
/// work a sync has done.
pub fn frame_no(&self) -> Option<FrameNo> {
self.frame_no
}

/// The count of frames synced during this call of `sync`. A frame is a 4kB frame from the
/// libsql write ahead log.
pub fn frames_synced(&self) -> usize {
self.frames_synced
}
}
use crate::database::Replicated;

/// A set of rames to be injected via `sync_frames`.
pub enum Frames {
Expand Down

0 comments on commit 7a78841

Please sign in to comment.