diff --git a/libsql-replication/src/injector/mod.rs b/libsql-replication/src/injector/mod.rs index 78fae2f24e..e619175d67 100644 --- a/libsql-replication/src/injector/mod.rs +++ b/libsql-replication/src/injector/mod.rs @@ -32,6 +32,7 @@ pub struct Injector { /// Injector connection // connection must be dropped before the hook context connection: Arc>>, + biggest_uncommitted_seen: FrameNo, } /// Methods from this trait are called before and after performing a frame injection. @@ -64,6 +65,7 @@ impl Injector { buffer, capacity: buffer_capacity, connection: Arc::new(Mutex::new(connection)), + biggest_uncommitted_seen: 0, }) } @@ -86,6 +88,7 @@ impl Injector { Err(e) => { // something went wrong, rollback the connection to make sure we can retry in a // clean state + self.biggest_uncommitted_seen = 0; let connection = self.connection.lock(); let mut rollback = connection.prepare_cached("ROLLBACK")?; let _ = rollback.execute(()); @@ -112,6 +115,8 @@ impl Injector { } }; + self.biggest_uncommitted_seen = self.biggest_uncommitted_seen.max(last_frame_no); + drop(lock); let connection = self.connection.lock(); @@ -134,7 +139,9 @@ impl Injector { let _ = rollback.execute(()); self.is_txn = false; assert!(self.buffer.lock().is_empty()); - return Ok(Some(last_frame_no)); + let commit_frame_no = self.biggest_uncommitted_seen; + self.biggest_uncommitted_seen = 0; + return Ok(Some(commit_frame_no)); } else if e.extended_code == LIBSQL_INJECT_OK_TXN { self.is_txn = true; assert!(self.buffer.lock().is_empty()); diff --git a/libsql-replication/src/meta.rs b/libsql-replication/src/meta.rs index 8586222634..348e7467d3 100644 --- a/libsql-replication/src/meta.rs +++ b/libsql-replication/src/meta.rs @@ -56,7 +56,7 @@ impl WalIndexMetaData { pub struct WalIndexMeta { file: File, - data: Option, + pub data: Option, } impl WalIndexMeta { diff --git a/libsql-server/tests/cluster/mod.rs b/libsql-server/tests/cluster/mod.rs index b935ad649f..a5f43c16c0 100644 --- a/libsql-server/tests/cluster/mod.rs +++ b/libsql-server/tests/cluster/mod.rs @@ -15,6 +15,7 @@ use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector}; use crate::common::{http::Client, net::SimServer, snapshot_metrics}; mod replica_restart; +mod replication; fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) { init_tracing(); diff --git a/libsql-server/tests/cluster/replication.rs b/libsql-server/tests/cluster/replication.rs new file mode 100644 index 0000000000..4f0fd7bbbd --- /dev/null +++ b/libsql-server/tests/cluster/replication.rs @@ -0,0 +1,137 @@ +use std::sync::Arc; +use std::time::Duration; + +use libsql_server::config::{AdminApiConfig, DbConfig, RpcClientConfig, RpcServerConfig}; +use tokio::sync::Notify; + +use crate::common::{ + http::Client, + net::{SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}, +}; + +/// In this test, we first create a primary with a very small max_log_size, and then add a good +/// amount of data to it. This will cause the primary to create a bunch of snaphots a large enough +/// to prevent the replica from applying them all at once. We then start the replica, and check +/// that it replicates correctly to the primary's replicaton index. +#[test] +fn apply_partial_snapshot() { + let mut sim = turmoil::Builder::new() + .tcp_capacity(4096 * 30) + .simulation_duration(Duration::from_secs(3600)) + .build(); + + let prim_tmp = tempfile::tempdir().unwrap(); + let notify = Arc::new(Notify::new()); + + sim.host("primary", { + let prim_path = prim_tmp.path().to_path_buf(); + move || { + let prim_path = prim_path.clone(); + async move { + let primary = TestServer { + path: prim_path.into(), + db_config: DbConfig { + max_log_size: 1, + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 5050)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + }; + + primary.start_sim(8080).await.unwrap(); + + Ok(()) + } + } + }); + + sim.host("replica", { + let notify = notify.clone(); + move || { + let notify = notify.clone(); + async move { + let tmp = tempfile::tempdir().unwrap(); + let replica = TestServer { + path: tmp.path().to_path_buf().into(), + db_config: DbConfig { + max_log_size: 1, + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:5050".into(), + tls_config: None, + connector: TurmoilConnector, + }), + ..Default::default() + }; + + notify.notified().await; + replica.start_sim(8080).await.unwrap(); + + Ok(()) + } + } + }); + + sim.client("client", async move { + let primary = libsql::Database::open_remote_with_connector( + "http://primary:8080", + "", + TurmoilConnector, + ) + .unwrap(); + let conn = primary.connect().unwrap(); + conn.execute("CREATE TABLE TEST (x)", ()).await.unwrap(); + // we need a sufficiently large snapshot for the test. Before the fix, 5000 insert would + // trigger an infinite loop. + for _ in 0..5000 { + conn.execute("INSERT INTO TEST VALUES (randomblob(6000))", ()) + .await + .unwrap(); + } + + let client = Client::new(); + let resp = client + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap(); + let stats = resp.json_value().await.unwrap(); + let primary_replication_index = stats["replication_index"].as_i64().unwrap(); + + // primary is setup, time to start replica + notify.notify_waiters(); + + let client = Client::new(); + loop { + let resp = client + .get("http://replica:9090/v1/namespaces/default/stats") + .await + .unwrap(); + let stats = resp.json_value().await.unwrap(); + let replication_index = &stats["replication_index"]; + if !replication_index.is_null() { + if replication_index.as_i64().unwrap() == primary_replication_index { + break; + } + } + tokio::time::sleep(Duration::from_millis(1000)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +}