diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java index cd358d4..790224e 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java @@ -40,7 +40,7 @@ public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, Offset protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); - Struct newKey = getKey(tableSchema, newColumnValues); + Struct newKey = keyFromInternalId(); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); @@ -58,7 +58,7 @@ protected void emitUpdateRecord(Receiver receiver, Table Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); - Struct newKey = getKey(tableSchema, newColumnValues); + Struct newKey = keyFromInternalId(); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); @@ -86,8 +86,7 @@ protected void emitUpdateRecord(Receiver receiver, Table @Override protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); - Object[] newColumnValues = getNewColumnValues(); - Struct newKey = getKey(tableSchema, newColumnValues); + Struct newKey = keyFromInternalId(); Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues); @@ -120,15 +119,6 @@ protected Object[] getNewColumnValues() { return after; } - private Struct getKey(TableSchema tableSchema, Object[] columnData) { - Struct key = tableSchema.keyFromColumnData(columnData); - if (key == null) { - return keyFromInternalId(); - } else { - return key; - } - } - private Struct keyFromInternalId() { Struct result = new Struct(SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build()); result.put(INTERNAL_ID, internalId); diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java index b01a2ef..3ad90e7 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java @@ -25,11 +25,10 @@ public SingleStoreDBSnapshotChangeRecordEmitter(SingleStoreDBPartition partition protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); - Struct newKey = tableSchema.keyFromColumnData(newColumnValues); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); Struct envelope = tableSchema.getEnvelopeSchema().read(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant()); - receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, newKey != null ? newKey : keyFromInternalId(), envelope, getOffset(), null); + receiver.changeRecord(getPartition(), tableSchema, Envelope.Operation.READ, keyFromInternalId(), envelope, getOffset(), null); } private Struct keyFromInternalId() { diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java b/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java index 9b873b6..7235a09 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java @@ -31,15 +31,11 @@ public SingleStoreDBTableSchemaBuilder(ValueConverterProvider valueConverterProv public TableSchema create(TopicNamingStrategy topicNamingStrategy, Table table, ColumnNameFilter filter, ColumnMappers mappers, KeyMapper keysMapper) { TableSchema schema = super.create(topicNamingStrategy, table, filter, mappers, keysMapper); - if (schema.keySchema() == null) { - return new TableSchema(schema.id(), - SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(), - (row) -> schema.keyFromColumnData(row), - schema.getEnvelopeSchema(), - schema.valueSchema(), - (row) -> schema.valueFromColumnData(row)); - } else { - return schema; - } + return new TableSchema(schema.id(), + SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build(), + (row) -> schema.keyFromColumnData(row), + schema.getEnvelopeSchema(), + schema.valueSchema(), + (row) -> schema.valueFromColumnData(row)); } } diff --git a/src/test/java/com/singlestore/debezium/SnapshotIT.java b/src/test/java/com/singlestore/debezium/SnapshotIT.java index 5f4208c..0dad2e6 100644 --- a/src/test/java/com/singlestore/debezium/SnapshotIT.java +++ b/src/test/java/com/singlestore/debezium/SnapshotIT.java @@ -49,19 +49,19 @@ public void testSnapshotA() throws Exception { try { final SourceRecords recordsA = consumeRecordsByTopic(3); final List table1 = recordsA.recordsForTopic(TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".A") - .stream().sorted(Comparator.comparingInt(v -> (Integer) ((Struct) v.key()).get("pk"))).collect(Collectors.toList()); + .stream().sorted(Comparator.comparingInt(v -> (Integer) ((Struct)((Struct) v.value()).get("after")).get("pk"))).collect(Collectors.toList()); assertThat(table1).hasSize(3); for (int i = 0; i < 3; i++) { final SourceRecord record1 = table1.get(i); - final List expectedKey1 = List.of( - new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), i)); final List expectedRow1 = Arrays.asList( new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), i), new SchemaAndValueField("aa", Schema.OPTIONAL_STRING_SCHEMA, "test" + i)); final Struct key1 = (Struct) record1.key(); final Struct value1 = (Struct) record1.value(); - assertRecord(key1, expectedKey1); + assertNotNull(key1.get("internalId")); + assertEquals(Schema.Type.STRUCT, key1.schema().type()); + assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); assertRecord((Struct) value1.get("after"), expectedRow1); assertThat(record1.sourceOffset()) .extracting("snapshot").containsExactly(true); diff --git a/src/test/java/com/singlestore/debezium/StreamingIT.java b/src/test/java/com/singlestore/debezium/StreamingIT.java index c0c6e8e..cfe78de 100644 --- a/src/test/java/com/singlestore/debezium/StreamingIT.java +++ b/src/test/java/com/singlestore/debezium/StreamingIT.java @@ -185,6 +185,7 @@ public void readSeveralOperations() throws SQLException, InterruptedException { Configuration config = defaultJdbcConfigWithTable("product"); config = config.edit() .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "product") + .withDefault("tombstones.on.delete", "false") .build(); start(SingleStoreDBConnector.class, config); @@ -198,15 +199,12 @@ public void readSeveralOperations() throws SQLException, InterruptedException { conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2"); conn.execute("INSERT INTO `product` (`id`) VALUES (4)"); - List records = consumeRecordsByTopic(7).allRecordsInOrder(); + List records = consumeRecordsByTopic(6).allRecordsInOrder(); - List ids = Arrays.asList(new Integer[]{1, 2, 3, - 0, // TODO: PLAT-6906 get PK for DELETE events - 0, // TODO: PLAT-6906 get PK for DELETE events - 2, 4}); - List operations = Arrays.asList(new String[]{"c", "c", "c", "d", null, "u", "c"}); + List ids = new ArrayList<>(); + List operations = Arrays.asList(new String[]{"c", "c", "c", "d", "u", "c"}); - assertEquals(7, records.size()); + assertEquals(6, records.size()); for (int i = 0; i < records.size(); i++) { SourceRecord record = records.get(i); @@ -219,8 +217,11 @@ public void readSeveralOperations() throws SQLException, InterruptedException { } Struct key = (Struct) record.key(); - assertEquals(ids.get(i), key.get("id")); + ids.add(key.getInt64("internalId")); } + + assertEquals(ids.get(0), ids.get(3)); + assertEquals(ids.get(1), ids.get(4)); } finally { stopConnector(); }