Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move label filtering into KafkaPrometheusReporter and out of the KafkaPrometheusCollector #38

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 21 additions & 59 deletions src/main/java/io/strimzi/kafka/metrics/KafkaMetricsCollector.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,20 @@
*/
package io.strimzi.kafka.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.model.registry.MultiCollector;
import io.prometheus.metrics.model.snapshots.GaugeSnapshot;
import io.prometheus.metrics.model.snapshots.InfoSnapshot;
import io.prometheus.metrics.model.snapshots.Labels;
import io.prometheus.metrics.model.snapshots.MetricSnapshot;
import io.prometheus.metrics.model.snapshots.MetricSnapshots;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand All @@ -31,78 +26,59 @@
public class KafkaMetricsCollector implements MultiCollector {

private static final Logger LOG = LoggerFactory.getLogger(KafkaMetricsCollector.class);

private final Map<MetricName, KafkaMetric> metrics;
private final PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the setPrefix method
private String prefix;
private final Map<MetricName, MetricWrapper> metrics;

/**
* Constructs a new KafkaMetricsCollector with provided configuration.
*
* @param config The configuration for the PrometheusMetricsReporter.
*/
public KafkaMetricsCollector(PrometheusMetricsReporterConfig config) {
this.config = config;
public KafkaMetricsCollector() {
this.metrics = new ConcurrentHashMap<>();
}

/**
* Sets the prefix to be used for metric names. This is always called before addMetric/removeMetric
*
* @param prefix The prefix to set.
*/
public void setPrefix(String prefix) {
this.prefix = PrometheusNaming.prometheusName(prefix);
}

/**
* Adds a Kafka metric to be collected.
* This method is used to add a Kafka metric to the collection for reporting.
* The metric is wrapped in a MetricWrapper object which contains additional information
* such as the prometheus name of the metric.
*
* @param metric The Kafka metric to add.
* @param name The name of the metric in the Kafka system. This is used as the key in the metrics map.
* @param metric The Kafka metric to add. This is wrapped in a MetricWrapper object.
*/
public void addMetric(KafkaMetric metric) {
metrics.put(metric.metricName(), metric);
public void addMetric(MetricName name, MetricWrapper metric) {
metrics.put(name, metric);
}

/**
* Removes a Kafka metric from collection.
*
* @param metric The Kafka metric to remove.
* @param name The Kafka metric to remove.
*/
public void removeMetric(KafkaMetric metric) {
metrics.remove(metric.metricName());
public void removeMetric(MetricName name) {
metrics.remove(name);
}

/**
* Called when the Prometheus server scrapes metrics.
* @return metrics that match the configured allowlist
* @return MetricSnapshots object that contains snapshots of metrics
*/
@Override
public MetricSnapshots collect() {
Map<String, GaugeSnapshot.Builder> gaugeBuilders = new HashMap<>();
Map<String, InfoSnapshot.Builder> infoBuilders = new HashMap<>();

for (Map.Entry<MetricName, KafkaMetric> entry : metrics.entrySet()) {
MetricName metricName = entry.getKey();
KafkaMetric kafkaMetric = entry.getValue();

String prometheusMetricName = MetricWrapper.prometheusName(prefix, metricName);
if (!config.isAllowed(prometheusMetricName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusMetricName);
continue;
}
Labels labels = labelsFromTags(metricName.tags(), metricName.name());
for (Map.Entry<MetricName, MetricWrapper> entry : metrics.entrySet()) {
MetricWrapper metricWrapper = entry.getValue();
String prometheusMetricName = metricWrapper.prometheusName();
Object metric = metricWrapper.value();
Labels labels = metricWrapper.labels();
LOG.debug("Collecting metric {} with the following labels: {}", prometheusMetricName, labels);

Object valueObj = kafkaMetric.metricValue();
if (valueObj instanceof Number) {
double value = ((Number) valueObj).doubleValue();
if (metric instanceof Number) {
double value = ((Number) metric).doubleValue();
GaugeSnapshot.Builder builder = gaugeBuilders.computeIfAbsent(prometheusMetricName, k -> GaugeSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.gaugeDataPoint(labels, value));
} else {
InfoSnapshot.Builder builder = infoBuilders.computeIfAbsent(prometheusMetricName, k -> InfoSnapshot.builder().name(prometheusMetricName));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, valueObj, metricName.name()));
builder.dataPoint(DataPointSnapshotBuilder.infoDataPoint(labels, metric, metricWrapper.attribute()));
}
}
List<MetricSnapshot> snapshots = new ArrayList<>();
Expand All @@ -114,18 +90,4 @@ public MetricSnapshots collect() {
}
return new MetricSnapshots(snapshots);
}

static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
String newLabelName = PrometheusNaming.sanitizeLabelName(label.getKey());
if (labelNames.add(newLabelName)) {
builder.label(newLabelName, label.getValue());
} else {
LOG.warn("Ignoring duplicate label key: {} with value: {} from metric: {} ", newLabelName, label.getValue(), metricName);
}
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsContext;
Expand All @@ -29,12 +30,15 @@
public class KafkaPrometheusMetricsReporter implements MetricsReporter {

private static final Logger LOG = LoggerFactory.getLogger(KafkaPrometheusMetricsReporter.class);

private final PrometheusRegistry registry;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private KafkaMetricsCollector kafkaMetricsCollector;
private KafkaMetricsCollector collector;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HTTPServer> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
private String prefix;

/**
* Constructor
Expand All @@ -50,8 +54,8 @@ public KafkaPrometheusMetricsReporter() {

@Override
public void configure(Map<String, ?> map) {
PrometheusMetricsReporterConfig config = new PrometheusMetricsReporterConfig(map, registry);
kafkaMetricsCollector = new KafkaMetricsCollector(config);
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register(registry);
httpServer = config.startHttpServer();
Expand All @@ -60,25 +64,30 @@ public void configure(Map<String, ?> map) {

@Override
public void init(List<KafkaMetric> metrics) {
registry.register(kafkaMetricsCollector);
registry.register(collector);
for (KafkaMetric metric : metrics) {
metricChange(metric);
}
}

@Override
public void metricChange(KafkaMetric metric) {
kafkaMetricsCollector.addMetric(metric);
String prometheusName = MetricWrapper.prometheusName(prefix, metric.metricName());
if (!config.isAllowed(prometheusName)) {
LOG.trace("Ignoring metric {} as it does not match the allowlist", prometheusName);
} else {
MetricWrapper metricWrapper = new MetricWrapper(prometheusName, metric, metric.metricName().name());
OwenCorrigan76 marked this conversation as resolved.
Show resolved Hide resolved
collector.addMetric(metric.metricName(), metricWrapper);
}
}

@Override
public void metricRemoval(KafkaMetric metric) {
kafkaMetricsCollector.removeMetric(metric);
collector.removeMetric(metric.metricName());
}

@Override
public void close() {
registry.unregister(kafkaMetricsCollector);
registry.unregister(collector);
}

@Override
Expand All @@ -97,7 +106,7 @@ public Set<String> reconfigurableConfigs() {
@Override
public void contextChange(MetricsContext metricsContext) {
String prefix = metricsContext.contextLabels().get(MetricsContext.NAMESPACE);
kafkaMetricsCollector.setPrefix(prefix);
this.prefix = PrometheusNaming.prometheusName(prefix);
}

// for testing
Expand Down
7 changes: 2 additions & 5 deletions src/main/java/io/strimzi/kafka/metrics/MetricWrapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ public class MetricWrapper {
private final Object value;
private final String attribute;

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Constructor from Kafka Metrics
* @param prometheusName The name of the metric in the prometheus format
Expand Down Expand Up @@ -89,7 +88,7 @@ public String attribute() {
return attribute;
}

private static Labels labelsFromScope(String scope, String metricName) {
static Labels labelsFromScope(String scope, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
if (scope != null) {
Expand All @@ -108,8 +107,7 @@ private static Labels labelsFromScope(String scope, String metricName) {
return builder.build();
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
private static Labels labelsFromTags(Map<String, String> tags, String metricName) {
static Labels labelsFromTags(Map<String, String> tags, String metricName) {
Labels.Builder builder = Labels.builder();
Set<String> labelNames = new HashSet<>();
for (Map.Entry<String, String> label : tags.entrySet()) {
Expand Down Expand Up @@ -137,7 +135,6 @@ public static String prometheusName(MetricName metricName) {
metricName.getName()).toLowerCase(Locale.ROOT));
}

// Will be used when implementing https://github.com/strimzi/metrics-reporter/issues/9
/**
* Compute the Prometheus name from a Kafka MetricName
* @param prefix The prefix to add to the metric name
Expand Down
Loading