From 912ac4e362c2658fe131ebaf840d986f5b82200c Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 13 Nov 2023 15:43:32 +0100 Subject: [PATCH 1/3] Gracefully shutdown http services --- libsql-server/src/http/admin/mod.rs | 3 +++ libsql-server/src/http/user/mod.rs | 4 +++- libsql-server/src/lib.rs | 6 ++++++ 3 files changed, 12 insertions(+), 1 deletion(-) diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 4e427307c8..75c2443b8e 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -13,6 +13,7 @@ use std::io::ErrorKind; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use tokio::sync::Notify; use tokio_util::io::ReaderStream; use url::Url; @@ -60,6 +61,7 @@ pub async fn run( namespaces: NamespaceStore, connector: C, disable_metrics: bool, + shutdown: Arc, ) -> anyhow::Result<()> where A: crate::net::Accept, @@ -124,6 +126,7 @@ where hyper::server::Server::builder(acceptor) .serve(router.into_make_service()) + .with_graceful_shutdown(shutdown.notified()) .await .context("Could not bind admin HTTP API server")?; Ok(()) diff --git a/libsql-server/src/http/user/mod.rs b/libsql-server/src/http/user/mod.rs index c6f1390340..bc4edc77dd 100644 --- a/libsql-server/src/http/user/mod.rs +++ b/libsql-server/src/http/user/mod.rs @@ -21,7 +21,7 @@ use hyper::{header, Body, Request, Response, StatusCode}; use serde::de::DeserializeOwned; use serde::Serialize; use serde_json::Number; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{mpsc, oneshot, Notify}; use tokio::task::JoinSet; use tonic::transport::Server; use tower_http::trace::DefaultOnResponse; @@ -237,6 +237,7 @@ pub struct UserApi { pub enable_console: bool, pub self_url: Option, pub path: Arc, + pub shutdown: Arc, } impl UserApi @@ -441,6 +442,7 @@ where join_set.spawn(async move { hyper::server::Server::builder(acceptor) .serve(h2c) + .with_graceful_shutdown(self.shutdown.notified()) .await .context("http server")?; Ok(()) diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index a66b95fecd..0f5ffddd11 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -131,6 +131,7 @@ struct Services { db_config: DbConfig, auth: Arc, path: Arc, + shutdown: Arc, } impl Services @@ -156,6 +157,7 @@ where enable_console: self.user_api_config.enable_http_console, self_url: self.user_api_config.self_url, path: self.path.clone(), + shutdown: self.shutdown.clone(), }; let user_http_service = user_http.configure(join_set); @@ -166,12 +168,14 @@ where disable_metrics, }) = self.admin_api_config { + let shutdown = self.shutdown.clone(); join_set.spawn(http::admin::run( acceptor, user_http_service, self.namespaces, connector, disable_metrics, + shutdown, )); } } @@ -398,6 +402,7 @@ where db_config: self.db_config, auth, path: self.path.clone(), + shutdown: self.shutdown.clone(), }; services.configure(&mut join_set); @@ -433,6 +438,7 @@ where db_config: self.db_config, auth, path: self.path.clone(), + shutdown: self.shutdown.clone(), }; services.configure(&mut join_set); From b9b47ee16cf24fa89c68274d4f7af03da55b916f Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 13 Nov 2023 10:47:56 +0100 Subject: [PATCH 2/3] fix namespace reset on imcompatible log --- libsql-replication/src/replicator.rs | 2 +- libsql-server/src/namespace/mod.rs | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/libsql-replication/src/replicator.rs b/libsql-replication/src/replicator.rs index d3b9b982ff..a9fcab3a18 100644 --- a/libsql-replication/src/replicator.rs +++ b/libsql-replication/src/replicator.rs @@ -157,7 +157,7 @@ impl Replicator { self.has_handshake = true; return Ok(()); } - Err(e @ Error::Fatal(_)) => return Err(e), + Err(e @ (Error::Fatal(_) | Error::Meta(_))) => return Err(e), Err(e) if !error_printed => { tracing::error!("error connecting to primary. retrying. error: {e}"); error_printed = true; diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 4689324673..70eb5b5091 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -575,7 +575,14 @@ impl Namespace { .await?; // force a handshake now, to retrieve the primary's current replication index - replicator.try_perform_handshake().await?; + match replicator.try_perform_handshake().await { + Err(libsql_replication::replicator::Error::Meta(libsql_replication::meta::Error::LogIncompatible)) => { + tracing::error!("trying to replicate incompatible logs, reseting replica"); + (reset)(ResetOp::Reset(name.clone())); + } + Err(e) => Err(e)?, + Ok(_) => (), + } let primary_current_replicatio_index = replicator.client_mut().primary_replication_index; let mut join_set = JoinSet::new(); From 675f4d99b8b5991996c109dd53f5702a125fa008 Mon Sep 17 00:00:00 2001 From: ad hoc Date: Mon, 13 Nov 2023 16:07:04 +0100 Subject: [PATCH 3/3] test replica restart --- libsql-server/src/http/admin/mod.rs | 1 + libsql-server/src/namespace/mod.rs | 4 +- libsql-server/tests/cluster/mod.rs | 2 + .../tests/cluster/replica_restart.rs | 536 ++++++++++++++++++ 4 files changed, 542 insertions(+), 1 deletion(-) create mode 100644 libsql-server/tests/cluster/replica_restart.rs diff --git a/libsql-server/src/http/admin/mod.rs b/libsql-server/src/http/admin/mod.rs index 75c2443b8e..1dfc1af425 100644 --- a/libsql-server/src/http/admin/mod.rs +++ b/libsql-server/src/http/admin/mod.rs @@ -129,6 +129,7 @@ where .with_graceful_shutdown(shutdown.notified()) .await .context("Could not bind admin HTTP API server")?; + Ok(()) } diff --git a/libsql-server/src/namespace/mod.rs b/libsql-server/src/namespace/mod.rs index 70eb5b5091..b14ac6d056 100644 --- a/libsql-server/src/namespace/mod.rs +++ b/libsql-server/src/namespace/mod.rs @@ -576,7 +576,9 @@ impl Namespace { // force a handshake now, to retrieve the primary's current replication index match replicator.try_perform_handshake().await { - Err(libsql_replication::replicator::Error::Meta(libsql_replication::meta::Error::LogIncompatible)) => { + Err(libsql_replication::replicator::Error::Meta( + libsql_replication::meta::Error::LogIncompatible, + )) => { tracing::error!("trying to replicate incompatible logs, reseting replica"); (reset)(ResetOp::Reset(name.clone())); } diff --git a/libsql-server/tests/cluster/mod.rs b/libsql-server/tests/cluster/mod.rs index 0b0b55fe9d..02a39e1c16 100644 --- a/libsql-server/tests/cluster/mod.rs +++ b/libsql-server/tests/cluster/mod.rs @@ -14,6 +14,8 @@ use common::net::{init_tracing, TestServer, TurmoilAcceptor, TurmoilConnector}; use crate::common::{http::Client, net::SimServer, snapshot_metrics}; +mod replica_restart; + fn make_cluster(sim: &mut Sim, num_replica: usize, disable_namespaces: bool) { init_tracing(); let tmp = tempdir().unwrap(); diff --git a/libsql-server/tests/cluster/replica_restart.rs b/libsql-server/tests/cluster/replica_restart.rs new file mode 100644 index 0000000000..40b06edf50 --- /dev/null +++ b/libsql-server/tests/cluster/replica_restart.rs @@ -0,0 +1,536 @@ +use std::sync::Arc; +use std::time::Duration; + +use futures::FutureExt; +use libsql::Database; +use sqld::config::{AdminApiConfig, RpcClientConfig, RpcServerConfig, UserApiConfig}; +use tempfile::tempdir; +use tokio::sync::Notify; +use turmoil::Builder; + +use crate::common::{ + http::Client, + net::{init_tracing, SimServer, TestServer, TurmoilAcceptor, TurmoilConnector}, +}; + +/// In this test, we create a primary and a replica, add some data and sync them. when then shut +/// down and bring back up the replica, and ensure the the replica continue normal mode of +/// operation. +#[test] +fn replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + sim.host("primary", move || { + let path = tmp.path().to_path_buf(); + async move { + let server = TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await?, + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await?, + tls_config: None, + }), + ..Default::default() + }; + + server.start_sim(8080).await?; + + Ok(()) + } + }); + + let notify = Arc::new(Notify::new()); + let tmp = tempdir().unwrap(); + let notify_clone = notify.clone(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + let notify = notify_clone.clone(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + + tokio::select! { + res = server.start_sim(8080) => { + res.unwrap() + } + _ = notify.notified() => (), + } + + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + + // make sure that replica is up to date + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// In this test, we start a primary and a replica. We add some entries to the primary, and wait +/// for the replica to be up to date. Then we stop the primary, remove it's wallog, and restart the +/// primary. This will force the primary to regenerate the log. The replica should catch that, and +/// self heal. During this process the replica is not shutdown. +#[test] +fn primary_regenerate_log_no_replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + init_tracing(); + sim.host("primary", move || { + let notify = notify_clone.clone(); + let path = tmp.path().to_path_buf(); + async move { + let make_server = || async { + TestServer { + path: path.clone().into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + } + }; + let server = make_server().await; + let shutdown = server.shutdown.clone(); + + let fut = async move { server.start_sim(8080).await }; + + tokio::pin!(fut); + + loop { + tokio::select! { + res = &mut fut => { + res.unwrap(); + break + } + _ = notify.notified() => { + shutdown.notify_waiters(); + }, + } + } + // remove the wallog and start again + tokio::fs::remove_file(path.join("dbs/default/wallog")) + .await + .unwrap(); + notify.notify_waiters(); + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + let tmp = tempdir().unwrap(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + notify.notified().await; + + drop(http); + let http = Client::new(); + // make sure that replica is up to date + let new_primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + assert_ne!(primary_index, new_primary_index); + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if new_primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +} + +/// This test is very similar to `primary_regenerate_log_no_replica_restart`. The only difference +/// is that the replica is being shutdown before the primary regenerates their log. When the +/// replica is brought back up, it will try to load the namespace from a primary with a new log, +/// and it should self heal. +#[test] +fn primary_regenerate_log_with_replica_restart() { + let mut sim = Builder::new().build(); + let tmp = tempdir().unwrap(); + + let notify = Arc::new(Notify::new()); + let notify_clone = notify.clone(); + + init_tracing(); + sim.host("primary", move || { + let notify = notify_clone.clone(); + let path = tmp.path().to_path_buf(); + async move { + let make_server = || async { + TestServer { + path: path.clone().into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_server_config: Some(RpcServerConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 4567)).await.unwrap(), + tls_config: None, + }), + ..Default::default() + } + }; + let server = make_server().await; + let shutdown = server.shutdown.clone(); + + let fut = async move { server.start_sim(8080).await }; + + tokio::pin!(fut); + + loop { + tokio::select! { + res = &mut fut => { + res.unwrap(); + break + } + _ = notify.notified() => { + shutdown.notify_waiters(); + }, + } + } + // remove the wallog and start again + tokio::fs::remove_file(path.join("dbs/default/wallog")) + .await + .unwrap(); + notify.notify_waiters(); + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + let tmp = tempdir().unwrap(); + let notify_clone = notify.clone(); + sim.host("replica", move || { + let path = tmp.path().to_path_buf(); + let notify = notify_clone.clone(); + async move { + let make_server = || { + let path = path.clone(); + async { + TestServer { + path: path.into(), + user_api_config: UserApiConfig { + ..Default::default() + }, + admin_api_config: Some(AdminApiConfig { + acceptor: TurmoilAcceptor::bind(([0, 0, 0, 0], 9090)).await.unwrap(), + connector: TurmoilConnector, + disable_metrics: true, + }), + rpc_client_config: Some(RpcClientConfig { + remote_url: "http://primary:4567".into(), + connector: TurmoilConnector, + tls_config: None, + }), + ..Default::default() + } + } + }; + + let server = make_server().await; + let shutdown = server.shutdown.clone(); + let fut = async { + server.start_sim(8080).await.unwrap(); + }; + + tokio::pin!(fut); + let notify_fut = async { + notify.notified().await; + } + .fuse(); + tokio::pin!(notify_fut); + loop { + tokio::select! { + _ = &mut fut => break, + _ = &mut notify_fut => { + shutdown.notify_waiters(); + } + } + } + + // we wait for the server to have restarted + notify.notified().await; + + // and then restart the replica + let server = make_server().await; + server.start_sim(8080).await.unwrap(); + + Ok(()) + } + }); + + sim.client("client", async move { + let http = Client::new(); + let db = Database::open_remote_with_connector("http://primary:8080", "", TurmoilConnector)?; + let conn = db.connect()?; + + // insert a few valued into the primary + conn.execute("create table test (x)", ()).await.unwrap(); + for _ in 0..50 { + conn.execute("insert into test values (42)", ()) + .await + .unwrap(); + } + + let primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + notify.notify_waiters(); + notify.notified().await; + + drop(http); + let http = Client::new(); + // make sure that replica is up to date + let new_primary_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + assert_ne!(primary_index, new_primary_index); + loop { + let replica_index = http + .get("http://primary:9090/v1/namespaces/default/stats") + .await + .unwrap() + .json_value() + .await + .unwrap()["replication_index"] + .clone() + .as_i64(); + if new_primary_index == replica_index { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + + Ok(()) + }); + + sim.run().unwrap(); +}