Skip to content

Commit

Permalink
Deduplicate before committing and log warning
Browse files Browse the repository at this point in the history
  • Loading branch information
fqaiser94 committed Mar 14, 2024
1 parent db151be commit 18002f2
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 11 deletions.
2 changes: 2 additions & 0 deletions kafka-connect/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ dependencies {

testImplementation libs.mockito
testImplementation libs.assertj

testImplementation 'ch.qos.logback:logback-classic:1.5.3'
}

configurations {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,11 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Stream;
import org.apache.iceberg.AppendFiles;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
Expand All @@ -48,6 +52,8 @@
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.NoSuchTableException;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Tasks;
import org.apache.iceberg.util.ThreadPools;
import org.apache.kafka.clients.admin.MemberDescription;
Expand Down Expand Up @@ -194,22 +200,49 @@ private void commitToTable(
Long minOffset = committedOffsets.get(envelope.partition());
return minOffset == null || envelope.offset() >= minOffset;
})
.map(envelope -> (CommitResponsePayload) envelope.event().payload())
.map(envelope ->
(CommitResponsePayload) envelope.event().payload())
.collect(toList());

List<DataFile> dataFiles =
payloads.stream()
.filter(payload -> payload.dataFiles() != null)
.flatMap(payload -> payload.dataFiles().stream())
.filter(dataFile -> dataFile.recordCount() > 0)
.collect(toList());
deduplicateBatch(
payloads.stream()
.filter(payload -> payload.dataFiles() != null)
.flatMap(
payload ->
deduplicatePayload(
payload.dataFiles(),
Coordinator::dataFilePath,
"data",
payload.commitId(),
payload.tableName().toIdentifier())
.stream())
.filter(dataFile -> dataFile.recordCount() > 0)
.collect(toList()),
Coordinator::dataFilePath,
"data",
commitState.currentCommitId(),
tableIdentifier);

List<DeleteFile> deleteFiles =
payloads.stream()
.filter(payload -> payload.deleteFiles() != null)
.flatMap(payload -> payload.deleteFiles().stream())
.filter(deleteFile -> deleteFile.recordCount() > 0)
.collect(toList());
deduplicateBatch(
payloads.stream()
.filter(payload -> payload.deleteFiles() != null)
.flatMap(
payload ->
deduplicatePayload(
payload.deleteFiles(),
Coordinator::deleteFilePath,
"delete",
payload.commitId(),
payload.tableName().toIdentifier())
.stream())
.filter(deleteFile -> deleteFile.recordCount() > 0)
.collect(toList()),
Coordinator::deleteFilePath,
"delete",
commitState.currentCommitId(),
tableIdentifier);

if (dataFiles.isEmpty() && deleteFiles.isEmpty()) {
LOG.info("Nothing to commit to table {}, skipping", tableIdentifier);
Expand Down Expand Up @@ -255,6 +288,77 @@ private void commitToTable(
}
}

private static String dataFilePath(DataFile dataFile) {
return dataFile.path().toString();
}

private static String deleteFilePath(DeleteFile deleteFile) {
return deleteFile.path().toString();
}

private <T> List<T> deduplicateBatch(
List<T> files,
Function<T, String> getPathFn,
String fileType,
UUID commitId,
TableIdentifier tableIdentifier) {
return deduplicate(
files,
getPathFn,
(numDuplicatedFiles, path) ->
String.format(
"Detected %d %s files with the same path=%s across payloads during commitId=%s for table=%s",
numDuplicatedFiles,
fileType,
path,
commitId.toString(),
tableIdentifier.toString()));
}

private <T> List<T> deduplicatePayload(
List<T> files,
Function<T, String> getPathFn,
String fileType,
UUID commitId,
TableIdentifier tableIdentifier) {
return deduplicate(
files,
getPathFn,
(numDuplicatedFiles, path) ->
String.format(
"Detected %d %s files with the same path=%s in payload with commitId=%s for table=%s",
numDuplicatedFiles,
fileType,
path,
commitId.toString(),
tableIdentifier.toString()));
}

private <T> List<T> deduplicate(
List<T> files,
Function<T, String> getPathFn,
BiFunction<Integer, String, String> logMessageFn) {
Map<String, List<T>> pathToFilesMapping = Maps.newHashMap();
files.forEach(
f ->
pathToFilesMapping
.computeIfAbsent(getPathFn.apply(f), ignored -> Lists.newArrayList())
.add(f));

return pathToFilesMapping.entrySet().stream()
.flatMap(
entry -> {
String maybeDuplicatedPath = entry.getKey();
List<T> maybeDuplicatedFiles = entry.getValue();
int numDuplicatedFiles = maybeDuplicatedFiles.size();
if (numDuplicatedFiles > 1) {
LOG.warn(logMessageFn.apply(numDuplicatedFiles, maybeDuplicatedPath));
}
return Stream.of(maybeDuplicatedFiles.get(0));
})
.collect(toList());
}

private Snapshot latestSnapshot(Table table, String branch) {
if (branch == null) {
return table.currentSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.LoggerFactory;

public class ChannelTestBase {
protected static final String SRC_TOPIC_NAME = "src-topic";
Expand All @@ -61,6 +62,7 @@ public class ChannelTestBase {
protected MockProducer<String, byte[]> producer;
protected MockConsumer<String, byte[]> consumer;
protected Admin admin;
protected MemoryAppender memoryAppender;

private InMemoryCatalog initInMemoryCatalog() {
InMemoryCatalog inMemoryCatalog = new InMemoryCatalog();
Expand Down Expand Up @@ -116,11 +118,17 @@ public void before() {
when(clientFactory.createProducer(any())).thenReturn(producer);
when(clientFactory.createConsumer(any())).thenReturn(consumer);
when(clientFactory.createAdmin()).thenReturn(admin);

memoryAppender = new MemoryAppender();
((ch.qos.logback.classic.Logger) LoggerFactory.getLogger(Coordinator.class))
.addAppender(memoryAppender);
memoryAppender.start();
}

@AfterEach
public void after() throws IOException {
catalog.close();
memoryAppender.stop();
}

protected void initConsumer() {
Expand Down
Loading

0 comments on commit 18002f2

Please sign in to comment.