From 75d209fb626da1e21cfb34c46c6973d28e99d2f6 Mon Sep 17 00:00:00 2001 From: Arpad Borsos Date: Thu, 14 Mar 2024 14:43:46 +0100 Subject: [PATCH] Hook up Sentry Metrics to Metrics Aggregator (#1408) --- crates/symbolicator-service/Cargo.toml | 2 +- crates/symbolicator-service/src/metrics.rs | 57 ++++++++++++++++++++-- 2 files changed, 53 insertions(+), 6 deletions(-) diff --git a/crates/symbolicator-service/Cargo.toml b/crates/symbolicator-service/Cargo.toml index 156680503..cdc93caa8 100644 --- a/crates/symbolicator-service/Cargo.toml +++ b/crates/symbolicator-service/Cargo.toml @@ -31,7 +31,7 @@ once_cell = "1.17.1" rand = "0.8.5" reqwest = { version = "0.11.0", features = ["gzip", "brotli", "deflate", "json", "stream", "trust-dns"] } rustc-hash = "1.1.0" -sentry = { version = "0.32.1", features = ["tracing"] } +sentry = { version = "0.32.1", features = ["tracing", "UNSTABLE_metrics"] } serde = { version = "1.0.137", features = ["derive", "rc"] } serde_json = "1.0.81" serde_yaml = "0.9.14" diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs index e9d3ec937..ff15a6d9b 100644 --- a/crates/symbolicator-service/src/metrics.rs +++ b/crates/symbolicator-service/src/metrics.rs @@ -10,6 +10,7 @@ use std::time::Duration; use cadence::{BufferedUdpMetricSink, MetricSink, QueuingMetricSink, StatsdClient}; use crossbeam_utils::CachePadded; use rustc_hash::FxHashMap; +use sentry::metrics::MetricBuilder; use thread_local::ThreadLocal; static METRICS_CLIENT: OnceLock = OnceLock::new(); @@ -98,10 +99,17 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L // send all the aggregated "counter like" metrics for (AggregationKey { ty, name, tags }, value) in total_counters { - let _ = write!( - &mut formatted_metric, - "{prefix}{name}:{value}{ty}{formatted_global_tags}" - ); + formatted_metric.push_str(&prefix); + formatted_metric.push_str(name); + + let sentry_metric = if ty == "|c" { + sentry::metrics::Metric::incr(formatted_metric.clone(), value as f64) + } else { + sentry::metrics::Metric::gauge(formatted_metric.clone(), value as f64) + }; + submit_sentry_metric(sentry_metric, &formatted_global_tags, tags.as_deref()); + + let _ = write!(&mut formatted_metric, ":{value}{ty}{formatted_global_tags}"); if let Some(tags) = tags { if formatted_global_tags.is_empty() { @@ -121,8 +129,21 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L // we do this in a batched manner, as we do not actually *aggregate* them, // but still send each value individually. for (AggregationKey { ty, name, tags }, value) in total_distributions { - suffix.push_str(&formatted_global_tags); + formatted_metric.push_str(&prefix); + formatted_metric.push_str(name); + for value in &value { + let sentry_metric = if ty == "|ms" { + let secs = value / 1_000.; + let duration = Duration::from_secs_f64(secs); + sentry::metrics::Metric::timing(formatted_metric.clone(), duration) + } else { + sentry::metrics::Metric::distribution(formatted_metric.clone(), *value) + }; + submit_sentry_metric(sentry_metric, &formatted_global_tags, tags.as_deref()); + } + formatted_metric.clear(); + suffix.push_str(&formatted_global_tags); if let Some(tags) = tags { if formatted_global_tags.is_empty() { suffix.push_str("|#"); @@ -160,6 +181,32 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L local_aggregators } +/// Submits the given `metric` to Sentry after adding all the necessary tags. +fn submit_sentry_metric( + mut metric: MetricBuilder, + formatted_global_tags: &str, + formatted_tags: Option<&str>, +) { + // All the tags have been pre-formatted into a StatsD-like string to avoid excessive allocations + // in the thread-local aggregator. However that means we have to parse those from strings again + // and copy that into a fresh `String` because the Sentry API expects them that way. + fn add_tags(mut metric: MetricBuilder, formatted_tags: &str) -> MetricBuilder { + for (tag_key, tag_value) in formatted_tags.split(',').filter_map(|t| t.split_once(':')) { + metric = metric.with_tag(tag_key.to_owned(), tag_value.to_owned()); + } + metric + } + + if let Some(formatted_global_tags) = formatted_global_tags.strip_prefix("|#") { + metric = add_tags(metric, formatted_global_tags); + } + if let Some(formatted_tags) = formatted_tags { + metric = add_tags(metric, formatted_tags); + } + + metric.send() +} + fn aggregate_all(aggregators: &LocalAggregators) -> (AggregatedCounters, AggregatedDistributions) { let mut total_counters = AggregatedCounters::default(); let mut total_distributions = AggregatedDistributions::default();