Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed to always get internalId as a key #17

Merged
merged 1 commit into from
Jan 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, Offset
protected void emitCreateRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = getKey(tableSchema, newColumnValues);
Struct newKey = keyFromInternalId();
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

Expand All @@ -58,7 +58,7 @@ protected void emitUpdateRecord(Receiver<SingleStoreDBPartition> receiver, Table
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();

Struct newKey = getKey(tableSchema, newColumnValues);
Struct newKey = keyFromInternalId();

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);
Expand Down Expand Up @@ -86,8 +86,7 @@ protected void emitUpdateRecord(Receiver<SingleStoreDBPartition> receiver, Table
@Override
protected void emitDeleteRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Struct newKey = getKey(tableSchema, newColumnValues);
Struct newKey = keyFromInternalId();

Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

Expand Down Expand Up @@ -120,15 +119,6 @@ protected Object[] getNewColumnValues() {
return after;
}

private Struct getKey(TableSchema tableSchema, Object[] columnData) {
Struct key = tableSchema.keyFromColumnData(columnData);
if (key == null) {
return keyFromInternalId();
} else {
return key;
}
}

private Struct keyFromInternalId() {
Struct result = new Struct(SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build());
result.put(INTERNAL_ID, internalId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,10 @@ public SingleStoreDBSnapshotChangeRecordEmitter(SingleStoreDBPartition partition
protected void emitReadRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = tableSchema.keyFromColumnData(newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, newKey != null ? newKey : keyFromInternalId(), envelope, getOffset(), null);
receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, keyFromInternalId(), envelope, getOffset(), null);
}

private Struct keyFromInternalId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,11 @@ public SingleStoreDBTableSchemaBuilder(ValueConverterProvider valueConverterProv

public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) {
TableSchema schema = super.create(topicNamingStrategy, table, filter, mappers, keysMapper);
if (schema.keySchema() == null) {
return new TableSchema(schema.id(),
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(),
(row) -> schema.keyFromColumnData(row),
schema.getEnvelopeSchema(),
schema.valueSchema(),
(row) -> schema.valueFromColumnData(row));
} else {
return schema;
}
return new TableSchema(schema.id(),
SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(),
(row) -> schema.keyFromColumnData(row),
schema.getEnvelopeSchema(),
schema.valueSchema(),
(row) -> schema.valueFromColumnData(row));
}
}
8 changes: 4 additions & 4 deletions src/test/java/com/singlestore/debezium/SnapshotIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ public void testSnapshotA() throws Exception {
try {
final SourceRecords recordsA = consumeRecordsByTopic(3);
final List<SourceRecord> table1 = recordsA.recordsForTopic(TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".A")
.stream().sorted(Comparator.comparingInt(v -> (Integer) ((Struct) v.key()).get("pk"))).collect(Collectors.toList());
.stream().sorted(Comparator.comparingInt(v -> (Integer) ((Struct)((Struct) v.value()).get("after")).get("pk"))).collect(Collectors.toList());
assertThat(table1).hasSize(3);

for (int i = 0; i < 3; i++) {
final SourceRecord record1 = table1.get(i);
final List<SchemaAndValueField> expectedKey1 = List.of(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), i));
final List<SchemaAndValueField> expectedRow1 = Arrays.asList(
new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), i),
new SchemaAndValueField("aa", Schema.OPTIONAL_STRING_SCHEMA, "test" + i));
final Struct key1 = (Struct) record1.key();
final Struct value1 = (Struct) record1.value();
assertRecord(key1, expectedKey1);
assertNotNull(key1.get("internalId"));
assertEquals(Schema.Type.STRUCT, key1.schema().type());
assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type());
assertRecord((Struct) value1.get("after"), expectedRow1);
assertThat(record1.sourceOffset())
.extracting("snapshot").containsExactly(true);
Expand Down
17 changes: 9 additions & 8 deletions src/test/java/com/singlestore/debezium/StreamingIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public void readSeveralOperations() throws SQLException, InterruptedException {
Configuration config = defaultJdbcConfigWithTable("product");
config = config.edit()
.withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "product")
.withDefault("tombstones.on.delete", "false")
.build();

start(SingleStoreDBConnector.class, config);
Expand All @@ -198,15 +199,12 @@ public void readSeveralOperations() throws SQLException, InterruptedException {
conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2");
conn.execute("INSERT INTO `product` (`id`) VALUES (4)");

List<SourceRecord> records = consumeRecordsByTopic(7).allRecordsInOrder();
List<SourceRecord> records = consumeRecordsByTopic(6).allRecordsInOrder();

List<Integer> ids = Arrays.asList(new Integer[]{1, 2, 3,
0, // TODO: PLAT-6906 get PK for DELETE events
0, // TODO: PLAT-6906 get PK for DELETE events
2, 4});
List<String> operations = Arrays.asList(new String[]{"c", "c", "c", "d", null, "u", "c"});
List<Long> ids = new ArrayList<>();
List<String> operations = Arrays.asList(new String[]{"c", "c", "c", "d", "u", "c"});

assertEquals(7, records.size());
assertEquals(6, records.size());
for (int i = 0; i < records.size(); i++) {
SourceRecord record = records.get(i);

Expand All @@ -219,8 +217,11 @@ public void readSeveralOperations() throws SQLException, InterruptedException {
}

Struct key = (Struct) record.key();
assertEquals(ids.get(i), key.get("id"));
ids.add(key.getInt64("internalId"));
}

assertEquals(ids.get(0), ids.get(3));
assertEquals(ids.get(1), ids.get(4));
} finally {
stopConnector();
}
Expand Down
Loading