From 7779f05a792a66b65261bb29943022b59f19285a Mon Sep 17 00:00:00 2001 From: ad hoc Date: Tue, 27 Feb 2024 11:13:04 +0100 Subject: [PATCH] simplify server kind configuration. --- libsql-server/src/config.rs | 7 +- libsql-server/src/lib.rs | 409 ++++++++++++++---------------------- 2 files changed, 161 insertions(+), 255 deletions(-) diff --git a/libsql-server/src/config.rs b/libsql-server/src/config.rs index edc3cf5d57..5fdf1da958 100644 --- a/libsql-server/src/config.rs +++ b/libsql-server/src/config.rs @@ -20,8 +20,8 @@ pub struct RpcClientConfig { } impl RpcClientConfig { - pub(crate) async fn configure(self) -> anyhow::Result<(Channel, tonic::transport::Uri)> { - let uri = tonic::transport::Uri::from_maybe_shared(self.remote_url)?; + pub(crate) async fn configure(&self) -> anyhow::Result<(Channel, tonic::transport::Uri)> { + let uri = tonic::transport::Uri::from_maybe_shared(self.remote_url.clone())?; let mut builder = Channel::builder(uri.clone()); if let Some(ref tls_config) = self.tls_config { let cert_pem = std::fs::read_to_string(&tls_config.cert)?; @@ -38,7 +38,8 @@ impl RpcClientConfig { builder = builder.tls_config(tls_config)?; } - let channel = builder.connect_with_connector_lazy(self.connector.map_err(Into::into)); + let channel = + builder.connect_with_connector_lazy(self.connector.clone().map_err(Into::into)); Ok((channel, uri)) } diff --git a/libsql-server/src/lib.rs b/libsql-server/src/lib.rs index 5073feb7b1..40a6e6f662 100644 --- a/libsql-server/src/lib.rs +++ b/libsql-server/src/lib.rs @@ -1,12 +1,11 @@ #![allow(clippy::type_complexity, clippy::too_many_arguments)] -use std::future::Future; use std::path::{Path, PathBuf}; -use std::pin::Pin; use std::str::FromStr; use std::sync::{Arc, Weak}; use crate::connection::{Connection, MakeConnection}; +use crate::database::DatabaseKind; use crate::error::Error; use crate::metrics::DIRTY_STARTUP; use crate::migration::maybe_migrate; @@ -28,10 +27,7 @@ use config::{ use http::user::UserApi; use hyper::client::HttpConnector; use hyper_rustls::HttpsConnector; -use namespace::{ - MakeNamespace, NamespaceBottomlessDbId, NamespaceName, NamespaceStore, PrimaryNamespaceConfig, - PrimaryNamespaceMaker, ReplicaNamespaceConfig, ReplicaNamespaceMaker, -}; +use namespace::{NamespaceConfig, NamespaceName}; use net::Connector; use once_cell::sync::Lazy; use rusqlite::ffi::{sqlite3_config, SQLITE_CONFIG_PCACHE2}; @@ -43,6 +39,7 @@ use url::Url; use utils::services::idle_shutdown::IdleShutdownKicker; use self::config::MetaStoreConfig; +use self::namespace::NamespaceStore; use self::net::AddrIncoming; use self::replication::script_backup_manager::{CommandHandler, ScriptBackupManager}; @@ -129,8 +126,8 @@ impl Default for Server { } } -struct Services { - namespaces: NamespaceStore, +struct Services { + namespace_store: NamespaceStore, idle_shutdown_kicker: Option, proxy_service: P, replication_service: S, @@ -144,9 +141,8 @@ struct Services { shutdown: Arc, } -impl Services +impl Services where - M: MakeNamespace, A: crate::net::Accept, P: Proxy, S: ReplicationLog, @@ -157,7 +153,7 @@ where http_acceptor: self.user_api_config.http_acceptor, hrana_ws_acceptor: self.user_api_config.hrana_ws_acceptor, user_auth_strategy: self.user_auth_strategy, - namespaces: self.namespaces.clone(), + namespaces: self.namespace_store.clone(), idle_shutdown_kicker: self.idle_shutdown_kicker.clone(), proxy_service: self.proxy_service, replication_service: self.replication_service, @@ -182,7 +178,7 @@ where join_set.spawn(http::admin::run( acceptor, user_http_service, - self.namespaces, + self.namespace_store, connector, disable_metrics, shutdown, @@ -311,7 +307,7 @@ where &self, join_set: &mut JoinSet>, stats_receiver: mpsc::Receiver<(NamespaceName, Weak)>, - namespaces: NamespaceStore, + namespaces: NamespaceStore, ) -> anyhow::Result<()> { match self.heartbeat_config { Some(ref config) => { @@ -351,6 +347,31 @@ where Ok(()) } + fn make_services( + self, + namespace_store: NamespaceStore, + idle_shutdown_kicker: Option, + proxy_service: P, + replication_service: L, + user_auth_strategy: Auth, + shutdown: Arc, + ) -> Services { + Services { + namespace_store, + idle_shutdown_kicker, + proxy_service, + replication_service, + user_api_config: self.user_api_config, + admin_api_config: self.admin_api_config, + disable_namespaces: self.disable_namespaces, + disable_default_namespace: self.disable_default_namespace, + db_config: self.db_config, + user_auth_strategy, + path: self.path.clone(), + shutdown, + } + } + pub async fn start(mut self) -> anyhow::Result<()> { static INIT: std::sync::Once = std::sync::Once::new(); let mut join_set = JoinSet::new(); @@ -379,167 +400,44 @@ where let idle_shutdown_kicker = self.setup_shutdown(); let extensions = self.db_config.validate_extensions()?; - let namespace_store_shutdown_fut: Pin> + Send>>; let user_auth_strategy = self.user_api_config.auth_strategy.clone(); let service_shutdown = Arc::new(Notify::new()); - match self.rpc_client_config { - Some(rpc_config) => { - let (stats_sender, stats_receiver) = mpsc::channel(8); - let replica = Replica { - rpc_config, - stats_sender, - extensions, - db_config: self.db_config.clone(), - base_path: self.path.clone(), - user_auth_strategy: user_auth_strategy.clone(), - disable_namespaces: self.disable_namespaces, - max_active_namespaces: self.max_active_namespaces, - meta_store_config: self.meta_store_config.clone(), - max_concurrent_connections: self.max_concurrent_connections, - }; - let (namespaces, proxy_service, replication_service) = replica.configure().await?; - self.rpc_client_config = None; - self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespaces.clone())?; - namespace_store_shutdown_fut = { - let namespaces = namespaces.clone(); - Box::pin(async move { namespaces.shutdown().await }) - }; - - let services = Services { - namespaces, - idle_shutdown_kicker, - proxy_service, - replication_service, - user_api_config: self.user_api_config, - admin_api_config: self.admin_api_config, - disable_namespaces: self.disable_namespaces, - disable_default_namespace: self.disable_default_namespace, - db_config: self.db_config, - user_auth_strategy, - path: self.path.clone(), - shutdown: service_shutdown.clone(), - }; - - services.configure(&mut join_set); - } - None => { - let (stats_sender, stats_receiver) = mpsc::channel(8); - - let primary = Primary { - rpc_config: self.rpc_server_config, - db_config: self.db_config.clone(), - idle_shutdown_kicker: idle_shutdown_kicker.clone(), - stats_sender, - db_is_dirty, - extensions, - base_path: self.path.clone(), - disable_namespaces: self.disable_namespaces, - max_active_namespaces: self.max_active_namespaces, - join_set: &mut join_set, - user_auth_strategy: user_auth_strategy.clone(), - meta_store_config: self.meta_store_config.clone(), - max_concurrent_connections: self.max_concurrent_connections, - }; - - let (namespaces, proxy_service, replication_service) = primary.configure().await?; - self.rpc_server_config = None; - self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespaces.clone())?; - namespace_store_shutdown_fut = { - let namespaces = namespaces.clone(); - Box::pin(async move { namespaces.shutdown().await }) - }; - - let services = Services { - namespaces, - idle_shutdown_kicker, - proxy_service, - replication_service, - user_api_config: self.user_api_config, - admin_api_config: self.admin_api_config, - disable_namespaces: self.disable_namespaces, - disable_default_namespace: self.disable_default_namespace, - db_config: self.db_config, - user_auth_strategy, - path: self.path.clone(), - shutdown: service_shutdown.clone(), - }; - - services.configure(&mut join_set); - } - } - - tokio::select! { - _ = self.shutdown.notified() => { - join_set.shutdown().await; - service_shutdown.notify_waiters(); - namespace_store_shutdown_fut.await?; - // clean shutdown, remove sentinel file - std::fs::remove_file(sentinel_file_path(&self.path))?; - tracing::info!("sqld was shutdown gracefully. Bye!"); - } - Some(res) = join_set.join_next() => { - res??; - }, - else => (), - } - - Ok(()) - } - - fn setup_shutdown(&self) -> Option { - let shutdown_notify = self.shutdown.clone(); - self.idle_shutdown_timeout.map(|d| { - IdleShutdownKicker::new(d, self.initial_idle_shutdown_timeout, shutdown_notify) - }) - } -} - -struct Primary<'a, A> { - rpc_config: Option>, - db_config: DbConfig, - idle_shutdown_kicker: Option, - stats_sender: StatsSender, - db_is_dirty: bool, - extensions: Arc<[PathBuf]>, - base_path: Arc, - disable_namespaces: bool, - max_active_namespaces: usize, - join_set: &'a mut JoinSet>, - meta_store_config: MetaStoreConfig, - max_concurrent_connections: usize, - user_auth_strategy: Auth, -} + let db_kind = if self.rpc_client_config.is_some() { + DatabaseKind::Replica + } else { + DatabaseKind::Primary + }; -impl Primary<'_, A> -where - A: Accept, -{ - async fn configure( - mut self, - ) -> anyhow::Result<( - NamespaceStore, - ProxyService, - ReplicationLogService, - )> { let scripted_backup = match self.db_config.snapshot_exec { - Some(command) => { + Some(ref command) => { let (scripted_backup, script_backup_task) = - ScriptBackupManager::new(&self.base_path, CommandHandler::new(command)).await?; - self.join_set.spawn(script_backup_task.run()); + ScriptBackupManager::new(&self.path, CommandHandler::new(command.to_string())) + .await?; + join_set.spawn(script_backup_task.run()); Some(scripted_backup) } None => None, }; - let conf = PrimaryNamespaceConfig { - base_path: self.base_path.clone(), + let (channel, uri) = match self.rpc_client_config { + Some(ref config) => { + let (channel, uri) = config.configure().await?; + (Some(channel), Some(uri)) + } + None => (None, None), + }; + + let (stats_sender, stats_receiver) = mpsc::channel(8); + let ns_config = NamespaceConfig { + db_kind, + base_path: self.path.clone(), max_log_size: self.db_config.max_log_size, - db_is_dirty: self.db_is_dirty, + db_is_dirty, max_log_duration: self.db_config.max_log_duration.map(Duration::from_secs_f32), bottomless_replication: self.db_config.bottomless_replication.clone(), - extensions: self.extensions, - stats_sender: self.stats_sender.clone(), + extensions, + stats_sender: stats_sender.clone(), max_response_size: self.db_config.max_response_size, max_total_response_size: self.db_config.max_total_response_size, checkpoint_interval: self.db_config.checkpoint_interval, @@ -547,26 +445,29 @@ where max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)), scripted_backup, max_concurrent_requests: self.db_config.max_concurrent_requests, + channel: channel.clone(), + uri: uri.clone(), }; - let factory = PrimaryNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new( - factory, - false, + let namespace_store: NamespaceStore = NamespaceStore::new( + db_kind.is_replica(), self.db_config.snapshot_at_shutdown, self.max_active_namespaces, - &self.base_path, - self.meta_store_config, + &self.path, + self.meta_store_config.clone(), + ns_config, ) .await?; + self.spawn_monitoring_tasks(&mut join_set, stats_receiver, namespace_store.clone())?; + // eagerly load the default namespace when namespaces are disabled - if self.disable_namespaces { - namespaces + if self.disable_namespaces && db_kind.is_primary() { + namespace_store .create( NamespaceName::default(), namespace::RestoreOption::Latest, - NamespaceBottomlessDbId::NotProvided, + Default::default(), ) .await?; } @@ -580,11 +481,12 @@ where } } - if let Some(config) = self.rpc_config.take() { + // configure rpc server + if let Some(config) = self.rpc_server_config.take() { let proxy_service = - ProxyService::new(namespaces.clone(), None, self.disable_namespaces); + ProxyService::new(namespace_store.clone(), None, self.disable_namespaces); // Garbage collect proxy clients every 30 seconds - self.join_set.spawn({ + join_set.spawn({ let clients = proxy_service.clients(); async move { loop { @@ -593,98 +495,101 @@ where } } }); - self.join_set.spawn(run_rpc_server( + join_set.spawn(run_rpc_server( proxy_service, config.acceptor, config.tls_config, - self.idle_shutdown_kicker.clone(), - namespaces.clone(), + idle_shutdown_kicker.clone(), + namespace_store.clone(), self.disable_namespaces, )); } - let logger_service = ReplicationLogService::new( - namespaces.clone(), - self.idle_shutdown_kicker, - Some(self.user_auth_strategy.clone()), - self.disable_namespaces, - true, - ); + let shutdown = self.shutdown.clone(); + let base_path = self.path.clone(); + // setup user-facing rpc services + match db_kind { + DatabaseKind::Primary => { + let replication_svc = ReplicationLogService::new( + namespace_store.clone(), + idle_shutdown_kicker.clone(), + Some(user_auth_strategy.clone()), + self.disable_namespaces, + true, + ); - let proxy_service = ProxyService::new( - namespaces.clone(), - Some(self.user_auth_strategy), - self.disable_namespaces, - ); - // Garbage collect proxy clients every 30 seconds - self.join_set.spawn({ - let clients = proxy_service.clients(); - async move { - loop { - tokio::time::sleep(Duration::from_secs(30)).await; - rpc::proxy::garbage_collect(&mut *clients.write().await).await; - } - } - }); - Ok((namespaces, proxy_service, logger_service)) - } -} + let proxy_svc = ProxyService::new( + namespace_store.clone(), + Some(user_auth_strategy.clone()), + self.disable_namespaces, + ); -struct Replica { - rpc_config: RpcClientConfig, - stats_sender: StatsSender, - extensions: Arc<[PathBuf]>, - db_config: DbConfig, - base_path: Arc, - disable_namespaces: bool, - max_active_namespaces: usize, - meta_store_config: MetaStoreConfig, - max_concurrent_connections: usize, - user_auth_strategy: Auth, -} + // Garbage collect proxy clients every 30 seconds + join_set.spawn({ + let clients = proxy_svc.clients(); + async move { + loop { + tokio::time::sleep(Duration::from_secs(30)).await; + rpc::proxy::garbage_collect(&mut *clients.write().await).await; + } + } + }); -impl Replica { - async fn configure( - self, - ) -> anyhow::Result<( - NamespaceStore, - impl Proxy, - impl ReplicationLog, - )> { - let (channel, uri) = self.rpc_config.configure().await?; - - let conf = ReplicaNamespaceConfig { - channel: channel.clone(), - uri: uri.clone(), - extensions: self.extensions.clone(), - stats_sender: self.stats_sender.clone(), - base_path: self.base_path.clone(), - max_response_size: self.db_config.max_response_size, - max_total_response_size: self.db_config.max_total_response_size, - encryption_config: self.db_config.encryption_config.clone(), - max_concurrent_connections: Arc::new(Semaphore::new(self.max_concurrent_connections)), - max_concurrent_requests: self.db_config.max_concurrent_requests, + self.make_services( + namespace_store.clone(), + idle_shutdown_kicker, + proxy_svc, + replication_svc, + user_auth_strategy.clone(), + service_shutdown.clone(), + ) + .configure(&mut join_set); + } + DatabaseKind::Replica => { + let replication_svc = + ReplicationLogProxyService::new(channel.clone().unwrap(), uri.clone().unwrap()); + let proxy_svc = ReplicaProxyService::new( + channel.clone().unwrap(), + uri.clone().unwrap(), + namespace_store.clone(), + user_auth_strategy.clone(), + self.disable_namespaces, + ); + + self.make_services( + namespace_store.clone(), + idle_shutdown_kicker, + proxy_svc, + replication_svc, + user_auth_strategy, + service_shutdown.clone(), + ) + .configure(&mut join_set); + } }; - let factory = ReplicaNamespaceMaker::new(conf); - let namespaces = NamespaceStore::new( - factory, - true, - false, - self.max_active_namespaces, - &self.base_path, - self.meta_store_config, - ) - .await?; - let replication_service = ReplicationLogProxyService::new(channel.clone(), uri.clone()); - let proxy_service = ReplicaProxyService::new( - channel, - uri, - namespaces.clone(), - self.user_auth_strategy.clone(), - self.disable_namespaces, - ); + tokio::select! { + _ = shutdown.notified() => { + join_set.shutdown().await; + service_shutdown.notify_waiters(); + namespace_store.shutdown().await?; + // clean shutdown, remove sentinel file + std::fs::remove_file(sentinel_file_path(&base_path))?; + tracing::info!("sqld was shutdown gracefully. Bye!"); + } + Some(res) = join_set.join_next() => { + res??; + }, + else => (), + } + + Ok(()) + } - Ok((namespaces, proxy_service, replication_service)) + fn setup_shutdown(&self) -> Option { + let shutdown_notify = self.shutdown.clone(); + self.idle_shutdown_timeout.map(|d| { + IdleShutdownKicker::new(d, self.initial_idle_shutdown_timeout, shutdown_notify) + }) } }