From 3da2a8bd2b1bc5e93354611f2758165d7ab1a8a6 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 20:12:09 +0200 Subject: [PATCH 1/6] libsql: Add max_write_replication_index field to Database This field will store replication index of the latest write performed using any connection created with this Database object. This will allow a client to know what is a minimal replication index they have to use to see all the writes they performed so far. Signed-off-by: Piotr Jastrzebski --- libsql/src/database.rs | 17 ++++++++++++++--- libsql/src/database/builder.rs | 5 +++++ 2 files changed, 19 insertions(+), 3 deletions(-) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index e87def367d..9bfa727703 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -7,9 +7,10 @@ pub use builder::Builder; #[cfg(feature = "core")] pub use libsql_sys::{Cipher, EncryptionConfig}; -use std::fmt; - use crate::{Connection, Result}; +use std::fmt; +use std::sync::atomic::AtomicU64; +use std::sync::Arc; cfg_core! { bitflags::bitflags! { @@ -76,6 +77,9 @@ impl fmt::Debug for DbType { /// not do much work until the [`Database::connect`] fn is called. pub struct Database { db_type: DbType, + /// The maximum replication index returned from a write performed using any connection created using this Database object. + #[allow(dead_code)] + max_write_replication_index: Arc, } cfg_core! { @@ -87,6 +91,7 @@ cfg_core! { Ok(Database { db_type: DbType::Memory { db }, + max_write_replication_index: Default::default(), }) } @@ -105,6 +110,7 @@ cfg_core! { flags, encryption_config: None, }, + max_write_replication_index: Default::default(), }) } } @@ -130,6 +136,7 @@ cfg_replication! { Ok(Database { db_type: DbType::Sync { db, encryption_config }, + max_write_replication_index: Default::default(), }) } @@ -191,6 +198,7 @@ cfg_replication! { Ok(Database { db_type: DbType::Sync { db, encryption_config }, + max_write_replication_index: Default::default(), }) } @@ -317,6 +325,7 @@ cfg_replication! { Ok(Database { db_type: DbType::Sync { db, encryption_config }, + max_write_replication_index: Default::default(), }) } @@ -372,7 +381,8 @@ cfg_replication! { DbType::Sync { db, .. } => { let path = db.path().to_string(); Ok(Database { - db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None} + db_type: DbType::File { path, flags: OpenFlags::default(), encryption_config: None}, + max_write_replication_index: Default::default(), }) } t => Err(Error::FreezeNotSupported(format!("{:?}", t))) @@ -445,6 +455,7 @@ cfg_remote! { connector: crate::util::ConnectorService::new(svc), version, }, + max_write_replication_index: Default::default(), }) } } diff --git a/libsql/src/database/builder.rs b/libsql/src/database/builder.rs index 8749b6452b..35cd93f899 100644 --- a/libsql/src/database/builder.rs +++ b/libsql/src/database/builder.rs @@ -135,6 +135,7 @@ cfg_core! { let db = crate::local::Database::open(":memory:", crate::OpenFlags::default())?; Database { db_type: DbType::Memory { db } , + max_write_replication_index: Default::default(), } } else { let path = self @@ -150,6 +151,7 @@ cfg_core! { flags: self.inner.flags, encryption_config: self.inner.encryption_config, }, + max_write_replication_index: Default::default(), } }; @@ -291,6 +293,7 @@ cfg_replication! { Ok(Database { db_type: DbType::Sync { db, encryption_config }, + max_write_replication_index: Default::default(), }) } } @@ -360,6 +363,7 @@ cfg_replication! { Ok(Database { db_type: DbType::Sync { db, encryption_config }, + max_write_replication_index: Default::default(), }) } } @@ -414,6 +418,7 @@ cfg_remote! { connector, version, }, + max_write_replication_index: Default::default(), }) } } From 8fb997e77c3c6fd66ab5e33f614dc09fce78a031 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 20:26:49 +0200 Subject: [PATCH 2/6] libsql: Add max_write_replication_index method to Database Signed-off-by: Piotr Jastrzebski --- libsql/src/database.rs | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 9bfa727703..0109cab4bd 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -388,6 +388,18 @@ cfg_replication! { t => Err(Error::FreezeNotSupported(format!("{:?}", t))) } } + + /// Get the maximum replication index returned from a write performed using any connection created using this Database object. + pub fn max_write_replication_index(&self) -> Option { + let index = self + .max_write_replication_index + .load(std::sync::atomic::Ordering::SeqCst); + if index == 0 { + None + } else { + Some(index) + } + } } } From 6c5193db6465069c9d4366259206335f2504cb2c Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 20:32:43 +0200 Subject: [PATCH 3/6] libsql: Add max_write_replication_index field to RemoteConnection Signed-off-by: Piotr Jastrzebski --- libsql/src/database.rs | 6 +++++- libsql/src/replication/connection.rs | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 0109cab4bd..44c606001e 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -575,7 +575,11 @@ impl Database { let local = LibsqlConnection { conn }; let writer = local.conn.new_connection_writer(); - let remote = crate::replication::RemoteConnection::new(local, writer); + let remote = crate::replication::RemoteConnection::new( + local, + writer, + self.max_write_replication_index.clone(), + ); let conn = std::sync::Arc::new(remote); Ok(Connection { conn }) diff --git a/libsql/src/replication/connection.rs b/libsql/src/replication/connection.rs index c720838798..78b41cb8e8 100644 --- a/libsql/src/replication/connection.rs +++ b/libsql/src/replication/connection.rs @@ -2,7 +2,7 @@ use std::str::FromStr; use std::sync::Arc; - +use std::sync::atomic::AtomicU64; use libsql_replication::rpc::proxy::{ describe_result, query_result::RowResult, Cond, DescribeResult, ExecuteResults, NotCond, OkCond, Positional, Query, ResultRows, State as RemoteState, Step, @@ -28,6 +28,8 @@ pub struct RemoteConnection { pub(self) local: LibsqlConnection, writer: Option, inner: Arc>, + #[allow(dead_code)] + max_write_replication_index: Arc, } #[derive(Default, Debug)] @@ -166,12 +168,13 @@ impl From for State { } impl RemoteConnection { - pub(crate) fn new(local: LibsqlConnection, writer: Option) -> Self { + pub(crate) fn new(local: LibsqlConnection, writer: Option, max_write_replication_index: Arc) -> Self { let state = Arc::new(Mutex::new(Inner::default())); Self { local, writer, inner: state, + max_write_replication_index, } } From 710ae01402fc39da93c02ba53c6d5a8293715cb3 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 20:57:22 +0200 Subject: [PATCH 4/6] libsql: Update max_write_replication_index after every write Signed-off-by: Piotr Jastrzebski --- libsql/src/replication/connection.rs | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/libsql/src/replication/connection.rs b/libsql/src/replication/connection.rs index 78b41cb8e8..593bd634a1 100644 --- a/libsql/src/replication/connection.rs +++ b/libsql/src/replication/connection.rs @@ -28,7 +28,6 @@ pub struct RemoteConnection { pub(self) local: LibsqlConnection, writer: Option, inner: Arc>, - #[allow(dead_code)] max_write_replication_index: Arc, } @@ -178,6 +177,18 @@ impl RemoteConnection { } } + fn update_max_write_replication_index(&self, index: Option) { + if let Some(index) = index { + let mut current = self.max_write_replication_index.load(std::sync::atomic::Ordering::SeqCst); + while index > current { + match self.max_write_replication_index.compare_exchange(current, index, std::sync::atomic::Ordering::SeqCst, std::sync::atomic::Ordering::SeqCst) { + Ok(_) => break, + Err(new_current) => current = new_current, + } + } + } + } + fn is_state_init(&self) -> bool { matches!(self.inner.lock().state, State::Init) } @@ -204,6 +215,8 @@ impl RemoteConnection { .into(); } + self.update_max_write_replication_index(res.current_frame_no); + if let Some(replicator) = writer.replicator() { replicator.sync_oneshot().await?; } @@ -229,6 +242,8 @@ impl RemoteConnection { .into(); } + self.update_max_write_replication_index(res.current_frame_no); + if let Some(replicator) = writer.replicator() { replicator.sync_oneshot().await?; } From 75d04cf61cb2225086720d202d1155bb66162a91 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 21:23:50 +0200 Subject: [PATCH 5/6] libsql: Add Database::sync_until Signed-off-by: Piotr Jastrzebski --- libsql/src/database.rs | 10 ++++++++++ libsql/src/local/database.rs | 23 +++++++++++++++++++++++ libsql/src/replication/mod.rs | 4 ++-- 3 files changed, 35 insertions(+), 2 deletions(-) diff --git a/libsql/src/database.rs b/libsql/src/database.rs index 44c606001e..d14cf2e42c 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -340,6 +340,16 @@ cfg_replication! { } } + /// Sync database from remote until it gets to a given replication_index or further, + /// and returns the committed frame_no after syncing, if applicable. + pub async fn sync_until(&self, replication_index: FrameNo) -> Result { + if let DbType::Sync { db, encryption_config: _ } = &self.db_type { + db.sync_until(replication_index).await + } else { + Err(Error::SyncNotSupported(format!("{:?}", self.db_type))) + } + } + /// Apply a set of frames to the database and returns the committed frame_no after syncing, if /// applicable. pub async fn sync_frames(&self, frames: crate::replication::Frames) -> Result> { diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index 2892d809cc..3453a777c9 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -277,6 +277,29 @@ impl Database { Ok(self.sync_oneshot().await?) } + #[cfg(feature = "replication")] + /// Sync with primary at least to a given replication index + pub async fn sync_until(&self, replication_index: FrameNo) -> Result { + if let Some(ctx) = &self.replication_ctx { + let mut frame_no: Option = ctx.replicator.committed_frame_no().await; + let mut frames_synced: usize = 0; + while frame_no.unwrap_or(0) < replication_index { + let res = ctx.replicator.sync_oneshot().await?; + frame_no = res.frame_no(); + frames_synced += res.frames_synced(); + } + Ok(crate::replication::Replicated { + frame_no, + frames_synced, + }) + } else { + Err(crate::errors::Error::Misuse( + "No replicator available. Use Database::with_replicator() to enable replication" + .to_string(), + )) + } + } + #[cfg(feature = "replication")] pub async fn sync_frames(&self, frames: Frames) -> Result> { if let Some(ref ctx) = self.replication_ctx { diff --git a/libsql/src/replication/mod.rs b/libsql/src/replication/mod.rs index 2f4e9b49c0..116839a54f 100644 --- a/libsql/src/replication/mod.rs +++ b/libsql/src/replication/mod.rs @@ -36,8 +36,8 @@ pub(crate) mod remote_client; #[derive(Debug)] pub struct Replicated { - frame_no: Option, - frames_synced: usize, + pub(crate) frame_no: Option, + pub(crate) frames_synced: usize, } impl Replicated { From 2cf34946e775e1e448dd0c302ae841848d2bd870 Mon Sep 17 00:00:00 2001 From: Piotr Jastrzebski Date: Tue, 13 Aug 2024 22:11:14 +0200 Subject: [PATCH 6/6] tests: Add checks for max_write_replication_index Signed-off-by: Piotr Jastrzebski --- libsql-server/tests/embedded_replica/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/libsql-server/tests/embedded_replica/mod.rs b/libsql-server/tests/embedded_replica/mod.rs index 78a46ad996..2d5f8c0de0 100644 --- a/libsql-server/tests/embedded_replica/mod.rs +++ b/libsql-server/tests/embedded_replica/mod.rs @@ -179,6 +179,8 @@ fn execute_batch() { conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; + assert_eq!(db.max_write_replication_index(), Some(1)); + let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); @@ -231,6 +233,7 @@ fn stream() { conn.execute("CREATE TABLE user (id INTEGER NOT NULL PRIMARY KEY)", ()) .await?; + assert_eq!(db.max_write_replication_index(), Some(1)); let n = db.sync().await?.frame_no(); assert_eq!(n, Some(1)); @@ -244,8 +247,10 @@ fn stream() { ", ) .await?; + let replication_index = db.max_write_replication_index(); - db.sync().await.unwrap(); + let synced_replication_index = db.sync().await.unwrap().frame_no(); + assert_eq!(synced_replication_index, replication_index); let rows = conn.query("select * from user", ()).await.unwrap();