From dce41b7e3792f86624ccd479c8018355d750bda3 Mon Sep 17 00:00:00 2001
From: Arpad Borsos <arpad.borsos@sentry.io>
Date: Wed, 21 Feb 2024 10:18:31 +0100
Subject: [PATCH] Aggregate timers and histograms

---
 .../symbolicator-service/src/download/mod.rs  |   8 +-
 crates/symbolicator-service/src/metrics.rs    | 175 +++++++++++++++---
 2 files changed, 149 insertions(+), 34 deletions(-)

diff --git a/crates/symbolicator-service/src/download/mod.rs b/crates/symbolicator-service/src/download/mod.rs
index 7d1059f80..306da99af 100644
--- a/crates/symbolicator-service/src/download/mod.rs
+++ b/crates/symbolicator-service/src/download/mod.rs
@@ -615,9 +615,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> {
         };
 
         let duration = self.creation_time.elapsed();
-        let metric_name = format!("{}.duration", self.task_name);
         metric!(
-            timer(&metric_name) = duration,
+            timer("download_duration") = duration,
+            "task_name" => self.task_name,
             "status" => status,
             "source" => self.source_name,
         );
@@ -631,9 +631,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> {
                 .checked_div(duration.as_millis())
                 .and_then(|t| t.try_into().ok())
                 .unwrap_or(bytes_transferred);
-            let throughput_name = format!("{}.throughput", self.task_name);
             metric!(
-                histogram(&throughput_name) = throughput,
+                histogram("download_throughput") = throughput,
+                "task_name" => self.task_name,
                 "status" => status,
                 "source" => self.source_name,
             );
diff --git a/crates/symbolicator-service/src/metrics.rs b/crates/symbolicator-service/src/metrics.rs
index f09e2d1bb..ec5e5f3c3 100644
--- a/crates/symbolicator-service/src/metrics.rs
+++ b/crates/symbolicator-service/src/metrics.rs
@@ -54,6 +54,10 @@ impl fmt::Debug for MetricsWrapper {
     }
 }
 
+/// We are not (yet) aggregating distributions, but keeping every value.
+/// To not overwhelm downstream services, we send them in batches instead of all at once.
+const DISTRIBUTION_BATCH_SIZE: usize = 20;
+
 /// Creates [`LocalAggregators`] and starts a thread that will periodically
 /// send aggregated metrics upstream to the `sink`.
 fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators {
@@ -68,17 +72,23 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
 
     let thread_fn = move || {
         let mut formatted_metric = String::with_capacity(256);
+        let mut suffix = String::with_capacity(128);
         loop {
             thread::sleep(Duration::from_secs(5));
 
             let mut total_counters = AggregatedCounters::default();
-
-            for local_counters in aggregators.iter() {
-                let local_counters = {
-                    let mut local_aggregator = local_counters.lock().unwrap();
-                    std::mem::take(&mut local_aggregator.aggregated_counters)
+            let mut total_distributions = AggregatedDistributions::default();
+
+            for local_aggregator in aggregators.iter() {
+                let (local_counters, local_distributions) = {
+                    let mut local_aggregator = local_aggregator.lock().unwrap();
+                    (
+                        std::mem::take(&mut local_aggregator.aggregated_counters),
+                        std::mem::take(&mut local_aggregator.aggregated_distributions),
+                    )
                 };
 
+                // aggregate all the "counter like" metrics
                 if total_counters.is_empty() {
                     total_counters = local_counters;
                 } else {
@@ -88,14 +98,26 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
                         if ty == "|c" {
                             *aggregated_value += value;
                         } else if ty == "|g" {
-                            // FIXME: when aggregating multiple thread-locals, we don’t really
-                            // know which one is the "latest". But it also does not really matter.
+                            // FIXME: when aggregating multiple thread-locals,
+                            // we don’t really know which one is the "latest".
+                            // But it also does not really matter that much?
                             *aggregated_value = value;
                         }
                     }
                 }
+
+                // aggregate all the "distribution like" metrics
+                if total_distributions.is_empty() {
+                    total_distributions = local_distributions;
+                } else {
+                    for (key, value) in local_distributions {
+                        let aggregated_value = total_distributions.entry(key).or_default();
+                        aggregated_value.extend(value);
+                    }
+                }
             }
 
+            // send all the aggregated "counter like" metrics
             for (AggregationKey { ty, name, tags }, value) in total_counters.drain() {
                 let _ = write!(
                     &mut formatted_metric,
@@ -115,6 +137,39 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
 
                 formatted_metric.clear();
             }
+
+            // send all the aggregated "distribution like" metrics
+            // we do this in a batched manner, as we do not actually *aggregate* them,
+            // but still send each value individually.
+            for (AggregationKey { ty, name, tags }, value) in total_distributions.drain() {
+                suffix.push_str(&formatted_global_tags);
+
+                if let Some(tags) = tags {
+                    if formatted_global_tags.is_empty() {
+                        suffix.push_str("|#");
+                    } else {
+                        suffix.push(',');
+                    }
+                    suffix.push_str(&tags);
+                }
+
+                for batch in value.chunks(DISTRIBUTION_BATCH_SIZE) {
+                    formatted_metric.push_str(&prefix);
+                    formatted_metric.push_str(name);
+
+                    for value in batch {
+                        let _ = write!(&mut formatted_metric, ":{value}");
+                    }
+
+                    formatted_metric.push_str(ty);
+                    formatted_metric.push_str(&suffix);
+
+                    let _ = sink.emit(&formatted_metric);
+                    formatted_metric.clear();
+                }
+
+                suffix.clear();
+            }
         }
     };
 
@@ -146,6 +201,7 @@ impl Deref for MetricsWrapper {
     }
 }
 
+/// The key by which we group/aggregate metrics.
 #[derive(Eq, Ord, PartialEq, PartialOrd, Hash)]
 struct AggregationKey {
     /// The metric type, pre-formatted as a statsd suffix such as `|c`.
@@ -157,8 +213,37 @@ struct AggregationKey {
 }
 
 type AggregatedCounters = FxHashMap<AggregationKey, i64>;
-type AggregatedHistograms = FxHashMap<AggregationKey, Vec<f64>>;
+type AggregatedDistributions = FxHashMap<AggregationKey, Vec<f64>>;
+
+pub trait IntoDistributionValue {
+    fn into_value(self) -> f64;
+}
+
+impl IntoDistributionValue for Duration {
+    fn into_value(self) -> f64 {
+        self.as_secs_f64() / 1_000.
+    }
+}
 
+impl IntoDistributionValue for usize {
+    fn into_value(self) -> f64 {
+        self as f64
+    }
+}
+
+impl IntoDistributionValue for u64 {
+    fn into_value(self) -> f64 {
+        self as f64
+    }
+}
+
+impl IntoDistributionValue for i32 {
+    fn into_value(self) -> f64 {
+        self as f64
+    }
+}
+
+/// The `thread_local` aggregator which pre-aggregates metrics per-thread.
 #[derive(Default)]
 pub struct LocalAggregator {
     /// A mutable scratch-buffer that is reused to format tags into it.
@@ -166,8 +251,7 @@ pub struct LocalAggregator {
     /// A map of all the `counter` and `gauge` metrics we have aggregated thus far.
     aggregated_counters: AggregatedCounters,
     /// A map of all the `timer` and `histogram` metrics we have aggregated thus far.
-    #[allow(unused)] // TODO
-    aggregated_histograms: AggregatedHistograms,
+    aggregated_distributions: AggregatedDistributions,
 }
 
 impl LocalAggregator {
@@ -220,6 +304,37 @@ impl LocalAggregator {
         let aggregation = self.aggregated_counters.entry(key).or_default();
         *aggregation = value as i64;
     }
+
+    /// Emit a `timer` metric, for which every value is accumulated
+    pub fn emit_timer(&mut self, name: &'static str, value: f64, tags: &[(&'static str, &str)]) {
+        let tags = self.format_tags(tags);
+        self.emit_distribution_inner("|ms", name, value, tags)
+    }
+
+    /// Emit a `histogram` metric, for which every value is accumulated
+    pub fn emit_histogram(
+        &mut self,
+        name: &'static str,
+        value: f64,
+        tags: &[(&'static str, &str)],
+    ) {
+        let tags = self.format_tags(tags);
+        self.emit_distribution_inner("|h", name, value, tags)
+    }
+
+    /// Emit a distribution metric, which is aggregated by appending to a list of values.
+    fn emit_distribution_inner(
+        &mut self,
+        ty: &'static str,
+        name: &'static str,
+        value: f64,
+        tags: Option<Box<str>>,
+    ) {
+        let key = AggregationKey { ty, name, tags };
+
+        let aggregation = self.aggregated_distributions.entry(key).or_default();
+        aggregation.push(value);
+    }
 }
 
 /// Tell the metrics system to report to statsd.
@@ -280,7 +395,7 @@ macro_rules! metric {
     // counters
     (counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
         $crate::metrics::with_client(|_client, local| {
-            let tags: &[(&str, &str)] = &[
+            let tags: &[(&'static str, &str)] = &[
                 $(($k, $v)),*
             ];
             local.emit_count($id, $value, tags);
@@ -290,7 +405,7 @@ macro_rules! metric {
     // gauges
     (gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
         $crate::metrics::with_client(|_client, local| {
-            let tags: &[(&str, &str)] = &[
+            let tags: &[(&'static str, &str)] = &[
                 $(($k, $v)),*
             ];
             local.emit_gauge($id, $value, tags);
@@ -299,34 +414,34 @@ macro_rules! metric {
 
     // timers
     (timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
-        $crate::metrics::with_client(|client, _local| {
-            use $crate::metrics::prelude::*;
-            client
-                .time_with_tags($id, $value)
-                $(.with_tag($k, $v))*
-                .send();
+        $crate::metrics::with_client(|_client, local| {
+            let tags: &[(&'static str, &str)] = &[
+                $(($k, $v)),*
+            ];
+            use $crate::metrics::IntoDistributionValue;
+            local.emit_timer($id, ($value).into_value(), tags);
         });
     }};
 
     // we use statsd timers to send things such as filesizes as well.
     (time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
-        $crate::metrics::with_client(|client, _local| {
-            use $crate::metrics::prelude::*;
-            client
-                .time_with_tags($id, $value)
-                $(.with_tag($k, $v))*
-                .send();
+        $crate::metrics::with_client(|_client, local| {
+            let tags: &[(&'static str, &str)] = &[
+                $(($k, $v)),*
+            ];
+            use $crate::metrics::IntoDistributionValue;
+            local.emit_timer($id, ($value).into_value(), tags);
         });
     }};
 
     // histograms
     (histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
-        $crate::metrics::with_client(|client, _local| {
-            use $crate::metrics::prelude::*;
-            client
-                .histogram_with_tags($id, $value)
-                $(.with_tag($k, $v))*
-                .send();
+        $crate::metrics::with_client(|_client, local| {
+            let tags: &[(&'static str, &str)] = &[
+                $(($k, $v)),*
+            ];
+            use $crate::metrics::IntoDistributionValue;
+            local.emit_histogram($id, ($value).into_value(), tags);
         });
     }};
 }