Skip to content

Commit

Permalink
Merge pull request #1665 from tursodatabase/replication_index
Browse files Browse the repository at this point in the history
libsql: Add max_write_replication_index and sync_until in Database
  • Loading branch information
haaawk authored Aug 13, 2024
2 parents 4023a3a + 2cf3494 commit ef0ad61
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 9 deletions.
7 changes: 6 additions & 1 deletion libsql-server/tests/embedded_replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ fn execute_batch() {
conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;

assert_eq!(db.max_write_replication_index(), Some(1));

let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));

Expand Down Expand Up @@ -231,6 +233,7 @@ fn stream() {

conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ())
.await?;
assert_eq!(db.max_write_replication_index(), Some(1));

let n = db.sync().await?.frame_no();
assert_eq!(n, Some(1));
Expand All @@ -244,8 +247,10 @@ fn stream() {
",
)
.await?;
let replication_index = db.max_write_replication_index();

db.sync().await.unwrap();
let synced_replication_index = db.sync().await.unwrap().frame_no();
assert_eq!(synced_replication_index, replication_index);

let rows = conn.query("select * from user", ()).await.unwrap();

Expand Down
45 changes: 41 additions & 4 deletions libsql/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ pub use builder::Builder;
#[cfg(feature = "core")]
pub use libsql_sys::{Cipher, EncryptionConfig};

use std::fmt;

use crate::{Connection, Result};
use std::fmt;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;

cfg_core! {
bitflags::bitflags! {
Expand Down Expand Up @@ -76,6 +77,9 @@ impl fmt::Debug for DbType {
/// not do much work until the [`Database::connect`] fn is called.
pub struct Database {
db_type: DbType,
/// The maximum replication index returned from a write performed using any connection created using this Database object.
#[allow(dead_code)]
max_write_replication_index: Arc<AtomicU64>,
}

cfg_core! {
Expand All @@ -87,6 +91,7 @@ cfg_core! {

Ok(Database {
db_type: DbType::Memory { db },
max_write_replication_index: Default::default(),
})
}

Expand All @@ -105,6 +110,7 @@ cfg_core! {
flags,
encryption_config: None,
},
max_write_replication_index: Default::default(),
})
}
}
Expand All @@ -130,6 +136,7 @@ cfg_replication! {

Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}

Expand Down Expand Up @@ -191,6 +198,7 @@ cfg_replication! {

Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}

Expand Down Expand Up @@ -317,6 +325,7 @@ cfg_replication! {

Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}

Expand All @@ -331,6 +340,16 @@ cfg_replication! {
}
}

/// Sync database from remote until it gets to a given replication_index or further,
/// and returns the committed frame_no after syncing, if applicable.
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
if let DbType::Sync { db, encryption_config: _ } = &self.db_type {
db.sync_until(replication_index).await
} else {
Err(Error::SyncNotSupported(format!("{:?}", self.db_type)))
}
}

/// Apply a set of frames to the database and returns the committed frame_no after syncing, if
/// applicable.
pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result<Option<FrameNo>> {
Expand Down Expand Up @@ -372,12 +391,25 @@ cfg_replication! {
DbType::Sync { db, .. } => {
let path = db.path().to_string();
Ok(Database {
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None}
db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None},
max_write_replication_index: Default::default(),
})
}
t => Err(Error::FreezeNotSupported(format!("{:?}", t)))
}
}

/// Get the maximum replication index returned from a write performed using any connection created using this Database object.
pub fn max_write_replication_index(&self) -> Option<FrameNo> {
let index = self
.max_write_replication_index
.load(std::sync::atomic::Ordering::SeqCst);
if index == 0 {
None
} else {
Some(index)
}
}
}
}

Expand Down Expand Up @@ -445,6 +477,7 @@ cfg_remote! {
connector: crate::util::ConnectorService::new(svc),
version,
},
max_write_replication_index: Default::default(),
})
}
}
Expand Down Expand Up @@ -552,7 +585,11 @@ impl Database {

let local = LibsqlConnection { conn };
let writer = local.conn.new_connection_writer();
let remote = crate::replication::RemoteConnection::new(local, writer);
let remote = crate::replication::RemoteConnection::new(
local,
writer,
self.max_write_replication_index.clone(),
);
let conn = std::sync::Arc::new(remote);

Ok(Connection { conn })
Expand Down
5 changes: 5 additions & 0 deletions libsql/src/database/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ cfg_core! {
let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?;
Database {
db_type: DbType::Memory { db } ,
max_write_replication_index: Default::default(),
}
} else {
let path = self
Expand All @@ -150,6 +151,7 @@ cfg_core! {
flags: self.inner.flags,
encryption_config: self.inner.encryption_config,
},
max_write_replication_index: Default::default(),
}
};

Expand Down Expand Up @@ -291,6 +293,7 @@ cfg_replication! {

Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}
}
Expand Down Expand Up @@ -360,6 +363,7 @@ cfg_replication! {

Ok(Database {
db_type: DbType::Sync { db, encryption_config },
max_write_replication_index: Default::default(),
})
}
}
Expand Down Expand Up @@ -414,6 +418,7 @@ cfg_remote! {
connector,
version,
},
max_write_replication_index: Default::default(),
})
}
}
Expand Down
23 changes: 23 additions & 0 deletions libsql/src/local/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,29 @@ impl Database {
Ok(self.sync_oneshot().await?)
}

#[cfg(feature = "replication")]
/// Sync with primary at least to a given replication index
pub async fn sync_until(&self, replication_index: FrameNo) -> Result<crate::replication::Replicated> {
if let Some(ctx) = &self.replication_ctx {
let mut frame_no: Option<FrameNo> = ctx.replicator.committed_frame_no().await;
let mut frames_synced: usize = 0;
while frame_no.unwrap_or(0) < replication_index {
let res = ctx.replicator.sync_oneshot().await?;
frame_no = res.frame_no();
frames_synced += res.frames_synced();
}
Ok(crate::replication::Replicated {
frame_no,
frames_synced,
})
} else {
Err(crate::errors::Error::Misuse(
"No replicator available. Use Database::with_replicator() to enable replication"
.to_string(),
))
}
}

#[cfg(feature = "replication")]
pub async fn sync_frames(&self, frames: Frames) -> Result<Option<FrameNo>> {
if let Some(ref ctx) = self.replication_ctx {
Expand Down
22 changes: 20 additions & 2 deletions libsql/src/replication/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

use std::str::FromStr;
use std::sync::Arc;

use std::sync::atomic::AtomicU64;
use libsql_replication::rpc::proxy::{
describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond,
OkCond, Positional, Query, ResultRows, State as RemoteState, Step,
Expand All @@ -28,6 +28,7 @@ pub struct RemoteConnection {
pub(self) local: LibsqlConnection,
writer: Option<Writer>,
inner: Arc<Mutex<Inner>>,
max_write_replication_index: Arc<AtomicU64>,
}

#[derive(Default, Debug)]
Expand Down Expand Up @@ -166,12 +167,25 @@ impl From<RemoteState> for State {
}

impl RemoteConnection {
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>) -> Self {
pub(crate) fn new(local: LibsqlConnection, writer: Option<Writer>, max_write_replication_index: Arc<AtomicU64>) -> Self {
let state = Arc::new(Mutex::new(Inner::default()));
Self {
local,
writer,
inner: state,
max_write_replication_index,
}
}

fn update_max_write_replication_index(&self, index: Option<u64>) {
if let Some(index) = index {
let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst);
while index > current {
match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) {
Ok(_) => break,
Err(new_current) => current = new_current,
}
}
}
}

Expand Down Expand Up @@ -201,6 +215,8 @@ impl RemoteConnection {
.into();
}

self.update_max_write_replication_index(res.current_frame_no);

if let Some(replicator) = writer.replicator() {
replicator.sync_oneshot().await?;
}
Expand All @@ -226,6 +242,8 @@ impl RemoteConnection {
.into();
}

self.update_max_write_replication_index(res.current_frame_no);

if let Some(replicator) = writer.replicator() {
replicator.sync_oneshot().await?;
}
Expand Down
4 changes: 2 additions & 2 deletions libsql/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ pub(crate) mod remote_client;

#[derive(Debug)]
pub struct Replicated {
frame_no: Option<FrameNo>,
frames_synced: usize,
pub(crate) frame_no: Option<FrameNo>,
pub(crate) frames_synced: usize,
}

impl Replicated {
Expand Down

0 comments on commit ef0ad61

Please sign in to comment.