Skip to content

Commit

Permalink
a bit of docs, a bit of aggregating gauges
Browse files Browse the repository at this point in the history
  • Loading branch information
Swatinem committed Feb 20, 2024
1 parent 396f183 commit 964cd09
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 83 deletions.
17 changes: 3 additions & 14 deletions crates/symbolicator-js/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,18 +117,6 @@ impl JsMetrics {
}

pub fn submit_metrics(&self, artifact_bundles: u64) {
metrics::with_client(|client| {
client.with_local_aggregator(|aggregator| {
self.submit_metrics_inner(aggregator, artifact_bundles);
});
})
}

fn submit_metrics_inner(
&self,
aggregator: &mut metrics::LocalAggregator,
artifact_bundles: u64,
) {
metric!(time_raw("js.needed_files") = self.needed_files);
metric!(time_raw("js.api_requests") = self.api_requests);
metric!(time_raw("js.queried_bundles") = self.queried_bundles);
Expand All @@ -137,9 +125,10 @@ impl JsMetrics {
metric!(time_raw("js.fetched_artifacts") = self.fetched_artifacts);
metric!(time_raw("js.scraped_files") = self.scraped_files);

// TODO: we are currently getting these as counters. maybe we want to use `time_raw` to also
// have a per-event avg, etc.
metrics::with_client(|_client, aggregator| self.submit_local_metrics(aggregator))
}

fn submit_local_metrics(&self, aggregator: &mut metrics::LocalAggregator) {
// Sources:
aggregator.emit_count(
"js.found_via_bundle_debugid",
Expand Down
155 changes: 102 additions & 53 deletions crates/symbolicator-service/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ pub mod prelude {
pub use cadence::prelude::*;
}

type LocalAggregators = Arc<ThreadLocal<CachePadded<Mutex<LocalAggregator>>>>;

#[derive(Debug, Clone)]
struct Sink(Arc<QueuingMetricSink>);

Expand All @@ -33,11 +31,14 @@ impl MetricSink for Sink {
}
}

type LocalAggregators = Arc<ThreadLocal<CachePadded<Mutex<LocalAggregator>>>>;

/// The globally configured Metrics, including a `cadence` client, and a local aggregator.
pub struct MetricsWrapper {
/// The raw `cadence` client.
statsd_client: StatsdClient,

/// A thread local aggregator for `count` metrics
/// A thread local aggregator.
local_aggregator: LocalAggregators,
}

Expand All @@ -47,16 +48,18 @@ impl fmt::Debug for MetricsWrapper {
.field("statsd_client", &self.statsd_client)
.field(
"local_aggregator",
&format_args!("LocalAggregator {{ .. }}"),
&format_args!("LocalAggregators {{ .. }}"),
)
.finish()
}
}

/// Creates [`LocalAggregators`] and starts a thread that will periodically
/// send aggregated metrics upstream to the `sink`.
fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> LocalAggregators {
let local_aggregator: LocalAggregators = Default::default();
let local_aggregators: LocalAggregators = Default::default();

let aggregators = Arc::clone(&local_aggregator);
let aggregators = Arc::clone(&local_aggregators);
let prefix = if prefix.is_empty() {
String::new()
} else {
Expand All @@ -68,32 +71,38 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
loop {
thread::sleep(Duration::from_secs(5));

let mut total_aggregations = Aggregations::default();
let mut total_counters = AggregatedCounters::default();

for local_aggregator in aggregators.iter() {
let local_aggregations = {
let mut local_aggregator = local_aggregator.lock().unwrap();
std::mem::take(&mut local_aggregator.aggregations)
for local_counters in aggregators.iter() {
let local_counters = {
let mut local_aggregator = local_counters.lock().unwrap();
std::mem::take(&mut local_aggregator.aggregated_counters)
};

if total_aggregations.is_empty() {
total_aggregations = local_aggregations;
if total_counters.is_empty() {
total_counters = local_counters;
} else {
for (key, value) in local_aggregations {
let aggregated_value = total_aggregations.entry(key).or_default();
*aggregated_value += value
for (key, value) in local_counters {
let ty = key.ty;
let aggregated_value = total_counters.entry(key).or_default();
if ty == "|c" {
*aggregated_value += value;
} else if ty == "|g" {
// FIXME: when aggregating multiple thread-locals, we don’t really
// know which one is the "latest". But it also does not really matter.
*aggregated_value = value;
}
}
}
}

for (AggregationKey { name, tags }, value) in total_aggregations.drain() {
for (AggregationKey { ty, name, tags }, value) in total_counters.drain() {
let _ = write!(
&mut formatted_metric,
"{}{name}:{value}|c{}",
prefix, formatted_global_tags
"{prefix}{name}:{value}{ty}{formatted_global_tags}"
);

if !tags.is_empty() {
if let Some(tags) = tags {
if formatted_global_tags.is_empty() {
formatted_metric.push_str("|#");
} else {
Expand All @@ -114,17 +123,18 @@ fn make_aggregator(prefix: &str, formatted_global_tags: String, sink: Sink) -> L
.spawn(thread_fn)
.unwrap();

local_aggregator
local_aggregators
}

impl MetricsWrapper {
pub fn with_local_aggregator(&self, f: impl FnOnce(&mut LocalAggregator)) {
/// Invokes the provided callback with a mutable reference to a thread-local [`LocalAggregator`].
fn with_local_aggregator(&self, f: impl FnOnce(&Self, &mut LocalAggregator)) {
let mut local_aggregator = self
.local_aggregator
.get_or(Default::default)
.lock()
.unwrap();
f(&mut local_aggregator)
f(self, &mut local_aggregator)
}
}

Expand All @@ -138,37 +148,78 @@ impl Deref for MetricsWrapper {

#[derive(Eq, Ord, PartialEq, PartialOrd, Hash)]
struct AggregationKey {
name: Box<str>,
tags: Box<str>,
/// The metric type, pre-formatted as a statsd suffix such as `|c`.
ty: &'static str,
/// The name of the metric.
name: &'static str,
/// The metric tags, pre-formatted as a statsd suffix, excluding the `|#` prefix.
tags: Option<Box<str>>,
}

type Aggregations = FxHashMap<AggregationKey, i64>;
type AggregatedCounters = FxHashMap<AggregationKey, i64>;
type AggregatedHistograms = FxHashMap<AggregationKey, Vec<f64>>;

#[derive(Default)]
pub struct LocalAggregator {
/// A mutable scratch-buffer that is reused to format tags into it.
buf: String,
aggregations: Aggregations,
/// A map of all the `counter` and `gauge` metrics we have aggregated thus far.
aggregated_counters: AggregatedCounters,
/// A map of all the `timer` and `histogram` metrics we have aggregated thus far.
#[allow(unused)] // TODO
aggregated_histograms: AggregatedHistograms,
}

impl LocalAggregator {
pub fn emit_count(&mut self, name: &str, value: i64, tags: &[(&str, &str)]) {
self.buf.reserve(256);
/// Formats the `tags` into a `statsd` like format with the help of our scratch buffer.
fn format_tags(&mut self, tags: &[(&str, &str)]) -> Option<Box<str>> {
if tags.is_empty() {
return None;
}

// to avoid reallocation, just reserve some space.
// the size is rather arbitrary, but should be large enough for reasonable tags.
self.buf.reserve(128);
for (key, value) in tags {
if !self.buf.is_empty() {
self.buf.push(',');
}
let _ = write!(&mut self.buf, "{key}:{value}");
}
let formatted_tags = self.buf.as_str().into();
self.buf.clear();

Some(formatted_tags)
}

/// Emit a `count` metric, which is aggregated by summing up all values.
pub fn emit_count(&mut self, name: &'static str, value: i64, tags: &[(&'static str, &str)]) {
let tags = self.format_tags(tags);

let key = AggregationKey {
name: name.into(),
tags: self.buf.as_str().into(),
ty: "|c",
name,
tags,
};
self.buf.clear();

let aggregation = self.aggregations.entry(key).or_default();
let aggregation = self.aggregated_counters.entry(key).or_default();
*aggregation += value;
}

/// Emit a `gauge` metric, for which only the latest value is retained.
pub fn emit_gauge(&mut self, name: &'static str, value: u64, tags: &[(&'static str, &str)]) {
let tags = self.format_tags(tags);

let key = AggregationKey {
// TODO: maybe we want to give gauges their own aggregations?
ty: "|g",
name,
tags,
};

let aggregation = self.aggregated_counters.entry(key).or_default();
*aggregation = value as i64;
}
}

/// Tell the metrics system to report to statsd.
Expand All @@ -185,6 +236,7 @@ pub fn configure_statsd<A: ToSocketAddrs>(prefix: &str, host: A, tags: BTreeMap<

let mut builder = StatsdClient::builder(prefix, sink.clone());

// pre-format the global tags in `statsd` format, including a leading `|#`.
let mut formatted_global_tags = String::new();
for (key, value) in tags {
if formatted_global_tags.is_empty() {
Expand All @@ -208,17 +260,17 @@ pub fn configure_statsd<A: ToSocketAddrs>(prefix: &str, host: A, tags: BTreeMap<
METRICS_CLIENT.set(wrapper).unwrap();
}

/// Invoke a callback with the current [`StatsdClient`].
/// Invoke a callback with the current [`MetricsWrapper`] and [`LocalAggregator`].
///
/// If no [`StatsdClient`] is configured the callback is not invoked.
/// If metrics have not been configured, the callback is not invoked.
/// For the most part the [`metric!`](crate::metric) macro should be used instead.
#[inline(always)]
pub fn with_client<F>(f: F)
where
F: FnOnce(&MetricsWrapper),
F: FnOnce(&MetricsWrapper, &mut LocalAggregator),
{
if let Some(client) = METRICS_CLIENT.get() {
f(client)
client.with_local_aggregator(f)
}
}

Expand All @@ -227,30 +279,27 @@ where
macro_rules! metric {
// counters
(counter($id:expr) += $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client| {
client.with_local_aggregator(|local| {
let tags: &[(&str, &str)] = &[
$(($k, $v)),*
];
local.emit_count($id, $value, tags);
});
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
$(($k, $v)),*
];
local.emit_count($id, $value, tags);
});
}};

// gauges
(gauge($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client| {
use $crate::metrics::prelude::*;
client
.gauge_with_tags($id, $value)
$(.with_tag($k, $v))*
.send();
$crate::metrics::with_client(|_client, local| {
let tags: &[(&str, &str)] = &[
$(($k, $v)),*
];
local.emit_gauge($id, $value, tags);
});
}};

// timers
(timer($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client| {
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.time_with_tags($id, $value)
Expand All @@ -261,7 +310,7 @@ macro_rules! metric {

// we use statsd timers to send things such as filesizes as well.
(time_raw($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client| {
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.time_with_tags($id, $value)
Expand All @@ -272,7 +321,7 @@ macro_rules! metric {

// histograms
(histogram($id:expr) = $value:expr $(, $k:expr => $v:expr)* $(,)?) => {{
$crate::metrics::with_client(|client| {
$crate::metrics::with_client(|client, _local| {
use $crate::metrics::prelude::*;
client
.histogram_with_tags($id, $value)
Expand Down
24 changes: 9 additions & 15 deletions crates/symbolicator-service/src/utils/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ use std::time::Instant;

use tokio::task::JoinHandle;

use crate::metrics::{self, prelude::*};

/// Execute a callback on dropping of the container type.
///
/// The callback must not panic under any circumstance. Since it is called while dropping an item,
Expand Down Expand Up @@ -100,12 +98,10 @@ impl<'a> MeasureGuard<'a> {
/// By default, the future is waiting to be polled. `start` emits the `futures.wait_time`
/// metric.
pub fn start(&mut self) {
metrics::with_client(|client| {
client
.time_with_tags("futures.wait_time", self.creation_time.elapsed())
.with_tag("task_name", self.task_name)
.send();
})
metric!(
timer("futures.wait_time") = self.creation_time.elapsed(),
"task_name" => self.task_name,
);
}

/// Marks the future as terminated and emits the `futures.done` metric.
Expand All @@ -120,13 +116,11 @@ impl Drop for MeasureGuard<'_> {
MeasureState::Pending => "canceled",
MeasureState::Done(status) => status,
};
metrics::with_client(|client| {
client
.time_with_tags("futures.done", self.creation_time.elapsed())
.with_tag("task_name", self.task_name)
.with_tag("status", status)
.send();
})
metric!(
timer("futures.done") = self.creation_time.elapsed(),
"task_name" => self.task_name,
"status" => status,
);
}
}

Expand Down
5 changes: 4 additions & 1 deletion crates/symbolicator/src/endpoints/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ where
.as_ref()
.map(|r| r.status())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
metric!(counter(&format!("responses.status_code.{status}")) += 1);
metric!(
counter("responses.status_code") += 1,
"status" => status.as_str(),
);
}
poll
}
Expand Down

0 comments on commit 964cd09

Please sign in to comment.