Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Txid & transactional consistency fields #3

Open
wants to merge 33 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
1b59059
txid in Debezium cdc metadata
syook-r7 Jul 2, 2024
ff2702d
WIP
dtobon-r7 Jul 18, 2024
d2db2ba
WIP
dtobon-r7 Jul 19, 2024
5bd5711
WIP
dtobon-r7 Jul 23, 2024
52f6fd9
Add transaction consistency
dtobon-r7 Jul 24, 2024
8d47daf
Add transaction consistency
dtobon-r7 Jul 25, 2024
bea5c74
Add transaction consistency
dtobon-r7 Jul 25, 2024
061994f
Fix event check
dtobon-r7 Jul 25, 2024
a6aae81
Move transaction data to payload
dtobon-r7 Jul 31, 2024
51566f4
Rename fields
dtobon-r7 Jul 31, 2024
5611f6a
Clean up and testing
dtobon-r7 Jul 31, 2024
a17f8dd
Clean up and testing
dtobon-r7 Jul 31, 2024
e366c4e
fix underscores
dtobon-r7 Aug 7, 2024
3f34fed
creating 1st layer partition-by field - source_ts_us
syook-r7 Aug 12, 2024
2f3c49b
Merge pull request #2 from rapid7/txid_validity
kgreenman-r7 Aug 12, 2024
717d347
Merge pull request #4 from rapid7/transform-txid
syook-r7 Aug 12, 2024
e4d1c6f
fix gradle format check err
syook-r7 Aug 12, 2024
f1b2fb6
Fix checkstyle issues
Aug 12, 2024
b6f38a2
Merge pull request #5 from rapid7/format_fix
syook-r7 Aug 12, 2024
37604f0
fix nullpointer
dtobon-r7 Aug 14, 2024
1da9fc2
Add Debugs statements
dtobon-r7 Aug 14, 2024
8411e50
Add Debugs statements
dtobon-r7 Aug 14, 2024
355a5a8
Add Debugs statements
dtobon-r7 Aug 14, 2024
c35cfbc
change txid parsing
dtobon-r7 Aug 14, 2024
f3531c9
change txid parsing as json
dtobon-r7 Aug 14, 2024
20d6118
change txid parsing as json
dtobon-r7 Aug 14, 2024
45e95e7
change txid parsing as json
dtobon-r7 Aug 14, 2024
7a1ed7e
Parse txid with regex
dtobon-r7 Aug 14, 2024
ef75261
Parse txid with regex
dtobon-r7 Aug 14, 2024
865b308
Cleanup and logging updates
dtobon-r7 Aug 15, 2024
4082fd4
Handle TX ID rollover
dtobon-r7 Oct 22, 2024
3417141
Handle TX ID rollover
dtobon-r7 Oct 22, 2024
137a5dd
Merge pull request #6 from rapid7/txid_validity_rollover
syook-r7 Oct 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions kafka-connect-events/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ dependencies {
implementation libs.iceberg.common
implementation libs.iceberg.guava
implementation libs.avro
implementation group: 'org.apache.iceberg', name: 'iceberg-kafka-connect-events', version: '1.5.1'

testImplementation libs.junit.api
testRuntimeOnly libs.junit.engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class CommitReadyPayload implements Payload {

private UUID commitId;
private List<TopicPartitionOffset> assignments;
private List<TopicPartitionTxId> txIds;
private final Schema avroSchema;

private static final Schema AVRO_SCHEMA =
Expand All @@ -44,16 +45,24 @@ public class CommitReadyPayload implements Payload {
.array()
.items(TopicPartitionOffset.AVRO_SCHEMA)
.noDefault()
.name("txIds")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.array()
.items(TopicPartitionTxId.AVRO_SCHEMA)
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public CommitReadyPayload(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public CommitReadyPayload(UUID commitId, List<TopicPartitionOffset> assignments) {
public CommitReadyPayload(UUID commitId, List<TopicPartitionOffset> assignments, List<TopicPartitionTxId> txIds) {
this.commitId = commitId;
this.assignments = assignments;
this.txIds = txIds;
this.avroSchema = AVRO_SCHEMA;
}

Expand All @@ -65,6 +74,10 @@ public List<TopicPartitionOffset> assignments() {
return assignments;
}

public List<TopicPartitionTxId> txIds() {
return txIds;
}

@Override
public Schema getSchema() {
return avroSchema;
Expand All @@ -80,6 +93,9 @@ public void put(int i, Object v) {
case 1:
this.assignments = (List<TopicPartitionOffset>) v;
return;
case 2:
this.txIds = (List<TopicPartitionTxId>) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
Expand All @@ -92,6 +108,8 @@ public Object get(int i) {
return commitId;
case 1:
return assignments;
case 2:
return txIds;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.events;

import org.apache.avro.Schema;
import org.apache.avro.generic.IndexedRecord;
import org.apache.iceberg.avro.AvroSchemaUtil;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.types.Types;

import java.util.List;

public class TopicPartitionTransaction implements IndexedRecord {

private String topic;
private Integer partition;
private Long txId;
private final Schema avroSchema;

static final int TOPIC = 10_800;
static final int PARTITION = 10_801;
static final int TX_ID = 10_802;

public static final Types.StructType ICEBERG_SCHEMA =
Types.StructType.of(
Types.NestedField.required(TOPIC, "topic", Types.StringType.get()),
Types.NestedField.required(PARTITION, "partition", Types.IntegerType.get()),
Types.NestedField.optional(TX_ID, "txId", Types.LongType.get()));

private static final Schema AVRO_SCHEMA = AvroSchemaUtil.convert(ICEBERG_SCHEMA, TopicPartitionTransaction.class.getName());

public TopicPartitionTransaction(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public TopicPartitionTransaction(String topic, Integer partition, Long txId) {
this.topic = topic;
this.partition = partition;
this.txId = txId;
this.avroSchema = AVRO_SCHEMA;
}

public String topic() {
return topic;
}

public Integer partition() {
return partition;
}

public Long txId() {
return txId;
}

@Override
public Schema getSchema() {
return avroSchema;
}

@Override
public void put(int i, Object v) {
switch (positionToId(i, avroSchema)) {
case TOPIC:
this.topic = v == null ? null : v.toString();
break;
case PARTITION:
this.partition = (Integer) v;
break;
case TX_ID:
this.txId = (Long) v;
break;
default:
throw new IllegalArgumentException("Unknown field index: " + i);
}
}
@Override
public Object get(int i) {
switch (positionToId(i, avroSchema)) {
case TOPIC:
return topic;
case PARTITION:
return partition;
case TX_ID:
return txId;
default:
throw new IllegalArgumentException("Unknown field index: " + i);
}
}

static int positionToId(int position, Schema avroSchema) {
List<Schema.Field> fields = avroSchema.getFields();
Preconditions.checkArgument(
position >= 0 && position < fields.size(), "Invalid field position: " + position);
Object val = fields.get(position).getObjectProp(AvroSchemaUtil.FIELD_ID_PROP);
return val == null ? -1 : (int) val;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.tabular.iceberg.connect.events;

import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.util.Utf8;

public class TopicPartitionTxId implements Element {

private String topic;
private Integer partition;
private Long txId;
private final Schema avroSchema;

public static final Schema AVRO_SCHEMA =
SchemaBuilder.builder()
.record(TopicPartitionTxId.class.getName())
.fields()
.name("topic")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.stringType()
.noDefault()
.name("partition")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.intType()
.noDefault()
.name("tx_id")
.prop(FIELD_ID_PROP, DUMMY_FIELD_ID)
.type()
.nullable()
.longType()
.noDefault()
.endRecord();

// Used by Avro reflection to instantiate this class when reading events
public TopicPartitionTxId(Schema avroSchema) {
this.avroSchema = avroSchema;
}

public TopicPartitionTxId(String topic, int partition, Long txId) {
this.topic = topic;
this.partition = partition;
this.txId = txId;
this.avroSchema = AVRO_SCHEMA;
}

public String topic() {
return topic;
}

public Integer partition() {
return partition;
}

public Long txId() {
return txId;
}

@Override
public Schema getSchema() {
return avroSchema;
}

@Override
public void put(int i, Object v) {
switch (i) {
case 0:
this.topic = v instanceof Utf8 ? v.toString() : (String) v;
return;
case 1:
this.partition = (Integer) v;
return;
case 2:
this.txId = (Long) v;
return;
default:
// ignore the object, it must be from a newer version of the format
}
}

@Override
public Object get(int i) {
switch (i) {
case 0:
return topic;
case 1:
return partition;
case 2:
return txId;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + i);
}
}
}
Loading