Skip to content

Commit

Permalink
fix-event-decoder-vtt (#253)
Browse files Browse the repository at this point in the history
* fix-event-decoder-vtt

- two null checks were missed when converting old records that can have null values for valid through timestamps
  • Loading branch information
tabmatfournier authored May 22, 2024
1 parent c16ca54 commit e7c07bf
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,12 @@ private Payload convertPayload(io.tabular.iceberg.connect.events.Payload payload
pay.commitId(),
TableReference.of(catalogName, pay.tableName().toIdentifier()),
pay.snapshotId(),
OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC));
pay.vtts() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC));
} else if (payload instanceof CommitCompletePayload) {
CommitCompletePayload pay = (CommitCompletePayload) payload;
return new CommitComplete(
pay.commitId(),
OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC));
pay.vtts() == null ? null : OffsetDateTime.ofInstant(Instant.ofEpochMilli(pay.vtts()), ZoneOffset.UTC));
} else {
throw new IllegalStateException(
String.format("Unknown event payload: %s", payload.getSchema()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,30 @@ public void testCommitTableBecomesCommitToTable() {
assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl"));
}

@Test
public void testCommitTableBecomesCommitToTableNullVtts() {
io.tabular.iceberg.connect.events.Event event =
new io.tabular.iceberg.connect.events.Event(
"cg-connector",
EventType.COMMIT_TABLE,
new CommitTablePayload(
commitId, new TableName(Collections.singletonList("db"), "tbl"), 1L, null));

byte[] data = io.tabular.iceberg.connect.events.Event.encode(event);

Event result = eventDecoder.decode(data);
assertThat(event.groupId()).isEqualTo("cg-connector");
assertThat(result.type()).isEqualTo(PayloadType.COMMIT_TO_TABLE);
assertThat(result.payload()).isInstanceOf(CommitToTable.class);
CommitToTable payload = (CommitToTable) result.payload();

assertThat(payload.commitId()).isEqualTo(commitId);
assertThat(payload.snapshotId()).isEqualTo(1L);
assertThat(payload.validThroughTs()).isNull();
assertThat(payload.tableReference().catalog()).isEqualTo(catalogName);
assertThat(payload.tableReference().identifier()).isEqualTo(TableIdentifier.of("db", "tbl"));
}

@Test
public void testCommitCompleteBecomesCommitCompleteSerialization() {
io.tabular.iceberg.connect.events.Event event =
Expand All @@ -255,4 +279,20 @@ public void testCommitCompleteBecomesCommitCompleteSerialization() {
assertThat(payload.validThroughTs())
.isEqualTo(OffsetDateTime.ofInstant(Instant.ofEpochMilli(2L), ZoneOffset.UTC));
}

@Test
public void testCommitCompleteBecomesCommitCompleteSerializationNullVtts() {
io.tabular.iceberg.connect.events.Event event =
new io.tabular.iceberg.connect.events.Event(
"cg-connector", EventType.COMMIT_COMPLETE, new CommitCompletePayload(commitId, null));

byte[] data = io.tabular.iceberg.connect.events.Event.encode(event);

Event result = eventDecoder.decode(data);
assertThat(result.type()).isEqualTo(PayloadType.COMMIT_COMPLETE);
assertThat(result.payload()).isInstanceOf(CommitComplete.class);
CommitComplete payload = (CommitComplete) result.payload();
assertThat(payload.commitId()).isEqualTo(commitId);
assertThat(payload.validThroughTs()).isNull();
}
}

0 comments on commit e7c07bf

Please sign in to comment.