Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Keep track of HTTPServer instances #40

Merged
merged 3 commits into from
Aug 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/HttpServers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Class to keep track of all the HTTP servers started by all the Kafka components in a JVM.
*/
public class HttpServers {

private final static Logger LOG = LoggerFactory.getLogger(HttpServers.class);
private static final Map<Listener, ServerCounter> SERVERS = new HashMap<>();

/**
* Get or create a new HTTP server if there isn't an existing instance for the specified listener.
* @param listener The host and port
* @param registry The Prometheus registry to expose
* @return A ServerCounter instance
* @throws IOException if the HTTP server does not exist and cannot be started
*/
public synchronized static ServerCounter getOrCreate(Listener listener, PrometheusRegistry registry) throws IOException {
Copy link

@k-wall k-wall Aug 21, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The API is a bit weak. It has you believe that you could attach different registries, but it actually ignore it.
I see that both Yammer and Kafka paths pass use the same registry (the default one) so given our use of the class, there's no issue.

I don't really have an alternative suggestion here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah the registry used to be hard coded to the default registry as it's what all components have to use. But because there isn't a mechanism to clear a registry, we allowed passing it in for tests.

ServerCounter serverCounter = SERVERS.get(listener);
if (serverCounter == null) {
serverCounter = new ServerCounter(listener, registry);
SERVERS.put(listener, serverCounter);
}
serverCounter.count.incrementAndGet();
return serverCounter;
}

/**
* Release an HTTP server instance. If no other components hold this instance, it is closed.
* @param serverCounter The HTTP server instance to release
*/
public synchronized static void release(ServerCounter serverCounter) {
if (serverCounter.close()) {
SERVERS.remove(serverCounter.listener);
}
}

/**
* Class used to keep track of the HTTP server started on a listener.
*/
public static class ServerCounter {

private final AtomicInteger count;
private final HTTPServer server;
private final Listener listener;

private ServerCounter(Listener listener, PrometheusRegistry registry) throws IOException {
this.count = new AtomicInteger();
this.server = HTTPServer.builder()
.hostname(listener.host)
.port(listener.port)
.registry(registry)
.buildAndStart();
LOG.debug("Started HTTP server on http://{}:{}", listener.host, server.getPort());
this.listener = listener;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd add a log statement for starting the server.

}

/**
* The port this HTTP server instance is listening on. If the listener port is 0, this returns the actual port
* that is used.
* @return The port number
*/
public int port() {
return server.getPort();
}

private synchronized boolean close() {
int remaining = count.decrementAndGet();
if (remaining == 0) {
server.close();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and log on shutdown

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. I checked that the port is returned (and valid) after close() is called.

LOG.debug("Stopped HTTP server on http://{}:{}", listener.host, server.getPort());
return true;
}
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
package io.strimzi.kafka.metrics;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.instrumentation.jvm.JvmMetrics;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import io.prometheus.metrics.model.snapshots.PrometheusNaming;
Expand Down Expand Up @@ -36,7 +35,7 @@ public class KafkaPrometheusMetricsReporter implements MetricsReporter {
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private PrometheusMetricsReporterConfig config;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the configure method
private Optional<HTTPServer> httpServer;
private Optional<HttpServers.ServerCounter> httpServer;
@SuppressFBWarnings({"UWF_FIELD_NOT_INITIALIZED_IN_CONSTRUCTOR"}) // This field is initialized in the contextChange method
private String prefix;

Expand All @@ -57,7 +56,7 @@ public void configure(Map<String, ?> map) {
config = new PrometheusMetricsReporterConfig(map, registry);
collector = new KafkaMetricsCollector();
// Add JVM metrics
JvmMetrics.builder().register(registry);
JvmMetrics.builder().register();
Copy link
Contributor Author

@mimaison mimaison Aug 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method without an argument is idempotent and registers the JVM metrics onto the default metrics registry, which is what we use at runtime. This allow multiple components to run this code without causing exceptions as the metrics are only added once: http://prometheus.github.io/client_java/api/io/prometheus/metrics/instrumentation/jvm/JvmMetrics.Builder.html#register()

It would be nice to pass a registry and still have the idempotent behavior if it's the default registry. I may open a PR on https://github.com/prometheus/client_java to do that. In the meantime, we have to use to method without arguments.

httpServer = config.startHttpServer();
LOG.debug("KafkaPrometheusMetricsReporter configured with {}", config);
}
Expand Down Expand Up @@ -88,6 +87,7 @@ public void metricRemoval(KafkaMetric metric) {
@Override
public void close() {
registry.unregister(collector);
httpServer.ifPresent(HttpServers::release);
}

@Override
Expand All @@ -111,6 +111,6 @@ public void contextChange(MetricsContext metricsContext) {

// for testing
Optional<Integer> getPort() {
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().getPort() : null);
return Optional.ofNullable(httpServer.isPresent() ? httpServer.get().port() : null);
}
}
74 changes: 74 additions & 0 deletions src/main/java/io/strimzi/kafka/metrics/Listener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static io.strimzi.kafka.metrics.PrometheusMetricsReporterConfig.LISTENER_CONFIG;

/**
* Class parsing and handling the listener specified via {@link PrometheusMetricsReporterConfig#LISTENER_CONFIG} for
* the HTTP server used to expose the metrics.
*/
public class Listener {

private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");

final String host;
final int port;

/* test */ Listener(String host, int port) {
this.host = host;
this.port = port;
}

static Listener parseListener(String listener) {
Matcher matcher = PATTERN.matcher(listener);
if (matcher.matches()) {
String host = matcher.group(1);
int port = Integer.parseInt(matcher.group(2));
return new Listener(host, port);
} else {
throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]");
}
}

@Override
public String toString() {
return "http://" + host + ":" + port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Listener listener = (Listener) o;
return port == listener.port && Objects.equals(host, listener.host);
}

@Override
public int hashCode() {
return Objects.hash(host, port);
}

/**
* Validator to check the user provided listener configuration
*/
static class ListenerValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
Matcher matcher = PATTERN.matcher(String.valueOf(value));
if (!matcher.matches()) {
throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.exporter.httpserver.HTTPServer;
import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
Expand All @@ -13,12 +12,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.BindException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -65,7 +61,7 @@ public class PrometheusMetricsReporterConfig extends AbstractConfig {
private static final String ALLOWLIST_CONFIG_DOC = "A comma separated list of regex patterns to specify the metrics to collect.";

private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC)
.define(LISTENER_CONFIG, ConfigDef.Type.STRING, LISTENER_CONFIG_DEFAULT, new Listener.ListenerValidator(), ConfigDef.Importance.HIGH, LISTENER_CONFIG_DOC)
.define(ALLOWLIST_CONFIG, ConfigDef.Type.LIST, ALLOWLIST_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, ALLOWLIST_CONFIG_DOC)
.define(LISTENER_ENABLE_CONFIG, ConfigDef.Type.BOOLEAN, LISTENER_ENABLE_CONFIG_DEFAULT, ConfigDef.Importance.HIGH, LISTENER_ENABLE_CONFIG_DOC);

Expand Down Expand Up @@ -140,80 +136,20 @@ public String toString() {
/**
* Start the HTTP server for exposing metrics.
*
* @return An optional HTTPServer instance if started successfully, otherwise empty.
* @return An optional ServerCounter instance if {@link #LISTENER_ENABLE_CONFIG} is enabled, otherwise empty.
*/
public synchronized Optional<HTTPServer> startHttpServer() {
public synchronized Optional<HttpServers.ServerCounter> startHttpServer() {
if (!listenerEnabled) {
LOG.info("HTTP server listener not enabled");
return Optional.empty();
}
try {
HTTPServer httpServer = HTTPServer.builder()
.hostname(listener.host)
.port(listener.port)
.registry(registry)
.buildAndStart();
LOG.info("HTTP server started on listener http://{}:{}", listener.host, httpServer.getPort());
return Optional.of(httpServer);
} catch (BindException be) {
LOG.info("HTTP server already started");
return Optional.empty();
HttpServers.ServerCounter server = HttpServers.getOrCreate(listener, registry);
LOG.info("HTTP server listening on http://{}:{}", listener.host, server.port());
return Optional.of(server);
} catch (IOException ioe) {
LOG.error("Failed starting HTTP server", ioe);
throw new RuntimeException(ioe);
}
}

static class Listener {

private static final Pattern PATTERN = Pattern.compile("http://\\[?([0-9a-zA-Z\\-%._:]*)]?:([0-9]+)");

final String host;
final int port;

Listener(String host, int port) {
this.host = host;
this.port = port;
}

static Listener parseListener(String listener) {
Matcher matcher = PATTERN.matcher(listener);
if (matcher.matches()) {
String host = matcher.group(1);
int port = Integer.parseInt(matcher.group(2));
return new Listener(host, port);
} else {
throw new ConfigException(LISTENER_CONFIG, listener, "Listener must be of format http://[host]:[port]");
}
}

@Override
public String toString() {
return "http://" + host + ":" + port;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Listener listener = (Listener) o;
return port == listener.port && Objects.equals(host, listener.host);
}

@Override
public int hashCode() {
return Objects.hash(host, port);
}
}

static class ListenerValidator implements ConfigDef.Validator {

@Override
public void ensureValid(String name, Object value) {
Matcher matcher = Listener.PATTERN.matcher(String.valueOf(value));
if (!matcher.matches()) {
throw new ConfigException(name, value, "Listener must be of format http://[host]:[port]");
}
}
}
}
62 changes: 62 additions & 0 deletions src/test/java/io/strimzi/kafka/metrics/HttpServersTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.kafka.metrics;

import io.prometheus.metrics.model.registry.PrometheusRegistry;
import org.junit.jupiter.api.Test;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;

import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class HttpServersTest {

private final PrometheusRegistry registry = new PrometheusRegistry();

@Test
public void testLifecycle() throws IOException {
Listener listener1 = Listener.parseListener("http://localhost:0");
HttpServers.ServerCounter server1 = HttpServers.getOrCreate(listener1, registry);
assertTrue(listenerStarted(listener1.host, server1.port()));

Listener listener2 = Listener.parseListener("http://localhost:0");
HttpServers.ServerCounter server2 = HttpServers.getOrCreate(listener2, registry);
assertTrue(listenerStarted(listener2.host, server2.port()));
assertSame(server1, server2);

Listener listener3 = Listener.parseListener("http://127.0.0.1:0");
HttpServers.ServerCounter server3 = HttpServers.getOrCreate(listener3, registry);
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server1);
assertTrue(listenerStarted(listener1.host, server1.port()));
assertTrue(listenerStarted(listener2.host, server2.port()));
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server2);
assertFalse(listenerStarted(listener1.host, server1.port()));
assertFalse(listenerStarted(listener2.host, server2.port()));
assertTrue(listenerStarted(listener3.host, server3.port()));

HttpServers.release(server3);
assertFalse(listenerStarted(listener3.host, server3.port()));
}

private boolean listenerStarted(String host, int port) {
try {
URL url = new URL("http://" + host + ":" + port + "/metrics");
HttpURLConnection con = (HttpURLConnection) url.openConnection();
con.setRequestMethod("HEAD");
con.connect();
return con.getResponseCode() == HttpURLConnection.HTTP_OK;
} catch (IOException ioe) {
return false;
}
}
}
Loading