diff --git a/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/KafkaExtensionProvider.java b/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/KafkaExtensionProvider.java index 38ee2940..2411f104 100644 --- a/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/KafkaExtensionProvider.java +++ b/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/KafkaExtensionProvider.java @@ -13,6 +13,7 @@ import io.streamthoughts.jikkou.core.models.change.ResourceChange; import io.streamthoughts.jikkou.core.resource.ResourceRegistry; import io.streamthoughts.jikkou.kafka.action.KafkaConsumerGroupsResetOffsets; +import io.streamthoughts.jikkou.kafka.action.TruncateKafkaTopicRecords; import io.streamthoughts.jikkou.kafka.collections.V1KafkaBrokerList; import io.streamthoughts.jikkou.kafka.collections.V1KafkaClientQuotaList; import io.streamthoughts.jikkou.kafka.collections.V1KafkaPrincipalAuthorizationList; @@ -171,6 +172,7 @@ public void registerExtensions(@NotNull ExtensionRegistry registry) { // actions registry.register(KafkaConsumerGroupsResetOffsets.class, KafkaConsumerGroupsResetOffsets::new); + registry.register(TruncateKafkaTopicRecords.class, TruncateKafkaTopicRecords::new); } /** diff --git a/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/action/TruncateKafkaTopicRecords.java b/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/action/TruncateKafkaTopicRecords.java new file mode 100644 index 00000000..7c87ac2a --- /dev/null +++ b/providers/jikkou-provider-kafka/src/main/java/io/streamthoughts/jikkou/kafka/action/TruncateKafkaTopicRecords.java @@ -0,0 +1,222 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * Copyright (c) The original authors + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.streamthoughts.jikkou.kafka.action; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import io.streamthoughts.jikkou.common.utils.Pair; +import io.streamthoughts.jikkou.core.action.*; +import io.streamthoughts.jikkou.core.annotation.*; +import io.streamthoughts.jikkou.core.config.ConfigProperty; +import io.streamthoughts.jikkou.core.config.Configuration; +import io.streamthoughts.jikkou.core.extension.ContextualExtension; +import io.streamthoughts.jikkou.core.extension.ExtensionContext; +import io.streamthoughts.jikkou.core.models.BaseHasMetadata; +import io.streamthoughts.jikkou.core.models.ObjectMeta; +import io.streamthoughts.jikkou.kafka.KafkaExtensionProvider; +import io.streamthoughts.jikkou.kafka.internals.admin.AdminClientFactory; +import io.streamthoughts.jikkou.kafka.reconciler.service.KafkaAdminService; +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DeletedRecords; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.beans.ConstructorProperties; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import static io.streamthoughts.jikkou.kafka.reconciler.service.KafkaOffsetSpec.ToTimestamp.fromISODateTime; + +@Named("TruncateKafkaTopicRecords") +@Title("Truncate topic-partitions to a specific datetime.") +@Description(""" + """) +public class TruncateKafkaTopicRecords extends ContextualExtension implements Action { + + private static final Logger LOG = LoggerFactory.getLogger(TruncateKafkaTopicRecords.class); + + interface Config { + ConfigProperty> TOPIC = ConfigProperty.ofList("topic") + .description("The topic whose partitions must be truncated.") + .required(true); + + ConfigProperty TO_DATETIME = ConfigProperty + .ofString("to-datetime") + .description("Truncate topic partitions to offsets for datetime. Format: 'YYYY-MM-DDTHH:mm:SS.sss'") + .required(true); + + ConfigProperty DRY_RUN = ConfigProperty + .ofBoolean("dry-run") + .description("Only show results without executing changes on Kafka topics.") + .defaultValue(false); + } + + private AdminClientFactory adminClientFactory; + + /** + * Creates a new {@link TruncateKafkaTopicRecords} instance. + * Extension requires an empty constructor. + */ + public TruncateKafkaTopicRecords() { + } + + /** + * {@inheritDoc} + */ + @Override + public void init(@NotNull ExtensionContext context) { + super.init(context); + this.adminClientFactory = context.provider().newAdminClientFactory(); + } + + /** + * {@inheritDoc} + */ + @Override + public @NotNull ExecutionResultSet execute(@NotNull Configuration configuration) { + try (AdminClient client = adminClientFactory.createAdminClient()) { + KafkaAdminService service = new KafkaAdminService(client); + + final List topics = Config.TOPIC.get(configuration); + final String dateTime = Config.TO_DATETIME.get(configuration); + final Long timestamp = fromISODateTime(dateTime).timestamp(); + + + List> results = Flux.fromIterable(topics) + .flatMap(topic -> Mono.fromFuture(service.listOffsets(List.of(topic), OffsetSpec.forTimestamp(timestamp))) + .flatMap(offsetsByTopicPartition -> { + + Map recordsToDelete = offsetsByTopicPartition.entrySet() + .stream() + .collect(Collectors.toMap( + Map.Entry::getKey, + it -> RecordsToDelete.beforeOffset(it.getValue().offset()) + ) + ); + + LOG.info("Deleting record for topic '{}' from partition: {}", topic, recordsToDelete); + Map> lowWatermarks = client.deleteRecords(recordsToDelete).lowWatermarks(); + return Flux + .fromStream(lowWatermarks.entrySet().stream().map(Pair::of)) + .flatMap(pair -> + Mono.fromFuture(pair._2().toCompletionStage().toCompletableFuture()) + .map(deleted -> new TopicPartitionLowWatermark(pair._1().partition(), deleted.lowWatermark())) + ) + .collectSortedList(Comparator.comparingInt(TopicPartitionLowWatermark::partition)) + .map(topicPartitionLowWatermarks -> + ExecutionResult.newBuilder() + .status(ExecutionStatus.SUCCEEDED) + .data(new V1TruncatedKafkaTopicRecords(new TruncatedKafkaTopicRecordsResult(topic, topicPartitionLowWatermarks))) + .build() + ).onErrorResume(ex -> + Mono.just(ExecutionResult.newBuilder() + .status(ExecutionStatus.FAILED) + .errors(List.of(new ExecutionError(ex.getLocalizedMessage()))) + .data(new V1TruncatedKafkaTopicRecords(new TruncatedKafkaTopicRecordsResult(topic, null))) + .build() + ) + ); + }) + ) + .collectList() + .block(); + + return ExecutionResultSet.newBuilder() + .results(results) + .build(); + } + } + + /** + * {@inheritDoc} + */ + @Override + public List> configProperties() { + return List.of( + Config.TOPIC, + Config.TO_DATETIME, + Config.DRY_RUN + ); + } + + @ApiVersion("kafka.jikkou.io/v1") + @Kind("TruncatedKafkaTopicRecords") + @JsonPropertyOrder({ + "apiVersion", + "kind", + "metadata", + "result" + }) + public static class V1TruncatedKafkaTopicRecords extends BaseHasMetadata { + + private final TruncatedKafkaTopicRecordsResult result; + + public V1TruncatedKafkaTopicRecords(TruncatedKafkaTopicRecordsResult result) { + this(null, null, null, result); + } + + @ConstructorProperties({ + "apiVersion", + "kind", + "metadata", + "result" + }) + public V1TruncatedKafkaTopicRecords(@Nullable String apiVersion, + @Nullable String kind, + @Nullable ObjectMeta metadata, + TruncatedKafkaTopicRecordsResult result) { + super(apiVersion, kind, metadata); + this.result = result; + } + + public TruncatedKafkaTopicRecordsResult getResult() { + return result; + } + } + + @JsonPropertyOrder({ + "topic", + "partitions" + }) + @Reflectable + public record TruncatedKafkaTopicRecordsResult( + @JsonProperty("topic") + @JsonPropertyDescription("The topic name.") + String topic, + + @JsonProperty("partitions") + @JsonPropertyDescription("The topic partitions for which records was deleted.") + List partitions + ) { + } + + @JsonPropertyOrder({ + "partition", + "lowWatermark" + }) + @Reflectable + public record TopicPartitionLowWatermark( + @JsonProperty("partition") + @JsonPropertyDescription("The topic partition.") + int partition, + + @JsonProperty("lowWatermark") + @JsonPropertyDescription("The low watermark for the topic partition.") + long lowWatermark) { + } +}