Skip to content

Commit

Permalink
Aggregate timers and histograms
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Feb 21, 2024
1 parent 964cd09 commit dce41b7
Show file tree
Hide file tree
Showing 2 changed files with 149 additions and 34 deletions.
8 changes: 4 additions & 4 deletions crates/symbolicator-service/src/download/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,9 +615,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> {
};

let duration = self.creation_time.elapsed();
let metric_name = format!("{}.duration", self.task_name);
metric!(
timer(&metric_name) = duration,
timer("download_duration") = duration,
"task_name" => self.task_name,
"status" => status,
"source" => self.source_name,
);
Expand All @@ -631,9 +631,9 @@ impl Drop for MeasureSourceDownloadGuard<'_> {
.checked_div(duration.as_millis())
.and_then(|t| t.try_into().ok())
.unwrap_or(bytes_transferred);
let throughput_name = format!("{}.throughput", self.task_name);
metric!(
histogram(&throughput_name) = throughput,
histogram("download_throughput") = throughput,
"task_name" => self.task_name,
"status" => status,
"source" => self.source_name,
);
Expand Down
175 changes: 145 additions & 30 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ impl fmt::Debug for MetricsWrapper {
}
}

/// We are not (yet) aggregating distributions, but keeping every value.
/// To not overwhelm downstream services, we send them in batches instead of all at once.
const DISTRIBUTION_BATCH_SIZE: usize = 20;

/// Creates [`LocalAggregators`] and starts a thread that will periodically
/// send aggregated metrics upstream to the `sink`.
fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators {
Expand All @@ -68,17 +72,23 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L

let thread_fn = move || {
let mut formatted_metric = String::with_capacity(256);
let mut suffix = String::with_capacity(128);
loop {
thread::sleep(Duration::from_secs(5));

let mut total_counters = AggregatedCounters::default();

for local_counters in aggregators.iter() {
let local_counters = {
let mut local_aggregator = local_counters.lock().unwrap();
std::mem::take(&mut local_aggregator.aggregated_counters)
let mut total_distributions = AggregatedDistributions::default();

for local_aggregator in aggregators.iter() {
let (local_counters, local_distributions) = {
let mut local_aggregator = local_aggregator.lock().unwrap();
(
std::mem::take(&mut local_aggregator.aggregated_counters),
std::mem::take(&mut local_aggregator.aggregated_distributions),
)
};

// aggregate all the "counter like" metrics
if total_counters.is_empty() {
total_counters = local_counters;
} else {
Expand All @@ -88,14 +98,26 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
if ty == "|c" {
*aggregated_value += value;
} else if ty == "|g" {
// FIXME: when aggregating multiple thread-locals, we don’t really
// know which one is the "latest". But it also does not really matter.
// FIXME: when aggregating multiple thread-locals,
// we don’t really know which one is the "latest".
// But it also does not really matter that much?
*aggregated_value = value;
}
}
}

// aggregate all the "distribution like" metrics
if total_distributions.is_empty() {
total_distributions = local_distributions;
} else {
for (key, value) in local_distributions {
let aggregated_value = total_distributions.entry(key).or_default();
aggregated_value.extend(value);
}
}
}

// send all the aggregated "counter like" metrics
for (AggregationKey { ty, name, tags }, value) in total_counters.drain() {
let _ = write!(
&mut formatted_metric,
Expand All @@ -115,6 +137,39 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L

formatted_metric.clear();
}

// send all the aggregated "distribution like" metrics
// we do this in a batched manner, as we do not actually *aggregate* them,
// but still send each value individually.
for (AggregationKey { ty, name, tags }, value) in total_distributions.drain() {
suffix.push_str(&formatted_global_tags);

if let Some(tags) = tags {
if formatted_global_tags.is_empty() {
suffix.push_str("|#");
} else {
suffix.push(',');
}
suffix.push_str(&tags);
}

for batch in value.chunks(DISTRIBUTION_BATCH_SIZE) {
formatted_metric.push_str(&prefix);
formatted_metric.push_str(name);

for value in batch {
let _ = write!(&mut formatted_metric, ":{value}");
}

formatted_metric.push_str(ty);
formatted_metric.push_str(&suffix);

let _ = sink.emit(&formatted_metric);
formatted_metric.clear();
}

suffix.clear();
}
}
};

Expand Down Expand Up @@ -146,6 +201,7 @@ impl Deref for MetricsWrapper {
}
}

/// The key by which we group/aggregate metrics.
#[derive(Eq, Ord, PartialEq, PartialOrd, Hash)]
struct AggregationKey {
/// The metric type, pre-formatted as a statsd suffix such as `|c`.
Expand All @@ -157,17 +213,45 @@ struct AggregationKey {
}

type AggregatedCounters = FxHashMap<AggregationKey, i64>;
type AggregatedHistograms = FxHashMap<AggregationKey, Vec<f64>>;
type AggregatedDistributions = FxHashMap<AggregationKey, Vec<f64>>;

pub trait IntoDistributionValue {
fn into_value(self) -> f64;
}

impl IntoDistributionValue for Duration {
fn into_value(self) -> f64 {
self.as_secs_f64() / 1_000.
}
}

impl IntoDistributionValue for usize {
fn into_value(self) -> f64 {
self as f64
}
}

impl IntoDistributionValue for u64 {
fn into_value(self) -> f64 {
self as f64
}
}

impl IntoDistributionValue for i32 {
fn into_value(self) -> f64 {
self as f64
}
}

/// The `thread_local` aggregator which pre-aggregates metrics per-thread.
#[derive(Default)]
pub struct LocalAggregator {
/// A mutable scratch-buffer that is reused to format tags into it.
buf: String,
/// A map of all the `counter` and `gauge` metrics we have aggregated thus far.
aggregated_counters: AggregatedCounters,
/// A map of all the `timer` and `histogram` metrics we have aggregated thus far.
#[allow(unused)] // TODO
aggregated_histograms: AggregatedHistograms,
aggregated_distributions: AggregatedDistributions,
}

impl LocalAggregator {
Expand Down Expand Up @@ -220,6 +304,37 @@ impl LocalAggregator {
let aggregation = self.aggregated_counters.entry(key).or_default();
*aggregation = value as i64;
}

/// Emit a `timer` metric, for which every value is accumulated
pub fn emit_timer(&mut self, name: &'static str, value: f64, tags: &[(&'static str, &str)]) {
let tags = self.format_tags(tags);
self.emit_distribution_inner("|ms", name, value, tags)
}

/// Emit a `histogram` metric, for which every value is accumulated
pub fn emit_histogram(
&mut self,
name: &'static str,
value: f64,
tags: &[(&'static str, &str)],
) {
let tags = self.format_tags(tags);
self.emit_distribution_inner("|h", name, value, tags)
}

/// Emit a distribution metric, which is aggregated by appending to a list of values.
fn emit_distribution_inner(
&mut self,
ty: &'static str,
name: &'static str,
value: f64,
tags: Option<Box<str>>,
) {
let key = AggregationKey { ty, name, tags };

let aggregation = self.aggregated_distributions.entry(key).or_default();
aggregation.push(value);
}
}

/// Tell the metrics system to report to statsd.
Expand Down Expand Up @@ -280,7 +395,7 @@ macro_rules! metric {
// counters
(counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
local.emit_count($id, $value, tags);
Expand All @@ -290,7 +405,7 @@ macro_rules! metric {
// gauges
(gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
local.emit_gauge($id, $value, tags);
Expand All @@ -299,34 +414,34 @@ macro_rules! metric {

// timers
(timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.time_with_tags($id, $value)
$(.with_tag($k, $v))*
.send();
$crate::metrics::with_client(|_client, local| {
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
use $crate::metrics::IntoDistributionValue;
local.emit_timer($id, ($value).into_value(), tags);
});
}};

// we use statsd timers to send things such as filesizes as well.
(time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.time_with_tags($id, $value)
$(.with_tag($k, $v))*
.send();
$crate::metrics::with_client(|_client, local| {
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
use $crate::metrics::IntoDistributionValue;
local.emit_timer($id, ($value).into_value(), tags);
});
}};

// histograms
(histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.histogram_with_tags($id, $value)
$(.with_tag($k, $v))*
.send();
$crate::metrics::with_client(|_client, local| {
let tags: &[(&'static str, &str)] = &[
$(($k, $v)),*
];
use $crate::metrics::IntoDistributionValue;
local.emit_histogram($id, ($value).into_value(), tags);
});
}};
}

0 comments on commit dce41b7

Please sign in to comment.