From 5984987fd4042daa8b4e0d900144bb69bb0b4e6a Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Thu, 22 Feb 2024 20:36:07 +0100 Subject: [PATCH 1/2] counter for session open/closed --- src/extensions/server/mod.rs | 19 +++++++++-- src/extensions/server/prometheus.rs | 52 +++++++++++++++++++++++++++++ 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 3de4b90..49d1eec 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -4,7 +4,8 @@ use hyper::server::conn::AddrStream; use hyper::service::Service; use hyper::service::{make_service_fn, service_fn}; use jsonrpsee::server::{ - middleware::rpc::RpcServiceBuilder, stop_channel, RandomStringIdProvider, RpcModule, ServerBuilder, ServerHandle, + middleware::rpc::RpcServiceBuilder, stop_channel, ws, RandomStringIdProvider, RpcModule, ServerBuilder, + ServerHandle, }; use jsonrpsee::Methods; use prometheus_endpoint::Registry; @@ -24,7 +25,7 @@ use crate::extensions::rate_limit::{MethodWeights, RateLimitBuilder, XFF}; mod prometheus; mod proxy_get_request; mod ready_get_request; -use crate::extensions::server::prometheus::{MetricPair, PrometheusService}; +use crate::extensions::server::prometheus::{MetricPair, PrometheusService, WsMetrics}; use proxy_get_request::{ProxyGetRequestLayer, ProxyGetRequestMethod}; use ready_get_request::ReadyProxyLayer; @@ -114,6 +115,7 @@ impl SubwayServerBuilder { let handle = stop_handle.clone(); let rpc_module = rpc_module_builder().await?; let metrics: Arc>> = Default::default(); + let ws_metrics = Arc::new(WsMetrics::new(prometheus_registry.as_ref())); // make_service handle each connection let make_service = make_service_fn(move |socket: &AddrStream| { @@ -142,6 +144,7 @@ impl SubwayServerBuilder { let rpc_method_weights = rpc_method_weights.clone(); let prometheus_registry = prometheus_registry.clone(); let metrics = metrics.clone(); + let ws_metrics = ws_metrics.clone(); async move { // service_fn handle each request @@ -150,6 +153,7 @@ impl SubwayServerBuilder { let methods: Methods = rpc_module.clone().into(); let stop_handle = stop_handle.clone(); let http_middleware = http_middleware.clone(); + let ws_metrics = ws_metrics.clone(); if let Some(true) = rate_limit_builder.as_ref().map(|r| r.use_xff()) { socket_ip = req.xxf_ip().unwrap_or(socket_ip); @@ -180,6 +184,17 @@ impl SubwayServerBuilder { .to_service_builder(); let mut service = service_builder.build(methods, stop_handle); + + let is_websocket = ws::is_upgrade_request(&req); + + if is_websocket { + let on_ws_close = service.on_session_closed(); + ws_metrics.ws_open(); + tokio::spawn(async move { + on_ws_close.await; + ws_metrics.ws_closed(); + }); + } service.call(req) })) } diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs index cd5515b..8d4a85f 100644 --- a/src/extensions/server/prometheus.rs +++ b/src/extensions/server/prometheus.rs @@ -8,6 +8,58 @@ use std::sync::{Arc, Mutex}; pub type MetricPair = (Counter, Histogram); +pub enum WsMetrics { + Prometheus(InnerMetrics), + Noop, +} + +impl WsMetrics { + pub fn new(registry: Option<&Registry>) -> Self { + match registry { + None => Self::Noop, + Some(r) => Self::Prometheus(InnerMetrics::new(r)), + } + } + + pub fn ws_open(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_open(); + } + } + + pub fn ws_closed(&self) { + if let Self::Prometheus(inner) = self { + inner.ws_closed(); + } + } +} + +pub struct InnerMetrics { + open_session_count: Counter, + closed_session_count: Counter, +} + +impl InnerMetrics { + fn new(registry: &Registry) -> Self { + let open_counter = Counter::new("open_ws_counter", "No help").unwrap(); + let closed_counter = Counter::new("closed_ws_counter", "No help").unwrap(); + + let open_session_count = register(open_counter, registry).unwrap(); + let closed_session_count = register(closed_counter, registry).unwrap(); + Self { + open_session_count, + closed_session_count, + } + } + fn ws_open(&self) { + self.open_session_count.inc(); + } + + fn ws_closed(&self) { + self.closed_session_count.inc(); + } +} + #[derive(Clone)] pub struct PrometheusService { inner: S, From 530df337c467747e35a84fc864248afef52c584c Mon Sep 17 00:00:00 2001 From: Jan Koscisz Date: Thu, 22 Feb 2024 20:45:17 +0100 Subject: [PATCH 2/2] rm arc --- src/extensions/server/mod.rs | 2 +- src/extensions/server/prometheus.rs | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/extensions/server/mod.rs b/src/extensions/server/mod.rs index 49d1eec..be56408 100644 --- a/src/extensions/server/mod.rs +++ b/src/extensions/server/mod.rs @@ -115,7 +115,7 @@ impl SubwayServerBuilder { let handle = stop_handle.clone(); let rpc_module = rpc_module_builder().await?; let metrics: Arc>> = Default::default(); - let ws_metrics = Arc::new(WsMetrics::new(prometheus_registry.as_ref())); + let ws_metrics = WsMetrics::new(prometheus_registry.as_ref()); // make_service handle each connection let make_service = make_service_fn(move |socket: &AddrStream| { diff --git a/src/extensions/server/prometheus.rs b/src/extensions/server/prometheus.rs index 8d4a85f..68b1990 100644 --- a/src/extensions/server/prometheus.rs +++ b/src/extensions/server/prometheus.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex}; pub type MetricPair = (Counter, Histogram); +#[derive(Clone)] pub enum WsMetrics { Prometheus(InnerMetrics), Noop, @@ -34,6 +35,7 @@ impl WsMetrics { } } +#[derive(Clone)] pub struct InnerMetrics { open_session_count: Counter, closed_session_count: Counter,