diff --git a/crates/symbolicator-js/src/metrics.rs b/crates/symbolicator-js/src/metrics.rs index 85c011504..032b80226 100644 --- a/crates/symbolicator-js/src/metrics.rs +++ b/crates/symbolicator-js/src/metrics.rs @@ -117,18 +117,6 @@ impl JsMetrics { } pub fn submit_metrics(&self, artifact_bundles: u64) { - metrics::with_client(|client| { - client.with_local_aggregator(|aggregator| { - self.submit_metrics_inner(aggregator, artifact_bundles); - }); - }) - } - - fn submit_metrics_inner( - &self, - aggregator: &mut metrics::LocalAggregator, - artifact_bundles: u64, - ) { metric!(time_raw("js.needed_files") = self.needed_files); metric!(time_raw("js.api_requests") = self.api_requests); metric!(time_raw("js.queried_bundles") = self.queried_bundles); @@ -137,9 +125,10 @@ impl JsMetrics { metric!(time_raw("js.fetched_artifacts") = self.fetched_artifacts); metric!(time_raw("js.scraped_files") = self.scraped_files); - // TODO: we are currently getting these as counters. maybe we want to use `time_raw` to also - // have a per-event avg, etc. + metrics::with_client(|_client, aggregator| self.submit_local_metrics(aggregator)) + } + fn submit_local_metrics(&self, aggregator: &mut metrics::LocalAggregator) { // Sources: aggregator.emit_count( "js.found_via_bundle_debugid", diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index 66e92e173..f09e2d1bb 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -19,8 +19,6 @@ pub mod prelude { pub use cadence::prelude::*; } -type LocalAggregators = Arc>>>; - #[derive(Debug, Clone)] struct Sink(Arc); @@ -33,11 +31,14 @@ impl MetricSink for Sink { } } +type LocalAggregators = Arc>>>; + +/// The globally configured Metrics, including a `cadence` client, and a local aggregator. pub struct MetricsWrapper { /// The raw `cadence` client. statsd_client: StatsdClient, - /// A thread local aggregator for `count` metrics + /// A thread local aggregator. local_aggregator: LocalAggregators, } @@ -47,16 +48,18 @@ impl fmt::Debug for MetricsWrapper { .field("statsd_client", &self.statsd_client) .field( "local_aggregator", - &format_args!("LocalAggregator {{ .. }}"), + &format_args!("LocalAggregators {{ .. }}"), ) .finish() } } +/// Creates [`LocalAggregators`] and starts a thread that will periodically +/// send aggregated metrics upstream to the `sink`. fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators { - let local_aggregator: LocalAggregators = Default::default(); + let local_aggregators: LocalAggregators = Default::default(); - let aggregators = Arc::clone(&local_aggregator); + let aggregators = Arc::clone(&local_aggregators); let prefix = if prefix.is_empty() { String::new() } else { @@ -68,32 +71,38 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L loop { thread::sleep(Duration::from_secs(5)); - let mut total_aggregations = Aggregations::default(); + let mut total_counters = AggregatedCounters::default(); - for local_aggregator in aggregators.iter() { - let local_aggregations = { - let mut local_aggregator = local_aggregator.lock().unwrap(); - std::mem::take(&mut local_aggregator.aggregations) + for local_counters in aggregators.iter() { + let local_counters = { + let mut local_aggregator = local_counters.lock().unwrap(); + std::mem::take(&mut local_aggregator.aggregated_counters) }; - if total_aggregations.is_empty() { - total_aggregations = local_aggregations; + if total_counters.is_empty() { + total_counters = local_counters; } else { - for (key, value) in local_aggregations { - let aggregated_value = total_aggregations.entry(key).or_default(); - *aggregated_value += value + for (key, value) in local_counters { + let ty = key.ty; + let aggregated_value = total_counters.entry(key).or_default(); + if ty == "|c" { + *aggregated_value += value; + } else if ty == "|g" { + // FIXME: when aggregating multiple thread-locals, we don’t really + // know which one is the "latest". But it also does not really matter. + *aggregated_value = value; + } } } } - for (AggregationKey { name, tags }, value) in total_aggregations.drain() { + for (AggregationKey { ty, name, tags }, value) in total_counters.drain() { let _ = write!( &mut formatted_metric, - "{}{name}:{value}|c{}", - prefix, formatted_global_tags + "{prefix}{name}:{value}{ty}{formatted_global_tags}" ); - if !tags.is_empty() { + if let Some(tags) = tags { if formatted_global_tags.is_empty() { formatted_metric.push_str("|#"); } else { @@ -114,17 +123,18 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L .spawn(thread_fn) .unwrap(); - local_aggregator + local_aggregators } impl MetricsWrapper { - pub fn with_local_aggregator(&self, f: impl FnOnce(&mut LocalAggregator)) { + /// Invokes the provided callback with a mutable reference to a thread-local [`LocalAggregator`]. + fn with_local_aggregator(&self, f: impl FnOnce(&Self, &mut LocalAggregator)) { let mut local_aggregator = self .local_aggregator .get_or(Default::default) .lock() .unwrap(); - f(&mut local_aggregator) + f(self, &mut local_aggregator) } } @@ -138,37 +148,78 @@ impl Deref for MetricsWrapper { #[derive(Eq, Ord, PartialEq, PartialOrd, Hash)] struct AggregationKey { - name: Box, - tags: Box, + /// The metric type, pre-formatted as a statsd suffix such as `|c`. + ty: &'static str, + /// The name of the metric. + name: &'static str, + /// The metric tags, pre-formatted as a statsd suffix, excluding the `|#` prefix. + tags: Option>, } -type Aggregations = FxHashMap; +type AggregatedCounters = FxHashMap; +type AggregatedHistograms = FxHashMap>; #[derive(Default)] pub struct LocalAggregator { + /// A mutable scratch-buffer that is reused to format tags into it. buf: String, - aggregations: Aggregations, + /// A map of all the `counter` and `gauge` metrics we have aggregated thus far. + aggregated_counters: AggregatedCounters, + /// A map of all the `timer` and `histogram` metrics we have aggregated thus far. + #[allow(unused)] // TODO + aggregated_histograms: AggregatedHistograms, } impl LocalAggregator { - pub fn emit_count(&mut self, name: &str, value: i64, tags: &[(&str, &str)]) { - self.buf.reserve(256); + /// Formats the `tags` into a `statsd` like format with the help of our scratch buffer. + fn format_tags(&mut self, tags: &[(&str, &str)]) -> Option> { + if tags.is_empty() { + return None; + } + + // to avoid reallocation, just reserve some space. + // the size is rather arbitrary, but should be large enough for reasonable tags. + self.buf.reserve(128); for (key, value) in tags { if !self.buf.is_empty() { self.buf.push(','); } let _ = write!(&mut self.buf, "{key}:{value}"); } + let formatted_tags = self.buf.as_str().into(); + self.buf.clear(); + + Some(formatted_tags) + } + + /// Emit a `count` metric, which is aggregated by summing up all values. + pub fn emit_count(&mut self, name: &'static str, value: i64, tags: &[(&'static str, &str)]) { + let tags = self.format_tags(tags); let key = AggregationKey { - name: name.into(), - tags: self.buf.as_str().into(), + ty: "|c", + name, + tags, }; - self.buf.clear(); - let aggregation = self.aggregations.entry(key).or_default(); + let aggregation = self.aggregated_counters.entry(key).or_default(); *aggregation += value; } + + /// Emit a `gauge` metric, for which only the latest value is retained. + pub fn emit_gauge(&mut self, name: &'static str, value: u64, tags: &[(&'static str, &str)]) { + let tags = self.format_tags(tags); + + let key = AggregationKey { + // TODO: maybe we want to give gauges their own aggregations? + ty: "|g", + name, + tags, + }; + + let aggregation = self.aggregated_counters.entry(key).or_default(); + *aggregation = value as i64; + } } /// Tell the metrics system to report to statsd. @@ -185,6 +236,7 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< let mut builder = StatsdClient::builder(prefix, sink.clone()); + // pre-format the global tags in `statsd` format, including a leading `|#`. let mut formatted_global_tags = String::new(); for (key, value) in tags { if formatted_global_tags.is_empty() { @@ -208,17 +260,17 @@ pub fn configure_statsd(prefix: &str, host: A, tags: BTreeMap< METRICS_CLIENT.set(wrapper).unwrap(); } -/// Invoke a callback with the current [`StatsdClient`]. +/// Invoke a callback with the current [`MetricsWrapper`] and [`LocalAggregator`]. /// -/// If no [`StatsdClient`] is configured the callback is not invoked. +/// If metrics have not been configured, the callback is not invoked. /// For the most part the [`metric!`](crate::metric) macro should be used instead. #[inline(always)] pub fn with_client(f: F) where - F: FnOnce(&MetricsWrapper), + F: FnOnce(&MetricsWrapper, &mut LocalAggregator), { if let Some(client) = METRICS_CLIENT.get() { - f(client) + client.with_local_aggregator(f) } } @@ -227,30 +279,27 @@ where macro_rules! metric { // counters (counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client| { - client.with_local_aggregator(|local| { - let tags: &[(&str, &str)] = &[ - $(($k, $v)),* - ]; - local.emit_count($id, $value, tags); - }); + $crate::metrics::with_client(|_client, local| { + let tags: &[(&str, &str)] = &[ + $(($k, $v)),* + ]; + local.emit_count($id, $value, tags); }); }}; // gauges (gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client| { - use $crate::metrics::prelude::*; - client - .gauge_with_tags($id, $value) - $(.with_tag($k, $v))* - .send(); + $crate::metrics::with_client(|_client, local| { + let tags: &[(&str, &str)] = &[ + $(($k, $v)),* + ]; + local.emit_gauge($id, $value, tags); }); }}; // timers (timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client| { + $crate::metrics::with_client(|client, _local| { use $crate::metrics::prelude::*; client .time_with_tags($id, $value) @@ -261,7 +310,7 @@ macro_rules! metric { // we use statsd timers to send things such as filesizes as well. (time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client| { + $crate::metrics::with_client(|client, _local| { use $crate::metrics::prelude::*; client .time_with_tags($id, $value) @@ -272,7 +321,7 @@ macro_rules! metric { // histograms (histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{ - $crate::metrics::with_client(|client| { + $crate::metrics::with_client(|client, _local| { use $crate::metrics::prelude::*; client .histogram_with_tags($id, $value) diff --git a/crates/symbolicator-service/src/utils/futures.rs b/crates/symbolicator-service/src/utils/futures.rs index 391dfe0b2..b944067c2 100644 --- a/crates/symbolicator-service/src/utils/futures.rs +++ b/crates/symbolicator-service/src/utils/futures.rs @@ -3,8 +3,6 @@ use std::time::Instant; use tokio::task::JoinHandle; -use crate::metrics::{self, prelude::*}; - /// Execute a callback on dropping of the container type. /// /// The callback must not panic under any circumstance. Since it is called while dropping an item, @@ -100,12 +98,10 @@ impl<'a> MeasureGuard<'a> { /// By default, the future is waiting to be polled. `start` emits the `futures.wait_time` /// metric. pub fn start(&mut self) { - metrics::with_client(|client| { - client - .time_with_tags("futures.wait_time", self.creation_time.elapsed()) - .with_tag("task_name", self.task_name) - .send(); - }) + metric!( + timer("futures.wait_time") = self.creation_time.elapsed(), + "task_name" => self.task_name, + ); } /// Marks the future as terminated and emits the `futures.done` metric. @@ -120,13 +116,11 @@ impl Drop for MeasureGuard<'_> { MeasureState::Pending => "canceled", MeasureState::Done(status) => status, }; - metrics::with_client(|client| { - client - .time_with_tags("futures.done", self.creation_time.elapsed()) - .with_tag("task_name", self.task_name) - .with_tag("status", status) - .send(); - }) + metric!( + timer("futures.done") = self.creation_time.elapsed(), + "task_name" => self.task_name, + "status" => status, + ); } } diff --git a/crates/symbolicator/src/endpoints/metrics.rs b/crates/symbolicator/src/endpoints/metrics.rs index f1e563a84..a3381c11f 100644 --- a/crates/symbolicator/src/endpoints/metrics.rs +++ b/crates/symbolicator/src/endpoints/metrics.rs @@ -39,7 +39,10 @@ where .as_ref() .map(|r| r.status()) .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); - metric!(counter(&format!("responses.status_code.{status}")) += 1); + metric!( + counter("responses.status_code") += 1, + "status" => status.as_str(), + ); } poll }