diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 4eb19bcf82..a6d5fe2969 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -113,6 +113,10 @@ jobs: with: tool: protoc@${{ env.PROTOC_VERSION }} + - uses: taiki-e/install-action@v2 + with: + tool: nextest + - uses: actions/checkout@v3 - name: Set up cargo cache @@ -129,10 +133,7 @@ jobs: restore-keys: ${{ runner.os }}-cargo- - name: Run tests - uses: actions-rs/cargo@v1 - with: - command: test - args: --verbose + run: cargo nextest run env: LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID: minioadmin LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY: minioadmin @@ -140,8 +141,6 @@ jobs: LIBSQL_BOTTOMLESS_BUCKET: bottomless LIBSQL_BOTTOMLESS_ENDPOINT: http://localhost:9000 - # TODO(lucio): Enable this, for some reason github doesn't like this - # but won't tell me why... # test-rust-wasm: # runs-on: ubuntu-latest # name: Run Rust Wasm Tests @@ -176,4 +175,4 @@ jobs: # restore-keys: ${{ runner.os }}-cargo- # - name: Run check - # command: cargo check --verbose -p libsql --target wasm32-unknown-unknown --no-default-features --features cloudflare + # run: cargo check --verbose -p libsql --target wasm32-unknown-unknown --no-default-features --features cloudflare diff --git a/Cargo.lock b/Cargo.lock index 5781991fcf..a377eabd6e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3817,6 +3817,7 @@ dependencies = [ "futures", "futures-core", "hmac", + "http-body", "hyper", "hyper-rustls 0.24.1", "hyper-tungstenite", diff --git a/libsql-server/Cargo.toml b/libsql-server/Cargo.toml index f0e2171916..0c3b203fa4 100644 --- a/libsql-server/Cargo.toml +++ b/libsql-server/Cargo.toml @@ -70,6 +70,7 @@ tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace tracing = "0.1.37" tracing-panic = "0.1" tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } +http-body = "0.4" url = { version = "2.3", features = ["serde"] } uuid = { version = "1.3", features = ["v4", "serde"] } diff --git a/libsql-server/src/h2c.rs b/libsql-server/src/h2c.rs index f48dc37256..93d4999543 100644 --- a/libsql-server/src/h2c.rs +++ b/libsql-server/src/h2c.rs @@ -38,9 +38,11 @@ //! └────────────►│call axum router │◄───────────┘ //! └─────────────────┘ +use std::marker::PhantomData; use std::pin::Pin; use axum::{body::BoxBody, http::HeaderValue}; +use bytes::Bytes; use hyper::header; use hyper::Body; use hyper::{Request, Response}; @@ -52,25 +54,31 @@ type BoxError = Box; /// A `MakeService` adapter for [`H2c`] that injects connection /// info into the request extensions. #[derive(Debug, Clone)] -pub struct H2cMaker { +pub struct H2cMaker { s: S, + _pd: PhantomData, } -impl H2cMaker { +impl H2cMaker { pub fn new(s: S) -> Self { - Self { s } + Self { + s, + _pd: PhantomData, + } } } -impl Service<&C> for H2cMaker +impl Service<&C> for H2cMaker where - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Sync + Send + 'static, S::Response: Send + 'static, C: crate::net::Conn, + B: http_body::Body + Send + 'static, + B::Error: Into + Sync + Send + 'static, { - type Response = H2c; + type Response = H2c; type Error = hyper::Error; @@ -87,7 +95,13 @@ where fn call(&mut self, conn: &C) -> Self::Future { let connect_info = conn.connect_info(); let s = self.s.clone(); - Box::pin(async move { Ok(H2c { s, connect_info }) }) + Box::pin(async move { + Ok(H2c { + s, + connect_info, + _pd: PhantomData, + }) + }) } } @@ -95,17 +109,20 @@ where /// delegate calls to the inner service once a protocol /// has been selected. #[derive(Debug, Clone)] -pub struct H2c { +pub struct H2c { s: S, connect_info: TcpConnectInfo, + _pd: PhantomData, } -impl Service> for H2c +impl Service> for H2c where - S: Service, Response = Response> + Clone + Send + 'static, + S: Service, Response = Response> + Clone + Send + 'static, S::Future: Send + 'static, S::Error: Into + Sync + Send + 'static, S::Response: Send + 'static, + B: http_body::Body + Send + 'static, + B::Error: Into + Sync + Send + 'static, { type Response = hyper::Response; type Error = BoxError; @@ -130,7 +147,11 @@ where // the request to the inner service, which in our case is the // axum router. if req.headers().get(header::UPGRADE) != Some(&HeaderValue::from_static("h2c")) { - return svc.call(req).await.map_err(Into::into); + return svc + .call(req) + .await + .map(|r| r.map(axum::body::boxed)) + .map_err(Into::into); } tracing::debug!("Got a h2c upgrade request"); diff --git a/libsql-server/src/http/user/mod.rs b/libsql-server/src/http/user/mod.rs index 9ce9cc0e92..c6f1390340 100644 --- a/libsql-server/src/http/user/mod.rs +++ b/libsql-server/src/http/user/mod.rs @@ -34,7 +34,7 @@ use crate::database::Database; use crate::error::Error; use crate::hrana; use crate::http::user::types::HttpQuery; -use crate::metrics::LEGACY_HTTP_CALL; +use crate::metrics::{CLIENT_VERSION, LEGACY_HTTP_CALL}; use crate::namespace::{MakeNamespace, NamespaceStore}; use crate::net::Accept; use crate::query::{self, Query}; @@ -307,8 +307,20 @@ where path: self.path, }; - fn trace_request(req: &Request, _span: &Span) { - tracing::debug!("got request: {} {}", req.method(), req.uri()); + fn trace_request(req: &Request, span: &Span) { + let _s = span.enter(); + + tracing::debug!( + "got request: {} {} {:?}", + req.method(), + req.uri(), + req.headers() + ); + if let Some(v) = req.headers().get("x-libsql-client-version") { + if let Ok(s) = v.to_str() { + metrics::increment_counter!(CLIENT_VERSION, "version" => s.to_string()); + } + } } macro_rules! handle_hrana { @@ -392,7 +404,19 @@ where ) .with_state(state); - let layered_app = app + // Merge the grpc based axum router into our regular http router + let replication = ReplicationLogServer::new(self.replication_service); + let write_proxy = ProxyServer::new(self.proxy_service); + + let grpc_router = Server::builder() + .accept_http1(true) + .add_service(tonic_web::enable(replication)) + .add_service(tonic_web::enable(write_proxy)) + .into_router(); + + let router = app.merge(grpc_router); + + let router = router .layer(option_layer(self.idle_shutdown_kicker.clone())) .layer( tower_http::trace::TraceLayer::new_for_http() @@ -411,18 +435,6 @@ where .allow_origin(cors::Any), ); - // Merge the grpc based axum router into our regular http router - let replication = ReplicationLogServer::new(self.replication_service); - let write_proxy = ProxyServer::new(self.proxy_service); - - let grpc_router = Server::builder() - .accept_http1(true) - .add_service(tonic_web::enable(replication)) - .add_service(tonic_web::enable(write_proxy)) - .into_router(); - - let router = layered_app.merge(grpc_router); - let router = router.fallback(handle_fallback); let h2c = crate::h2c::H2cMaker::new(router); diff --git a/libsql-server/src/metrics.rs b/libsql-server/src/metrics.rs index efabf3573e..4faf81901d 100644 --- a/libsql-server/src/metrics.rs +++ b/libsql-server/src/metrics.rs @@ -5,6 +5,8 @@ use metrics::{ }; use once_cell::sync::Lazy; +pub static CLIENT_VERSION: &str = "libsql_client_version"; + pub static WRITE_QUERY_COUNT: Lazy = Lazy::new(|| { const NAME: &str = "libsql_server_writes_count"; describe_counter!(NAME, "number of write statements"); diff --git a/libsql-server/src/rpc/mod.rs b/libsql-server/src/rpc/mod.rs index e81dc1aa10..e927b54985 100644 --- a/libsql-server/src/rpc/mod.rs +++ b/libsql-server/src/rpc/mod.rs @@ -7,8 +7,12 @@ use rustls::server::AllowAnyAuthenticatedClient; use rustls::RootCertStore; use tonic::Status; use tower::util::option_layer; +use tower::ServiceBuilder; +use tower_http::trace::DefaultOnResponse; +use tracing::Span; use crate::config::TlsConfig; +use crate::metrics::CLIENT_VERSION; use crate::namespace::{NamespaceName, NamespaceStore, PrimaryNamespaceMaker}; use crate::rpc::proxy::rpc::proxy_server::ProxyServer; use crate::rpc::proxy::ProxyService; @@ -80,7 +84,19 @@ pub async fn run_rpc_server( .add_service(ReplicationLogServer::new(logger_service)) .into_router(); - let h2c = crate::h2c::H2cMaker::new(router); + let svc = ServiceBuilder::new() + .layer( + tower_http::trace::TraceLayer::new_for_grpc() + .on_request(trace_request) + .on_response( + DefaultOnResponse::new() + .level(tracing::Level::DEBUG) + .latency_unit(tower_http::LatencyUnit::Micros), + ), + ) + .service(router); + + let h2c = crate::h2c::H2cMaker::new(svc); hyper::server::Server::builder(acceptor) .serve(h2c) .await @@ -95,7 +111,20 @@ pub async fn run_rpc_server( .add_service(replication) .into_router(); - let h2c = crate::h2c::H2cMaker::new(router); + let svc = ServiceBuilder::new() + .layer( + tower_http::trace::TraceLayer::new_for_grpc() + .on_request(trace_request) + .on_response( + DefaultOnResponse::new() + .level(tracing::Level::DEBUG) + .latency_unit(tower_http::LatencyUnit::Micros), + ), + ) + .service(router); + + let h2c = crate::h2c::H2cMaker::new(svc); + hyper::server::Server::builder(acceptor) .serve(h2c) .await @@ -122,3 +151,20 @@ fn extract_namespace( Err(Status::invalid_argument("Missing x-namespace-bin metadata")) } } + +fn trace_request(req: &hyper::Request, span: &Span) { + let _s = span.enter(); + + tracing::debug!( + "rpc request: {} {} {:?}", + req.method(), + req.uri(), + req.headers() + ); + + if let Some(v) = req.headers().get("x-libsql-client-version") { + if let Ok(s) = v.to_str() { + metrics::increment_counter!(CLIENT_VERSION, "version" => s.to_string()); + } + } +} diff --git a/libsql-server/tests/common/mod.rs b/libsql-server/tests/common/mod.rs index 3b54b8b1f3..d276ffae32 100644 --- a/libsql-server/tests/common/mod.rs +++ b/libsql-server/tests/common/mod.rs @@ -64,6 +64,12 @@ impl MetricsSnapshot { None } + pub fn snapshot( + &self, + ) -> &HashMap, Option, DebugValue)> { + &self.snapshot + } + #[track_caller] pub fn assert_gauge(&self, metric_name: &str, value: f64) -> &Self { let val = self.get_gauge(metric_name).expect("metric does not exist"); diff --git a/libsql-server/tests/standalone/mod.rs b/libsql-server/tests/standalone/mod.rs index 9cebb9158b..0942e49c34 100644 --- a/libsql-server/tests/standalone/mod.rs +++ b/libsql-server/tests/standalone/mod.rs @@ -59,7 +59,6 @@ fn basic_query() { } #[test] -#[ignore] fn basic_metrics() { let mut sim = turmoil::Builder::new().build(); @@ -81,7 +80,18 @@ fn basic_metrics() { tokio::time::sleep(Duration::from_secs(1)).await; - snapshot_metrics().assert_counter("libsql_server_libsql_execute_program", 3); + let snapshot = snapshot_metrics(); + snapshot.assert_counter("libsql_server_libsql_execute_program", 3); + + for (key, (_, _, val)) in snapshot.snapshot() { + if key.kind() == metrics_util::MetricKind::Counter + && key.key().name() == "libsql_client_version" + { + assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(3)); + let label = key.key().labels().next().unwrap(); + assert!(label.value().starts_with("libsql-hrana-")); + } + } Ok(()) }); diff --git a/libsql/Cargo.toml b/libsql/Cargo.toml index 591e577560..4100bc4ed0 100644 --- a/libsql/Cargo.toml +++ b/libsql/Cargo.toml @@ -32,7 +32,7 @@ uuid = { version = "1.4.0", features = ["v4", "serde"], optional = true } tokio-stream = { version = "0.1.14", optional = true } tonic = { version = "0.10.2", features = ["tls", "tls-roots", "tls-webpki-roots"], optional = true} tonic-web = { version = "0.10.2" , optional = true } -tower-http = { version = "0.4.4", features = ["trace", "util"], optional = true } +tower-http = { version = "0.4.4", features = ["trace", "set-header", "util"], optional = true } http = { version = "0.2", optional = true } sqlite3-parser = { path = "../vendored/sqlite3-parser", version = "0.11", optional = true } diff --git a/libsql/src/database.rs b/libsql/src/database.rs index fd4d33015c..da8087befc 100644 --- a/libsql/src/database.rs +++ b/libsql/src/database.rs @@ -36,6 +36,7 @@ enum DbType { url: String, auth_token: String, connector: crate::util::ConnectorService, + version: Option, }, } @@ -111,6 +112,20 @@ cfg_replication! { Self::open_with_remote_sync_connector(db_path, url, token, http).await } + #[doc(hidden)] + pub async fn open_with_remote_sync_internal( + db_path: impl Into, + url: impl Into, + token: impl Into, + version: Option + ) -> Result { + let mut http = hyper::client::HttpConnector::new(); + http.enforce_http(false); + http.set_nodelay(true); + + Self::open_with_remote_sync_connector_internal(db_path, url, token, http, version).await + } + #[doc(hidden)] pub async fn open_with_remote_sync_connector( db_path: impl Into, @@ -118,6 +133,23 @@ cfg_replication! { token: impl Into, connector: C, ) -> Result + where + C: tower::Service + Send + Clone + Sync + 'static, + C::Response: crate::util::Socket, + C::Future: Send + 'static, + C::Error: Into>, + { + Self::open_with_remote_sync_connector_internal(db_path, url, token, connector, None).await + } + + #[doc(hidden)] + async fn open_with_remote_sync_connector_internal( + db_path: impl Into, + url: impl Into, + token: impl Into, + connector: C, + version: Option + ) -> Result where C: tower::Service + Send + Clone + Sync + 'static, C::Response: crate::util::Socket, @@ -132,11 +164,12 @@ cfg_replication! { let svc = crate::util::ConnectorService::new(svc); - let db = crate::local::Database::open_http_sync( + let db = crate::local::Database::open_http_sync_internal( svc, db_path.into(), url.into(), - token.into() + token.into(), + version ).await?; Ok(Database { @@ -177,13 +210,27 @@ cfg_replication! { } } +impl Database {} + cfg_remote! { impl Database { pub fn open_remote(url: impl Into, auth_token: impl Into) -> Result { let mut connector = hyper::client::HttpConnector::new(); connector.enforce_http(false); - Self::open_remote_with_connector(url, auth_token, connector) + Self::open_remote_with_connector_internal(url, auth_token, connector, None) + } + + #[doc(hidden)] + pub fn open_remote_internal( + url: impl Into, + auth_token: impl Into, + version: impl Into, + ) -> Result { + let mut connector = hyper::client::HttpConnector::new(); + connector.enforce_http(false); + + Self::open_remote_with_connector_internal(url, auth_token, connector, Some(version.into())) } // For now, only expose this for sqld testing purposes @@ -193,6 +240,21 @@ cfg_remote! { auth_token: impl Into, connector: C, ) -> Result + where + C: tower::Service + Send + Clone + Sync + 'static, + C::Response: crate::util::Socket, + C::Future: Send + 'static, + C::Error: Into>, + { + Self::open_remote_with_connector_internal(url, auth_token, connector, None) + } + + fn open_remote_with_connector_internal( + url: impl Into, + auth_token: impl Into, + connector: C, + version: Option + ) -> Result where C: tower::Service + Send + Clone + Sync + 'static, C::Response: crate::util::Socket, @@ -209,6 +271,7 @@ cfg_remote! { url: url.into(), auth_token: auth_token.into(), connector: crate::util::ConnectorService::new(svc), + version, }, }) } @@ -262,12 +325,14 @@ impl Database { url, auth_token, connector, + version, } => { let conn = std::sync::Arc::new( crate::hrana::connection::HttpConnection::new_with_connector( url, auth_token, connector.clone(), + version.as_ref().map(|s| s.as_str()), ), ); diff --git a/libsql/src/hrana/connection.rs b/libsql/src/hrana/connection.rs index 518a501509..1539c60115 100644 --- a/libsql/src/hrana/connection.rs +++ b/libsql/src/hrana/connection.rs @@ -20,10 +20,14 @@ struct Cookie { #[derive(Debug)] pub struct HttpConnection(Arc>); -impl Clone for HttpConnection { - fn clone(&self) -> Self { - HttpConnection(self.0.clone()) - } +#[derive(Debug)] +struct InnerClient { + inner: T, + cookies: RwLock>, + url_for_queries: String, + auth: String, + affected_row_count: AtomicU64, + last_insert_rowid: AtomicI64, } impl HttpConnection @@ -222,12 +226,8 @@ where } } -#[derive(Debug)] -struct InnerClient { - inner: T, - cookies: RwLock>, - url_for_queries: String, - auth: String, - affected_row_count: AtomicU64, - last_insert_rowid: AtomicI64, +impl Clone for HttpConnection { + fn clone(&self) -> Self { + HttpConnection(self.0.clone()) + } } diff --git a/libsql/src/hrana/hyper.rs b/libsql/src/hrana/hyper.rs index 3d60aaf00c..fb924bdda4 100644 --- a/libsql/src/hrana/hyper.rs +++ b/libsql/src/hrana/hyper.rs @@ -6,16 +6,21 @@ use crate::util::ConnectorService; use crate::{Rows, Statement}; use futures::future::BoxFuture; use http::header::AUTHORIZATION; -use http::StatusCode; +use http::{HeaderValue, StatusCode}; use hyper_rustls::{HttpsConnector, HttpsConnectorBuilder}; #[derive(Clone, Debug)] pub struct HttpSender { inner: hyper::Client, hyper::Body>, + version: HeaderValue, } impl HttpSender { - pub fn new(connector: ConnectorService) -> Self { + pub fn new(connector: ConnectorService, version: Option<&str>) -> Self { + let ver = version.unwrap_or_else(|| env!("CARGO_PKG_VERSION")); + + let version = HeaderValue::try_from(format!("libsql-hrana-{ver}")).unwrap(); + let https = HttpsConnectorBuilder::new() .with_native_roots() .https_or_http() @@ -23,12 +28,13 @@ impl HttpSender { .wrap_connector(connector); let inner = hyper::Client::builder().build(https); - Self { inner } + Self { inner, version } } async fn send(&self, url: String, auth: String, body: String) -> Result { let req = hyper::Request::post(url) .header(AUTHORIZATION, auth) + .header("x-libsql-client-version", self.version.clone()) .body(hyper::Body::from(body)) .unwrap(); @@ -72,8 +78,9 @@ impl HttpConnection { url: impl Into, token: impl Into, connector: ConnectorService, + version: Option<&str>, ) -> Self { - let inner = HttpSender::new(connector); + let inner = HttpSender::new(connector, version); Self::new(url.into(), token.into(), inner) } } diff --git a/libsql/src/local/database.rs b/libsql/src/local/database.rs index edcaf92d3f..41c77a8dba 100644 --- a/libsql/src/local/database.rs +++ b/libsql/src/local/database.rs @@ -65,6 +65,46 @@ impl Database { connector, endpoint.as_str().try_into().unwrap(), auth_token, + None, + ) + .unwrap(); + let path = PathBuf::from(db_path); + let client = RemoteClient::new(remote.clone(), &path).await.unwrap(); + let replicator = Mutex::new( + Replicator::new(Either::Left(client), path, 1000) + .await + .unwrap(), + ); + + db.replication_ctx = Some(ReplicationContext { + replicator, + client: Some(remote), + }); + + Ok(db) + } + + #[cfg(feature = "replication")] + #[doc(hidden)] + pub async fn open_http_sync_internal( + connector: crate::util::ConnectorService, + db_path: String, + endpoint: String, + auth_token: String, + version: Option, + ) -> Result { + use std::path::PathBuf; + + use crate::util::coerce_url_scheme; + + let mut db = Database::open(&db_path, OpenFlags::default())?; + + let endpoint = coerce_url_scheme(&endpoint); + let remote = crate::replication::client::Client::new( + connector, + endpoint.as_str().try_into().unwrap(), + auth_token, + version.as_ref().map(String::as_str), ) .unwrap(); let path = PathBuf::from(db_path); diff --git a/libsql/src/replication/client.rs b/libsql/src/replication/client.rs index e6e410b224..afc41e051b 100644 --- a/libsql/src/replication/client.rs +++ b/libsql/src/replication/client.rs @@ -1,10 +1,12 @@ -use std::task::{Context, Poll}; use std::pin::Pin; +use std::task::{Context, Poll}; use anyhow::Context as _; use http::Uri; use hyper_rustls::HttpsConnectorBuilder; -use libsql_replication::rpc::proxy::{proxy_client::ProxyClient, DescribeRequest, DescribeResult, ExecuteResults, ProgramReq}; +use libsql_replication::rpc::proxy::{ + proxy_client::ProxyClient, DescribeRequest, DescribeResult, ExecuteResults, ProgramReq, +}; use libsql_replication::rpc::replication::replication_log_client::ReplicationLogClient; use tonic::{ body::BoxBody, @@ -24,7 +26,6 @@ use crate::util::ConnectorService; use crate::util::box_clone_service::BoxCloneService; - type ResponseBody = trace::ResponseBody< GrpcWebCall, classify::GrpcEosErrorsAsFailures, @@ -45,7 +46,14 @@ impl Client { connector: ConnectorService, origin: Uri, auth_token: impl AsRef, + version: Option<&str>, ) -> anyhow::Result { + let ver = version.unwrap_or_else(|| env!("CARGO_PKG_VERSION")); + + let version: AsciiMetadataValue = format!("libsql-embedded-replica-{ver}") + .try_into() + .context("Invalid client version")?; + let auth_token: AsciiMetadataValue = format!("Bearer {}", auth_token.as_ref()) .try_into() .context("Invalid auth token must be ascii")?; @@ -55,15 +63,18 @@ impl Client { let channel = GrpcChannel::new(connector); - let interceptor = GrpcInterceptor(auth_token, namespace); + let interceptor = GrpcInterceptor { + auth_token, + namespace, + version, + }; let replication = ReplicationLogClient::with_origin( InterceptedService::new(channel.clone(), interceptor.clone()), origin.clone(), ); - let proxy = - ProxyClient::with_origin(InterceptedService::new(channel, interceptor), origin); + let proxy = ProxyClient::with_origin(InterceptedService::new(channel, interceptor), origin); // Remove default tonic `8mb` message limits since fly may buffer // messages causing the msg len to be longer. @@ -149,13 +160,20 @@ impl Service> for GrpcChannel { #[derive(Clone)] /// Contains token and namespace headers to append to every request. -pub struct GrpcInterceptor(AsciiMetadataValue, BinaryMetadataValue); +pub struct GrpcInterceptor { + auth_token: AsciiMetadataValue, + namespace: BinaryMetadataValue, + version: AsciiMetadataValue, +} impl Interceptor for GrpcInterceptor { fn call(&mut self, mut req: tonic::Request<()>) -> Result, tonic::Status> { - req.metadata_mut().insert("x-authorization", self.0.clone()); req.metadata_mut() - .insert_bin("x-namespace-bin", self.1.clone()); + .insert("x-authorization", self.auth_token.clone()); + req.metadata_mut() + .insert_bin("x-namespace-bin", self.namespace.clone()); + req.metadata_mut() + .insert("x-libsql-client-version", self.version.clone()); Ok(req) } }