diff --git a/codestyle/intellij-java-google-style.xml b/codestyle/intellij-java-google-style.xml new file mode 100644 index 0000000..f3a6743 --- /dev/null +++ b/codestyle/intellij-java-google-style.xml @@ -0,0 +1,598 @@ + + + + + + diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java b/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java index bdb977b..c32110b 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java @@ -160,6 +160,7 @@ public Map validate() { private static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION.edit() .name("SingleStoreDB") .excluding(SNAPSHOT_LOCK_TIMEOUT_MS, + MSG_KEY_COLUMNS, INCLUDE_SCHEMA_COMMENTS, INCLUDE_SCHEMA_CHANGES, SCHEMA_INCLUDE_LIST, @@ -168,9 +169,18 @@ public Map validate() { SNAPSHOT_FETCH_SIZE, SNAPSHOT_MAX_THREADS, TABLE_IGNORE_BUILTIN, + SNAPSHOT_MODE_TABLES, TABLE_INCLUDE_LIST, TABLE_EXCLUDE_LIST, + INCREMENTAL_SNAPSHOT_CHUNK_SIZE, + SNAPSHOT_SELECT_STATEMENT_OVERRIDES_BY_TABLE, + INCREMENTAL_SNAPSHOT_ALLOW_SCHEMA_CHANGES, INCREMENTAL_SNAPSHOT_WATERMARKING_STRATEGY, + SIGNAL_DATA_COLLECTION, + SIGNAL_POLL_INTERVAL_MS, + SIGNAL_ENABLED_CHANNELS, + SNAPSHOT_FULL_COLUMN_SCAN_FORCE, //single table supported + SNAPSHOT_TABLES_ORDER_BY_ROW_COUNT, //single table supported // TODO PLAT-6820 implement transaction monitoring PROVIDE_TRANSACTION_METADATA) .type( diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java b/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java index 204dd66..3a87faf 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java @@ -4,6 +4,7 @@ import java.util.Map; import java.util.stream.Collectors; +import io.debezium.DebeziumException; import org.apache.kafka.connect.source.SourceRecord; import io.debezium.config.Configuration; @@ -86,7 +87,7 @@ public ChangeEventSourceCoordinator dispatcher = new EventDispatcher<>( connectorConfig, topicNamingStrategy, @@ -94,8 +95,16 @@ public ChangeEventSourceCoordinator new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(heartbeatConfig)), + exception -> { + final String sqlErrorId = exception.getMessage(); + throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); + } + ), schemaNameAdjuster, signalProcessor); diff --git a/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java b/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java new file mode 100644 index 0000000..ec0f8b9 --- /dev/null +++ b/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java @@ -0,0 +1,175 @@ +package com.singlestore.debezium; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import io.debezium.config.Configuration; +import java.sql.SQLException; +import java.util.List; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +public class ColumnMappingsIT extends IntegrationTestBase { + + @Test + public void testHashMask() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("song"))) { + Configuration config = defaultJdbcConfigWithTable("song"); + config = config.edit() + .with("column.mask.hash.SHA-256.with.salt.salt123", TEST_DATABASE + ".song.author") + .with("column.mask.with.10.chars", TEST_DATABASE + ".song.name").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `song` VALUES ('test1', 'test1')"); + conn.execute("INSERT INTO `song` VALUES ('test1', 'test2')"); + conn.execute("INSERT INTO `song` VALUES ('test1', 'test3')"); + + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + + assertEquals(3, records.size()); + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + Struct value = (Struct) ((Struct) record.value()).get("after"); + assertNotEquals("test" + (i + 1), value.get("author")); + assertEquals("**********", value.get("name")); + } + } finally { + stopConnector(); + } + } + } + + @Test + public void testTruncate() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("song"))) { + Configuration config = defaultJdbcConfigWithTable("song"); + config = config.edit().with("column.truncate.to.10.chars", + TEST_DATABASE + ".song.author," + TEST_DATABASE + ".song.name").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')"); + conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')"); + conn.execute("INSERT INTO `song` VALUES ('12345678901234567890', '12345678901234567890')"); + + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + + assertEquals(3, records.size()); + for (SourceRecord record : records) { + Struct value = (Struct) ((Struct) record.value()).get("after"); + assertEquals("1234567890", value.get("author")); + assertEquals("1234567890", value.get("name")); + } + } finally { + stopConnector(); + } + } + } + + @Test + public void testColumnPropagate() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("person"))) { + Configuration config = defaultJdbcConfigWithTable("person"); + config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") + .build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + + "VALUES ('Mike', '1999-01-01', 10, 100, 'a')"); + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + SourceRecord record = records.get(0); + Schema afterSchema = record.valueSchema().field("after").schema(); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("VARCHAR"); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("255"); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("name"); + + assertThat(afterSchema.field("birthdate").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("DATE"); + assertThat(afterSchema.field("birthdate").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("10"); + assertThat(afterSchema.field("birthdate").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("birthdate"); + + assertThat(afterSchema.field("age").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("INT"); + assertThat(afterSchema.field("age").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("10"); + assertThat(afterSchema.field("age").schema().parameters() + .get("__debezium.source.column.scale")).isEqualTo("0"); + assertThat(afterSchema.field("age").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("age"); + + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("DECIMAL"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("5"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.scale")).isEqualTo("2"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("salary"); + } finally { + stopConnector(); + } + } + } + + @Test + public void testDataTypePropagate() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("person"))) { + Configuration config = defaultJdbcConfigWithTable("person"); + config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_DATATYPE_SOURCE_TYPE, + ".+\\.VARCHAR,.+\\.DECIMAL").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + + "VALUES ('Mike', '1999-01-01', 10, 100, 'a')"); + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + SourceRecord record = records.get(0); + Schema afterSchema = record.valueSchema().field("after").schema(); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("VARCHAR"); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("255"); + assertThat(afterSchema.field("name").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("name"); + + assertThat(afterSchema.field("birthdate").schema().parameters()).isNull(); + assertThat(afterSchema.field("age").schema().parameters()).isNull(); + + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.type")).isEqualTo("DECIMAL"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.length")).isEqualTo("5"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.scale")).isEqualTo("2"); + assertThat(afterSchema.field("salary").schema().parameters() + .get("__debezium.source.column.name")).isEqualTo("salary"); + } finally { + stopConnector(); + } + } + } +} \ No newline at end of file diff --git a/src/test/java/com/singlestore/debezium/EventProcessingFailureHandlingIT.java b/src/test/java/com/singlestore/debezium/EventProcessingFailureHandlingIT.java new file mode 100644 index 0000000..b0a7004 --- /dev/null +++ b/src/test/java/com/singlestore/debezium/EventProcessingFailureHandlingIT.java @@ -0,0 +1,50 @@ +package com.singlestore.debezium; + +import static org.junit.Assert.assertEquals; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import java.util.List; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.Test; + +public class EventProcessingFailureHandlingIT extends IntegrationTestBase { + + @Before + public void initTestData() { + String statements = + "DROP DATABASE IF EXISTS " + TEST_DATABASE + ";" + "CREATE DATABASE " + TEST_DATABASE + ";" + + "DROP TABLE IF EXISTS " + TEST_DATABASE + ".tablea;" + "DROP TABLE IF EXISTS " + + TEST_DATABASE + ".tableb;" + "CREATE ROWSTORE TABLE " + TEST_DATABASE + + ".tablea (id int primary key, cola varchar(30));" + "CREATE TABLE " + TEST_DATABASE + + ".tableb (id int primary key, colb BIGINT NOT NULL)"; + execute(statements); + } + + @Test + public void ignore() throws Exception { + final Configuration config = defaultJdbcConfigBuilder().withDefault( + SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "tablea") + .with(SingleStoreDBConnectorConfig.EVENT_PROCESSING_FAILURE_HANDLING_MODE, + CommonConnectorConfig.EventProcessingFailureHandlingMode.IGNORE).build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + try (SingleStoreDBConnection connection = new SingleStoreDBConnection( + new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config))) { + + connection.execute("INSERT INTO tablea VALUES (1, 'text1')"); + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + connection.execute("DELETE FROM tablea where id = 1"); + connection.execute("ALTER TABLE " + TEST_DATABASE + ".tablea MODIFY COLUMN cola int"); + connection.execute("INSERT INTO tablea VALUES (1, 1)"); + records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + } finally { + stopConnector(); + } + } +} diff --git a/src/test/java/com/singlestore/debezium/MetricsIT.java b/src/test/java/com/singlestore/debezium/MetricsIT.java new file mode 100644 index 0000000..6f0613d --- /dev/null +++ b/src/test/java/com/singlestore/debezium/MetricsIT.java @@ -0,0 +1,98 @@ +package com.singlestore.debezium; + +import static io.debezium.heartbeat.DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY; +import static io.debezium.heartbeat.Heartbeat.HEARTBEAT_INTERVAL; +import static io.debezium.schema.AbstractTopicNamingStrategy.TOPIC_HEARTBEAT_PREFIX; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import java.lang.management.ManagementFactory; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import javax.management.MBeanServer; +import javax.management.ObjectName; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +public class MetricsIT extends IntegrationTestBase { + + @Test + public void testHeartBeat() throws InterruptedException { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit().withDefault(TOPIC_HEARTBEAT_PREFIX, "__heartbeat") + .withDefault(HEARTBEAT_ACTION_QUERY, "SELECT 1").withDefault(HEARTBEAT_INTERVAL, 1000) + .build(); + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + try { + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + assertEquals("__heartbeat.singlestore_topic", records.get(0).topic()); + } finally { + stopConnector(); + } + } + + @Test + public void testCustomMetrics() throws Exception { + //create snapshot + String statements = + "DROP TABLE IF EXISTS " + TEST_DATABASE + ".A;" + "CREATE TABLE " + TEST_DATABASE + + ".A (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));" + "INSERT INTO " + TEST_DATABASE + + ".A VALUES(0, 'test0');" + "INSERT INTO " + TEST_DATABASE + ".A VALUES(4, 'test4');" + + "SNAPSHOT DATABASE " + TEST_DATABASE + ";"; + execute(statements); + final Configuration config = defaultJdbcConfigBuilder().withDefault( + SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata").build(); + + Map customMetricTags = new SingleStoreDBConnectorConfig( + config).getCustomMetricTags(); + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + assertSnapshotWithCustomMetrics(customMetricTags); + assertStreamingWithCustomMetrics(customMetricTags); + } + + private void assertSnapshotWithCustomMetrics(Map customMetricTags) + throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + final ObjectName objectName = getSnapshotMetricsObjectName("singlestoredb", "singlestore_topic", + customMetricTags); + Thread.sleep(Duration.ofSeconds(3).toMillis()); + // Check snapshot metrics + assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1); + assertThat(mBeanServer.getAttribute(objectName, "CapturedTables")).isEqualTo( + new String[]{"db.A"}); + assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo(2L); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotRunning")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotAborted")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotCompleted")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotPaused")).isEqualTo(false); + assertThat(mBeanServer.getAttribute(objectName, "SnapshotPausedDurationInSeconds")).isEqualTo( + 0L); + } + + private void assertStreamingWithCustomMetrics(Map customMetricTags) + throws Exception { + final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + final ObjectName objectName = getStreamingMetricsObjectName("singlestoredb", + "singlestore_topic", customMetricTags); + // Insert for streaming events + String statements = + "INSERT INTO " + TEST_DATABASE + ".A VALUES(1, 'test1');" + "INSERT INTO " + TEST_DATABASE + + ".A VALUES(2, 'test2');"; + execute(statements); + Thread.sleep(Duration.ofSeconds(3).toMillis()); + // Check streaming metrics + assertThat(mBeanServer.getAttribute(objectName, "Connected")).isEqualTo(true); + assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfUpdateEventsSeen")).isEqualTo(0L); + assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfDeleteEventsSeen")).isEqualTo(0L); + assertThat(mBeanServer.getAttribute(objectName, "TotalNumberOfEventsSeen")).isEqualTo( + 4L);//todo fix should be 2? + } +} diff --git a/src/test/java/com/singlestore/debezium/NotificationsIT.java b/src/test/java/com/singlestore/debezium/NotificationsIT.java new file mode 100644 index 0000000..25d76ff --- /dev/null +++ b/src/test/java/com/singlestore/debezium/NotificationsIT.java @@ -0,0 +1,132 @@ +package com.singlestore.debezium; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.pipeline.notification.Notification; +import io.debezium.pipeline.notification.channels.SinkNotificationChannel; +import io.debezium.pipeline.notification.channels.jmx.JmxNotificationChannelMXBean; +import java.lang.management.ManagementFactory; +import java.time.Instant; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import javax.management.AttributeNotFoundException; +import javax.management.InstanceNotFoundException; +import javax.management.IntrospectionException; +import javax.management.JMX; +import javax.management.MBeanAttributeInfo; +import javax.management.MBeanException; +import javax.management.MBeanInfo; +import javax.management.MBeanServer; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.ReflectionException; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.assertj.core.api.Assertions; +import org.assertj.core.data.Percentage; +import org.awaitility.Awaitility; +import org.junit.Test; + +public class NotificationsIT extends SnapshotIT { + + @Test + public void notificationCorrectlySentOnItsTopic() { + final Configuration config = defaultJdbcConfigBuilder().withDefault( + SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") + .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink").build(); + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + List notifications = new ArrayList<>(); + Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { + consumeAvailableRecords(r -> { + if (r.topic().equals("io.debezium.notification")) { + notifications.add(r); + } + }); + return notifications.size() == 2; + }); + assertThat(notifications).hasSize(2); + SourceRecord sourceRecord = notifications.get(0); + Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")) + .isEqualTo("Initial Snapshot"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("STARTED"); + Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")) + .isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); + sourceRecord = notifications.get(notifications.size() - 1); + Assertions.assertThat(sourceRecord.topic()).isEqualTo("io.debezium.notification"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("aggregate_type")) + .isEqualTo("Initial Snapshot"); + Assertions.assertThat(((Struct) sourceRecord.value()).getString("type")).isEqualTo("COMPLETED"); + Assertions.assertThat(((Struct) sourceRecord.value()).getInt64("timestamp")) + .isCloseTo(Instant.now().toEpochMilli(), Percentage.withPercentage(1)); + } + + @Test + public void notificationNotSentIfNoChannelIsConfigured() { + final Configuration config = defaultJdbcConfigBuilder().withDefault( + SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification").build(); + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); + List notifications = consumedLines.stream() + .filter(r -> r.topic().equals("io.debezium.notification")).collect(Collectors.toList()); + assertThat(notifications).isEmpty(); + } + + @Test + public void notificationCorrectlySentOnJmx() + throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException { + + final Configuration config = defaultJdbcConfigBuilder().withDefault( + SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS) + .pollInterval(1, TimeUnit.SECONDS).until(() -> !readNotificationFromJmx().isEmpty()); + + final List notifications = readNotificationFromJmx(); + assertThat(notifications).hasSize(2); + assertThat(notifications.get(0)).hasFieldOrPropertyWithValue("aggregateType", + "Initial Snapshot").hasFieldOrPropertyWithValue("type", "STARTED") + .hasFieldOrProperty("timestamp"); + assertThat(notifications.get(notifications.size() - 1)).hasFieldOrPropertyWithValue( + "aggregateType", "Initial Snapshot").hasFieldOrPropertyWithValue("type", "COMPLETED") + .hasFieldOrProperty("timestamp"); + List notificationsAfterReset = readNotificationFromJmx(); + assertThat(notificationsAfterReset).hasSize(2); + } + + private List readNotificationFromJmx() + throws MalformedObjectNameException, ReflectionException, InstanceNotFoundException, IntrospectionException { + ObjectName notificationBean = getObjectName(); + MBeanServer server = ManagementFactory.getPlatformMBeanServer(); + MBeanInfo mBeanInfo = server.getMBeanInfo(notificationBean); + List attributesNames = Arrays.stream(mBeanInfo.getAttributes()) + .map(MBeanAttributeInfo::getName).collect(Collectors.toList()); + assertThat(attributesNames).contains("Notifications"); + JmxNotificationChannelMXBean proxy = JMX.newMXBeanProxy(server, notificationBean, + JmxNotificationChannelMXBean.class); + + return proxy.getNotifications(); + } + + private ObjectName getObjectName() throws MalformedObjectNameException { + return new ObjectName( + String.format("debezium.%s:type=management,context=notifications,server=%s", + "singlestoredb", "singlestore_topic")); + } +} diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java index 4118f5f..cc7567a 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java @@ -1,12 +1,111 @@ package com.singlestore.debezium; -import java.util.Map; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import com.singlestore.debezium.SingleStoreDBConnectorConfig.SecureConnectionMode; +import com.singlestore.debezium.SingleStoreDBConnectorConfig.SnapshotMode; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; +import io.debezium.config.Field; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; +import io.debezium.schema.AbstractTopicNamingStrategy; +import io.debezium.schema.DefaultTopicNamingStrategy; +import io.debezium.schema.DefaultUnicodeTopicNamingStrategy; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; import org.apache.kafka.common.config.Config; +import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; public class SingleStoreDBConnectorIT extends IntegrationTestBase { - + + @Test + public void shouldFailToValidateInvalidConfiguration() { + Configuration config = Configuration.create().build(); + SingleStoreDBConnector connector = new SingleStoreDBConnector(); + Config result = connector.validate(config.asMap()); + + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.DATABASE_NAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.TABLE_NAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.HOSTNAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.USER, 1); + assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.PORT); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.MAX_QUEUE_SIZE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.MAX_BATCH_SIZE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.POLL_INTERVAL_MS); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SNAPSHOT_MODE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_MODE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.DECIMAL_HANDLING_MODE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.TIME_PRECISION_MODE); + assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID); + } + + @Test + public void shouldValidateAcceptableConfiguration() { + Configuration config = Configuration.create().build(); + SingleStoreDBConnector connector = new SingleStoreDBConnector(); + Config result = connector.validate(config.asMap()); + + // validate that the required fields have errors + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.DATABASE_NAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.TABLE_NAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.HOSTNAME, 1); + assertConfigurationErrors(result, SingleStoreDBConnectorConfig.USER, 1); + assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); + + // validate the non required fields + validateConfigField(result, SingleStoreDBConnectorConfig.PORT, 3306); + validateConfigField(result, SingleStoreDBConnectorConfig.PASSWORD, null); + validateConfigField(result, SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST, null); + validateConfigField(result, SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, null); + validateConfigField(result, SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS, 30000); + validateConfigField(result, SingleStoreDBConnectorConfig.MAX_QUEUE_SIZE, 8192); + validateConfigField(result, SingleStoreDBConnectorConfig.MAX_BATCH_SIZE, 2048); + validateConfigField(result, SingleStoreDBConnectorConfig.POLL_INTERVAL_MS, 500L); + validateConfigField(result, SingleStoreDBConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL); + validateConfigField(result, SingleStoreDBConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLE); + validateConfigField(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE, null); + validateConfigField(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD, null); + validateConfigField(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE, null); + validateConfigField(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD, null); + validateConfigField(result, SingleStoreDBConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE); + validateConfigField(result, SingleStoreDBConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE); + validateConfigField(result, SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultTopicNamingStrategy.class.getName()); + validateConfigField(result, SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID, false); + } + + private void validateConfigField(Config config, Field field, T expectedValue) { + assertNoConfigurationErrors(config, field); + Object actualValue = configValue(config, field.name()).value(); + if (actualValue == null) { + actualValue = field.defaultValue(); + } + if (expectedValue == null) { + assertThat(actualValue).isNull(); + } + else { + if (expectedValue instanceof EnumeratedValue) { + assertThat(((EnumeratedValue) expectedValue).getValue()).isEqualTo(actualValue.toString()); + } + else { + assertThat(expectedValue).isEqualTo(actualValue); + } + } + } + @Test public void successfullConfigValidation() { SingleStoreDBConnector connector = new SingleStoreDBConnector(); @@ -44,4 +143,55 @@ public void configMissingTable() { Config validatedConfig = connector.validate(config); assertConfigurationErrors(validatedConfig, SingleStoreDBConnectorConfig.TABLE_NAME, 1); } + + + @Test + public void testTopicOptions() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit() + .with(SingleStoreDBConnectorConfig.TOPIC_PREFIX, "prefix") + .with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_") + .build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + assertEquals("prefix_db_product", records.get(0).topic()); + } finally { + stopConnector(); + } + } + } + + @Test + public void testTopicNamingStrategy() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit() + .with(SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultUnicodeTopicNamingStrategy.class.getName()) + + .build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + assertEquals("singlestore_u005ftopic.db.product", records.get(0).topic()); + } finally { + stopConnector(); + } + } + } } diff --git a/src/test/java/com/singlestore/debezium/SnapshotIT.java b/src/test/java/com/singlestore/debezium/SnapshotIT.java index 224aeaa..af04987 100644 --- a/src/test/java/com/singlestore/debezium/SnapshotIT.java +++ b/src/test/java/com/singlestore/debezium/SnapshotIT.java @@ -1,14 +1,14 @@ package com.singlestore.debezium; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.data.SchemaAndValueField; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaBuilder; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Before; -import org.junit.Test; - +import io.debezium.pipeline.notification.channels.SinkNotificationChannel; import java.sql.SQLException; import java.util.Arrays; import java.util.Comparator; @@ -17,165 +17,224 @@ import java.util.Set; import java.util.function.Consumer; import java.util.stream.Collectors; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.*; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Before; +import org.junit.Test; public class SnapshotIT extends IntegrationTestBase { - @Before - public void initTestData() { - String statements = "DROP DATABASE IF EXISTS " + TEST_DATABASE + ";" + - "CREATE DATABASE " + TEST_DATABASE + ";" + - "DROP TABLE IF EXISTS " + TEST_DATABASE + ".A;" + - "DROP TABLE IF EXISTS " + TEST_DATABASE + ".B;" + - "CREATE TABLE " + TEST_DATABASE + ".A (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));" + - "CREATE TABLE " + TEST_DATABASE + ".B (aa INT, bb VARCHAR(20));" + - "INSERT INTO " + TEST_DATABASE + ".B VALUES(0, 'test0');" + - "INSERT INTO " + TEST_DATABASE + ".A VALUES(0, 'test0');" + - "INSERT INTO " + TEST_DATABASE + ".A VALUES(4, 'test4');" + - "INSERT INTO " + TEST_DATABASE + ".A VALUES(1, 'test1');" + - "INSERT INTO " + TEST_DATABASE + ".A VALUES(2, 'test2');" + - "UPDATE " + TEST_DATABASE + ".B SET bb = 'testUpdated' WHERE aa = 0;" + - "DELETE FROM " + TEST_DATABASE + ".A WHERE pk = 4;" + - "SNAPSHOT DATABASE " + TEST_DATABASE + ";"; - execute(statements); - } - - @Test - public void testSnapshotA() throws Exception { - final Configuration config = defaultJdbcConfigWithTable("A"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - final SourceRecords recordsA = consumeRecordsByTopic(3); - final List table1 = recordsA.recordsForTopic(TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".A") - .stream().sorted(Comparator.comparingInt(v -> (Integer) ((Struct)((Struct) v.value()).get("after")).get("pk"))).collect(Collectors.toList()); - assertThat(table1).hasSize(3); - - for (int i = 0; i < 3; i++) { - final SourceRecord record1 = table1.get(i); - final List expectedRow1 = Arrays.asList( - new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), i), - new SchemaAndValueField("aa", Schema.OPTIONAL_STRING_SCHEMA, "test" + i)); - final Struct key1 = (Struct) record1.key(); - final Struct value1 = (Struct) record1.value(); - assertNotNull(key1.get("internalId")); - assertEquals(Schema.Type.STRUCT, key1.schema().type()); - assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); - assertRecord((Struct) value1.get("after"), expectedRow1); - assertThat(record1.sourceOffset()) - .extracting("snapshot").containsExactly(true); + @Before + public void initTestData() { + String statements = "DROP DATABASE IF EXISTS " + TEST_DATABASE + ";" + + "CREATE DATABASE " + TEST_DATABASE + ";" + + "DROP TABLE IF EXISTS " + TEST_DATABASE + ".A;" + + "DROP TABLE IF EXISTS " + TEST_DATABASE + ".B;" + + "CREATE TABLE " + TEST_DATABASE + ".A (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));" + + "CREATE TABLE " + TEST_DATABASE + ".B (aa INT, bb VARCHAR(20));" + + "INSERT INTO " + TEST_DATABASE + ".B VALUES(0, 'test0');" + + "INSERT INTO " + TEST_DATABASE + ".A VALUES(0, 'test0');" + + "INSERT INTO " + TEST_DATABASE + ".A VALUES(4, 'test4');" + + "INSERT INTO " + TEST_DATABASE + ".A VALUES(1, 'test1');" + + "INSERT INTO " + TEST_DATABASE + ".A VALUES(2, 'test2');" + + "UPDATE " + TEST_DATABASE + ".B SET bb = 'testUpdated' WHERE aa = 0;" + + "DELETE FROM " + TEST_DATABASE + ".A WHERE pk = 4;" + + "SNAPSHOT DATABASE " + TEST_DATABASE + ";"; + execute(statements); + } + + @Test + public void testSnapshotA() throws Exception { + final Configuration config = defaultJdbcConfigBuilder() + .withDefault(SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") + .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink") + .build(); + ; + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + final SourceRecords recordsA = consumeRecordsByTopic(3); + final List table1 = recordsA.recordsForTopic( + TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".A") + .stream().sorted(Comparator.comparingInt( + v -> (Integer) ((Struct) ((Struct) v.value()).get("after")).get("pk"))) + .collect(Collectors.toList()); + assertThat(table1).hasSize(3); + + for (int i = 0; i < 3; i++) { + final SourceRecord record1 = table1.get(i); + final List expectedRow1 = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), + i), + new SchemaAndValueField("aa", Schema.OPTIONAL_STRING_SCHEMA, "test" + i)); + final Struct key1 = (Struct) record1.key(); + final Struct value1 = (Struct) record1.value(); + assertNotNull(key1.get("internalId")); + assertEquals(Schema.Type.STRUCT, key1.schema().type()); + assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); + assertRecord((Struct) value1.get("after"), expectedRow1); + assertThat(record1.sourceOffset()) + .extracting("snapshot").containsExactly(true); // assertThat(record1.sourceOffset()) // .extracting("snapshot_completed").containsExactly(i == 2); - assertNull(value1.get("before")); - } - } finally { - stopConnector(); - } + assertNull(value1.get("before")); + } + } finally { + stopConnector(); } - - @Test - public void testSnapshotB() throws Exception { - final Configuration config = defaultJdbcConfigWithTable("B"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - final SourceRecords recordsB = consumeRecordsByTopic(1); - final List table2 = recordsB.recordsForTopic(TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".B"); - assertThat(table2).hasSize(1); - final SourceRecord record1 = table2.get(0); - final List expectedRow1 = Arrays.asList( - new SchemaAndValueField("aa", Schema.OPTIONAL_INT32_SCHEMA, 0), - new SchemaAndValueField("bb", Schema.OPTIONAL_STRING_SCHEMA, "testUpdated")); - final Struct key1 = (Struct) record1.key(); - final Struct value1 = (Struct) record1.value(); - assertRecord((Struct) value1.get("after"), expectedRow1); - assertThat(record1.sourceOffset()) - .extracting("snapshot").containsExactly(true); + } + + @Test + public void testSnapshotB() throws Exception { + final Configuration config = defaultJdbcConfigWithTable("B"); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + final SourceRecords recordsB = consumeRecordsByTopic(1); + final List table2 = recordsB.recordsForTopic( + TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".B"); + assertThat(table2).hasSize(1); + final SourceRecord record1 = table2.get(0); + final List expectedRow1 = Arrays.asList( + new SchemaAndValueField("aa", Schema.OPTIONAL_INT32_SCHEMA, 0), + new SchemaAndValueField("bb", Schema.OPTIONAL_STRING_SCHEMA, "testUpdated")); + final Struct key1 = (Struct) record1.key(); + final Struct value1 = (Struct) record1.value(); + assertRecord((Struct) value1.get("after"), expectedRow1); + assertThat(record1.sourceOffset()) + .extracting("snapshot").containsExactly(true); // assertThat(record1.sourceOffset()) // .extracting("snapshot_completed").containsExactly(false); - assertNull(value1.get("before")); - assertNotNull(key1.get("internalId")); - assertEquals(Schema.Type.STRUCT, key1.schema().type()); - assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); - - } finally { - stopConnector(); - } - } + assertNull(value1.get("before")); + assertNotNull(key1.get("internalId")); + assertEquals(Schema.Type.STRUCT, key1.schema().type()); + assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); - @Test - public void testSnapshotFilter() throws InterruptedException { - final Configuration config = defaultJdbcConfigWithTable("B"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - final SourceRecords recordsB = consumeRecordsByTopic(1); - final List table2 = recordsB.recordsForTopic(TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".B"); - assertThat(table2).hasSize(1); - } - - private void assertRecord(Struct record, List expected) { - expected.forEach(schemaAndValueField -> { - schemaAndValueField.assertFor(record); - - }); - } - - @Override - protected int consumeRecords(int numberOfRecords, Consumer recordConsumer) throws InterruptedException { - int breakAfterNulls = waitTimeForRecordsAfterNulls(); - return this.consumeRecords(numberOfRecords, breakAfterNulls, recordConsumer, false); + } finally { + stopConnector(); } - - @Test - public void filterColumns() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("A"))) { - - Configuration config = defaultJdbcConfigWithTable("A"); - config = config.edit() - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") - .withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, "db.A.aa") - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - try { - - List records = consumeRecordsByTopic(3).allRecordsInOrder(); - - List values = Arrays.asList(new String[]{"test0", "test1", "test2"}); - List operations = Arrays.asList(new String[]{"r", "r", "r"}); - - for (int i = 0; i < records.size(); i++) { - SourceRecord record = records.get(i); - - String operation = operations.get(i); - Struct value = (Struct) record.value(); - if (operation == null) { - assertNull(value); - } else { - assertEquals(operation, value.get("op")); - } - - value = value.getStruct("after"); - Set columnNames = value.schema() - .fields() - .stream() - .map(field -> field.name()) - .collect(Collectors.toSet()); - assertEquals(new HashSet<>(Arrays.asList("aa")), columnNames); - assertEquals(values.get(i), value.get("aa")); - } - } finally { - stopConnector(); - } + } + + @Test + public void testSnapshotFilter() throws InterruptedException { + final Configuration config = defaultJdbcConfigWithTable("B"); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + final SourceRecords recordsB = consumeRecordsByTopic(1); + final List table2 = recordsB.recordsForTopic( + TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".B"); + assertThat(table2).hasSize(1); + } + + private void assertRecord(Struct record, List expected) { + expected.forEach(schemaAndValueField -> { + schemaAndValueField.assertFor(record); + + }); + } + + @Override + protected int consumeRecords(int numberOfRecords, Consumer recordConsumer) + throws InterruptedException { + int breakAfterNulls = waitTimeForRecordsAfterNulls(); + return this.consumeRecords(numberOfRecords, breakAfterNulls, recordConsumer, false); + } + + @Test + public void filterColumns() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("A"))) { + + Configuration config = defaultJdbcConfigWithTable("A"); + config = config.edit() + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, "db.A.aa") + .build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + try { + + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + + List values = Arrays.asList(new String[]{"test0", "test1", "test2"}); + List operations = Arrays.asList(new String[]{"r", "r", "r"}); + + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + + String operation = operations.get(i); + Struct value = (Struct) record.value(); + if (operation == null) { + assertNull(value); + } else { + assertEquals(operation, value.get("op")); + } + + value = value.getStruct("after"); + Set columnNames = value.schema() + .fields() + .stream() + .map(field -> field.name()) + .collect(Collectors.toSet()); + assertEquals(new HashSet<>(Arrays.asList("aa")), columnNames); + assertEquals(values.get(i), value.get("aa")); } + } finally { + stopConnector(); + } } - + } + + @Test + public void testSnapshotDelay() throws Exception { + final Configuration config = defaultJdbcConfigBuilder() + .withDefault(SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .withDefault(SingleStoreDBConnectorConfig.SNAPSHOT_DELAY_MS, 30000) + .build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + SourceRecords recordsA = consumeRecordsByTopic(3); + assertThat(recordsA.allRecordsInOrder()).isEmpty(); + recordsA = consumeRecordsByTopic(3); + final List table1 = recordsA.recordsForTopic( + TEST_TOPIC_PREFIX + "." + TEST_DATABASE + ".A") + .stream().sorted(Comparator.comparingInt( + v -> (Integer) ((Struct) ((Struct) v.value()).get("after")).get("pk"))) + .collect(Collectors.toList()); + assertThat(table1).hasSize(3); + + for (int i = 0; i < 3; i++) { + final SourceRecord record1 = table1.get(i); + final List expectedRow1 = Arrays.asList( + new SchemaAndValueField("pk", SchemaBuilder.int32().defaultValue(0).required().build(), + i), + new SchemaAndValueField("aa", Schema.OPTIONAL_STRING_SCHEMA, "test" + i)); + final Struct key1 = (Struct) record1.key(); + final Struct value1 = (Struct) record1.value(); + assertNotNull(key1.get("internalId")); + assertEquals(Schema.Type.STRUCT, key1.schema().type()); + assertEquals(Schema.Type.INT64, key1.schema().fields().get(0).schema().type()); + assertRecord((Struct) value1.get("after"), expectedRow1); + assertThat(record1.sourceOffset()) + .extracting("snapshot").containsExactly(true); + assertNull(value1.get("before")); + } + } finally { + stopConnector(); + } + } } diff --git a/src/test/java/com/singlestore/debezium/StreamingIT.java b/src/test/java/com/singlestore/debezium/StreamingIT.java index 0808155..e81c281 100644 --- a/src/test/java/com/singlestore/debezium/StreamingIT.java +++ b/src/test/java/com/singlestore/debezium/StreamingIT.java @@ -5,6 +5,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; +import io.debezium.config.Configuration; import java.math.BigDecimal; import java.nio.ByteBuffer; import java.sql.SQLException; @@ -14,333 +15,396 @@ import java.util.List; import java.util.Set; import java.util.stream.Collectors; - import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.source.SourceRecord; import org.junit.Test; import org.locationtech.jts.io.ParseException; -import io.debezium.config.Configuration; -import io.debezium.config.Field.Recommender; - public class StreamingIT extends IntegrationTestBase { - @Test - public void canReadAllTypes() throws SQLException, InterruptedException, ParseException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("allTypesTable"))) { - Configuration config = defaultJdbcConfigWithTable("allTypesTable"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `allTypesTable` VALUES (\n" + - "TRUE, " + // boolColumn - "TRUE, " + // booleanColumn - "'abcdefgh', " + // bitColumn - "-128, " + // tinyintColumn - "-8388608, " + // mediumintColumn - "-32768, " + // smallintColumn - "-2147483648, " + // intColumn - "-2147483648, " + // integerColumn - "-9223372036854775808, " + // bigintColumn - "-100.01, " + // floatColumn - "-1000.01, " + // doubleColumn - "-1000.01, " + // realColumn - "'1000-01-01', " + // dateColumn - // Negative time returns incorrect result - // It is converted to 24h - time during reading of the result - "'0:00:00', " + // timeColumn - "'0:00:00.000000', " + // time6Column - "'1000-01-01 00:00:00', " + // datetimeColumn - "'1000-01-01 00:00:00.000000', " + // datetime6Column - "'1970-01-01 00:00:01', " + // timestampColumn - "'1970-01-01 00:00:01.000000', " + // timestamp6Column - "1901, " + // yearColumn - "12345678901234567890123456789012345.123456789012345678901234567891, " + // decimalColumn - "1234567890, " + // decColumn - "1234567890, " + // fixedColumn - "1234567890, " + // numericColumn - "'a', " + // charColumn - "'abc', " + // mediumtextColumn - "'a', " + // binaryColumn - "'abc', " + // varcharColumn - "'abc', " + // varbinaryColumn - "'abc', " + // longtextColumn - "'abc', " + // textColumn - "'abc', " + // tinytextColumn - "'abc', " + // longblobColumn - "'abc', " + // mediumblobColumn - "'abc', " + // blobColumn - "'abc', " + // tinyblobColumn - "'{}', " + // jsonColumn - "'val1', " + // enum_f - "'v1', " + // set_f + + @Test + public void canReadAllTypes() throws SQLException, InterruptedException, ParseException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("allTypesTable"))) { + Configuration config = defaultJdbcConfigWithTable("allTypesTable"); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `allTypesTable` VALUES (\n" + "TRUE, " + // boolColumn + "TRUE, " + // booleanColumn + "'abcdefgh', " + // bitColumn + "-128, " + // tinyintColumn + "-8388608, " + // mediumintColumn + "-32768, " + // smallintColumn + "-2147483648, " + // intColumn + "-2147483648, " + // integerColumn + "-9223372036854775808, " + // bigintColumn + "-100.01, " + // floatColumn + "-1000.01, " + // doubleColumn + "-1000.01, " + // realColumn + "'1000-01-01', " + // dateColumn + // Negative time returns incorrect result + // It is converted to 24h - time during reading of the result + "'0:00:00', " + // timeColumn + "'0:00:00.000000', " + // time6Column + "'1000-01-01 00:00:00', " + // datetimeColumn + "'1000-01-01 00:00:00.000000', " + // datetime6Column + "'1970-01-01 00:00:01', " + // timestampColumn + "'1970-01-01 00:00:01.000000', " + // timestamp6Column + "1901, " + // yearColumn + "12345678901234567890123456789012345.123456789012345678901234567891, " + + // decimalColumn + "1234567890, " + // decColumn + "1234567890, " + // fixedColumn + "1234567890, " + // numericColumn + "'a', " + // charColumn + "'abc', " + // mediumtextColumn + "'a', " + // binaryColumn + "'abc', " + // varcharColumn + "'abc', " + // varbinaryColumn + "'abc', " + // longtextColumn + "'abc', " + // textColumn + "'abc', " + // tinytextColumn + "'abc', " + // longblobColumn + "'abc', " + // mediumblobColumn + "'abc', " + // blobColumn + "'abc', " + // tinyblobColumn + "'{}', " + // jsonColumn + "'val1', " + // enum_f + "'v1', " + // set_f // "'POLYGON((1 1,2 1,2 2, 1 2, 1 1))', " + // geographyColumn TODO: PLAT-6907 test GEOGRAPHY datatype - "'POINT(1.50000003 1.50000000)')" // geographypointColumn - ); - - List records = consumeRecordsByTopic(1).allRecordsInOrder(); - assertEquals(1, records.size()); - - SourceRecord record = records.get(0); - Struct value = (Struct) record.value(); - Struct after = (Struct) value.get("after"); - - // TODO: PLAT-6909 handle BOOL columns as boolean - assertEquals((short)1, after.get("boolColumn")); - assertEquals((short)1, after.get("booleanColumn")); - // TODO: PLAT-6910 BIT type is returned in reversed order - assertArrayEquals("hgfedcba".getBytes(), (byte[])after.get("bitColumn")); - assertEquals((short)-128, after.get("tinyintColumn")); - assertEquals((int)-8388608, after.get("mediumintColumn")); - assertEquals((short)-32768, after.get("smallintColumn")); - assertEquals((int)-2147483648, after.get("intColumn")); - assertEquals((int)-2147483648, after.get("integerColumn")); - assertEquals((long)-9223372036854775808l, after.get("bigintColumn")); - assertEquals((float)-100.01, after.get("floatColumn")); - assertEquals((double)-1000.01, after.get("doubleColumn")); - assertEquals((double)-1000.01, after.get("realColumn")); - assertEquals((int)-354285, after.get("dateColumn")); - assertEquals((int)0, after.get("timeColumn")); - assertEquals((long)0, after.get("time6Column")); - assertEquals((long)-30610224000000l, after.get("datetimeColumn")); - assertEquals((long)-30610224000000000l, after.get("datetime6Column")); - assertEquals((long)1000, after.get("timestampColumn")); - assertEquals((long)1000000, after.get("timestamp6Column")); - assertEquals((int)1901, after.get("yearColumn")); - assertEquals(new BigDecimal("12345678901234567890123456789012345.123456789012345678901234567891"), after.get("decimalColumn")); - assertEquals(new BigDecimal("1234567890"), after.get("decColumn")); - assertEquals(new BigDecimal("1234567890"), after.get("fixedColumn")); - assertEquals(new BigDecimal("1234567890"), after.get("numericColumn")); - assertEquals("abc", after.get("mediumtextColumn")); - assertEquals(ByteBuffer.wrap("a".getBytes()), after.get("binaryColumn")); - assertEquals("abc", after.get("varcharColumn")); - assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("varbinaryColumn")); - assertEquals("abc", after.get("longtextColumn")); - assertEquals("abc", after.get("textColumn")); - assertEquals("abc", after.get("tinytextColumn")); - assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("longblobColumn")); - assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("mediumblobColumn")); - assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("blobColumn")); - assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("tinyblobColumn")); - assertEquals("{}", after.get("jsonColumn")); - assertEquals("val1", after.get("enum_f")); - assertEquals("v1", after.get("set_f")); - String geographyPointValue = "POINT(1.50000003 1.50000000)"; - SingleStoreDBGeometry singleStoreDBgeographyPointValue = SingleStoreDBGeometry.fromEkt(geographyPointValue); - assertArrayEquals((byte[]) ((Struct)after.get("geographypointColumn")).get("wkb"), - singleStoreDBgeographyPointValue.getWkb()); - } finally { - stopConnector(); - } - } + "'POINT(1.50000003 1.50000000)')" // geographypointColumn + ); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + + SourceRecord record = records.get(0); + Struct value = (Struct) record.value(); + Struct after = (Struct) value.get("after"); + + // TODO: PLAT-6909 handle BOOL columns as boolean + assertEquals((short) 1, after.get("boolColumn")); + assertEquals((short) 1, after.get("booleanColumn")); + // TODO: PLAT-6910 BIT type is returned in reversed order + assertArrayEquals("hgfedcba".getBytes(), (byte[]) after.get("bitColumn")); + assertEquals((short) -128, after.get("tinyintColumn")); + assertEquals((int) -8388608, after.get("mediumintColumn")); + assertEquals((short) -32768, after.get("smallintColumn")); + assertEquals((int) -2147483648, after.get("intColumn")); + assertEquals((int) -2147483648, after.get("integerColumn")); + assertEquals((long) -9223372036854775808l, after.get("bigintColumn")); + assertEquals((float) -100.01, after.get("floatColumn")); + assertEquals((double) -1000.01, after.get("doubleColumn")); + assertEquals((double) -1000.01, after.get("realColumn")); + assertEquals((int) -354285, after.get("dateColumn")); + assertEquals((int) 0, after.get("timeColumn")); + assertEquals((long) 0, after.get("time6Column")); + assertEquals((long) -30610224000000l, after.get("datetimeColumn")); + assertEquals((long) -30610224000000000l, after.get("datetime6Column")); + assertEquals((long) 1000, after.get("timestampColumn")); + assertEquals((long) 1000000, after.get("timestamp6Column")); + assertEquals((int) 1901, after.get("yearColumn")); + assertEquals( + new BigDecimal("12345678901234567890123456789012345.123456789012345678901234567891"), + after.get("decimalColumn")); + assertEquals(new BigDecimal("1234567890"), after.get("decColumn")); + assertEquals(new BigDecimal("1234567890"), after.get("fixedColumn")); + assertEquals(new BigDecimal("1234567890"), after.get("numericColumn")); + assertEquals("abc", after.get("mediumtextColumn")); + assertEquals(ByteBuffer.wrap("a".getBytes()), after.get("binaryColumn")); + assertEquals("abc", after.get("varcharColumn")); + assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("varbinaryColumn")); + assertEquals("abc", after.get("longtextColumn")); + assertEquals("abc", after.get("textColumn")); + assertEquals("abc", after.get("tinytextColumn")); + assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("longblobColumn")); + assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("mediumblobColumn")); + assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("blobColumn")); + assertEquals(ByteBuffer.wrap("abc".getBytes()), after.get("tinyblobColumn")); + assertEquals("{}", after.get("jsonColumn")); + assertEquals("val1", after.get("enum_f")); + assertEquals("v1", after.get("set_f")); + String geographyPointValue = "POINT(1.50000003 1.50000000)"; + SingleStoreDBGeometry singleStoreDBgeographyPointValue = SingleStoreDBGeometry.fromEkt( + geographyPointValue); + assertArrayEquals((byte[]) ((Struct) after.get("geographypointColumn")).get("wkb"), + singleStoreDBgeographyPointValue.getWkb()); + } finally { + stopConnector(); + } } - - @Test - public void populatesSourceInfo() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("purchased"))) { - Configuration config = defaultJdbcConfigWithTable("purchased"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - try { - conn.execute("INSERT INTO `purchased` VALUES ('archie', 1, NOW())"); - List records = consumeRecordsByTopic(1).allRecordsInOrder(); - assertEquals(1, records.size()); - SourceRecord record = records.get(0); - - Struct source = (Struct)((Struct)record.value()).get("source"); - assertEquals(source.get("version"), "1.0-SNAPSHOT"); - assertEquals(source.get("connector"), "singlestoredb"); - assertEquals(source.get("name"), "singlestore_topic"); - assertNotNull(source.get("ts_ms")); - assertEquals(source.get("snapshot"), "true"); - assertEquals(source.get("db"), "db"); - assertEquals(source.get("table"), "purchased"); - assertNotNull(source.get("txId")); - assertEquals(source.get("partitionId"), 0); - assertNotNull(source.get("offsets")); - assertEquals(1, ((List)source.get("offsets")).size()); - } finally { - stopConnector(); - } - } + } + + @Test + public void populatesSourceInfo() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("purchased"))) { + Configuration config = defaultJdbcConfigWithTable("purchased"); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + try { + conn.execute("INSERT INTO `purchased` VALUES ('archie', 1, NOW())"); + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + SourceRecord record = records.get(0); + + Struct source = (Struct) ((Struct) record.value()).get("source"); + assertEquals(source.get("version"), "1.0-SNAPSHOT"); + assertEquals(source.get("connector"), "singlestoredb"); + assertEquals(source.get("name"), "singlestore_topic"); + assertNotNull(source.get("ts_ms")); + assertEquals(source.get("snapshot"), "true"); + assertEquals(source.get("db"), "db"); + assertEquals(source.get("table"), "purchased"); + assertNotNull(source.get("txId")); + assertEquals(source.get("partitionId"), 0); + assertNotNull(source.get("offsets")); + assertEquals(1, ((List) source.get("offsets")).size()); + } finally { + stopConnector(); + } } + } - @Test - public void noPrimaryKey() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("song"))) { - Configuration config = defaultJdbcConfigWithTable("song"); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `song` VALUES ('Metallica', 'Enter Sandman')"); - conn.execute("INSERT INTO `song` VALUES ('AC/DC', 'Back In Black')"); - conn.execute("DELETE FROM `song` WHERE name = 'Enter Sandman'"); - - List records = consumeRecordsByTopic(4).allRecordsInOrder(); - - List ids = new ArrayList<>(); - List operations = Arrays.asList(new String[]{"c", "c", "d", null}); - - assertEquals(4, records.size()); - for (int i = 0; i < records.size(); i++) { - SourceRecord record = records.get(i); - - String operation = operations.get(i); - Struct value = (Struct) record.value(); - if (operation == null) { - assertNull(value); - } else { - assertEquals(operation, value.get("op")); - } - - Struct key = (Struct) record.key(); - ids.add((Long) key.get("internalId")); - } - - assertEquals(ids.get(0), ids.get(2)); - assertEquals(ids.get(0), ids.get(3)); - } finally { - stopConnector(); - } - } - } + @Test + public void noPrimaryKey() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("song"))) { + Configuration config = defaultJdbcConfigWithTable("song"); - @Test - public void readSeveralOperations() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("product"))) { - Configuration config = defaultJdbcConfigWithTable("product"); - config = config.edit() - .withDefault("tombstones.on.delete", "false") - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); - conn.execute("INSERT INTO `product` (`id`) VALUES (2)"); - conn.execute("INSERT INTO `product` (`id`) VALUES (3)"); - conn.execute("DELETE FROM `product` WHERE `id` = 1"); - conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2"); - conn.execute("INSERT INTO `product` (`id`) VALUES (4)"); - - List records = consumeRecordsByTopic(6).allRecordsInOrder(); - - List ids = new ArrayList<>(); - List operations = Arrays.asList(new String[]{"c", "c", "c", "d", "u", "c"}); - - assertEquals(6, records.size()); - for (int i = 0; i < records.size(); i++) { - SourceRecord record = records.get(i); - - String operation = operations.get(i); - Struct value = (Struct) record.value(); - if (operation == null) { - assertNull(value); - } else { - assertEquals(operation, value.get("op")); - } - - Struct key = (Struct) record.key(); - ids.add(key.getInt64("internalId")); - } - - assertEquals(ids.get(0), ids.get(3)); - assertEquals(ids.get(1), ids.get(4)); - } finally { - stopConnector(); - } + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `song` VALUES ('Metallica', 'Enter Sandman')"); + conn.execute("INSERT INTO `song` VALUES ('AC/DC', 'Back In Black')"); + conn.execute("DELETE FROM `song` WHERE name = 'Enter Sandman'"); + + List records = consumeRecordsByTopic(4).allRecordsInOrder(); + + List ids = new ArrayList<>(); + List operations = Arrays.asList(new String[]{"c", "c", "d", null}); + + assertEquals(4, records.size()); + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + + String operation = operations.get(i); + Struct value = (Struct) record.value(); + if (operation == null) { + assertNull(value); + } else { + assertEquals(operation, value.get("op")); + } + + Struct key = (Struct) record.key(); + ids.add((Long) key.get("internalId")); } + + assertEquals(ids.get(0), ids.get(2)); + assertEquals(ids.get(0), ids.get(3)); + } finally { + stopConnector(); + } } + } + + @Test + public void readSeveralOperations() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit().withDefault("tombstones.on.delete", "false").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (2)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (3)"); + conn.execute("DELETE FROM `product` WHERE `id` = 1"); + conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2"); + conn.execute("INSERT INTO `product` (`id`) VALUES (4)"); + + List records = consumeRecordsByTopic(6).allRecordsInOrder(); + + List ids = new ArrayList<>(); + List operations = Arrays.asList(new String[]{"c", "c", "c", "d", "u", "c"}); + + assertEquals(6, records.size()); + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + + String operation = operations.get(i); + Struct value = (Struct) record.value(); + if (operation == null) { + assertNull(value); + } else { + assertEquals(operation, value.get("op")); + } + + Struct key = (Struct) record.key(); + ids.add(key.getInt64("internalId")); + } - @Test - public void filterColumns() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("person"))) { - Configuration config = defaultJdbcConfigWithTable("person"); - config = config.edit() - .withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, "db.person.name,db.person.age") - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + - "VALUES ('Adalbert', '2001-04-11', 22, 100, 'a')"); - conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + - "VALUES ('Alice', '2001-04-11', 23, 100, 'a')"); - conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + - "VALUES ('Bob', '2001-04-11', 24, 100, 'a')"); - - List records = consumeRecordsByTopic(3).allRecordsInOrder(); - - List names = Arrays.asList(new String[]{"Adalbert", "Alice", "Bob"}); - List ages = Arrays.asList(new Integer[]{22, 23, 24}); - List operations = Arrays.asList(new String[]{"c", "c", "c"}); - - for (int i = 0; i < records.size(); i++) { - SourceRecord record = records.get(i); - - String operation = operations.get(i); - Struct value = (Struct) record.value(); - if (operation == null) { - assertNull(value); - } else { - assertEquals(operation, value.get("op")); - } - - value = value.getStruct("after"); - Set columnNames = value.schema() - .fields() - .stream() - .map(field -> field.name()) - .collect(Collectors.toSet()); - assertEquals(new HashSet<>(Arrays.asList("name", "age")), columnNames); - assertEquals(names.get(i), value.get("name")); - assertEquals(ages.get(i), value.get("age")); - } - } finally { - stopConnector(); - } + assertEquals(ids.get(0), ids.get(3)); + assertEquals(ids.get(1), ids.get(4)); + } finally { + stopConnector(); + } + } + } + + @Test + public void filterColumns() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("person"))) { + Configuration config = defaultJdbcConfigWithTable("person"); + config = config.edit().withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, + "db.person.name,db.person.age").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + + "VALUES ('Adalbert', '2001-04-11', 22, 100, 'a')"); + conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + + "VALUES ('Alice', '2001-04-11', 23, 100, 'a')"); + conn.execute("INSERT INTO `person` (`name`, `birthdate`, `age`, `salary`, `bitStr`) " + + "VALUES ('Bob', '2001-04-11', 24, 100, 'a')"); + + List records = consumeRecordsByTopic(3).allRecordsInOrder(); + + List names = Arrays.asList(new String[]{"Adalbert", "Alice", "Bob"}); + List ages = Arrays.asList(new Integer[]{22, 23, 24}); + List operations = Arrays.asList(new String[]{"c", "c", "c"}); + + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + + String operation = operations.get(i); + Struct value = (Struct) record.value(); + if (operation == null) { + assertNull(value); + } else { + assertEquals(operation, value.get("op")); + } + + value = value.getStruct("after"); + Set columnNames = value.schema().fields().stream().map(field -> field.name()) + .collect(Collectors.toSet()); + assertEquals(new HashSet<>(Arrays.asList("name", "age")), columnNames); + assertEquals(names.get(i), value.get("name")); + assertEquals(ages.get(i), value.get("age")); } + } finally { + stopConnector(); + } } - - @Test - public void internalId() throws SQLException, InterruptedException { - try (SingleStoreDBConnection createTableConn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("product"))) { - createTableConn.execute("CREATE TABLE internalIdTable(a INT)"); - try { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("internalIdTable"))) { - Configuration config = defaultJdbcConfigWithTable("internalIdTable"); - config = config.edit() - .withDefault(SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID, "true") - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO internalIdTable VALUES (1)"); - - List records = consumeRecordsByTopic(1).allRecordsInOrder(); - assertEquals(1, records.size()); - for (SourceRecord record: records) { - Struct value = (Struct) record.value(); - Struct key = (Struct) record.key(); - Long internalId = value.getStruct("after").getInt64("internalId"); - assertEquals(key.getInt64("internalId"), internalId); - } - } finally { - stopConnector(); - } - } - } finally { - createTableConn.execute("DROP TABLE internalIdTable"); + } + + @Test + public void internalId() throws SQLException, InterruptedException { + try (SingleStoreDBConnection createTableConn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + createTableConn.execute("CREATE TABLE internalIdTable(a INT)"); + try { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("internalIdTable"))) { + Configuration config = defaultJdbcConfigWithTable("internalIdTable"); + config = config.edit() + .withDefault(SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID, "true").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO internalIdTable VALUES (1)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + for (SourceRecord record : records) { + Struct value = (Struct) record.value(); + Struct key = (Struct) record.key(); + Long internalId = value.getStruct("after").getInt64("internalId"); + assertEquals(key.getInt64("internalId"), internalId); } + } finally { + stopConnector(); + } } + } finally { + createTableConn.execute("DROP TABLE internalIdTable"); + } + } + } + + @Test + public void testSkippedOperations() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit().withDefault(SingleStoreDBConnectorConfig.SKIPPED_OPERATIONS, "c") + .withDefault(SingleStoreDBConnectorConfig.TOMBSTONES_ON_DELETE, "false").build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (2)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (3)"); + conn.execute("DELETE FROM `product` WHERE `id` = 1"); + conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2"); + conn.execute("INSERT INTO `product` (`id`) VALUES (4)"); + + List records = consumeRecordsByTopic(2).allRecordsInOrder(); + + List operations = Arrays.asList("d", "u"); + assertEquals(2, records.size()); + for (int i = 0; i < records.size(); i++) { + SourceRecord record = records.get(i); + String operation = operations.get(i); + Struct value = (Struct) record.value(); + assertEquals(operation, value.get("op")); + } + } finally { + stopConnector(); + } + } + } + + @Test + public void testStreamAfterInitialOnlySnapshot() throws SQLException, InterruptedException { + try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit().withDefault(SingleStoreDBConnectorConfig.SNAPSHOT_MODE, + SingleStoreDBConnectorConfig.SnapshotMode.INITIAL_ONLY).build(); + + start(SingleStoreDBConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (2)"); + conn.execute("INSERT INTO `product` (`id`) VALUES (3)"); + conn.execute("DELETE FROM `product` WHERE `id` = 1"); + conn.execute("UPDATE `product` SET `createdByDate` = '2013-11-23 15:22:33' WHERE `id` = 2"); + conn.execute("INSERT INTO `product` (`id`) VALUES (4)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(0, records.size()); + + } finally { + stopConnector(); + } } + } }