Skip to content

Commit

Permalink
Merge pull request #587 from tursodatabase/lucio/client-versions
Browse files Browse the repository at this point in the history
Add client version reporting metrics
  • Loading branch information
MarinPostma authored Nov 8, 2023
2 parents 5df434f + 5b1e1c2 commit 69b0bbc
Show file tree
Hide file tree
Showing 15 changed files with 295 additions and 67 deletions.
13 changes: 6 additions & 7 deletions .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ jobs:
with:
tool: protoc@${{ env.PROTOC_VERSION }}

- uses: taiki-e/install-action@v2
with:
tool: nextest

- uses: actions/checkout@v3

- name: Set up cargo cache
Expand All @@ -129,19 +133,14 @@ jobs:
restore-keys: ${{ runner.os }}-cargo-

- name: Run tests
uses: actions-rs/cargo@v1
with:
command: test
args: --verbose
run: cargo nextest run
env:
LIBSQL_BOTTOMLESS_AWS_ACCESS_KEY_ID: minioadmin
LIBSQL_BOTTOMLESS_AWS_SECRET_ACCESS_KEY: minioadmin
LIBSQL_BOTTOMLESS_AWS_DEFAULT_REGION: eu-central-2
LIBSQL_BOTTOMLESS_BUCKET: bottomless
LIBSQL_BOTTOMLESS_ENDPOINT: http://localhost:9000

# TODO(lucio): Enable this, for some reason github doesn't like this
# but won't tell me why...
# test-rust-wasm:
# runs-on: ubuntu-latest
# name: Run Rust Wasm Tests
Expand Down Expand Up @@ -176,4 +175,4 @@ jobs:
# restore-keys: ${{ runner.os }}-cargo-

# - name: Run check
# command: cargo check --verbose -p libsql --target wasm32-unknown-unknown --no-default-features --features cloudflare
# run: cargo check --verbose -p libsql --target wasm32-unknown-unknown --no-default-features --features cloudflare
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libsql-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ tower-http = { version = "0.3.5", features = ["compression-full", "cors", "trace
tracing = "0.1.37"
tracing-panic = "0.1"
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
http-body = "0.4"
url = { version = "2.3", features = ["serde"] }
uuid = { version = "1.3", features = ["v4", "serde"] }

Expand Down
43 changes: 32 additions & 11 deletions libsql-server/src/h2c.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
//! └────────────►│call axum router │◄───────────┘
//! └─────────────────┘
use std::marker::PhantomData;
use std::pin::Pin;

use axum::{body::BoxBody, http::HeaderValue};
use bytes::Bytes;
use hyper::header;
use hyper::Body;
use hyper::{Request, Response};
Expand All @@ -52,25 +54,31 @@ type BoxError = Box<dyn std::error::Error + Send + Sync>;
/// A `MakeService` adapter for [`H2c`] that injects connection
/// info into the request extensions.
#[derive(Debug, Clone)]
pub struct H2cMaker<S> {
pub struct H2cMaker<S, B> {
s: S,
_pd: PhantomData<fn(B)>,
}

impl<S> H2cMaker<S> {
impl<S, B> H2cMaker<S, B> {
pub fn new(s: S) -> Self {
Self { s }
Self {
s,
_pd: PhantomData,
}
}
}

impl<S, C> Service<&C> for H2cMaker<S>
impl<S, C, B> Service<&C> for H2cMaker<S, B>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S: Service<Request<Body>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
C: crate::net::Conn,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError> + Sync + Send + 'static,
{
type Response = H2c<S>;
type Response = H2c<S, B>;

type Error = hyper::Error;

Expand All @@ -87,25 +95,34 @@ where
fn call(&mut self, conn: &C) -> Self::Future {
let connect_info = conn.connect_info();
let s = self.s.clone();
Box::pin(async move { Ok(H2c { s, connect_info }) })
Box::pin(async move {
Ok(H2c {
s,
connect_info,
_pd: PhantomData,
})
})
}
}

/// A service that can perform `h2c` upgrades and will
/// delegate calls to the inner service once a protocol
/// has been selected.
#[derive(Debug, Clone)]
pub struct H2c<S> {
pub struct H2c<S, B> {
s: S,
connect_info: TcpConnectInfo,
_pd: PhantomData<fn(B)>,
}

impl<S> Service<Request<Body>> for H2c<S>
impl<S, B> Service<Request<Body>> for H2c<S, B>
where
S: Service<Request<Body>, Response = Response<BoxBody>> + Clone + Send + 'static,
S: Service<Request<Body>, Response = Response<B>> + Clone + Send + 'static,
S::Future: Send + 'static,
S::Error: Into<BoxError> + Sync + Send + 'static,
S::Response: Send + 'static,
B: http_body::Body<Data = Bytes> + Send + 'static,
B::Error: Into<BoxError> + Sync + Send + 'static,
{
type Response = hyper::Response<BoxBody>;
type Error = BoxError;
Expand All @@ -130,7 +147,11 @@ where
// the request to the inner service, which in our case is the
// axum router.
if req.headers().get(header::UPGRADE) != Some(&HeaderValue::from_static("h2c")) {
return svc.call(req).await.map_err(Into::into);
return svc
.call(req)
.await
.map(|r| r.map(axum::body::boxed))
.map_err(Into::into);
}

tracing::debug!("Got a h2c upgrade request");
Expand Down
44 changes: 28 additions & 16 deletions libsql-server/src/http/user/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use crate::database::Database;
use crate::error::Error;
use crate::hrana;
use crate::http::user::types::HttpQuery;
use crate::metrics::LEGACY_HTTP_CALL;
use crate::metrics::{CLIENT_VERSION, LEGACY_HTTP_CALL};
use crate::namespace::{MakeNamespace, NamespaceStore};
use crate::net::Accept;
use crate::query::{self, Query};
Expand Down Expand Up @@ -307,8 +307,20 @@ where
path: self.path,
};

fn trace_request<B>(req: &Request<B>, _span: &Span) {
tracing::debug!("got request: {} {}", req.method(), req.uri());
fn trace_request<B>(req: &Request<B>, span: &Span) {
let _s = span.enter();

tracing::debug!(
"got request: {} {} {:?}",
req.method(),
req.uri(),
req.headers()
);
if let Some(v) = req.headers().get("x-libsql-client-version") {
if let Ok(s) = v.to_str() {
metrics::increment_counter!(CLIENT_VERSION, "version" => s.to_string());
}
}
}

macro_rules! handle_hrana {
Expand Down Expand Up @@ -392,7 +404,19 @@ where
)
.with_state(state);

let layered_app = app
// Merge the grpc based axum router into our regular http router
let replication = ReplicationLogServer::new(self.replication_service);
let write_proxy = ProxyServer::new(self.proxy_service);

let grpc_router = Server::builder()
.accept_http1(true)
.add_service(tonic_web::enable(replication))
.add_service(tonic_web::enable(write_proxy))
.into_router();

let router = app.merge(grpc_router);

let router = router
.layer(option_layer(self.idle_shutdown_kicker.clone()))
.layer(
tower_http::trace::TraceLayer::new_for_http()
Expand All @@ -411,18 +435,6 @@ where
.allow_origin(cors::Any),
);

// Merge the grpc based axum router into our regular http router
let replication = ReplicationLogServer::new(self.replication_service);
let write_proxy = ProxyServer::new(self.proxy_service);

let grpc_router = Server::builder()
.accept_http1(true)
.add_service(tonic_web::enable(replication))
.add_service(tonic_web::enable(write_proxy))
.into_router();

let router = layered_app.merge(grpc_router);

let router = router.fallback(handle_fallback);
let h2c = crate::h2c::H2cMaker::new(router);

Expand Down
2 changes: 2 additions & 0 deletions libsql-server/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use metrics::{
};
use once_cell::sync::Lazy;

pub static CLIENT_VERSION: &str = "libsql_client_version";

pub static WRITE_QUERY_COUNT: Lazy<Counter> = Lazy::new(|| {
const NAME: &str = "libsql_server_writes_count";
describe_counter!(NAME, "number of write statements");
Expand Down
50 changes: 48 additions & 2 deletions libsql-server/src/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,12 @@ use rustls::server::AllowAnyAuthenticatedClient;
use rustls::RootCertStore;
use tonic::Status;
use tower::util::option_layer;
use tower::ServiceBuilder;
use tower_http::trace::DefaultOnResponse;
use tracing::Span;

use crate::config::TlsConfig;
use crate::metrics::CLIENT_VERSION;
use crate::namespace::{NamespaceName, NamespaceStore, PrimaryNamespaceMaker};
use crate::rpc::proxy::rpc::proxy_server::ProxyServer;
use crate::rpc::proxy::ProxyService;
Expand Down Expand Up @@ -80,7 +84,19 @@ pub async fn run_rpc_server<A: crate::net::Accept>(
.add_service(ReplicationLogServer::new(logger_service))
.into_router();

let h2c = crate::h2c::H2cMaker::new(router);
let svc = ServiceBuilder::new()
.layer(
tower_http::trace::TraceLayer::new_for_grpc()
.on_request(trace_request)
.on_response(
DefaultOnResponse::new()
.level(tracing::Level::DEBUG)
.latency_unit(tower_http::LatencyUnit::Micros),
),
)
.service(router);

let h2c = crate::h2c::H2cMaker::new(svc);
hyper::server::Server::builder(acceptor)
.serve(h2c)
.await
Expand All @@ -95,7 +111,20 @@ pub async fn run_rpc_server<A: crate::net::Accept>(
.add_service(replication)
.into_router();

let h2c = crate::h2c::H2cMaker::new(router);
let svc = ServiceBuilder::new()
.layer(
tower_http::trace::TraceLayer::new_for_grpc()
.on_request(trace_request)
.on_response(
DefaultOnResponse::new()
.level(tracing::Level::DEBUG)
.latency_unit(tower_http::LatencyUnit::Micros),
),
)
.service(router);

let h2c = crate::h2c::H2cMaker::new(svc);

hyper::server::Server::builder(acceptor)
.serve(h2c)
.await
Expand All @@ -122,3 +151,20 @@ fn extract_namespace<T>(
Err(Status::invalid_argument("Missing x-namespace-bin metadata"))
}
}

fn trace_request<B>(req: &hyper::Request<B>, span: &Span) {
let _s = span.enter();

tracing::debug!(
"rpc request: {} {} {:?}",
req.method(),
req.uri(),
req.headers()
);

if let Some(v) = req.headers().get("x-libsql-client-version") {
if let Ok(s) = v.to_str() {
metrics::increment_counter!(CLIENT_VERSION, "version" => s.to_string());
}
}
}
6 changes: 6 additions & 0 deletions libsql-server/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ impl MetricsSnapshot {
None
}

pub fn snapshot(
&self,
) -> &HashMap<CompositeKey, (Option<Unit>, Option<SharedString>, DebugValue)> {
&self.snapshot
}

#[track_caller]
pub fn assert_gauge(&self, metric_name: &str, value: f64) -> &Self {
let val = self.get_gauge(metric_name).expect("metric does not exist");
Expand Down
14 changes: 12 additions & 2 deletions libsql-server/tests/standalone/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ fn basic_query() {
}

#[test]
#[ignore]
fn basic_metrics() {
let mut sim = turmoil::Builder::new().build();

Expand All @@ -81,7 +80,18 @@ fn basic_metrics() {

tokio::time::sleep(Duration::from_secs(1)).await;

snapshot_metrics().assert_counter("libsql_server_libsql_execute_program", 3);
let snapshot = snapshot_metrics();
snapshot.assert_counter("libsql_server_libsql_execute_program", 3);

for (key, (_, _, val)) in snapshot.snapshot() {
if key.kind() == metrics_util::MetricKind::Counter
&& key.key().name() == "libsql_client_version"
{
assert_eq!(val, &metrics_util::debugging::DebugValue::Counter(3));
let label = key.key().labels().next().unwrap();
assert!(label.value().starts_with("libsql-hrana-"));
}
}

Ok(())
});
Expand Down
2 changes: 1 addition & 1 deletion libsql/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ uuid = { version = "1.4.0", features = ["v4", "serde"], optional = true }
tokio-stream = { version = "0.1.14", optional = true }
tonic = { version = "0.10.2", features = ["tls", "tls-roots", "tls-webpki-roots"], optional = true}
tonic-web = { version = "0.10.2" , optional = true }
tower-http = { version = "0.4.4", features = ["trace", "util"], optional = true }
tower-http = { version = "0.4.4", features = ["trace", "set-header", "util"], optional = true }
http = { version = "0.2", optional = true }

sqlite3-parser = { path = "../vendored/sqlite3-parser", version = "0.11", optional = true }
Expand Down
Loading

0 comments on commit 69b0bbc

Please sign in to comment.