Skip to content

Commit

Permalink
porting
Browse files Browse the repository at this point in the history
  • Loading branch information
tabmatfournier committed Apr 11, 2024
1 parent f63ca07 commit 42dd045
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 175 deletions.
5 changes: 3 additions & 2 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ awaitility-ver = "4.2.0"
hadoop-ver = "3.3.6"
hive-ver = "2.3.9"
http-client-ver = "5.2.1"
iceberg-ver = "1.4.2"
iceberg-ver = "1.5.0"
jackson-ver = "2.14.2"
junit-ver = "5.10.0"
kafka-ver = "3.5.1"
Expand All @@ -30,6 +30,7 @@ iceberg-gcp = { module = "org.apache.iceberg:iceberg-gcp", version.ref = "iceber
iceberg-gcp-bundle = { module = "org.apache.iceberg:iceberg-gcp-bundle", version.ref = "iceberg-ver" }
iceberg-guava = { module = "org.apache.iceberg:iceberg-bundled-guava", version.ref = "iceberg-ver" }
iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", version.ref = "iceberg-ver" }
iceberg-kafka-connect-events = {module = "org.apache.iceberg:iceberg-kafka-connect-events", version.ref = "iceberg-ver"}
iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" }
iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" }
iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" }
Expand Down Expand Up @@ -57,7 +58,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0"


[bundles]
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"]
iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"]
iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"]
jackson = ["jackson-core", "jackson-databind"]
kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"]
Expand Down
3 changes: 1 addition & 2 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
dependencies {
implementation project(":iceberg-kafka-connect-events")
implementation libs.bundles.iceberg
implementation libs.bundles.jackson
implementation libs.avro
implementation libs.slf4j
compileOnly libs.bundles.kafka.connect

testImplementation(testFixtures(project(":iceberg-kafka-connect-events")))
testImplementation libs.hadoop.client

testImplementation libs.junit.api
Expand All @@ -15,6 +13,7 @@ dependencies {

testImplementation libs.mockito
testImplementation libs.assertj
testImplementation libs.iceberg.kafka.connect.events

testImplementation 'ch.qos.logback:logback-classic:1.5.3'
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.data.Offset;
import io.tabular.iceberg.connect.events.Event;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.iceberg.connect.events.AvroUtil;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
Expand Down Expand Up @@ -84,7 +85,7 @@ protected void send(List<Event> events, Map<TopicPartition, Offset> sourceOffset
.map(
event -> {
LOG.info("Sending event of type: {}", event.type().name());
byte[] data = Event.encode(event);
byte[] data = AvroUtil.encode(event);
// key by producer ID to keep event order
return new ProducerRecord<>(controlTopic, producerId, data);
})
Expand Down Expand Up @@ -122,7 +123,7 @@ record -> {
// so increment the record offset by one
controlTopicOffsets.put(record.partition(), record.offset() + 1);

Event event = Event.decode(record.value());
Event event = AvroUtil.decode(record.value());

if (event.groupId().equals(groupId)) {
LOG.debug("Received event of type: {}", event.type().name());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,26 @@
import static java.util.stream.Collectors.groupingBy;

import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.events.CommitReadyPayload;
import io.tabular.iceberg.connect.events.CommitResponsePayload;
import io.tabular.iceberg.connect.events.TopicPartitionOffset;
import java.time.OffsetDateTime;
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Optional;
import java.util.UUID;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.DataComplete;
import org.apache.iceberg.connect.events.DataWritten;
import org.apache.iceberg.connect.events.TopicPartitionOffset;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CommitState {
private static final Logger LOG = LoggerFactory.getLogger(CommitState.class);

private final List<Envelope> commitBuffer = new LinkedList<>();
private final List<CommitReadyPayload> readyBuffer = new LinkedList<>();
private final List<DataComplete> readyBuffer = new LinkedList<>();
private long startTime;
private UUID currentCommitId;
private final IcebergSinkConfig config;
Expand All @@ -49,17 +53,17 @@ public void addResponse(Envelope envelope) {
commitBuffer.add(envelope);
if (!isCommitInProgress()) {
LOG.warn(
"Received commit response with commit-id={} when no commit in progress, this can happen during recovery",
((CommitResponsePayload) envelope.event().payload()).commitId());
"Received data written with commit-id={} when no commit in progress, this can happen during recovery",
((DataWritten) envelope.event().payload()).commitId());
}
}

public void addReady(Envelope envelope) {
readyBuffer.add((CommitReadyPayload) envelope.event().payload());
readyBuffer.add((DataComplete) envelope.event().payload());
if (!isCommitInProgress()) {
LOG.warn(
"Received commit ready for commit-id={} when no commit in progress, this can happen during recovery",
((CommitReadyPayload) envelope.event().payload()).commitId());
"Received data complete for commit-id={} when no commit in progress, this can happen during recovery",
((DataComplete) envelope.event().payload()).commitId());
}
}

Expand Down Expand Up @@ -139,26 +143,38 @@ public Map<TableIdentifier, List<Envelope>> tableCommitMap() {
.collect(
groupingBy(
envelope ->
((CommitResponsePayload) envelope.event().payload())
.tableName()
.toIdentifier()));
((DataWritten) envelope.event().payload()).tableReference().identifier()));
}

public Long vtts(boolean partialCommit) {
public OffsetDateTime vtts(boolean partialCommit) {

Comparator<OffsetDateTime> comparator =
new Comparator<OffsetDateTime>() {

@Override
public int compare(OffsetDateTime o1, OffsetDateTime o2) {
return o1.compareTo(o2);
}
};

boolean validVtts =
!partialCommit
&& readyBuffer.stream()
.flatMap(event -> event.assignments().stream())
.allMatch(offset -> offset.timestamp() != null);

Long result;
OffsetDateTime result;
if (validVtts) {
result =
Optional<OffsetDateTime> maybeResult =
readyBuffer.stream()
.flatMap(event -> event.assignments().stream())
.mapToLong(TopicPartitionOffset::timestamp)
.min()
.getAsLong();
.map(TopicPartitionOffset::timestamp)
.min(comparator);
if (maybeResult.isPresent()) {
result = maybeResult.get();
} else {
throw new NoSuchElementException("no vtts found");
}
} else {
result = null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,10 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.tabular.iceberg.connect.IcebergSinkConfig;
import io.tabular.iceberg.connect.events.CommitCompletePayload;
import io.tabular.iceberg.connect.events.CommitRequestPayload;
import io.tabular.iceberg.connect.events.CommitTablePayload;
import io.tabular.iceberg.connect.events.Event;
import io.tabular.iceberg.connect.events.EventType;
import io.tabular.iceberg.connect.events.TableName;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.OffsetDateTime;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -47,6 +42,11 @@
import org.apache.iceberg.Transaction;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.CommitComplete;
import org.apache.iceberg.connect.events.CommitToTable;
import org.apache.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.StartCommit;
import org.apache.iceberg.connect.events.TableReference;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
Expand Down Expand Up @@ -95,10 +95,7 @@ public void process() {
// send out begin commit
commitState.startNewCommit();
Event event =
new Event(
config.controlGroupId(),
EventType.COMMIT_REQUEST,
new CommitRequestPayload(commitState.currentCommitId()));
new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId()));
send(event);
LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString());
}
Expand All @@ -113,10 +110,10 @@ public void process() {
@Override
protected boolean receive(Envelope envelope) {
switch (envelope.event().type()) {
case COMMIT_RESPONSE:
case DATA_WRITTEN:
commitState.addResponse(envelope);
return true;
case COMMIT_READY:
case DATA_COMPLETE:
commitState.addReady(envelope);
if (commitState.isCommitReady(totalPartitionCount)) {
commit(false);
Expand All @@ -140,7 +137,7 @@ private void doCommit(boolean partialCommit) {
Map<TableIdentifier, List<Envelope>> commitMap = commitState.tableCommitMap();

String offsetsJson = offsetsJson();
Long vtts = commitState.vtts(partialCommit);
OffsetDateTime vtts = commitState.vtts(partialCommit);

Tasks.foreach(commitMap.entrySet())
.executeWith(exec)
Expand All @@ -155,10 +152,7 @@ private void doCommit(boolean partialCommit) {
commitState.clearResponses();

Event event =
new Event(
config.controlGroupId(),
EventType.COMMIT_COMPLETE,
new CommitCompletePayload(commitState.currentCommitId(), vtts));
new Event(config.controlGroupId(), new CommitComplete(commitState.currentCommitId(), vtts));
send(event);

LOG.info(
Expand All @@ -177,7 +171,10 @@ private String offsetsJson() {
}

private void commitToTable(
TableIdentifier tableIdentifier, List<Envelope> envelopeList, String offsetsJson, Long vtts) {
TableIdentifier tableIdentifier,
List<Envelope> envelopeList,
String offsetsJson,
OffsetDateTime vtts) {
Table table;
try {
table = catalog.loadTable(tableIdentifier);
Expand Down Expand Up @@ -233,7 +230,7 @@ private void commitToTable(
if (i == lastIdx) {
appendOp.set(snapshotOffsetsProp, offsetsJson);
if (vtts != null) {
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli()));
}
}

Expand All @@ -247,7 +244,7 @@ private void commitToTable(
deltaOp.set(snapshotOffsetsProp, offsetsJson);
deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString());
if (vtts != null) {
deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts));
deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli()));
}
dataFiles.forEach(deltaOp::addRows);
deleteFiles.forEach(deltaOp::addDeletes);
Expand All @@ -258,9 +255,11 @@ private void commitToTable(
Event event =
new Event(
config.controlGroupId(),
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitState.currentCommitId(), TableName.of(tableIdentifier), snapshotId, vtts));
new CommitToTable(
commitState.currentCommitId(),
TableReference.of(catalog.name(), tableIdentifier),
snapshotId,
vtts));
send(event);

LOG.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import static java.util.stream.Collectors.toList;

import io.tabular.iceberg.connect.events.CommitResponsePayload;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
Expand All @@ -31,6 +31,7 @@
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.connect.events.DataWritten;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -82,7 +83,7 @@ public static List<DataFile> dataFiles(
tableIdentifier,
envelopes,
"data",
CommitResponsePayload::dataFiles,
DataWritten::dataFiles,
dataFile -> dataFile.path().toString());
}

Expand All @@ -97,7 +98,7 @@ public static List<DeleteFile> deleteFiles(
tableIdentifier,
envelopes,
"delete",
CommitResponsePayload::deleteFiles,
DataWritten::deleteFiles,
deleteFile -> deleteFile.path().toString());
}

Expand All @@ -106,14 +107,13 @@ private static <F> List<F> deduplicatedFiles(
TableIdentifier tableIdentifier,
List<Envelope> envelopes,
String fileType,
Function<CommitResponsePayload, List<F>> extractFilesFromPayload,
Function<DataWritten, List<F>> extractFilesFromPayload,
Function<F, String> extractPathFromFile) {
List<Pair<F, SimpleEnvelope>> filesAndEnvelopes =
envelopes.stream()
.flatMap(
envelope -> {
CommitResponsePayload payload =
(CommitResponsePayload) envelope.event().payload();
DataWritten payload = (DataWritten) envelope.event().payload();
List<F> files = extractFilesFromPayload.apply(payload);
if (files == null) {
return Stream.empty();
Expand Down Expand Up @@ -207,7 +207,7 @@ private static class SimpleEnvelope {
private final long offset;
private final UUID eventId;
private final String eventGroupId;
private final Long eventTimestamp;
private final OffsetDateTime eventTimestamp;
private final UUID payloadCommitId;

SimpleEnvelope(Envelope envelope) {
Expand All @@ -216,7 +216,7 @@ private static class SimpleEnvelope {
eventId = envelope.event().id();
eventGroupId = envelope.event().groupId();
eventTimestamp = envelope.event().timestamp();
payloadCommitId = ((CommitResponsePayload) envelope.event().payload()).commitId();
payloadCommitId = ((DataWritten) envelope.event().payload()).commitId();
}

@Override
Expand Down Expand Up @@ -255,7 +255,7 @@ public String toString() {
+ eventGroupId
+ '\''
+ ", eventTimestamp="
+ eventTimestamp
+ eventTimestamp.toInstant().toEpochMilli()
+ ", payloadCommitId="
+ payloadCommitId
+ '}';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package io.tabular.iceberg.connect.channel;

import io.tabular.iceberg.connect.events.Event;
import org.apache.iceberg.connect.events.Event;

public class Envelope {
private final Event event;
Expand Down
Loading

0 comments on commit 42dd045

Please sign in to comment.