Skip to content

Commit

Permalink
Support consumer based zombie fencing in single cluster case
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Mar 29, 2024
1 parent d27cb44 commit 79e367a
Show file tree
Hide file tree
Showing 10 changed files with 184 additions and 69 deletions.
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jackson-core = { module = "com.fasterxml.jackson.core:jackson-core", version.ref
jackson-databind = { module = "com.fasterxml.jackson.core:jackson-databind", version.ref = "jackson-ver" }
kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka-ver" }
kafka-connect-api = { module = "org.apache.kafka:connect-api", version.ref = "kafka-ver" }
kafka-connect-runtime = { module = "org.apache.kafka:connect-runtime", version.ref = "kafka-ver" }
kafka-connect-json = { module = "org.apache.kafka:connect-json", version.ref = "kafka-ver" }
kafka-connect-transforms = { module = "org.apache.kafka:connect-transforms", version.ref = "kafka-ver" }
slf4j = { module = "org.slf4j:slf4j-api", version.ref = "slf4j-ver" }
Expand All @@ -60,7 +61,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0"
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"]
iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"]
jackson = ["jackson-core", "jackson-databind"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-runtime", "kafka-connect-json", "kafka-connect-transforms"]


[plugins]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,20 +78,17 @@ public void open(Collection<TopicPartition> partitions) {
// destroy any state if KC re-uses object
clearObjectState();

// TODO: are catalogs thread-safe objects? Both coordinator and worker could use catalog at the
// same time.
// TODO: are Catalogs thread-safe? Coordinator and worker could use Catalog at the same time.
catalog = Utilities.loadCatalog(config);
AdminFactory adminFactory = new AdminFactory();
ConsumerFactory consumerFactory = new ConsumerFactory();
TransactionalProducerFactory producerFactory = new TransactionalProducerFactory();

if (isLeader()) {
LOG.info("Task elected leader");

if (config.controlClusterMode()) {
createControlClusterTopic(adminFactory);
}

startCoordinator(adminFactory, consumerFactory, producerFactory);
}

Expand All @@ -103,44 +100,43 @@ public void open(Collection<TopicPartition> partitions) {
}

private void createControlClusterTopic(Factory<Admin> adminFactory) {
try (Admin sourceAdmin = adminFactory.create(config.sourceClusterKafkaProps())) {
try (Admin controlAdmin = adminFactory.create(config.controlClusterKafkaProps())) {
// TODO a few options here:
// - Create a corresponding topics for each topic we're reading from in control cluster
// - this handles topics.regex case well
// - Create a user-specified control-cluster-source-topic which has at least as many
// partitions as topics we're reading from here (this minimizes number of topics to create)
// - does this even work for multi-topic connectors? No.
// - This means I now to have plumb this special topic name all the way through to our
// PartitionWorker (not too bad but damn it's ugly)
// - Create a user-specific-control-cluster-source-topic and then have a different
// consumer-group-id that we commit to for each topic
// - now the loadOffsets command also has to change to understand this ... it has to know
// all the consumer-group-ids that existed before
// - you want to reset offsets, welcome to nightmare
// - hacky hacky hacky hacky hacky

ConsumerGroupDescription groupDesc =
KafkaUtils.consumerGroupDescription(config.connectGroupId(), sourceAdmin);
if (groupDesc.state() == ConsumerGroupState.STABLE) {
List<NewTopic> newTopics =
groupDesc.members().stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.toSet()))
.entrySet()
.stream()
.map(e -> new NewTopic(e.getKey(), e.getValue().size(), (short) 1))
.collect(Collectors.toList());

// TODO: what happens if topic already exists? :)
try {
controlAdmin.createTopics(newTopics).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
throw new NotRunningException("Group not stable");
try (Admin sourceAdmin = adminFactory.create(config.sourceClusterKafkaProps());
Admin controlAdmin = adminFactory.create(config.controlClusterKafkaProps())) {
// TODO a few options here:
// - Create a corresponding topics for each topic we're reading from in control cluster
// - this handles topics.regex case well
// - Create a user-specified control-cluster-source-topic which has at least as many
// partitions as topics we're reading from here (this minimizes number of topics to create)
// - does this even work for multi-topic connectors? No.
// - This means I now to have plumb this special topic name all the way through to our
// PartitionWorker (not too bad but damn it's ugly)
// - Create a user-specific-control-cluster-source-topic and then have a different
// consumer-group-id that we commit to for each topic
// - now the loadOffsets command also has to change to understand this ... it has to know
// all the consumer-group-ids that existed before
// - you want to reset offsets, welcome to nightmare
// - hacky hacky hacky hacky hacky

ConsumerGroupDescription groupDesc =
KafkaUtils.consumerGroupDescription(config.connectGroupId(), sourceAdmin);
if (groupDesc.state() == ConsumerGroupState.STABLE) {
List<NewTopic> newTopics =
groupDesc.members().stream()
.flatMap(member -> member.assignment().topicPartitions().stream())
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.toSet()))
.entrySet()
.stream()
.map(e -> new NewTopic(e.getKey(), e.getValue().size(), (short) 1))
.collect(Collectors.toList());

// TODO: what happens if topic already exists? :)
try {
controlAdmin.createTopics(newTopics).all().get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
} else {
throw new NotRunningException("Group not stable");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,9 @@ public static Catalog loadCatalog(IcebergSinkConfig config) {

// use reflection here to avoid requiring Hadoop as a dependency
private static Object loadHadoopConfig(IcebergSinkConfig config) {
Class<?> configClass =
DynClasses.builder().impl("org.apache.hadoop.hdfs.HdfsConfiguration").orNull().build();
Class<?> configClass = dynamicallyLoad("org.apache.hadoop.hdfs.HdfsConfiguration");
if (configClass == null) {
configClass =
DynClasses.builder().impl("org.apache.hadoop.conf.Configuration").orNull().build();
configClass = dynamicallyLoad("org.apache.hadoop.conf.Configuration");
}

if (configClass == null) {
Expand Down Expand Up @@ -250,5 +248,21 @@ public static TaskWriter<Record> createTableWriter(
return writer;
}

/**
* Dynamically load hive/hadoop configs to avoid packaging them with the distribution. Gradle
* strips hadoop from the classpath which will cause a NoClassDefFoundError to be thrown when
* using the version without Hive, so intercept that exception to maintain the underlying
* DynClass.builder.impl(...).orNull() behavior.
*/
private static Class<?> dynamicallyLoad(String className) {
Class<?> configClass;
try {
configClass = DynClasses.builder().impl(className).orNull().build();
} catch (NoClassDefFoundError e) {
configClass = null;
}
return configClass;
}

private Utilities() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.clients.admin.Admin;

// TODO: move to top level package maybe to fix visibility issues
// Maybe have a generic Factory interface that takes a IcebergSinkConfig and returns a T?
public class AdminFactory implements Factory<Admin> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.internal.kafka.Factory;
import java.lang.reflect.Field;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.runtime.WorkerSinkTaskContext;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTaskContext;
import org.slf4j.Logger;
Expand All @@ -44,11 +49,54 @@ class MultiPartitionWorker implements Worker {
Collection<TopicPartition> topicPartitions,
PartitionWorkerFactory partitionWorkerFactory,
Factory<Admin> adminFactory) {
this(
context,
getConsumerGroupMetadata(context, config),
config,
topicPartitions,
partitionWorkerFactory,
adminFactory);
}

private static ConsumerGroupMetadata getConsumerGroupMetadata(
SinkTaskContext context, IcebergSinkConfig config) {
final ConsumerGroupMetadata consumerGroupMetadata;
if (config.controlClusterMode()) {
// TODO: we could also just the connect-consumer-group-id here
consumerGroupMetadata = new ConsumerGroupMetadata(config.controlGroupId());
} else {
// TODO: this is a breaking change, avoid until 1.0 release
consumerGroupMetadata = extractConsumer(context).groupMetadata();
}

return consumerGroupMetadata;
}

private static Consumer<byte[], byte[]> extractConsumer(SinkTaskContext context) {
try {
WorkerSinkTaskContext workerContext = (WorkerSinkTaskContext) context;
Field field = workerContext.getClass().getDeclaredField("consumer");
field.setAccessible(true);
return (Consumer<byte[], byte[]>) field.get(workerContext);
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}

@VisibleForTesting
MultiPartitionWorker(
SinkTaskContext context,
ConsumerGroupMetadata consumerGroupMetadata,
IcebergSinkConfig config,
Collection<TopicPartition> topicPartitions,
PartitionWorkerFactory partitionWorkerFactory,
Factory<Admin> adminFactory) {
this.workers = Maps.newHashMap();

topicPartitions.forEach(
topicPartition -> {
final Worker worker = partitionWorkerFactory.createWorker(topicPartition);
final Worker worker =
partitionWorkerFactory.createWorker(consumerGroupMetadata, topicPartition);
LOG.info("Created worker to handle topic-partition={}", topicPartition.toString());
workers.put(topicPartition, worker);
});
Expand All @@ -63,9 +111,9 @@ class MultiPartitionWorker implements Worker {
safeOffsets =
admin
.listConsumerGroupOffsets(
// TODO: think about what groupId to use here, should be same as whatever we were
// committing to before to avoid a breaking change
config.controlGroupId(),
// TODO: This is a breaking change, just for demonstration purposes
// should really use controlGroupId to be backwards compatible (for now)
consumerGroupMetadata.groupId(),
new ListConsumerGroupOffsetsOptions().requireStable(true))
.partitionsToOffsetAndMetadata().get().entrySet().stream()
.filter(entry -> entry.getValue() != null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class PartitionWorker implements Worker {
private static final Logger LOG = LoggerFactory.getLogger(Worker.class);
private final IcebergSinkConfig config;
private final TopicPartition topicPartition;
private final ConsumerGroupMetadata consumerGroupMetadata;
private final IcebergWriterFactory writerFactory;
private final Map<String, RecordWriter> writers;
private Offset sourceOffset;
Expand All @@ -71,10 +72,12 @@ class PartitionWorker implements Worker {
PartitionWorker(
IcebergSinkConfig config,
TopicPartition topicPartition,
ConsumerGroupMetadata consumerGroupMetadata,
IcebergWriterFactory writerFactory,
Factory<Producer<String, byte[]>> transactionalProducerFactory) {
this.config = config;
this.topicPartition = topicPartition;
this.consumerGroupMetadata = consumerGroupMetadata;
this.writerFactory = writerFactory;
this.writers = Maps.newHashMap();
this.sourceOffset = Offset.NULL_OFFSET;
Expand Down Expand Up @@ -223,9 +226,7 @@ private void sendEventsAndCommitCurrentOffset(List<Event> events) {
config.controlTopic(),
events,
offsetsToCommit,
// TODO: think about what groupId to use here, should be same as whatever we were committing
// to before.
new ConsumerGroupMetadata(config.controlGroupId()));
consumerGroupMetadata);
}

private void clearWriters() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.internal.data.IcebergWriterFactory;
import io.tabular.iceberg.connect.internal.kafka.TransactionalProducerFactory;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.common.TopicPartition;

class PartitionWorkerFactory {
Expand All @@ -38,8 +39,12 @@ class PartitionWorkerFactory {
this.transactionalProducerFactory = transactionalProducerFactory;
}

Worker createWorker(TopicPartition topicPartition) {
Worker createWorker(ConsumerGroupMetadata consumerGroupMetadata, TopicPartition topicPartition) {
return new PartitionWorker(
config, topicPartition, icebergWriterFactory, transactionalProducerFactory);
config,
topicPartition,
consumerGroupMetadata,
icebergWriterFactory,
transactionalProducerFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.internals.CoordinatorKey;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
Expand All @@ -52,6 +53,8 @@
class MultiPartitionWorkerTest {
private static final String SOURCE_TOPIC = "source-topic-name";
private static final String CONNECTOR_NAME = "connector-name";
private static final ConsumerGroupMetadata CONSUMER_GROUP_METADATA =
new ConsumerGroupMetadata(String.format("connect-%s", CONNECTOR_NAME));
private static final String TABLE_1_NAME = "db.tbl1";
private static final String CONTROL_TOPIC = "control-topic-name";

Expand Down Expand Up @@ -102,7 +105,8 @@ public void testWorkersManagerResetsOffsetsSafely() {
ImmutableList.of(new TopicPartition(SOURCE_TOPIC, 0), new TopicPartition(SOURCE_TOPIC, 1));

final PartitionWorkerFactory partitionWorkerFactory = mock(PartitionWorkerFactory.class);
when(partitionWorkerFactory.createWorker(any())).thenReturn(new NoOpWorker());
when(partitionWorkerFactory.createWorker(eq(CONSUMER_GROUP_METADATA), any()))
.thenReturn(new NoOpWorker());

final Admin admin = mock(Admin.class);
final Map<TopicPartition, Long> safeOffsets =
Expand All @@ -127,12 +131,21 @@ public void testWorkersManagerResetsOffsetsSafely() {
final Factory<Admin> adminFactory = (kafkaProps) -> admin;

new MultiPartitionWorker(
context, config, topicPartitions, partitionWorkerFactory, adminFactory);
context,
CONSUMER_GROUP_METADATA,
config,
topicPartitions,
partitionWorkerFactory,
adminFactory);

InOrder inOrderVerifier = inOrder(partitionWorkerFactory, admin, context);
// should create a worker for each topicPartition
inOrderVerifier.verify(partitionWorkerFactory).createWorker(topicPartitions.get(0));
inOrderVerifier.verify(partitionWorkerFactory).createWorker(topicPartitions.get(1));
inOrderVerifier
.verify(partitionWorkerFactory)
.createWorker(CONSUMER_GROUP_METADATA, topicPartitions.get(0));
inOrderVerifier
.verify(partitionWorkerFactory)
.createWorker(CONSUMER_GROUP_METADATA, topicPartitions.get(1));
// only after that, should it retrieve offsets from kafka
inOrderVerifier
.verify(admin)
Expand Down
Loading

0 comments on commit 79e367a

Please sign in to comment.