diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 2420248d..991537e7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -5,7 +5,7 @@ awaitility-ver = "4.2.0" hadoop-ver = "3.3.6" hive-ver = "2.3.9" http-client-ver = "5.2.1" -iceberg-ver = "1.4.2" +iceberg-ver = "1.5.0" jackson-ver = "2.14.2" junit-ver = "5.10.0" kafka-ver = "3.5.1" @@ -30,6 +30,7 @@ iceberg-gcp = { module = "org.apache.iceberg:iceberg-gcp", version.ref = "iceber iceberg-gcp-bundle = { module = "org.apache.iceberg:iceberg-gcp-bundle", version.ref = "iceberg-ver" } iceberg-guava = { module = "org.apache.iceberg:iceberg-bundled-guava", version.ref = "iceberg-ver" } iceberg-hive-metastore = { module = "org.apache.iceberg:iceberg-hive-metastore", version.ref = "iceberg-ver" } +iceberg-kafka-connect-events = {module = "org.apache.iceberg:iceberg-kafka-connect-events", version.ref = "iceberg-ver"} iceberg-nessie = { module = "org.apache.iceberg:iceberg-nessie", version.ref = "iceberg-ver" } iceberg-orc = { module = "org.apache.iceberg:iceberg-orc", version.ref = "iceberg-ver" } iceberg-parquet = { module = "org.apache.iceberg:iceberg-parquet", version.ref = "iceberg-ver" } @@ -57,7 +58,7 @@ palantir-gradle = "com.palantir.baseline:gradle-baseline-java:4.42.0" [bundles] -iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet"] +iceberg = ["iceberg-api", "iceberg-common", "iceberg-core", "iceberg-data", "iceberg-guava", "iceberg-orc", "iceberg-parquet", "iceberg-kafka-connect-events"] iceberg-ext = ["iceberg-aws", "iceberg-aws-bundle", "iceberg-azure", "iceberg-azure-bundle", "iceberg-gcp","iceberg-gcp-bundle", "iceberg-nessie"] jackson = ["jackson-core", "jackson-databind"] kafka-connect = ["kafka-clients", "kafka-connect-api", "kafka-connect-json", "kafka-connect-transforms"] diff --git a/kafka-connect/build.gradle b/kafka-connect/build.gradle index 4f6337c4..756dbc6e 100644 --- a/kafka-connect/build.gradle +++ b/kafka-connect/build.gradle @@ -1,12 +1,10 @@ dependencies { - implementation project(":iceberg-kafka-connect-events") implementation libs.bundles.iceberg implementation libs.bundles.jackson implementation libs.avro implementation libs.slf4j compileOnly libs.bundles.kafka.connect - testImplementation(testFixtures(project(":iceberg-kafka-connect-events"))) testImplementation libs.hadoop.client testImplementation libs.junit.api @@ -15,6 +13,7 @@ dependencies { testImplementation libs.mockito testImplementation libs.assertj + testImplementation libs.iceberg.kafka.connect.events testImplementation 'ch.qos.logback:logback-classic:1.5.3' } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java index 38367b15..4efbfd6c 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Channel.java @@ -22,11 +22,12 @@ import io.tabular.iceberg.connect.IcebergSinkConfig; import io.tabular.iceberg.connect.data.Offset; -import io.tabular.iceberg.connect.events.Event; import java.time.Duration; import java.util.List; import java.util.Map; import java.util.UUID; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.Event; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Maps; @@ -84,7 +85,7 @@ protected void send(List events, Map sourceOffset .map( event -> { LOG.info("Sending event of type: {}", event.type().name()); - byte[] data = Event.encode(event); + byte[] data = AvroUtil.encode(event); // key by producer ID to keep event order return new ProducerRecord<>(controlTopic, producerId, data); }) @@ -122,7 +123,7 @@ record -> { // so increment the record offset by one controlTopicOffsets.put(record.partition(), record.offset() + 1); - Event event = Event.decode(record.value()); + Event event = AvroUtil.decode(record.value()); if (event.groupId().equals(groupId)) { LOG.debug("Received event of type: {}", event.type().name()); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java index d027846e..c195c91e 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/CommitState.java @@ -21,14 +21,18 @@ import static java.util.stream.Collectors.groupingBy; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.time.OffsetDateTime; +import java.util.Comparator; import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Optional; import java.util.UUID; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,7 +40,7 @@ public class CommitState { private static final Logger LOG = LoggerFactory.getLogger(CommitState.class); private final List commitBuffer = new LinkedList<>(); - private final List readyBuffer = new LinkedList<>(); + private final List readyBuffer = new LinkedList<>(); private long startTime; private UUID currentCommitId; private final IcebergSinkConfig config; @@ -49,17 +53,17 @@ public void addResponse(Envelope envelope) { commitBuffer.add(envelope); if (!isCommitInProgress()) { LOG.warn( - "Received commit response with commit-id={} when no commit in progress, this can happen during recovery", - ((CommitResponsePayload) envelope.event().payload()).commitId()); + "Received data written with commit-id={} when no commit in progress, this can happen during recovery", + ((DataWritten) envelope.event().payload()).commitId()); } } public void addReady(Envelope envelope) { - readyBuffer.add((CommitReadyPayload) envelope.event().payload()); + readyBuffer.add((DataComplete) envelope.event().payload()); if (!isCommitInProgress()) { LOG.warn( - "Received commit ready for commit-id={} when no commit in progress, this can happen during recovery", - ((CommitReadyPayload) envelope.event().payload()).commitId()); + "Received data complete for commit-id={} when no commit in progress, this can happen during recovery", + ((DataComplete) envelope.event().payload()).commitId()); } } @@ -139,26 +143,38 @@ public Map> tableCommitMap() { .collect( groupingBy( envelope -> - ((CommitResponsePayload) envelope.event().payload()) - .tableName() - .toIdentifier())); + ((DataWritten) envelope.event().payload()).tableReference().identifier())); } - public Long vtts(boolean partialCommit) { + public OffsetDateTime vtts(boolean partialCommit) { + + Comparator comparator = + new Comparator() { + + @Override + public int compare(OffsetDateTime o1, OffsetDateTime o2) { + return o1.compareTo(o2); + } + }; + boolean validVtts = !partialCommit && readyBuffer.stream() .flatMap(event -> event.assignments().stream()) .allMatch(offset -> offset.timestamp() != null); - Long result; + OffsetDateTime result; if (validVtts) { - result = + Optional maybeResult = readyBuffer.stream() .flatMap(event -> event.assignments().stream()) - .mapToLong(TopicPartitionOffset::timestamp) - .min() - .getAsLong(); + .map(TopicPartitionOffset::timestamp) + .min(comparator); + if (maybeResult.isPresent()) { + result = maybeResult.get(); + } else { + throw new NoSuchElementException("no vtts found"); + } } else { result = null; } diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java index 439ba4d2..63469bd7 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Coordinator.java @@ -23,15 +23,10 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitCompletePayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitTablePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Duration; +import java.time.OffsetDateTime; import java.util.Collection; import java.util.List; import java.util.Map; @@ -47,6 +42,11 @@ import org.apache.iceberg.Transaction; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.exceptions.NoSuchTableException; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -95,10 +95,7 @@ public void process() { // send out begin commit commitState.startNewCommit(); Event event = - new Event( - config.controlGroupId(), - EventType.COMMIT_REQUEST, - new CommitRequestPayload(commitState.currentCommitId())); + new Event(config.controlGroupId(), new StartCommit(commitState.currentCommitId())); send(event); LOG.info("Started new commit with commit-id={}", commitState.currentCommitId().toString()); } @@ -113,10 +110,10 @@ public void process() { @Override protected boolean receive(Envelope envelope) { switch (envelope.event().type()) { - case COMMIT_RESPONSE: + case DATA_WRITTEN: commitState.addResponse(envelope); return true; - case COMMIT_READY: + case DATA_COMPLETE: commitState.addReady(envelope); if (commitState.isCommitReady(totalPartitionCount)) { commit(false); @@ -140,7 +137,7 @@ private void doCommit(boolean partialCommit) { Map> commitMap = commitState.tableCommitMap(); String offsetsJson = offsetsJson(); - Long vtts = commitState.vtts(partialCommit); + OffsetDateTime vtts = commitState.vtts(partialCommit); Tasks.foreach(commitMap.entrySet()) .executeWith(exec) @@ -155,10 +152,7 @@ private void doCommit(boolean partialCommit) { commitState.clearResponses(); Event event = - new Event( - config.controlGroupId(), - EventType.COMMIT_COMPLETE, - new CommitCompletePayload(commitState.currentCommitId(), vtts)); + new Event(config.controlGroupId(), new CommitComplete(commitState.currentCommitId(), vtts)); send(event); LOG.info( @@ -177,7 +171,10 @@ private String offsetsJson() { } private void commitToTable( - TableIdentifier tableIdentifier, List envelopeList, String offsetsJson, Long vtts) { + TableIdentifier tableIdentifier, + List envelopeList, + String offsetsJson, + OffsetDateTime vtts) { Table table; try { table = catalog.loadTable(tableIdentifier); @@ -233,7 +230,7 @@ private void commitToTable( if (i == lastIdx) { appendOp.set(snapshotOffsetsProp, offsetsJson); if (vtts != null) { - appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + appendOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli())); } } @@ -247,7 +244,7 @@ private void commitToTable( deltaOp.set(snapshotOffsetsProp, offsetsJson); deltaOp.set(COMMIT_ID_SNAPSHOT_PROP, commitState.currentCommitId().toString()); if (vtts != null) { - deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts)); + deltaOp.set(VTTS_SNAPSHOT_PROP, Long.toString(vtts.toInstant().toEpochMilli())); } dataFiles.forEach(deltaOp::addRows); deleteFiles.forEach(deltaOp::addDeletes); @@ -258,9 +255,11 @@ private void commitToTable( Event event = new Event( config.controlGroupId(), - EventType.COMMIT_TABLE, - new CommitTablePayload( - commitState.currentCommitId(), TableName.of(tableIdentifier), snapshotId, vtts)); + new CommitToTable( + commitState.currentCommitId(), + TableReference.of(catalog.name(), tableIdentifier), + snapshotId, + vtts)); send(event); LOG.info( diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java index 67f46f79..fc81030e 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Deduplicated.java @@ -20,7 +20,7 @@ import static java.util.stream.Collectors.toList; -import io.tabular.iceberg.connect.events.CommitResponsePayload; +import java.time.OffsetDateTime; import java.util.List; import java.util.Objects; import java.util.UUID; @@ -31,6 +31,7 @@ import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataWritten; import org.apache.iceberg.util.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -82,7 +83,7 @@ public static List dataFiles( tableIdentifier, envelopes, "data", - CommitResponsePayload::dataFiles, + DataWritten::dataFiles, dataFile -> dataFile.path().toString()); } @@ -97,7 +98,7 @@ public static List deleteFiles( tableIdentifier, envelopes, "delete", - CommitResponsePayload::deleteFiles, + DataWritten::deleteFiles, deleteFile -> deleteFile.path().toString()); } @@ -106,14 +107,13 @@ private static List deduplicatedFiles( TableIdentifier tableIdentifier, List envelopes, String fileType, - Function> extractFilesFromPayload, + Function> extractFilesFromPayload, Function extractPathFromFile) { List> filesAndEnvelopes = envelopes.stream() .flatMap( envelope -> { - CommitResponsePayload payload = - (CommitResponsePayload) envelope.event().payload(); + DataWritten payload = (DataWritten) envelope.event().payload(); List files = extractFilesFromPayload.apply(payload); if (files == null) { return Stream.empty(); @@ -207,7 +207,7 @@ private static class SimpleEnvelope { private final long offset; private final UUID eventId; private final String eventGroupId; - private final Long eventTimestamp; + private final OffsetDateTime eventTimestamp; private final UUID payloadCommitId; SimpleEnvelope(Envelope envelope) { @@ -216,7 +216,7 @@ private static class SimpleEnvelope { eventId = envelope.event().id(); eventGroupId = envelope.event().groupId(); eventTimestamp = envelope.event().timestamp(); - payloadCommitId = ((CommitResponsePayload) envelope.event().payload()).commitId(); + payloadCommitId = ((DataWritten) envelope.event().payload()).commitId(); } @Override @@ -255,7 +255,7 @@ public String toString() { + eventGroupId + '\'' + ", eventTimestamp=" - + eventTimestamp + + eventTimestamp.toInstant().toEpochMilli() + ", payloadCommitId=" + payloadCommitId + '}'; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java index 3458a3b7..27939897 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Envelope.java @@ -18,7 +18,7 @@ */ package io.tabular.iceberg.connect.channel; -import io.tabular.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Event; public class Envelope { private final Event event; diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java index 512471c7..aaf8c404 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/channel/Worker.java @@ -27,13 +27,6 @@ import io.tabular.iceberg.connect.data.RecordWriter; import io.tabular.iceberg.connect.data.Utilities; import io.tabular.iceberg.connect.data.WriterResult; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; import java.time.Duration; import java.util.Collection; import java.util.List; @@ -41,6 +34,13 @@ import java.util.Map.Entry; import java.util.UUID; import java.util.concurrent.ExecutionException; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult; @@ -104,7 +104,7 @@ public void process() { @Override protected boolean receive(Envelope envelope) { Event event = envelope.event(); - if (event.type() != EventType.COMMIT_REQUEST) { + if (event.type() != PayloadType.START_COMMIT) { return false; } @@ -131,7 +131,7 @@ protected boolean receive(Envelope envelope) { }) .collect(toList()); - UUID commitId = ((CommitRequestPayload) event.payload()).commitId(); + UUID commitId = ((StartCommit) event.payload()).commitId(); List events = writeResults.stream() @@ -139,20 +139,15 @@ protected boolean receive(Envelope envelope) { writeResult -> new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( writeResult.partitionStruct(), commitId, - TableName.of(writeResult.tableIdentifier()), + TableReference.of(config.catalogName(), writeResult.tableIdentifier()), writeResult.dataFiles(), writeResult.deleteFiles()))) .collect(toList()); - Event readyEvent = - new Event( - config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload(commitId, assignments)); + Event readyEvent = new Event(config.controlGroupId(), new DataComplete(commitId, assignments)); events.add(readyEvent); send(events, offsets); diff --git a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java index 0ea6261c..c04a7bec 100644 --- a/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java +++ b/kafka-connect/src/main/java/io/tabular/iceberg/connect/data/Offset.java @@ -18,6 +18,9 @@ */ package io.tabular.iceberg.connect.data; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.Objects; public class Offset implements Comparable { @@ -36,8 +39,15 @@ public Long offset() { return offset; } - public Long timestamp() { - return timestamp; + // public Long timestamp() { + // return timestamp; + // } + + public OffsetDateTime timestamp() { + if (timestamp == null) { + return null; + } + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(timestamp), ZoneOffset.UTC); } @Override diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java index e06e7f1e..9ee165d9 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CommitStateTest.java @@ -23,15 +23,23 @@ import static org.mockito.Mockito.when; import io.tabular.iceberg.connect.IcebergSinkConfig; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.Payload; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.UUID; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.Payload; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.junit.jupiter.api.Test; public class CommitStateTest { + + private OffsetDateTime offsetDateTime(Long ts) { + return OffsetDateTime.ofInstant(Instant.ofEpochMilli(ts), ZoneOffset.UTC); + } + @Test public void testIsCommitReady() { TopicPartitionOffset tp = mock(TopicPartitionOffset.class); @@ -39,15 +47,15 @@ public void testIsCommitReady() { CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); commitState.startNewCommit(); - CommitReadyPayload payload1 = mock(CommitReadyPayload.class); + DataComplete payload1 = mock(DataComplete.class); when(payload1.commitId()).thenReturn(commitState.currentCommitId()); when(payload1.assignments()).thenReturn(ImmutableList.of(tp, tp)); - CommitReadyPayload payload2 = mock(CommitReadyPayload.class); + DataComplete payload2 = mock(DataComplete.class); when(payload2.commitId()).thenReturn(commitState.currentCommitId()); when(payload2.assignments()).thenReturn(ImmutableList.of(tp)); - CommitReadyPayload payload3 = mock(CommitReadyPayload.class); + DataComplete payload3 = mock(DataComplete.class); when(payload3.commitId()).thenReturn(UUID.randomUUID()); when(payload3.assignments()).thenReturn(ImmutableList.of(tp)); @@ -61,16 +69,16 @@ public void testIsCommitReady() { @Test public void testGetVtts() { - CommitReadyPayload payload1 = mock(CommitReadyPayload.class); + DataComplete payload1 = mock(DataComplete.class); TopicPartitionOffset tp1 = mock(TopicPartitionOffset.class); - when(tp1.timestamp()).thenReturn(3L); + when(tp1.timestamp()).thenReturn(offsetDateTime(3L)); TopicPartitionOffset tp2 = mock(TopicPartitionOffset.class); - when(tp2.timestamp()).thenReturn(2L); + when(tp2.timestamp()).thenReturn(offsetDateTime(2L)); when(payload1.assignments()).thenReturn(ImmutableList.of(tp1, tp2)); - CommitReadyPayload payload2 = mock(CommitReadyPayload.class); + DataComplete payload2 = mock(DataComplete.class); TopicPartitionOffset tp3 = mock(TopicPartitionOffset.class); - when(tp3.timestamp()).thenReturn(1L); + when(tp3.timestamp()).thenReturn(offsetDateTime(1L)); when(payload2.assignments()).thenReturn(ImmutableList.of(tp3)); CommitState commitState = new CommitState(mock(IcebergSinkConfig.class)); @@ -79,11 +87,11 @@ public void testGetVtts() { commitState.addReady(wrapInEnvelope(payload1)); commitState.addReady(wrapInEnvelope(payload2)); - assertThat(commitState.vtts(false)).isEqualTo(1L); + assertThat(commitState.vtts(false)).isEqualTo(offsetDateTime(1L)); assertThat(commitState.vtts(true)).isNull(); // null timestamp for one, so should not set a vtts - CommitReadyPayload payload3 = mock(CommitReadyPayload.class); + DataComplete payload3 = mock(DataComplete.class); TopicPartitionOffset tp4 = mock(TopicPartitionOffset.class); when(tp4.timestamp()).thenReturn(null); when(payload3.assignments()).thenReturn(ImmutableList.of(tp4)); diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java index 353036fd..09233acf 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/CoordinatorTest.java @@ -21,16 +21,10 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.when; -import io.tabular.iceberg.connect.events.CommitCompletePayload; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.CommitTablePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventTestUtil; -import io.tabular.iceberg.connect.events.EventType; -import io.tabular.iceberg.connect.events.TableName; -import io.tabular.iceberg.connect.events.TopicPartitionOffset; +import io.tabular.iceberg.connect.fixtures.EventTestUtil; +import java.time.Instant; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; import java.util.List; import java.util.Map; import java.util.Set; @@ -46,6 +40,16 @@ import org.apache.iceberg.FileFormat; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Snapshot; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.CommitComplete; +import org.apache.iceberg.connect.events.CommitToTable; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; +import org.apache.iceberg.connect.events.TableReference; +import org.apache.iceberg.connect.events.TopicPartitionOffset; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -64,7 +68,7 @@ public class CoordinatorTest extends ChannelTestBase { public void testCommitAppend() { Assertions.assertEquals(0, ImmutableList.copyOf(table.snapshots().iterator()).size()); - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest(ImmutableList.of(EventTestUtil.createDataFile()), ImmutableList.of(), ts); table.refresh(); @@ -84,12 +88,13 @@ public void testCommitAppend() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(Long.toString(ts), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals( + Long.toString(ts.toInstant().toEpochMilli()), summary.get(VTTS_SNAPSHOT_PROP)); } @Test public void testCommitDelta() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest( ImmutableList.of(EventTestUtil.createDataFile()), @@ -111,12 +116,13 @@ public void testCommitDelta() { Map summary = snapshot.summary(); Assertions.assertEquals(commitId.toString(), summary.get(COMMIT_ID_SNAPSHOT_PROP)); Assertions.assertEquals("{\"0\":3}", summary.get(OFFSETS_SNAPSHOT_PROP)); - Assertions.assertEquals(Long.toString(ts), summary.get(VTTS_SNAPSHOT_PROP)); + Assertions.assertEquals( + Long.toString(ts.toInstant().toEpochMilli()), summary.get(VTTS_SNAPSHOT_PROP)); } @Test public void testCommitNoFiles() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); UUID commitId = coordinatorTest(ImmutableList.of(), ImmutableList.of(), ts); assertThat(producer.history()).hasSize(2); @@ -139,7 +145,10 @@ public void testCommitError() { .withRecordCount(5) .build(); - coordinatorTest(ImmutableList.of(badDataFile), ImmutableList.of(), 0L); + coordinatorTest( + ImmutableList.of(badDataFile), + ImmutableList.of(), + OffsetDateTime.ofInstant(Instant.ofEpochMilli(0L), ZoneOffset.UTC)); // no commit messages sent assertThat(producer.history()).hasSize(1); @@ -150,7 +159,7 @@ public void testCommitError() { @Test public void testShouldDeduplicateDataFilesBeforeAppending() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); DataFile dataFile = EventTestUtil.createDataFile(); UUID commitId = @@ -159,11 +168,10 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { Event commitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), ImmutableList.of(dataFile, dataFile), // duplicated data files ImmutableList.of())); @@ -172,8 +180,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { commitResponse, // duplicate commit response new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); }); @@ -192,7 +199,7 @@ public void testShouldDeduplicateDataFilesBeforeAppending() { @Test public void testShouldDeduplicateDeleteFilesBeforeAppending() { - long ts = System.currentTimeMillis(); + OffsetDateTime ts = OffsetDateTime.ofInstant(Instant.now(), ZoneOffset.UTC); DeleteFile deleteFile = EventTestUtil.createDeleteFile(); UUID commitId = @@ -201,11 +208,10 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { Event duplicateCommitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), ImmutableList.of(), ImmutableList.of(deleteFile, deleteFile))); // duplicate delete files @@ -214,8 +220,7 @@ public void testShouldDeduplicateDeleteFilesBeforeAppending() { duplicateCommitResponse, // duplicate commit response new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts))))); }); @@ -294,9 +299,9 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { // retrieve commitId from commit request produced by coordinator final byte[] bytes = producer.history().get(0).value(); - final Event commitRequest = Event.decode(bytes); - assert commitRequest.type().equals(EventType.COMMIT_REQUEST); - final UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); + final Event commitRequest = AvroUtil.decode(bytes); + assert commitRequest.type().equals(PayloadType.START_COMMIT); + final UUID commitId = ((StartCommit) commitRequest.payload()).commitId(); // each worker sends its responses for the commit request Map workerIdToSpecMap = @@ -323,14 +328,13 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { 0, currentControlTopicOffset, "key", - Event.encode( + AvroUtil.encode( new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( spec.partitionType(), commitId, - TableName.of(TABLE_IDENTIFIER), + TableReference.of("catalog", TABLE_IDENTIFIER), ImmutableList.of(dataFile), ImmutableList.of()))))); currentControlTopicOffset += 1; @@ -341,14 +345,18 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { 0, currentControlTopicOffset, "key", - Event.encode( + AvroUtil.encode( new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( commitId, ImmutableList.of( - new TopicPartitionOffset(SRC_TOPIC_NAME, 0, 100L, 100L))))))); + new TopicPartitionOffset( + SRC_TOPIC_NAME, + 0, + 100L, + OffsetDateTime.ofInstant( + Instant.ofEpochMilli(100L), ZoneOffset.UTC)))))))); currentControlTopicOffset += 1; } @@ -391,45 +399,44 @@ public void testCommitMultiPartitionSpecAppendDataFiles() { "Only the most recent snapshot should include vtts in it's summary"); } - private void assertCommitTable(int idx, UUID commitId, long ts) { + private void assertCommitTable(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); - Event commitTable = Event.decode(bytes); - assertThat(commitTable.type()).isEqualTo(EventType.COMMIT_TABLE); - CommitTablePayload commitTablePayload = (CommitTablePayload) commitTable.payload(); + Event commitTable = AvroUtil.decode(bytes); + assertThat(commitTable.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE); + CommitToTable commitTablePayload = (CommitToTable) commitTable.payload(); assertThat(commitTablePayload.commitId()).isEqualTo(commitId); - assertThat(commitTablePayload.tableName().toIdentifier().toString()) + assertThat(commitTablePayload.tableReference().identifier().toString()) .isEqualTo(TABLE_IDENTIFIER.toString()); - assertThat(commitTablePayload.vtts()).isEqualTo(ts); + assertThat(commitTablePayload.validThroughTs()).isEqualTo(ts); } - private void assertCommitComplete(int idx, UUID commitId, long ts) { + private void assertCommitComplete(int idx, UUID commitId, OffsetDateTime ts) { byte[] bytes = producer.history().get(idx).value(); - Event commitComplete = Event.decode(bytes); - assertThat(commitComplete.type()).isEqualTo(EventType.COMMIT_COMPLETE); - CommitCompletePayload commitCompletePayload = (CommitCompletePayload) commitComplete.payload(); + Event commitComplete = AvroUtil.decode(bytes); + assertThat(commitComplete.type()).isEqualTo(PayloadType.COMMIT_COMPLETE); + CommitComplete commitCompletePayload = (CommitComplete) commitComplete.payload(); assertThat(commitCompletePayload.commitId()).isEqualTo(commitId); - assertThat(commitCompletePayload.vtts()).isEqualTo(ts); + assertThat(commitCompletePayload.validThroughTs()).isEqualTo(ts); } - private UUID coordinatorTest(List dataFiles, List deleteFiles, long ts) { + private UUID coordinatorTest( + List dataFiles, List deleteFiles, OffsetDateTime ts) { return coordinatorTest( currentCommitId -> { Event commitResponse = new Event( config.controlGroupId(), - EventType.COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( StructType.of(), currentCommitId, - new TableName(ImmutableList.of("db"), "tbl"), + new TableReference("catalog", ImmutableList.of("db"), "tbl"), dataFiles, deleteFiles)); Event commitReady = new Event( config.controlGroupId(), - EventType.COMMIT_READY, - new CommitReadyPayload( + new DataComplete( currentCommitId, ImmutableList.of(new TopicPartitionOffset("topic", 1, 1L, ts)))); @@ -453,14 +460,14 @@ private UUID coordinatorTest(Function> eventsFn) { assertThat(producer.history()).hasSize(1); byte[] bytes = producer.history().get(0).value(); - Event commitRequest = Event.decode(bytes); - assertThat(commitRequest.type()).isEqualTo(EventType.COMMIT_REQUEST); + Event commitRequest = AvroUtil.decode(bytes); + assertThat(commitRequest.type()).isEqualTo(PayloadType.START_COMMIT); - UUID commitId = ((CommitRequestPayload) commitRequest.payload()).commitId(); + UUID commitId = ((StartCommit) commitRequest.payload()).commitId(); int currentOffset = 1; for (Event event : eventsFn.apply(commitId)) { - bytes = Event.encode(event); + bytes = AvroUtil.encode(event); consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, currentOffset, "key", bytes)); currentOffset += 1; } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java index 24685b04..3b066ce8 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/DeduplicatedTest.java @@ -18,12 +18,8 @@ */ package io.tabular.iceberg.connect.channel; -import static io.tabular.iceberg.connect.events.EventType.COMMIT_RESPONSE; import static org.assertj.core.api.Assertions.assertThat; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.TableName; import java.util.List; import java.util.Set; import java.util.UUID; @@ -35,6 +31,9 @@ import org.apache.iceberg.FileMetadata; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.TableReference; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -54,7 +53,7 @@ class DeduplicatedTest { private static final UUID PAYLOAD_COMMIT_ID = UUID.fromString("4142add7-7c92-4bbe-b864-21ce8ac4bf53"); private static final TableIdentifier TABLE_IDENTIFIER = TableIdentifier.of("db", "tbl"); - private static final TableName TABLE_NAME = TableName.of(TABLE_IDENTIFIER); + private static final TableReference TABLE_NAME = TableReference.of("catalog", TABLE_IDENTIFIER); private static final String GROUP_ID = "some-group"; private static final DataFile DATA_FILE_1 = createDataFile("1"); private static final DataFile DATA_FILE_2 = createDataFile("2"); @@ -124,8 +123,7 @@ private void assertWarnOrHigherLogsContainsEntryMatching(String expectedMessages private Event commitResponseEvent(List dataFiles, List deleteFiles) { return new Event( GROUP_ID, - COMMIT_RESPONSE, - new CommitResponsePayload( + new DataWritten( Types.StructType.of(), PAYLOAD_COMMIT_ID, TABLE_NAME, dataFiles, deleteFiles)); } diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java index c7954900..5e21e287 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/channel/WorkerTest.java @@ -27,15 +27,16 @@ import io.tabular.iceberg.connect.data.IcebergWriter; import io.tabular.iceberg.connect.data.IcebergWriterFactory; import io.tabular.iceberg.connect.data.WriterResult; -import io.tabular.iceberg.connect.events.CommitReadyPayload; -import io.tabular.iceberg.connect.events.CommitRequestPayload; -import io.tabular.iceberg.connect.events.CommitResponsePayload; -import io.tabular.iceberg.connect.events.Event; -import io.tabular.iceberg.connect.events.EventTestUtil; -import io.tabular.iceberg.connect.events.EventType; +import io.tabular.iceberg.connect.fixtures.EventTestUtil; import java.util.Map; import java.util.UUID; import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.connect.events.AvroUtil; +import org.apache.iceberg.connect.events.DataComplete; +import org.apache.iceberg.connect.events.DataWritten; +import org.apache.iceberg.connect.events.Event; +import org.apache.iceberg.connect.events.PayloadType; +import org.apache.iceberg.connect.events.StartCommit; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -54,6 +55,7 @@ public class WorkerTest extends ChannelTestBase { @Test public void testStaticRoute() { when(config.tables()).thenReturn(ImmutableList.of(TABLE_NAME)); + when(config.catalogName()).thenReturn("catalog"); Map value = ImmutableMap.of(FIELD_NAME, "val"); workerTest(value); } @@ -62,6 +64,8 @@ public void testStaticRoute() { public void testDynamicRoute() { when(config.dynamicTablesEnabled()).thenReturn(true); when(config.tablesRouteField()).thenReturn(FIELD_NAME); + when(config.catalogName()).thenReturn("catalog"); + Map value = ImmutableMap.of(FIELD_NAME, TABLE_NAME); workerTest(value); } @@ -93,24 +97,22 @@ private void workerTest(Map value) { worker.save(ImmutableList.of(rec)); UUID commitId = UUID.randomUUID(); - Event commitRequest = - new Event( - config.controlGroupId(), EventType.COMMIT_REQUEST, new CommitRequestPayload(commitId)); - byte[] bytes = Event.encode(commitRequest); + Event commitRequest = new Event(config.controlGroupId(), new StartCommit(commitId)); + byte[] bytes = AvroUtil.encode(commitRequest); consumer.addRecord(new ConsumerRecord<>(CTL_TOPIC_NAME, 0, 1, "key", bytes)); worker.process(); assertThat(producer.history()).hasSize(2); - Event event = Event.decode(producer.history().get(0).value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_RESPONSE); - CommitResponsePayload responsePayload = (CommitResponsePayload) event.payload(); + Event event = AvroUtil.decode(producer.history().get(0).value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_WRITTEN); + DataWritten responsePayload = (DataWritten) event.payload(); assertThat(responsePayload.commitId()).isEqualTo(commitId); - event = Event.decode(producer.history().get(1).value()); - assertThat(event.type()).isEqualTo(EventType.COMMIT_READY); - CommitReadyPayload readyPayload = (CommitReadyPayload) event.payload(); + event = AvroUtil.decode(producer.history().get(1).value()); + assertThat(event.type()).isEqualTo(PayloadType.DATA_COMPLETE); + DataComplete readyPayload = (DataComplete) event.payload(); assertThat(readyPayload.commitId()).isEqualTo(commitId); assertThat(readyPayload.assignments()).hasSize(1); // offset should be one more than the record offset diff --git a/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java b/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java index 3a7e7143..77a4a129 100644 --- a/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java +++ b/kafka-connect/src/test/java/io/tabular/iceberg/connect/fixtures/EventTestUtil.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package io.tabular.iceberg.connect.events; +package io.tabular.iceberg.connect.fixtures; import java.util.UUID; import org.apache.iceberg.DataFile;