Skip to content

Commit

Permalink
PLAT-6952 Test all connector options
Browse files Browse the repository at this point in the history
  • Loading branch information
oyeliseiev-ua committed Jan 26, 2024
1 parent 2903421 commit c474f3d
Show file tree
Hide file tree
Showing 10 changed files with 1,816 additions and 471 deletions.
598 changes: 598 additions & 0 deletions codestyle/intellij-java-google-style.xml

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ public Map<String, ConfigValue> validate() {
private static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit()
.name("SingleStoreDB")
.excluding(SNAPSHOT_LOCK_TIMEOUT_MS,
MSG_KEY_COLUMNS,
INCLUDE_SCHEMA_COMMENTS,
INCLUDE_SCHEMA_CHANGES,
SCHEMA_INCLUDE_LIST,
Expand All @@ -168,9 +169,18 @@ public Map<String, ConfigValue> validate() {
SNAPSHOT_FETCH_SIZE,
SNAPSHOT_MAX_THREADS,
TABLE_IGNORE_BUILTIN,
SNAPSHOT_MODE_TABLES,
TABLE_INCLUDE_LIST,
TABLE_EXCLUDE_LIST,
INCREMENTAL_SNAPSHOT_CHUNK_SIZE,
SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE,
INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES,
INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY,
SIGNAL_DATA_COLLECTION,
SIGNAL_POLL_INTERVAL_MS,
SIGNAL_ENABLED_CHANNELS,
SNAPSHOT_FULL_COLUMN_SCAN_FORCE, //single table supported
SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, //single table supported
// TODO PLAT-6820 implement transaction monitoring
PROVIDE_TRANSACTION_METADATA)
.type(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.debezium.DebeziumException;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -86,16 +87,24 @@ public ChangeEventSourceCoordinator<SingleStoreDBPartition, SingleStoreDBOffsetC
DocumentReader.defaultReader(),
previousOffsets);


final Configuration heartbeatConfig = config;
final EventDispatcher<SingleStoreDBPartition, TableId> dispatcher = new EventDispatcher<>(
connectorConfig,
topicNamingStrategy,
schema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
DataChangeEvent::new,
metadataProvider,
// TODO: add heartbeat
metadataProvider,
connectorConfig.createHeartbeat(
topicNamingStrategy,
schemaNameAdjuster,
() -> new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(heartbeatConfig)),
exception -> {
final String sqlErrorId = exception.getMessage();
throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception);
}
),
schemaNameAdjuster,
signalProcessor);

Expand Down
175 changes: 175 additions & 0 deletions src/test/java/com/singlestore/debezium/ColumnMappingsIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package com.singlestore.debezium;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;

import io.debezium.config.Configuration;
import java.sql.SQLException;
import java.util.List;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;

public class ColumnMappingsIT extends IntegrationTestBase {

@Test
public void testHashMask() throws SQLException, InterruptedException {
try (SingleStoreDBConnection conn = new SingleStoreDBConnection(
defaultJdbcConnectionConfigWithTable("song"))) {
Configuration config = defaultJdbcConfigWithTable("song");
config = config.edit()
.with("column.mask.hash.SHA-256.with.salt.salt123", TEST_DATABASE + ".song.author")
.with("column.mask.with.10.chars", TEST_DATABASE + ".song.name").build();

start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();

try {
conn.execute("INSERT INTO `song` VALUES ('test1', 'test1')");
conn.execute("INSERT INTO `song` VALUES ('test1', 'test2')");
conn.execute("INSERT INTO `song` VALUES ('test1', 'test3')");

List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder();

assertEquals(3, records.size());
for (int i = 0; i < records.size(); i++) {
SourceRecord record = records.get(i);
Struct value = (Struct) ((Struct) record.value()).get("after");
assertNotEquals("test" + (i + 1), value.get("author"));
assertEquals("**********", value.get("name"));
}
} finally {
stopConnector();
}
}
}

@Test
public void testTruncate() throws SQLException, InterruptedException {
try (SingleStoreDBConnection conn = new SingleStoreDBConnection(
defaultJdbcConnectionConfigWithTable("song"))) {
Configuration config = defaultJdbcConfigWithTable("song");
config = config.edit().with("column.truncate.to.10.chars",
TEST_DATABASE + ".song.author," + TEST_DATABASE + ".song.name").build();

start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();

try {
conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')");
conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')");
conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')");

List<SourceRecord> records = consumeRecordsByTopic(3).allRecordsInOrder();

assertEquals(3, records.size());
for (SourceRecord record : records) {
Struct value = (Struct) ((Struct) record.value()).get("after");
assertEquals("1234567890", value.get("author"));
assertEquals("1234567890", value.get("name"));
}
} finally {
stopConnector();
}
}
}

@Test
public void testColumnPropagate() throws SQLException, InterruptedException {
try (SingleStoreDBConnection conn = new SingleStoreDBConnection(
defaultJdbcConnectionConfigWithTable("person"))) {
Configuration config = defaultJdbcConfigWithTable("person");
config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*")
.build();

start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();

try {
conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) "
+ "VALUES ('Mike', '1999-01-01', 10, 100, 'a')");
List<SourceRecord> records = consumeRecordsByTopic(1).allRecordsInOrder();
assertEquals(1, records.size());
SourceRecord record = records.get(0);
Schema afterSchema = record.valueSchema().field("after").schema();
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("VARCHAR");
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("255");
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("name");

assertThat(afterSchema.field("birthdate").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("DATE");
assertThat(afterSchema.field("birthdate").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("10");
assertThat(afterSchema.field("birthdate").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("birthdate");

assertThat(afterSchema.field("age").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("INT");
assertThat(afterSchema.field("age").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("10");
assertThat(afterSchema.field("age").schema().parameters()
.get("__debezium.source.column.scale")).isEqualTo("0");
assertThat(afterSchema.field("age").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("age");

assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("DECIMAL");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("5");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.scale")).isEqualTo("2");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("salary");
} finally {
stopConnector();
}
}
}

@Test
public void testDataTypePropagate() throws SQLException, InterruptedException {
try (SingleStoreDBConnection conn = new SingleStoreDBConnection(
defaultJdbcConnectionConfigWithTable("person"))) {
Configuration config = defaultJdbcConfigWithTable("person");
config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_DATATYPE_SOURCE_TYPE,
".+\\.VARCHAR,.+\\.DECIMAL").build();

start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();

try {
conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) "
+ "VALUES ('Mike', '1999-01-01', 10, 100, 'a')");
List<SourceRecord> records = consumeRecordsByTopic(1).allRecordsInOrder();
assertEquals(1, records.size());
SourceRecord record = records.get(0);
Schema afterSchema = record.valueSchema().field("after").schema();
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("VARCHAR");
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("255");
assertThat(afterSchema.field("name").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("name");

assertThat(afterSchema.field("birthdate").schema().parameters()).isNull();
assertThat(afterSchema.field("age").schema().parameters()).isNull();

assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.type")).isEqualTo("DECIMAL");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.length")).isEqualTo("5");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.scale")).isEqualTo("2");
assertThat(afterSchema.field("salary").schema().parameters()
.get("__debezium.source.column.name")).isEqualTo("salary");
} finally {
stopConnector();
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.singlestore.debezium;

import static org.junit.Assert.assertEquals;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import java.util.List;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Before;
import org.junit.Test;

public class EventProcessingFailureHandlingIT extends IntegrationTestBase {

@Before
public void initTestData() {
String statements =
"DROP DATABASE IF EXISTS " + TEST_DATABASE + ";" + "CREATE DATABASE " + TEST_DATABASE + ";"
+ "DROP TABLE IF EXISTS " + TEST_DATABASE + ".tablea;" + "DROP TABLE IF EXISTS "
+ TEST_DATABASE + ".tableb;" + "CREATE ROWSTORE TABLE " + TEST_DATABASE
+ ".tablea (id int primary key, cola varchar(30));" + "CREATE TABLE " + TEST_DATABASE
+ ".tableb (id int primary key, colb BIGINT NOT NULL)";
execute(statements);
}

@Test
public void ignore() throws Exception {
final Configuration config = defaultJdbcConfigBuilder().withDefault(
SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE)
.withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "tablea")
.with(SingleStoreDBConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE,
CommonConnectorConfig.EventProcessingFailureHandlingMode.IGNORE).build();

start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();
try (SingleStoreDBConnection connection = new SingleStoreDBConnection(
new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config))) {

connection.execute("INSERT INTO tablea VALUES (1, 'text1')");
List<SourceRecord> records = consumeRecordsByTopic(1).allRecordsInOrder();
assertEquals(1, records.size());
connection.execute("DELETE FROM tablea where id = 1");
connection.execute("ALTER TABLE " + TEST_DATABASE + ".tablea MODIFY COLUMN cola int");
connection.execute("INSERT INTO tablea VALUES (1, 1)");
records = consumeRecordsByTopic(1).allRecordsInOrder();
assertEquals(1, records.size());
} finally {
stopConnector();
}
}
}
98 changes: 98 additions & 0 deletions src/test/java/com/singlestore/debezium/MetricsIT.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.singlestore.debezium;

import static io.debezium.heartbeat.DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY;
import static io.debezium.heartbeat.Heartbeat.HEARTBEAT_INTERVAL;
import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import java.lang.management.ManagementFactory;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import javax.management.MBeanServer;
import javax.management.ObjectName;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.Test;

public class MetricsIT extends IntegrationTestBase {

@Test
public void testHeartBeat() throws InterruptedException {
Configuration config = defaultJdbcConfigWithTable("product");
config = config.edit().withDefault(TOPIC_HEARTBEAT_PREFIX, "__heartbeat")
.withDefault(HEARTBEAT_ACTION_QUERY, "SELECT 1").withDefault(HEARTBEAT_INTERVAL, 1000)
.build();
start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();
try {
List<SourceRecord> records = consumeRecordsByTopic(1).allRecordsInOrder();
assertEquals(1, records.size());
assertEquals("__heartbeat.singlestore_topic", records.get(0).topic());
} finally {
stopConnector();
}
}

@Test
public void testCustomMetrics() throws Exception {
//create snapshot
String statements =
"DROP TABLE IF EXISTS " + TEST_DATABASE + ".A;" + "CREATE TABLE " + TEST_DATABASE
+ ".A (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));" + "INSERT INTO " + TEST_DATABASE
+ ".A VALUES(0, 'test0');" + "INSERT INTO " + TEST_DATABASE + ".A VALUES(4, 'test4');"
+ "SNAPSHOT DATABASE " + TEST_DATABASE + ";";
execute(statements);
final Configuration config = defaultJdbcConfigBuilder().withDefault(
SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE)
.withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A")
.with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata").build();

Map<String, String> customMetricTags = new SingleStoreDBConnectorConfig(
config).getCustomMetricTags();
start(SingleStoreDBConnector.class, config);
assertConnectorIsRunning();
assertSnapshotWithCustomMetrics(customMetricTags);
assertStreamingWithCustomMetrics(customMetricTags);
}

private void assertSnapshotWithCustomMetrics(Map<String, String> customMetricTags)
throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getSnapshotMetricsObjectName("singlestoredb", "singlestore_topic",
customMetricTags);
Thread.sleep(Duration.ofSeconds(3).toMillis());
// Check snapshot metrics
assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1);
assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo(
new String[]{"db.A"});
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false);
assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo(
0L);
}

private void assertStreamingWithCustomMetrics(Map<String, String> customMetricTags)
throws Exception {
final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
final ObjectName objectName = getStreamingMetricsObjectName("singlestoredb",
"singlestore_topic", customMetricTags);
// Insert for streaming events
String statements =
"INSERT INTO " + TEST_DATABASE + ".A VALUES(1, 'test1');" + "INSERT INTO " + TEST_DATABASE
+ ".A VALUES(2, 'test2');";
execute(statements);
Thread.sleep(Duration.ofSeconds(3).toMillis());
// Check streaming metrics
assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true);
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfUpdateEventsSeen")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfDeleteEventsSeen")).isEqualTo(0L);
assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(
4L);//todo fix should be 2?
}
}
Loading

0 comments on commit c474f3d

Please sign in to comment.