diff --git a/.github/workflows/maven.yml b/.github/workflows/maven.yml index b1b3911..fb4d5c1 100644 --- a/.github/workflows/maven.yml +++ b/.github/workflows/maven.yml @@ -43,7 +43,7 @@ jobs: uses: softprops/action-gh-release@v1 if: startsWith(github.ref, 'refs/tags/') with: - files: target/singlestoredb-debezium-connector-*-plugin.tar.gz + files: target/singlestore-debezium-connector-*-plugin.tar.gz # Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive - name: Update dependency graph diff --git a/README.md b/README.md index 97200db..5356dd1 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,14 @@ -# SingleStoreDB connector for Debezium +# SingleStore connector for Debezium [![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt) -[![Github Actions status image](https://github.com/singlestore-labs/singlestoredb-debezium-connector/actions/workflows/maven.yml/badge.svg)](https://github.com/singlestore-labs/singlestoredb-debezium-connector/actions) +[![Github Actions status image](https://github.com/singlestore-labs/singlestore-debezium-connector/actions/workflows/maven.yml/badge.svg)](https://github.com/singlestore-labs/singlestore-debezium-connector/actions) -SingleStoreDB connector for Debezium ("the connector") captures and records row-level changes that occur in the database. +SingleStore connector for Debezium ("the connector") captures and records row-level changes that occur in the database. You can configure the connector to read from a single table and to ignore, mask, or truncate values in specific columns. ## TOC - [Getting started](#getting-started) - - [How the SingleStoreDB Debezium connector works](#how-the-singlestoredb-debezium-connector-works) + - [How the SingleStore Debezium connector works](#how-the-singlestore-debezium-connector-works) - [Data change events](#data-change-events) - [Data type mappings](#data-type-mappings) - [Connector properties](#connector-properties) @@ -40,7 +40,7 @@ curl -i -X POST \ "name": "", "config": { - "connector.class": "com.singlestore.debezium.SingleStoreDBConnector", + "connector.class": "com.singlestore.debezium.SingleStoreConnector", "tasks.max": "1", "database.hostname": "", "database.port": "", @@ -54,10 +54,10 @@ curl -i -X POST \ }' ``` -## How the SingleStoreDB Debezium connector works +## How the SingleStore Debezium connector works To optimally configure and run the connector, it is essential to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata. -SingleStoreDB Debezium connector uses Change Data Capture (CDC) to capture change events. +SingleStore Debezium connector uses Change Data Capture (CDC) to capture change events. The connector does not support handling schema changes. You cannot run `ALTER` and `DROP` queries while the `OBSERVE` query is running, specifically during snapshotting and filtering. @@ -73,13 +73,13 @@ When the connector starts for the first time, it performs an initial consistent ### Streaming After the initial snapshot is complete, the connector continues streaming from the offset that it receives in Steps 4 and 5 above. If the streaming stops again for any reason, upon restart, the connector tries to continue streaming changes from where it previously left off. -The SingleStoreDB connector forwards change events in records to the Kafka Connect framework. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic. +The SingleStore connector forwards change events in records to the Kafka Connect framework. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic. Kafka Connect periodically records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. The connector records database-level offsets for each database partition in each change event. When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to the SingleStoreDB to send the events that occurred after the offset position. ### Topic names -The SingleStoreDB Debezium connector writes change events for all `INSERT`, `UPDATE`, and `DELETE` operations that occur in a table to a single Apache Kafka topic that is specific to that table. The connector uses the following naming convention for the change event topics: +The SingleStore Debezium connector writes change events for all `INSERT`, `UPDATE`, and `DELETE` operations that occur in a table to a single Apache Kafka topic that is specific to that table. The connector uses the following naming convention for the change event topics: ``` topicPrefix.databaseName.tableName @@ -95,7 +95,7 @@ The following list provides definitions for the components of the default name: For example, if the topic prefix is `fulfillment`, database name is `inventory`, and the table where the operation occurred is `orders`, the connector writes events to the `fulfillment.inventory.orders` Kafka topic. ## Data change events -Every data change event that the SingleStoreDB Debezium connector generates has a **key** and a **value**. The structures of the key and value depend on the table from which the change events originate. For information on how Debezium constructs topic names, refer to [Topic names](#topic-names). +Every data change event that the SingleStore Debezium connector generates has a **key** and a **value**. The structures of the key and value depend on the table from which the change events originate. For information on how Debezium constructs topic names, refer to [Topic names](#topic-names). Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for topic consumers to handle. To facilitate the processing of mutable event structures, each event in Kafka Connect is self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data. @@ -161,7 +161,7 @@ The following example demonstrates the value of a change event that the connecto }, "source":{ "version":"0.1.0", - "connector":"singlestoredb", + "connector":"singlestore", "name":"singlestore", "ts_ms":1706197043473, "snapshot":"true", @@ -192,7 +192,7 @@ The following example shows an update change event that the connector captures f }, "source":{ "version":"0.1.0", - "connector":"singlestoredb", + "connector":"singlestore", "name":"singlestore", "ts_ms":1706197446500, "snapshot":"true", @@ -227,8 +227,8 @@ The following example shows a delete event for the table that is shown in the pr "before":null, "after":null, "source":{ - "version":"${project.version}", - "connector":"singlestoredb", + "version":"0.1.0", + "connector":"singlestore", "name":"singlestore", "ts_ms":1706197665407, "snapshot":"true", @@ -345,14 +345,14 @@ Debezium connectors handle binary data types based on the value of the [`binary. ## Connector properties -The SingleStoreDB Debezium connector supports the following configuration properties which can be used to achieve the right connector behavior for your application. +The SingleStore Debezium connector supports the following configuration properties which can be used to achieve the right connector behavior for your application. ### Kafka connect properties | Property | Default | Description | - | - | - | name | | Unique name for the connector. Any attempts to register again with the same name will fail. This property is required by all Kafka Connect connectors. -| connector.class | | The name of the Java Class for the connector. For the SingleStore connector specify `com.singlestore.debezium.SingleStoreDBConnector`. -| tasks.max | 1 | The maximum number of tasks that should be created for this connector. The SingleStoreDB connector always uses a single task and therefore does not use this value, so the default is always acceptable. +| connector.class | | The name of the Java Class for the connector. For the SingleStore connector specify `com.singlestore.debezium.SingleStoreConnector`. +| tasks.max | 1 | The maximum number of tasks that should be created for this connector. The SingleStore connector always uses a single task and therefore does not use this value, so the default is always acceptable. ### Connection properties | Property | Default | Description @@ -413,6 +413,6 @@ The following advanced configuration properties have defaults that work in most | topic.naming.strategy | no default | The name of the TopicNamingStrategy Class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, etc. | custom.metric.tags | no default | The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended at the end of the regular name. Each key represents a tag for the MBean object name, and the corresponding value represents the value of that key. For example: k1=v1,k2=v2. | errors.max.retries | -1 (no limit) | The maximum number of retries on connection errors before failing. -| sourceinfo.struct.maker | SingleStoreDBSourceInfoStructMaker | The name of the SourceInfoStructMaker Class that returns the SourceInfo schema and struct. +| sourceinfo.struct.maker | SingleStoreSourceInfoStructMaker | The name of the SourceInfoStructMaker Class that returns the SourceInfo schema and struct. | notification.sink.topic.name | no default | The name of the topic for the notifications. This property is required if the 'sink' is in the list of enabled channels. | post.processors | no default | Optional list of post processors. The processors are defined using the `.type` option and configured using ``. diff --git a/pom.xml b/pom.xml index 2a4a005..e737850 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.singlestore - singlestoredb-debezium-connector + singlestore-debezium-connector 1.0-SNAPSHOT diff --git a/src/main/java/com/singlestore/debezium/Module.java b/src/main/java/com/singlestore/debezium/Module.java index 606de2a..3bc4fb1 100644 --- a/src/main/java/com/singlestore/debezium/Module.java +++ b/src/main/java/com/singlestore/debezium/Module.java @@ -18,14 +18,14 @@ public static String version() { * @return symbolic name of the connector plugin */ public static String name() { - return "singlestoredb"; + return "singlestore"; } /** * @return context name used in log MDC and JMX metrics */ public static String contextName() { - return "SingleStoreDB"; + return "SingleStore"; } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreChangeEventSourceFactory.java b/src/main/java/com/singlestore/debezium/SingleStoreChangeEventSourceFactory.java new file mode 100644 index 0000000..0f97a96 --- /dev/null +++ b/src/main/java/com/singlestore/debezium/SingleStoreChangeEventSourceFactory.java @@ -0,0 +1,50 @@ +package com.singlestore.debezium; + +import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; +import io.debezium.pipeline.ErrorHandler; +import io.debezium.pipeline.EventDispatcher; +import io.debezium.pipeline.notification.NotificationService; +import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; +import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; +import io.debezium.pipeline.source.spi.SnapshotProgressListener; +import io.debezium.pipeline.source.spi.StreamingChangeEventSource; +import io.debezium.relational.TableId; +import io.debezium.util.Clock; + +public class SingleStoreChangeEventSourceFactory implements ChangeEventSourceFactory { + + SingleStoreConnectorConfig connectorConfig; + MainConnectionProvidingConnectionFactory connectionFactory; + SingleStoreDatabaseSchema schema; + EventDispatcher dispatcher; + ErrorHandler errorHandler; + Clock clock; + + public SingleStoreChangeEventSourceFactory(SingleStoreConnectorConfig connectorConfig, + MainConnectionProvidingConnectionFactory connectionFactory, + SingleStoreDatabaseSchema schema, + EventDispatcher dispatcher, + ErrorHandler errorHandler, + Clock clock) { + this.connectorConfig = connectorConfig; + this.connectionFactory = connectionFactory; + this.schema = schema; + this.dispatcher = dispatcher; + this.errorHandler = errorHandler; + this.clock = clock; + } + + @Override + public SnapshotChangeEventSource getSnapshotChangeEventSource( + SnapshotProgressListener snapshotProgressListener, + NotificationService notificationService) { + return new SingleStoreSnapshotChangeEventSource(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService); + } + + @Override + public StreamingChangeEventSource getStreamingChangeEventSource() { + return new SingleStoreStreamingChangeEventSource(connectorConfig, connectionFactory.mainConnection(), dispatcher, errorHandler, schema, clock); + } + + // TODO incremental snapshot +} diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java similarity index 87% rename from src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java rename to src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java index 790224e..f3af3ca 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreChangeRecordEmitter.java @@ -15,9 +15,9 @@ import io.debezium.relational.TableSchema; import io.debezium.util.Clock; -public class SingleStoreDBChangeRecordEmitter extends RelationalChangeRecordEmitter { +public class SingleStoreChangeRecordEmitter extends RelationalChangeRecordEmitter { - private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBSnapshotChangeRecordEmitter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreSnapshotChangeRecordEmitter.class); private final Envelope.Operation operation; private final OffsetContext offset; private final Object[] before; @@ -26,8 +26,8 @@ public class SingleStoreDBChangeRecordEmitter extends RelationalChangeRecordEmit private static final String INTERNAL_ID = "internalId"; - public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before, - Object[] after, long internalId, SingleStoreDBConnectorConfig connectorConfig) { + public SingleStoreChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before, + Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig) { super(partition, offset, clock, connectorConfig); this.offset = offset; this.operation = operation; @@ -37,7 +37,7 @@ public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, Offset } @Override - protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) + protected void emitCreateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Struct newKey = keyFromInternalId(); @@ -53,7 +53,7 @@ protected void emitCreateRecord(Receiver receiver, Table } @Override - protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) + protected void emitUpdateRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Object[] newColumnValues = getNewColumnValues(); @@ -84,7 +84,7 @@ protected void emitUpdateRecord(Receiver receiver, Table } @Override - protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { + protected void emitDeleteRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] oldColumnValues = getOldColumnValues(); Struct newKey = keyFromInternalId(); diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnection.java b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java similarity index 81% rename from src/main/java/com/singlestore/debezium/SingleStoreDBConnection.java rename to src/main/java/com/singlestore/debezium/SingleStoreConnection.java index 858e393..924f945 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnection.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnection.java @@ -23,19 +23,19 @@ import java.util.stream.Collectors; /** - * {@link JdbcConnection} extension to be used with SingleStoreDB + * {@link JdbcConnection} extension to be used with SingleStore */ -public class SingleStoreDBConnection extends JdbcConnection { +public class SingleStoreConnection extends JdbcConnection { private static final String QUOTED_CHARACTER = "`"; protected static final String URL_PATTERN = "jdbc:singlestore://${hostname}:${port}/?connectTimeout=${connectTimeout}"; protected static final String URL_PATTERN_DATABASE = "jdbc:singlestore://${hostname}:${port}/${dbname}?connectTimeout=${connectTimeout}"; - private final SingleStoreDBConnectionConfiguration connectionConfig; + private final SingleStoreConnectionConfiguration connectionConfig; - public SingleStoreDBConnection(SingleStoreDBConnectionConfiguration connectionConfig) { + public SingleStoreConnection(SingleStoreConnectionConfiguration connectionConfig) { super(connectionConfig.jdbcConfig, connectionConfig.factory, - SingleStoreDBConnection::validateServerVersion, QUOTED_CHARACTER, QUOTED_CHARACTER); + SingleStoreConnection::validateServerVersion, QUOTED_CHARACTER, QUOTED_CHARACTER); this.connectionConfig = connectionConfig; } @@ -118,7 +118,7 @@ public JdbcConnection observe(Set fieldFilter, Set tableFilte return query(query.toString(), resultSetConsumer); } - public SingleStoreDBConnectionConfiguration connectionConfig() { + public SingleStoreConnectionConfiguration connectionConfig() { return connectionConfig; } @@ -156,19 +156,19 @@ public enum OBSERVE_OUTPUT_FORMAT { SQL, JSON; } - public static class SingleStoreDBConnectionConfiguration { + public static class SingleStoreConnectionConfiguration { private final JdbcConfiguration jdbcConfig; private final ConnectionFactory factory; private final Configuration config; - public SingleStoreDBConnectionConfiguration(Configuration config) { + public SingleStoreConnectionConfiguration(Configuration config) { this.config = config; final boolean useSSL = sslModeEnabled(); final Configuration dbConfig = config .edit() - .withDefault(SingleStoreDBConnectorConfig.PORT, - SingleStoreDBConnectorConfig.PORT.defaultValue()) + .withDefault(SingleStoreConnectorConfig.PORT, + SingleStoreConnectorConfig.PORT.defaultValue()) .build() .subset(DATABASE_CONFIG_PREFIX, true) .merge(config.subset(DRIVER_CONFIG_PREFIX, true)); @@ -200,8 +200,8 @@ public SingleStoreDBConnectionConfiguration(Configuration config) { driverParameters().forEach(jdbcConfigBuilder::with); this.jdbcConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build()); factory = JdbcConnection.patternBasedFactory( - databaseName() != null ? SingleStoreDBConnection.URL_PATTERN_DATABASE - : SingleStoreDBConnection.URL_PATTERN, + databaseName() != null ? SingleStoreConnection.URL_PATTERN_DATABASE + : SingleStoreConnection.URL_PATTERN, com.singlestore.jdbc.Driver.class.getName(), getClass().getClassLoader()); } @@ -219,63 +219,63 @@ public ConnectionFactory factory() { } public String username() { - return config.getString(SingleStoreDBConnectorConfig.USER); + return config.getString(SingleStoreConnectorConfig.USER); } public String password() { - return config.getString(SingleStoreDBConnectorConfig.PASSWORD); + return config.getString(SingleStoreConnectorConfig.PASSWORD); } public String hostname() { - return config.getString(SingleStoreDBConnectorConfig.HOSTNAME); + return config.getString(SingleStoreConnectorConfig.HOSTNAME); } public int port() { - return config.getInteger(SingleStoreDBConnectorConfig.PORT); + return config.getInteger(SingleStoreConnectorConfig.PORT); } public String databaseName() { - return config.getString(SingleStoreDBConnectorConfig.DATABASE_NAME); + return config.getString(SingleStoreConnectorConfig.DATABASE_NAME); } - public SingleStoreDBConnectorConfig.SecureConnectionMode sslMode() { - String mode = config.getString(SingleStoreDBConnectorConfig.SSL_MODE); - return SingleStoreDBConnectorConfig.SecureConnectionMode.parse(mode); + public SingleStoreConnectorConfig.SecureConnectionMode sslMode() { + String mode = config.getString(SingleStoreConnectorConfig.SSL_MODE); + return SingleStoreConnectorConfig.SecureConnectionMode.parse(mode); } public boolean sslModeEnabled() { - return sslMode() != SingleStoreDBConnectorConfig.SecureConnectionMode.DISABLE; + return sslMode() != SingleStoreConnectorConfig.SecureConnectionMode.DISABLE; } public String sslKeyStore() { - return config.getString(SingleStoreDBConnectorConfig.SSL_KEYSTORE); + return config.getString(SingleStoreConnectorConfig.SSL_KEYSTORE); } public char[] sslKeyStorePassword() { - String password = config.getString(SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD); + String password = config.getString(SingleStoreConnectorConfig.SSL_KEYSTORE_PASSWORD); return Strings.isNullOrBlank(password) ? null : password.toCharArray(); } public String sslTrustStore() { - return config.getString(SingleStoreDBConnectorConfig.SSL_TRUSTSTORE); + return config.getString(SingleStoreConnectorConfig.SSL_TRUSTSTORE); } public char[] sslTrustStorePassword() { - String password = config.getString(SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD); + String password = config.getString(SingleStoreConnectorConfig.SSL_TRUSTSTORE_PASSWORD); return Strings.isNullOrBlank(password) ? null : password.toCharArray(); } public String sslServerCertificate() { - return config.getString(SingleStoreDBConnectorConfig.SSL_SERVER_CERT); + return config.getString(SingleStoreConnectorConfig.SSL_SERVER_CERT); } public Duration getConnectionTimeout() { - return Duration.ofMillis(config.getLong(SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS)); + return Duration.ofMillis(config.getLong(SingleStoreConnectorConfig.CONNECTION_TIMEOUT_MS)); } public Map driverParameters() { final String driverParametersString = config - .getString(SingleStoreDBConnectorConfig.DRIVER_PARAMETERS); + .getString(SingleStoreConnectorConfig.DRIVER_PARAMETERS); return driverParametersString == null ? Collections.emptyMap() : Arrays.stream(driverParametersString.split(";")) .map(s -> s.split("=")).collect(Collectors.toMap(s -> s[0].trim(), s -> s[1].trim())); diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnector.java b/src/main/java/com/singlestore/debezium/SingleStoreConnector.java similarity index 74% rename from src/main/java/com/singlestore/debezium/SingleStoreDBConnector.java rename to src/main/java/com/singlestore/debezium/SingleStoreConnector.java index 4b7b20c..becb348 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnector.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnector.java @@ -21,21 +21,21 @@ import io.debezium.relational.TableId; /** - * A Kafka Connect source connector that creates tasks that read the SingleStoreDB change log and generate the corresponding + * A Kafka Connect source connector that creates tasks that read the SingleStore change log and generate the corresponding * data change events. *

Configuration

*

- * This connector is configured with the set of properties described in {@link SingleStoreDBConnectorConfig}. + * This connector is configured with the set of properties described in {@link SingleStoreConnectorConfig}. */ -public class SingleStoreDBConnector extends RelationalBaseSourceConnector { +public class SingleStoreConnector extends RelationalBaseSourceConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBConnector.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreConnector.class); @Immutable private Map properties; - public SingleStoreDBConnector() { + public SingleStoreConnector() { } @Override @@ -45,7 +45,7 @@ public String version() { @Override public Class taskClass() { - return SingleStoreDBConnectorTask.class; + return SingleStoreConnectorTask.class; } @Override @@ -69,16 +69,16 @@ public void stop() { @Override public ConfigDef config() { - return SingleStoreDBConnectorConfig.configDef(); + return SingleStoreConnectorConfig.configDef(); } @Override protected void validateConnection(Map configValues, Configuration config) { ConfigValue hostnameValue = configValues.get(RelationalDatabaseConnectorConfig.HOSTNAME.name()); // Try to connect to the database ... - final SingleStoreDBConnection.SingleStoreDBConnectionConfiguration connectionConfig = - new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config); - try (SingleStoreDBConnection connection = new SingleStoreDBConnection(connectionConfig)) { + final SingleStoreConnection.SingleStoreConnectionConfiguration connectionConfig = + new SingleStoreConnection.SingleStoreConnectionConfiguration(config); + try (SingleStoreConnection connection = new SingleStoreConnection(connectionConfig)) { try { connection.connect(); connection.execute("SELECT 1"); @@ -96,15 +96,15 @@ protected void validateConnection(Map configValues, Configu @Override protected Map validateAllFields(Configuration config) { - return config.validate(SingleStoreDBConnectorConfig.ALL_FIELDS); + return config.validate(SingleStoreConnectorConfig.ALL_FIELDS); } @Override public List getMatchingCollections(Configuration config) { - SingleStoreDBConnectorConfig connectorConfig = new SingleStoreDBConnectorConfig(config); - final SingleStoreDBConnection.SingleStoreDBConnectionConfiguration connectionConfig = - new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config); - try (SingleStoreDBConnection connection = new SingleStoreDBConnection(connectionConfig)) { + SingleStoreConnectorConfig connectorConfig = new SingleStoreConnectorConfig(config); + final SingleStoreConnection.SingleStoreConnectionConfiguration connectionConfig = + new SingleStoreConnection.SingleStoreConnectionConfiguration(config); + try (SingleStoreConnection connection = new SingleStoreConnection(connectionConfig)) { return connection.readTableNames(connectorConfig.databaseName(), null, null, new String[]{ "TABLE" }).stream() .filter(tableId -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId)) .collect(Collectors.toList()); diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java similarity index 96% rename from src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java rename to src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java index a75ccaf..230ebdb 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorConfig.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnectorConfig.java @@ -24,12 +24,12 @@ /** * The configuration properties. */ -public class SingleStoreDBConnectorConfig extends RelationalDatabaseConnectorConfig { +public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfig { protected static final int DEFAULT_SNAPSHOT_FETCH_SIZE = 10_240; public static final Field SOURCE_INFO_STRUCT_MAKER = CommonConnectorConfig.SOURCE_INFO_STRUCT_MAKER - .withDefault(SingleStoreDBSourceInfoStructMaker.class.getName()); + .withDefault(SingleStoreSourceInfoStructMaker.class.getName()); public Map validate() { return getConfig().validate(ALL_FIELDS); @@ -156,7 +156,7 @@ public Map validate() { private static final ConfigDefinition CONFIG_DEFINITION = RelationalDatabaseConnectorConfig.CONFIG_DEFINITION .edit() - .name("SingleStoreDB") + .name("SingleStore") .excluding(SNAPSHOT_LOCK_TIMEOUT_MS, MSG_KEY_COLUMNS, INCLUDE_SCHEMA_COMMENTS, @@ -215,7 +215,7 @@ public Map validate() { private final RelationalTableFilters tableFilters; private final Boolean populateInternalId; - public SingleStoreDBConnectorConfig(Configuration config) { + public SingleStoreConnectorConfig(Configuration config) { super(config, new SystemTablesPredicate(), t -> t.catalog() + "." + t.table(), @@ -223,13 +223,13 @@ public SingleStoreDBConnectorConfig(Configuration config) { ColumnFilterMode.CATALOG, false); this.config = config; - this.tableFilters = new SingleStoreDBTableFilters(config, new SystemTablesPredicate(), + this.tableFilters = new SingleStoreTableFilters(config, new SystemTablesPredicate(), getTableIdMapper(), false); this.snapshotMode = SnapshotMode .parse(config.getString(SNAPSHOT_MODE), SNAPSHOT_MODE.defaultValueAsString()); this.connectionTimeout = Duration - .ofMillis(config.getLong(SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS)); - this.populateInternalId = config.getBoolean(SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID); + .ofMillis(config.getLong(SingleStoreConnectorConfig.CONNECTION_TIMEOUT_MS)); + this.populateInternalId = config.getBoolean(SingleStoreConnectorConfig.POPULATE_INTERNAL_ID); } private static class SystemTablesPredicate implements TableFilter { @@ -259,7 +259,7 @@ public String getConnectorName() { @Override protected SourceInfoStructMaker getSourceInfoStructMaker(Version version) { - return getSourceInfoStructMaker(SingleStoreDBConnectorConfig.SOURCE_INFO_STRUCT_MAKER, + return getSourceInfoStructMaker(SingleStoreConnectorConfig.SOURCE_INFO_STRUCT_MAKER, Module.name(), Module.version(), this); } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java b/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java similarity index 59% rename from src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java rename to src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java index 83d2612..dd3e348 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBConnectorTask.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreConnectorTask.java @@ -28,14 +28,14 @@ import io.debezium.util.Clock; /** - * The main task executing streaming from SingleStoreDB. + * The main task executing streaming from SingleStore. * Responsible for lifecycle management of the streaming code. */ -public class SingleStoreDBConnectorTask extends BaseSourceTask { +public class SingleStoreConnectorTask extends BaseSourceTask { - private static final String CONTEXT_NAME = "singlestoredb-connector-task"; + private static final String CONTEXT_NAME = "singlestore-connector-task"; private volatile ChangeEventQueue queue; - private volatile SingleStoreDBDatabaseSchema schema; + private volatile SingleStoreDatabaseSchema schema; @Override public String version() { @@ -44,30 +44,30 @@ public String version() { @Override protected Iterable getAllConfigurationFields() { - return SingleStoreDBConnectorConfig.ALL_FIELDS; + return SingleStoreConnectorConfig.ALL_FIELDS; } @Override - public ChangeEventSourceCoordinator start(Configuration config) { + public ChangeEventSourceCoordinator start(Configuration config) { final Clock clock = Clock.system(); - final SingleStoreDBConnectorConfig connectorConfig = new SingleStoreDBConnectorConfig(config); + final SingleStoreConnectorConfig connectorConfig = new SingleStoreConnectorConfig(config); final SchemaNameAdjuster schemaNameAdjuster = connectorConfig.schemaNameAdjuster(); - final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY); - final SingleStoreDBValueConverters valueConverter = new SingleStoreDBValueConverters(connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(), + final TopicNamingStrategy topicNamingStrategy = connectorConfig.getTopicNamingStrategy(SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY); + final SingleStoreValueConverters valueConverter = new SingleStoreValueConverters(connectorConfig.getDecimalMode(), connectorConfig.getTemporalPrecisionMode(), connectorConfig.binaryHandlingMode()); - final SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(valueConverter); + final SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(valueConverter); - MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( - () -> new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config))); + MainConnectionProvidingConnectionFactory connectionFactory = new DefaultMainConnectionProvidingConnectionFactory<>( + () -> new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(config))); - this.schema = new SingleStoreDBDatabaseSchema(connectorConfig, valueConverter, + this.schema = new SingleStoreDatabaseSchema(connectorConfig, valueConverter, defaultValueConverter, topicNamingStrategy, false); - SingleStoreDBTaskContext taskContext = new SingleStoreDBTaskContext(connectorConfig, schema); - SingleStoreDBEventMetadataProvider metadataProvider = new SingleStoreDBEventMetadataProvider(); - SingleStoreDBErrorHandler errorHandler = new SingleStoreDBErrorHandler(connectorConfig, queue); + SingleStoreTaskContext taskContext = new SingleStoreTaskContext(connectorConfig, schema); + SingleStoreEventMetadataProvider metadataProvider = new SingleStoreEventMetadataProvider(); + SingleStoreErrorHandler errorHandler = new SingleStoreErrorHandler(connectorConfig, queue); this.queue = new ChangeEventQueue.Builder() .pollInterval(connectorConfig.getPollInterval()) @@ -77,18 +77,18 @@ public ChangeEventSourceCoordinator taskContext.configureLoggingContext(CONTEXT_NAME)) .build(); - Offsets previousOffsets = getPreviousOffsets( - new SingleStoreDBPartition.Provider(connectorConfig, config), - new SingleStoreDBOffsetContext.Loader(connectorConfig)); + Offsets previousOffsets = getPreviousOffsets( + new SingleStorePartition.Provider(connectorConfig, config), + new SingleStoreOffsetContext.Loader(connectorConfig)); - SignalProcessor signalProcessor = new SignalProcessor<>( - SingleStoreDBConnector.class, connectorConfig, Map.of(), + SignalProcessor signalProcessor = new SignalProcessor<>( + SingleStoreConnector.class, connectorConfig, Map.of(), getAvailableSignalChannels(), DocumentReader.defaultReader(), previousOffsets); final Configuration heartbeatConfig = config; - final EventDispatcher dispatcher = new EventDispatcher<>( + final EventDispatcher dispatcher = new EventDispatcher<>( connectorConfig, topicNamingStrategy, schema, @@ -99,7 +99,7 @@ public ChangeEventSourceCoordinator new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(heartbeatConfig)), + () -> new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(heartbeatConfig)), exception -> { final String sqlErrorId = exception.getMessage(); throw new DebeziumException("Could not execute heartbeat action query (Error: " + sqlErrorId + ")", exception); @@ -108,15 +108,15 @@ public ChangeEventSourceCoordinator notificationService = new NotificationService<>(getNotificationChannels(), + NotificationService notificationService = new NotificationService<>(getNotificationChannels(), connectorConfig, SchemaFactory.get(), dispatcher::enqueueNotification); - ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( + ChangeEventSourceCoordinator coordinator = new ChangeEventSourceCoordinator<>( previousOffsets, errorHandler, - SingleStoreDBConnector.class, + SingleStoreConnector.class, connectorConfig, - new SingleStoreDBChangeEventSourceFactory(connectorConfig, connectionFactory, schema, dispatcher, errorHandler, clock), + new SingleStoreChangeEventSourceFactory(connectorConfig, connectionFactory, schema, dispatcher, errorHandler, clock), // TODO create custom metrics PLAT-6970 new DefaultChangeEventSourceMetricsFactory<>(), dispatcher, diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeEventSourceFactory.java b/src/main/java/com/singlestore/debezium/SingleStoreDBChangeEventSourceFactory.java deleted file mode 100644 index 3a22c4f..0000000 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBChangeEventSourceFactory.java +++ /dev/null @@ -1,50 +0,0 @@ -package com.singlestore.debezium; - -import io.debezium.jdbc.MainConnectionProvidingConnectionFactory; -import io.debezium.pipeline.ErrorHandler; -import io.debezium.pipeline.EventDispatcher; -import io.debezium.pipeline.notification.NotificationService; -import io.debezium.pipeline.source.spi.ChangeEventSourceFactory; -import io.debezium.pipeline.source.spi.SnapshotChangeEventSource; -import io.debezium.pipeline.source.spi.SnapshotProgressListener; -import io.debezium.pipeline.source.spi.StreamingChangeEventSource; -import io.debezium.relational.TableId; -import io.debezium.util.Clock; - -public class SingleStoreDBChangeEventSourceFactory implements ChangeEventSourceFactory { - - SingleStoreDBConnectorConfig connectorConfig; - MainConnectionProvidingConnectionFactory connectionFactory; - SingleStoreDBDatabaseSchema schema; - EventDispatcher dispatcher; - ErrorHandler errorHandler; - Clock clock; - - public SingleStoreDBChangeEventSourceFactory(SingleStoreDBConnectorConfig connectorConfig, - MainConnectionProvidingConnectionFactory connectionFactory, - SingleStoreDBDatabaseSchema schema, - EventDispatcher dispatcher, - ErrorHandler errorHandler, - Clock clock) { - this.connectorConfig = connectorConfig; - this.connectionFactory = connectionFactory; - this.schema = schema; - this.dispatcher = dispatcher; - this.errorHandler = errorHandler; - this.clock = clock; - } - - @Override - public SnapshotChangeEventSource getSnapshotChangeEventSource( - SnapshotProgressListener snapshotProgressListener, - NotificationService notificationService) { - return new SingleStoreDBSnapshotChangeEventSource(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService); - } - - @Override - public StreamingChangeEventSource getStreamingChangeEventSource() { - return new SingleStoreDBStreamingChangeEventSource(connectorConfig, connectionFactory.mainConnection(), dispatcher, errorHandler, schema, clock); - } - - // TODO incremental snapshot -} diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBDatabaseSchema.java b/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java similarity index 70% rename from src/main/java/com/singlestore/debezium/SingleStoreDBDatabaseSchema.java rename to src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java index b528a26..0582c71 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBDatabaseSchema.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDatabaseSchema.java @@ -10,23 +10,23 @@ import java.sql.SQLException; /** - * Component that records the schema information for the {@link SingleStoreDBConnector}. The schema information contains + * Component that records the schema information for the {@link SingleStoreConnector}. The schema information contains * the {@link io.debezium.relational.Tables table definitions} and the Kafka Connect {@link #schemaFor(TableId) Schema}s for each table, where the - * {@link org.apache.kafka.connect.data.Schema} excludes any columns that have been {@link SingleStoreDBConnectorConfig#COLUMN_EXCLUDE_LIST specified} in the + * {@link org.apache.kafka.connect.data.Schema} excludes any columns that have been {@link SingleStoreConnectorConfig#COLUMN_EXCLUDE_LIST specified} in the * configuration. */ -public class SingleStoreDBDatabaseSchema extends RelationalDatabaseSchema { +public class SingleStoreDatabaseSchema extends RelationalDatabaseSchema { - public SingleStoreDBDatabaseSchema(SingleStoreDBConnectorConfig config, SingleStoreDBValueConverters valueConverter, - SingleStoreDBDefaultValueConverter defaultValueConverter, TopicNamingStrategy topicNamingStrategy, + public SingleStoreDatabaseSchema(SingleStoreConnectorConfig config, SingleStoreValueConverters valueConverter, + SingleStoreDefaultValueConverter defaultValueConverter, TopicNamingStrategy topicNamingStrategy, boolean tableIdCaseInsensitive) { super(config, topicNamingStrategy, config.getTableFilters().dataCollectionFilter(), config.getColumnFilter(), getTableSchemaBuilder(config, valueConverter, defaultValueConverter), tableIdCaseInsensitive, config.getKeyMapper()); } - private static TableSchemaBuilder getTableSchemaBuilder(SingleStoreDBConnectorConfig config, SingleStoreDBValueConverters valueConverter, - SingleStoreDBDefaultValueConverter defaultValueConverter) { - return new SingleStoreDBTableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjuster(), + private static TableSchemaBuilder getTableSchemaBuilder(SingleStoreConnectorConfig config, SingleStoreValueConverters valueConverter, + SingleStoreDefaultValueConverter defaultValueConverter) { + return new SingleStoreTableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjuster(), config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(), config.getFieldNamer(), false, config.populateInternalId()); } @@ -38,7 +38,7 @@ private static TableSchemaBuilder getTableSchemaBuilder(SingleStoreDBConnectorCo * @return this object so methods can be chained together; never null * @throws SQLException if there is a problem obtaining the schema from the database server */ - protected SingleStoreDBDatabaseSchema refresh(SingleStoreDBConnection connection) throws SQLException { + protected SingleStoreDatabaseSchema refresh(SingleStoreConnection connection) throws SQLException { connection.readSchema(tables(), null, null, getTableFilter(), null, true); refreshSchemas(); return this; diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverter.java b/src/main/java/com/singlestore/debezium/SingleStoreDefaultValueConverter.java similarity index 96% rename from src/main/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverter.java rename to src/main/java/com/singlestore/debezium/SingleStoreDefaultValueConverter.java index 9d8be60..478ef59 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreDefaultValueConverter.java @@ -23,9 +23,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; -public class SingleStoreDBDefaultValueConverter implements DefaultValueConverter { +public class SingleStoreDefaultValueConverter implements DefaultValueConverter { - private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBDefaultValueConverter.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDefaultValueConverter.class); private static final String EPOCH_TIMESTAMP = "1970-01-01 00:00:00"; private static final String EPOCH_DATE = "1970-01-01"; private static final Pattern TIME_FIELD_PATTERN = Pattern.compile("(\\-?[0-9]*):([0-9]*)(:([0-9]*))?(\\.([0-9]*))?"); @@ -58,9 +58,9 @@ public class SingleStoreDBDefaultValueConverter implements DefaultValueConverter .optionalEnd() .toFormatter(); - private final SingleStoreDBValueConverters converters; + private final SingleStoreValueConverters converters; - public SingleStoreDBDefaultValueConverter(SingleStoreDBValueConverters converters) { + public SingleStoreDefaultValueConverter(SingleStoreValueConverters converters) { this.converters = converters; } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBErrorHandler.java b/src/main/java/com/singlestore/debezium/SingleStoreErrorHandler.java similarity index 67% rename from src/main/java/com/singlestore/debezium/SingleStoreDBErrorHandler.java rename to src/main/java/com/singlestore/debezium/SingleStoreErrorHandler.java index ff2c438..1e9639f 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBErrorHandler.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreErrorHandler.java @@ -9,10 +9,10 @@ import io.debezium.pipeline.ErrorHandler; import io.debezium.util.Collect; -public class SingleStoreDBErrorHandler extends ErrorHandler { +public class SingleStoreErrorHandler extends ErrorHandler { - public SingleStoreDBErrorHandler(SingleStoreDBConnectorConfig connectorConfig, ChangeEventQueue queue) { - super(SingleStoreDBConnector.class, connectorConfig, queue, null); + public SingleStoreErrorHandler(SingleStoreConnectorConfig connectorConfig, ChangeEventQueue queue) { + super(SingleStoreConnector.class, connectorConfig, queue, null); } @Override diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBEventMetadataProvider.java b/src/main/java/com/singlestore/debezium/SingleStoreEventMetadataProvider.java similarity index 95% rename from src/main/java/com/singlestore/debezium/SingleStoreDBEventMetadataProvider.java rename to src/main/java/com/singlestore/debezium/SingleStoreEventMetadataProvider.java index 110f3b1..63b1875 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBEventMetadataProvider.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreEventMetadataProvider.java @@ -13,7 +13,7 @@ import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Collect; -public class SingleStoreDBEventMetadataProvider implements EventMetadataProvider { +public class SingleStoreEventMetadataProvider implements EventMetadataProvider { @Override public Instant getEventTimestamp(DataCollectionId source, OffsetContext offset, Object key, Struct value) { diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBGeometry.java b/src/main/java/com/singlestore/debezium/SingleStoreGeometry.java similarity index 68% rename from src/main/java/com/singlestore/debezium/SingleStoreDBGeometry.java rename to src/main/java/com/singlestore/debezium/SingleStoreGeometry.java index da464af..32d2c3f 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBGeometry.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreGeometry.java @@ -7,7 +7,7 @@ import org.locationtech.jts.io.WKTReader; import org.locationtech.jts.geom.Geometry; -public class SingleStoreDBGeometry { +public class SingleStoreGeometry { private static final WKBWriter wkbWriter = new WKBWriter(); private static final WKTReader wktReader = new WKTReader(); @@ -34,46 +34,46 @@ public class SingleStoreDBGeometry { private final Integer srid; /** - * Create a SingleStoreDBGeometry using the supplied Hex EWKB string. + * Create a SingleStoreGeometry using the supplied Hex EWKB string. */ - public static SingleStoreDBGeometry fromHexEwkb(String hexEwkb) { + public static SingleStoreGeometry fromHexEwkb(String hexEwkb) { byte[] ewkb = HexConverter.convertFromHex(hexEwkb); return fromWkb(ewkb); } /** - * Create a SingleStoreDBGeometry using the supplied WKT. + * Create a SingleStoreGeometry using the supplied WKT. * srid is null as not specified by Single Store. */ - public static SingleStoreDBGeometry fromEkt(String wkt) throws ParseException { + public static SingleStoreGeometry fromEkt(String wkt) throws ParseException { final Geometry geometry = wktReader.read(wkt); final byte[] wkb = wkbWriter.write(geometry); - return new SingleStoreDBGeometry(wkb, null); + return new SingleStoreGeometry(wkb, null); } /** - * Create a SingleStoreDBGeometry using the supplied WKB. + * Create a SingleStoreGeometry using the supplied WKB. */ - public static SingleStoreDBGeometry fromWkb(byte[] wkb) { - return new SingleStoreDBGeometry(wkb, null); + public static SingleStoreGeometry fromWkb(byte[] wkb) { + return new SingleStoreGeometry(wkb, null); } /** - * Create a GEOMETRYCOLLECTION EMPTY SingleStoreDBGeometry + * Create a GEOMETRYCOLLECTION EMPTY SingleStoreGeometry * - * @return a {@link SingleStoreDBGeometry} which represents a PostgisGeometry API + * @return a {@link SingleStoreGeometry} which represents a PostgisGeometry API */ - public static SingleStoreDBGeometry createEmpty() { - return SingleStoreDBGeometry.fromHexEwkb(HEXEWKB_EMPTY_GEOMETRYCOLLECTION); + public static SingleStoreGeometry createEmpty() { + return SingleStoreGeometry.fromHexEwkb(HEXEWKB_EMPTY_GEOMETRYCOLLECTION); } /** - * Create a SingleStoreDBGeometry using the supplied EWKB and SRID. + * Create a SingleStoreGeometry using the supplied EWKB and SRID. * * @param ewkb the Extended Well-Known binary representation of the coordinate in the standard format * @param srid the coordinate system identifier (SRID); null if unset/unknown */ - private SingleStoreDBGeometry(byte[] ewkb, Integer srid) { + private SingleStoreGeometry(byte[] ewkb, Integer srid) { this.wkb = ewkb; this.srid = srid; } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java b/src/main/java/com/singlestore/debezium/SingleStoreOffsetContext.java similarity index 87% rename from src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java rename to src/main/java/com/singlestore/debezium/SingleStoreOffsetContext.java index 2ba6fea..e67e731 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBOffsetContext.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreOffsetContext.java @@ -14,7 +14,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -public class SingleStoreDBOffsetContext extends CommonOffsetContext { +public class SingleStoreOffsetContext extends CommonOffsetContext { private static final String SNAPSHOT_COMPLETED_KEY = "snapshot_completed"; @@ -24,7 +24,7 @@ public class SingleStoreDBOffsetContext extends CommonOffsetContext private boolean snapshotCompleted; private final Schema sourceInfoSchema; - public SingleStoreDBOffsetContext(SingleStoreDBConnectorConfig connectorConfig, Integer partitionId, + public SingleStoreOffsetContext(SingleStoreConnectorConfig connectorConfig, Integer partitionId, String txId, List offsets, boolean snapshot, boolean snapshotCompleted) { super(new SourceInfo(connectorConfig, offsets.size())); @@ -39,21 +39,21 @@ public SingleStoreDBOffsetContext(SingleStoreDBConnectorConfig connectorConfig, } } - public static SingleStoreDBOffsetContext initial(SingleStoreDBConnectorConfig connectorConfig, Supplier partitionNumberSupplier) { + public static SingleStoreOffsetContext initial(SingleStoreConnectorConfig connectorConfig, Supplier partitionNumberSupplier) { int numPartitions = partitionNumberSupplier.get(); if (numPartitions < 1) { throw new IllegalArgumentException("Wrong number of partitions: " + numPartitions); } ArrayList initOffsetList = Stream.generate(() -> (String) null) .limit(numPartitions).collect(Collectors.toCollection(ArrayList::new)); - return new SingleStoreDBOffsetContext(connectorConfig, null, null, initOffsetList, true, false); + return new SingleStoreOffsetContext(connectorConfig, null, null, initOffsetList, true, false); } - public static class Loader implements OffsetContext.Loader { + public static class Loader implements OffsetContext.Loader { - private final SingleStoreDBConnectorConfig connectorConfig; + private final SingleStoreConnectorConfig connectorConfig; - public Loader(SingleStoreDBConnectorConfig connectorConfig) { + public Loader(SingleStoreConnectorConfig connectorConfig) { this.connectorConfig = connectorConfig; } @@ -82,14 +82,14 @@ private Integer parsePartitionId(Object partitionId) { } @Override - public SingleStoreDBOffsetContext load(Map offset) { + public SingleStoreOffsetContext load(Map offset) { String txId = (String) offset.get(SourceInfo.TXID_KEY); Integer partitionId = parsePartitionId(offset.get(SourceInfo.PARTITIONID_KEY)); List offsets = parseOffsets((String) offset.get(SourceInfo.OFFSETS_KEY)); Boolean snapshot = (Boolean) ((Map) offset).getOrDefault(SourceInfo.SNAPSHOT_KEY, Boolean.FALSE); Boolean snapshotCompleted = (Boolean) ((Map) offset).getOrDefault(SNAPSHOT_COMPLETED_KEY, Boolean.FALSE); - return new SingleStoreDBOffsetContext(connectorConfig, partitionId, txId, offsets, snapshot, snapshotCompleted); + return new SingleStoreOffsetContext(connectorConfig, partitionId, txId, offsets, snapshot, snapshotCompleted); } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBPartition.java b/src/main/java/com/singlestore/debezium/SingleStorePartition.java similarity index 73% rename from src/main/java/com/singlestore/debezium/SingleStoreDBPartition.java rename to src/main/java/com/singlestore/debezium/SingleStorePartition.java index 9b94e81..5a29f73 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBPartition.java +++ b/src/main/java/com/singlestore/debezium/SingleStorePartition.java @@ -10,12 +10,12 @@ import io.debezium.relational.AbstractPartition; import io.debezium.util.Collect; -public class SingleStoreDBPartition extends AbstractPartition { +public class SingleStorePartition extends AbstractPartition { private static final String SERVER_PARTITION_KEY = "server"; private final String serverName; - public SingleStoreDBPartition(String serverName, String databaseName) { + public SingleStorePartition(String serverName, String databaseName) { super(databaseName); this.serverName = serverName; } @@ -33,7 +33,7 @@ public boolean equals(Object obj) { if (obj == null || getClass() != obj.getClass()) { return false; } - final SingleStoreDBPartition other = (SingleStoreDBPartition) obj; + final SingleStorePartition other = (SingleStorePartition) obj; return Objects.equals(serverName, other.serverName); } @@ -47,18 +47,18 @@ public String toString() { return "SingleStorePartition [sourcePartition=" + getSourcePartition() + "]"; } - public static class Provider implements Partition.Provider { - private final SingleStoreDBConnectorConfig connectorConfig; + public static class Provider implements Partition.Provider { + private final SingleStoreConnectorConfig connectorConfig; private final Configuration taskConfig; - public Provider(SingleStoreDBConnectorConfig connectorConfig, Configuration taskConfig) { + public Provider(SingleStoreConnectorConfig connectorConfig, Configuration taskConfig) { this.connectorConfig = connectorConfig; this.taskConfig = taskConfig; } @Override - public Set getPartitions() { - return Collections.singleton(new SingleStoreDBPartition( + public Set getPartitions() { + return Collections.singleton(new SingleStorePartition( connectorConfig.getLogicalName(), connectorConfig.databaseName())); } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeEventSource.java b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java similarity index 81% rename from src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeEventSource.java rename to src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java index 634bf6a..0d108d7 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeEventSource.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeEventSource.java @@ -46,23 +46,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class SingleStoreDBSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { +public class SingleStoreSnapshotChangeEventSource extends RelationalSnapshotChangeEventSource { - private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBSnapshotChangeEventSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreSnapshotChangeEventSource.class); private final Set OFFSET_SET = new HashSet<>(); private volatile boolean offsetIsWrong; - private final SingleStoreDBConnectorConfig connectorConfig; - private final SingleStoreDBConnection jdbcConnection; - private final SingleStoreDBDatabaseSchema schema; - private final SnapshotProgressListener snapshotProgressListener; + private final SingleStoreConnectorConfig connectorConfig; + private final SingleStoreConnection jdbcConnection; + private final SingleStoreDatabaseSchema schema; + private final SnapshotProgressListener snapshotProgressListener; private final MainConnectionProvidingConnectionFactory jdbcConnectionFactory; - public SingleStoreDBSnapshotChangeEventSource(SingleStoreDBConnectorConfig connectorConfig, - MainConnectionProvidingConnectionFactory jdbcConnectionFactory, - SingleStoreDBDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, - SnapshotProgressListener snapshotProgressListener, - NotificationService notificationService) { + public SingleStoreSnapshotChangeEventSource(SingleStoreConnectorConfig connectorConfig, + MainConnectionProvidingConnectionFactory jdbcConnectionFactory, + SingleStoreDatabaseSchema schema, EventDispatcher dispatcher, Clock clock, + SnapshotProgressListener snapshotProgressListener, + NotificationService notificationService) { super(connectorConfig, jdbcConnectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService); this.connectorConfig = connectorConfig; this.jdbcConnection = jdbcConnectionFactory.mainConnection(); @@ -72,10 +72,10 @@ public SingleStoreDBSnapshotChangeEventSource(SingleStoreDBConnectorConfig conne } @Override - public SnapshotResult doExecute(ChangeEventSourceContext context, SingleStoreDBOffsetContext previousOffset, - SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) + public SnapshotResult doExecute(ChangeEventSourceContext context, SingleStoreOffsetContext previousOffset, + SnapshotContext snapshotContext, SnapshottingTask snapshottingTask) throws Exception { - final RelationalSnapshotContext ctx = (RelationalSnapshotContext) snapshotContext; + final RelationalSnapshotContext ctx = (RelationalSnapshotContext) snapshotContext; Connection connection = null; Exception exceptionWhileSnapshot = null; @@ -138,17 +138,17 @@ public SnapshotResult doExecute(ChangeEventSourceCon } private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, + RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, Queue connectionPool) throws Exception { tryStartingSnapshot(snapshotContext); - EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); + EventDispatcher.SnapshotReceiver snapshotReceiver = dispatcher.getSnapshotChangeEventReceiver(); int snapshotMaxThreads = connectionPool.size(); LOGGER.info("Creating snapshot with {} worker thread(s)", snapshotMaxThreads); ExecutorService executorService = Executors.newFixedThreadPool(snapshotMaxThreads); - CompletionService completionService = new ExecutorCompletionService<>(executorService); - Queue offsets = new ConcurrentLinkedQueue<>(); + CompletionService completionService = new ExecutorCompletionService<>(executorService); + Queue offsets = new ConcurrentLinkedQueue<>(); for (int i = 0; i < snapshotMaxThreads; i++) { offsets.add(copyOffset(snapshotContext)); } @@ -164,7 +164,7 @@ private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceC } int tableCount = rowCountTables.size(); - List> dataEventTasks = new ArrayList<>(tableCount); + List> dataEventTasks = new ArrayList<>(tableCount); CyclicBarrier barrier = new CyclicBarrier(tableCount); int tableOrder = 1; for (TableId tableId : rowCountTables.keySet()) { @@ -172,14 +172,14 @@ private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceC boolean lastTable = tableOrder == tableCount && snapshotMaxThreads == 1; String selectStatement = queryTables.get(tableId); OptionalLong rowCount = rowCountTables.get(tableId); - Callable callable = createDataEventsForTableCallable(sourceContext, snapshotContext, snapshotReceiver, + Callable callable = createDataEventsForTableCallable(sourceContext, snapshotContext, snapshotReceiver, snapshotContext.tables.forTable(tableId), firstTable, lastTable, tableOrder++, tableCount, selectStatement, rowCount, offsets, connectionPool, barrier); dataEventTasks.add(callable); } - List commitSnapshotOffsetList = new ArrayList<>(tableCount); + List commitSnapshotOffsetList = new ArrayList<>(tableCount); try { - for (Callable callable : dataEventTasks) { + for (Callable callable : dataEventTasks) { completionService.submit(callable); } for (int i = 0; i < dataEventTasks.size(); i++) { @@ -206,22 +206,22 @@ private void createDataEvents(ChangeEventSource.ChangeEventSourceContext sourceC } }); - for (SingleStoreDBOffsetContext offset : offsets) { + for (SingleStoreOffsetContext offset : offsets) { offset.preSnapshotCompletion(); } snapshotReceiver.completeSnapshot(); - for (SingleStoreDBOffsetContext offset : offsets) { + for (SingleStoreOffsetContext offset : offsets) { offset.postSnapshotCompletion(); } } - private Callable createDataEventsForTableCallable(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, boolean firstTable, boolean lastTable, int tableOrder, + private Callable createDataEventsForTableCallable(ChangeEventSource.ChangeEventSourceContext sourceContext, RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, + EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, boolean firstTable, boolean lastTable, int tableOrder, int tableCount, String selectStatement, OptionalLong rowCount, - Queue offsets, Queue connectionPool, CyclicBarrier barrier) { + Queue offsets, Queue connectionPool, CyclicBarrier barrier) { return () -> { JdbcConnection connection = connectionPool.poll(); - SingleStoreDBOffsetContext offset = offsets.poll(); + SingleStoreOffsetContext offset = offsets.poll(); try { return doCreateDataEventsForTable(sourceContext, snapshotContext, offset, snapshotReceiver, table, firstTable, lastTable, tableOrder, tableCount, selectStatement, rowCount, connection, barrier); @@ -232,19 +232,19 @@ private Callable createDataEventsForTableCallable(Ch }; } - private SingleStoreDBOffsetContext doCreateDataEventsForTable( + private SingleStoreOffsetContext doCreateDataEventsForTable( ChangeEventSource.ChangeEventSourceContext sourceContext, - RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, - SingleStoreDBOffsetContext offset, - EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, + RelationalSnapshotChangeEventSource.RelationalSnapshotContext snapshotContext, + SingleStoreOffsetContext offset, + EventDispatcher.SnapshotReceiver snapshotReceiver, Table table, boolean firstTable, boolean lastTable, int tableOrder, int tableCount, String selectStatement, OptionalLong rowCount, JdbcConnection jdbcConnection, CyclicBarrier barrier) throws InterruptedException { - SingleStoreDBPartition partition = snapshotContext.partition; + SingleStorePartition partition = snapshotContext.partition; if (!sourceContext.isRunning()) { throw new InterruptedException("Interrupted while snapshotting table " + table.id()); } - SingleStoreDBOffsetContext commitOffset = copyOffset(snapshotContext); + SingleStoreOffsetContext commitOffset = copyOffset(snapshotContext); long exportStart = clock.currentTimeInMillis(); LOGGER.info("Exporting data from table '{}' ({} of {} tables)", table.id(), tableOrder, tableCount); Instant sourceTableSnapshotTimestamp = getSnapshotSourceTimestamp(jdbcConnection, offset, table.id()); @@ -328,7 +328,7 @@ private synchronized void validateBeginOffset(String offset) { } } - private void setSnapshotMarker(SingleStoreDBOffsetContext offset, boolean firstTable, boolean lastTable, boolean firstRecordInTable, + private void setSnapshotMarker(SingleStoreOffsetContext offset, boolean firstTable, boolean lastTable, boolean firstRecordInTable, boolean lastRecordInTable) { if (lastRecordInTable && lastTable) { offset.markSnapshotRecord(SnapshotRecord.LAST); @@ -343,7 +343,7 @@ private void setSnapshotMarker(SingleStoreDBOffsetContext offset, boolean firstT } } - private void updateSnapshotOffset(SingleStoreDBOffsetContext offset, ResultSet rs) throws SQLException { + private void updateSnapshotOffset(SingleStoreOffsetContext offset, ResultSet rs) throws SQLException { final String offsetValue = ObserveResultSetUtils.offset(rs); final String txId = ObserveResultSetUtils.txId(rs); final Integer partitionId = ObserveResultSetUtils.partitionId(rs); @@ -353,17 +353,17 @@ private void updateSnapshotOffset(SingleStoreDBOffsetContext offset, ResultSet r /** * Returns a {@link ChangeRecordEmitter} producing the change records for the given table row. */ - protected ChangeRecordEmitter getChangeRecordEmitter(SingleStoreDBPartition partition, SingleStoreDBOffsetContext offset, TableId tableId, + protected ChangeRecordEmitter getChangeRecordEmitter(SingleStorePartition partition, SingleStoreOffsetContext offset, TableId tableId, Object[] row, long internalId, Instant timestamp) { offset.event(tableId, timestamp); - return new SingleStoreDBSnapshotChangeRecordEmitter(partition, offset, row, internalId, getClock(), connectorConfig); + return new SingleStoreSnapshotChangeRecordEmitter(partition, offset, row, internalId, getClock(), connectorConfig); } private Threads.Timer getTableScanLogTimer() { return Threads.timer(clock, LOG_INTERVAL); } - private void determineCapturedTables(RelationalSnapshotContext ctx, Set dataCollectionsToBeSnapshotted) throws Exception { + private void determineCapturedTables(RelationalSnapshotContext ctx, Set dataCollectionsToBeSnapshotted) throws Exception { Set allTableIds = getAllTableIds(ctx); Set snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet()); @@ -422,7 +422,7 @@ private Stream toTableIds(Set tableIds, Pattern pattern) { .sorted(); } - private Queue createConnectionPool(final RelationalSnapshotContext ctx) throws SQLException { + private Queue createConnectionPool(final RelationalSnapshotContext ctx) throws SQLException { Queue connectionPool = new ConcurrentLinkedQueue<>(); connectionPool.add(jdbcConnection); @@ -454,26 +454,26 @@ private void rollbackTransaction(Connection connection) { @Override protected Set getAllTableIds( - RelationalSnapshotContext ctx) + RelationalSnapshotContext ctx) throws Exception { return jdbcConnection.readTableNames(ctx.catalogName, null, null, new String[]{"TABLE"}); } @Override protected void lockTablesForSchemaSnapshot(ChangeEventSourceContext sourceContext, - RelationalSnapshotContext snapshotContext) { + RelationalSnapshotContext snapshotContext) { } @Override protected void determineSnapshotOffset( - RelationalSnapshotContext ctx, - SingleStoreDBOffsetContext previousOffset) { + RelationalSnapshotContext ctx, + SingleStoreOffsetContext previousOffset) { if (previousOffset != null) { ctx.offset = previousOffset; tryStartingSnapshot(ctx); return; } - ctx.offset = SingleStoreDBOffsetContext.initial(connectorConfig, () -> readNumberOfPartitions(ctx.catalogName)); + ctx.offset = SingleStoreOffsetContext.initial(connectorConfig, () -> readNumberOfPartitions(ctx.catalogName)); } private int readNumberOfPartitions(String database) { @@ -491,8 +491,8 @@ private int readNumberOfPartitions(String database) { @Override protected void readTableStructure(ChangeEventSourceContext sourceContext, - RelationalSnapshotContext snapshotContext, - SingleStoreDBOffsetContext offsetContext, + RelationalSnapshotContext snapshotContext, + SingleStoreOffsetContext offsetContext, SnapshottingTask snapshottingTask) throws Exception { Set catalogs = snapshotContext.capturedTables.stream() .map(TableId::catalog) @@ -519,20 +519,20 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext, @Override protected void releaseSchemaSnapshotLocks( - RelationalSnapshotContext snapshotContext) + RelationalSnapshotContext snapshotContext) throws Exception { } @Override protected SchemaChangeEvent getCreateTableEvent( - RelationalSnapshotContext snapshotContext, Table table) { + RelationalSnapshotContext snapshotContext, Table table) { return SchemaChangeEvent.ofSnapshotCreate(snapshotContext.partition, snapshotContext.offset, snapshotContext.catalogName, table); } @Override - protected SingleStoreDBOffsetContext copyOffset( - RelationalSnapshotContext snapshotContext) { - return new SingleStoreDBOffsetContext.Loader(connectorConfig).load(snapshotContext.offset.getOffset()); + protected SingleStoreOffsetContext copyOffset( + RelationalSnapshotContext snapshotContext) { + return new SingleStoreOffsetContext.Loader(connectorConfig).load(snapshotContext.offset.getOffset()); } /** @@ -542,14 +542,14 @@ protected SingleStoreDBOffsetContext copyOffset( * @param tableId the table to generate a query for * @return a valid query string or empty if table will not be snapshotted */ - private String determineSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { + private String determineSnapshotSelect(RelationalSnapshotContext snapshotContext, TableId tableId) { List columns = getPreparedColumnNames(snapshotContext.partition, schema.tableFor(tableId)); return getSnapshotSelect(snapshotContext, tableId, columns).orElseThrow(() -> new IllegalArgumentException("Snapshot select query was not provided.")); } @Override protected Optional getSnapshotSelect( - RelationalSnapshotContext snapshotContext, + RelationalSnapshotContext snapshotContext, TableId tableId, List columns) { String snapshotSelectColumns = columns.stream() .collect(Collectors.joining(", "));//todo use in observe query @@ -557,8 +557,8 @@ protected Optional getSnapshotSelect( } @Override - public SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition, - SingleStoreDBOffsetContext previousOffset) { + public SnapshottingTask getSnapshottingTask(SingleStorePartition partition, + SingleStoreOffsetContext previousOffset) { List dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted(); Map snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream() .collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue)); @@ -583,8 +583,8 @@ public SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition, } @Override - protected RelationalSnapshotContext prepare( - SingleStoreDBPartition partition) throws Exception { + protected RelationalSnapshotContext prepare( + SingleStorePartition partition) throws Exception { return new RelationalSnapshotContext<>(partition, connectorConfig.databaseName()); } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java similarity index 76% rename from src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java rename to src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java index 3ad90e7..33b39b2 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBSnapshotChangeRecordEmitter.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreSnapshotChangeRecordEmitter.java @@ -10,19 +10,19 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -public class SingleStoreDBSnapshotChangeRecordEmitter extends SnapshotChangeRecordEmitter { +public class SingleStoreSnapshotChangeRecordEmitter extends SnapshotChangeRecordEmitter { private static final String INTERNAL_ID = "internalId"; private final long internalId; - public SingleStoreDBSnapshotChangeRecordEmitter(SingleStoreDBPartition partition, OffsetContext offset, Object[] row, long internalId, Clock clock, RelationalDatabaseConnectorConfig connectorConfig) { + public SingleStoreSnapshotChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset, Object[] row, long internalId, Clock clock, RelationalDatabaseConnectorConfig connectorConfig) { super(partition, offset, row, clock, connectorConfig); this.internalId = internalId; } @Override - protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) + protected void emitReadRecord(Receiver receiver, TableSchema tableSchema) throws InterruptedException { Object[] newColumnValues = getNewColumnValues(); Struct newValue = tableSchema.valueFromColumnData(newColumnValues); diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBSourceInfoStructMaker.java b/src/main/java/com/singlestore/debezium/SingleStoreSourceInfoStructMaker.java similarity index 94% rename from src/main/java/com/singlestore/debezium/SingleStoreDBSourceInfoStructMaker.java rename to src/main/java/com/singlestore/debezium/SingleStoreSourceInfoStructMaker.java index 5bc6eb1..879c34b 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBSourceInfoStructMaker.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreSourceInfoStructMaker.java @@ -7,7 +7,7 @@ import io.debezium.config.CommonConnectorConfig; import io.debezium.connector.AbstractSourceInfoStructMaker; -public class SingleStoreDBSourceInfoStructMaker extends AbstractSourceInfoStructMaker { +public class SingleStoreSourceInfoStructMaker extends AbstractSourceInfoStructMaker { private Schema schema; diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBStreamingChangeEventSource.java b/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java similarity index 88% rename from src/main/java/com/singlestore/debezium/SingleStoreDBStreamingChangeEventSource.java rename to src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java index d8ae175..9ec864f 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBStreamingChangeEventSource.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreStreamingChangeEventSource.java @@ -23,22 +23,22 @@ import io.debezium.relational.TableId; import io.debezium.util.Clock; -public class SingleStoreDBStreamingChangeEventSource implements StreamingChangeEventSource{ +public class SingleStoreStreamingChangeEventSource implements StreamingChangeEventSource{ - private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBStreamingChangeEventSource.class); + private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreStreamingChangeEventSource.class); - SingleStoreDBConnectorConfig connectorConfig; - SingleStoreDBConnection connection; - EventDispatcher dispatcher; + SingleStoreConnectorConfig connectorConfig; + SingleStoreConnection connection; + EventDispatcher dispatcher; ErrorHandler errorHandler; - SingleStoreDBDatabaseSchema schema; + SingleStoreDatabaseSchema schema; Clock clock; - public SingleStoreDBStreamingChangeEventSource(SingleStoreDBConnectorConfig connectorConfig, - SingleStoreDBConnection connection, - EventDispatcher dispatcher, + public SingleStoreStreamingChangeEventSource(SingleStoreConnectorConfig connectorConfig, + SingleStoreConnection connection, + EventDispatcher dispatcher, ErrorHandler errorHandler, - SingleStoreDBDatabaseSchema schema, + SingleStoreDatabaseSchema schema, Clock clock) { this.connectorConfig = connectorConfig; this.connection = connection; @@ -49,8 +49,8 @@ public SingleStoreDBStreamingChangeEventSource(SingleStoreDBConnectorConfig conn } @Override - public void execute(ChangeEventSourceContext context, SingleStoreDBPartition partition, - SingleStoreDBOffsetContext offsetContext) throws InterruptedException { + public void execute(ChangeEventSourceContext context, SingleStorePartition partition, + SingleStoreOffsetContext offsetContext) throws InterruptedException { if (!connectorConfig.getSnapshotMode().shouldStream()) { LOGGER.info("Streaming is disabled for snapshot mode {}", connectorConfig.getSnapshotMode()); return; @@ -126,7 +126,7 @@ public void run() { try { dispatcher.dispatchDataChangeEvent(partition, table, - new SingleStoreDBChangeRecordEmitter( + new SingleStoreChangeRecordEmitter( partition, offsetContext, clock, diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBTableFilters.java b/src/main/java/com/singlestore/debezium/SingleStoreTableFilters.java similarity index 78% rename from src/main/java/com/singlestore/debezium/SingleStoreDBTableFilters.java rename to src/main/java/com/singlestore/debezium/SingleStoreTableFilters.java index c29bab0..ed372cf 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBTableFilters.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreTableFilters.java @@ -8,18 +8,18 @@ import io.debezium.relational.Selectors.TableIdToStringMapper; import io.debezium.relational.Tables.TableFilter; -public class SingleStoreDBTableFilters extends RelationalTableFilters { +public class SingleStoreTableFilters extends RelationalTableFilters { private final TableFilter tableFilter; private final Predicate databaseFilter; private final String tableName; private final String databaseName; - public SingleStoreDBTableFilters(Configuration config, TableFilter systemTablesFilter, + public SingleStoreTableFilters(Configuration config, TableFilter systemTablesFilter, TableIdToStringMapper tableIdMapper, boolean useCatalogBeforeSchema) { super(config, systemTablesFilter, tableIdMapper, useCatalogBeforeSchema); - databaseName = config.getString(SingleStoreDBConnectorConfig.DATABASE_NAME); - tableName = config.getString(SingleStoreDBConnectorConfig.TABLE_NAME); + databaseName = config.getString(SingleStoreConnectorConfig.DATABASE_NAME); + tableName = config.getString(SingleStoreConnectorConfig.TABLE_NAME); tableFilter = TableFilter.fromPredicate( table -> diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java b/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java similarity index 95% rename from src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java rename to src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java index 6e934ca..5c47dec 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBTableSchemaBuilder.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreTableSchemaBuilder.java @@ -23,12 +23,12 @@ import io.debezium.spi.topic.TopicNamingStrategy; import io.debezium.schema.FieldNameSelector.FieldNamer; -public class SingleStoreDBTableSchemaBuilder extends TableSchemaBuilder { +public class SingleStoreTableSchemaBuilder extends TableSchemaBuilder { public static final String INTERNAL_ID = "internalId"; private final Boolean populateInternalId; - public SingleStoreDBTableSchemaBuilder(ValueConverterProvider valueConverterProvider, + public SingleStoreTableSchemaBuilder(ValueConverterProvider valueConverterProvider, DefaultValueConverter defaultValueConverter, SchemaNameAdjuster schemaNameAdjuster, CustomConverterRegistry customConverterRegistry, Schema sourceInfoSchema, FieldNamer fieldNamer, boolean multiPartitionMode, Boolean populateInternalId) { diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBTaskContext.java b/src/main/java/com/singlestore/debezium/SingleStoreTaskContext.java similarity index 54% rename from src/main/java/com/singlestore/debezium/SingleStoreDBTaskContext.java rename to src/main/java/com/singlestore/debezium/SingleStoreTaskContext.java index b6bf2ad..0ae642b 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBTaskContext.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreTaskContext.java @@ -2,8 +2,8 @@ import io.debezium.connector.common.CdcSourceTaskContext; -public class SingleStoreDBTaskContext extends CdcSourceTaskContext { - public SingleStoreDBTaskContext(SingleStoreDBConnectorConfig config, SingleStoreDBDatabaseSchema schema) { +public class SingleStoreTaskContext extends CdcSourceTaskContext { + public SingleStoreTaskContext(SingleStoreConnectorConfig config, SingleStoreDatabaseSchema schema) { super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds); } } diff --git a/src/main/java/com/singlestore/debezium/SingleStoreDBValueConverters.java b/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java similarity index 94% rename from src/main/java/com/singlestore/debezium/SingleStoreDBValueConverters.java rename to src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java index 695ad55..2979d6e 100644 --- a/src/main/java/com/singlestore/debezium/SingleStoreDBValueConverters.java +++ b/src/main/java/com/singlestore/debezium/SingleStoreValueConverters.java @@ -22,7 +22,7 @@ import java.time.ZoneOffset; import java.time.temporal.ChronoField; -public class SingleStoreDBValueConverters extends JdbcValueConverters { +public class SingleStoreValueConverters extends JdbcValueConverters { /** * Create a new instance of JdbcValueConverters. @@ -33,7 +33,7 @@ public class SingleStoreDBValueConverters extends JdbcValueConverters { * @param temporalPrecisionMode temporal precision mode based on {@link io.debezium.jdbc.TemporalPrecisionMode} * @param binaryMode how binary columns should be represented */ - public SingleStoreDBValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, + public SingleStoreValueConverters(DecimalMode decimalMode, TemporalPrecisionMode temporalPrecisionMode, CommonConnectorConfig.BinaryHandlingMode binaryMode) { super(decimalMode, temporalPrecisionMode, ZoneOffset.UTC, null, null, binaryMode); } @@ -162,7 +162,7 @@ protected Object convertBlob(Column column, Field fieldDefn, Object data) { } /** - * Converts java.sql.Timestamp returned from SingleStoreDB for types: TIMESTAMP, DATETIME, TIMESTAMP(6) and DATETIME(6). + * Converts java.sql.Timestamp returned from SingleStore for types: TIMESTAMP, DATETIME, TIMESTAMP(6) and DATETIME(6). * * @param column the column definition describing the {@code data} value; never null * @param fieldDefn the field definition; never null @@ -182,7 +182,7 @@ protected Object convertTimeStamp(Column column, Field fieldDefn, Object data) { } /** - * Converts returned value from SingleStoreDB for types: TIME and TIME(6) + * Converts returned value from SingleStore for types: TIME and TIME(6) *

* * @param column the column definition describing the {@code data} value; never null @@ -240,12 +240,12 @@ protected Object convertYearToInt(Column column, Field fieldDefn, Object data) { * @throws IllegalArgumentException if the value could not be converted but the column does not allow nulls */ protected Object convertGeometry(Column column, Field fieldDefn, Object data) throws IllegalArgumentException { - SingleStoreDBGeometry empty = SingleStoreDBGeometry.createEmpty(); + SingleStoreGeometry empty = SingleStoreGeometry.createEmpty(); return convertValue(column, fieldDefn, data, io.debezium.data.geometry.Geometry.createValue(fieldDefn.schema(), empty.getWkb(), empty.getSrid()), (r) -> { if (data instanceof String) { - SingleStoreDBGeometry geometry; + SingleStoreGeometry geometry; try { - geometry = SingleStoreDBGeometry.fromEkt((String) data); + geometry = SingleStoreGeometry.fromEkt((String) data); } catch (ParseException e) { throw new IllegalArgumentException(e); } diff --git a/src/main/java/com/singlestore/debezium/SourceInfo.java b/src/main/java/com/singlestore/debezium/SourceInfo.java index cc09a71..39ffe89 100644 --- a/src/main/java/com/singlestore/debezium/SourceInfo.java +++ b/src/main/java/com/singlestore/debezium/SourceInfo.java @@ -13,7 +13,7 @@ * Information about the source of information, which includes the partitions and offsets within those partitions. * *

- * The {@link SingleStoreDBPartition#getSourcePartition() source partition} information describes the cluster whose events are being consumed. + * The {@link SingleStorePartition#getSourcePartition() source partition} information describes the cluster whose events are being consumed. * Typically, the clutser is identified by the host address and the port number. Here's a JSON-like * representation of an example cluster: * @@ -23,7 +23,7 @@ * } * * - * The {@link SingleStoreDBOffsetContext#getOffset() source offset} information describes a structure containing the position in the server's offset for any + * The {@link SingleStoreOffsetContext#getOffset() source offset} information describes a structure containing the position in the server's offset for any * particular event for particular partition and transaction id. When performing snapshots, it may also contain a snapshot field which indicates that a particular record * is created while a snapshot it taking place. * Here's a JSON-like representation of an example: @@ -47,7 +47,7 @@ *

* * The {@link #struct() source} struct appears in each message envelope and contains information about the event. It is - * a mixture the fields from the {@link SingleStoreDBPartition#getSourcePartition() partition} and {@link SingleStoreDBPartition#getSourcePartition() offset}. + * a mixture the fields from the {@link SingleStorePartition#getSourcePartition() partition} and {@link SingleStorePartition#getSourcePartition() offset}. * Like with the offset, the "{@code snapshot}" field only appears for events produced when the connector is in the * middle of a snapshot. Here's a JSON-like representation of the source for an event that corresponds to the above partition and * offset: @@ -76,7 +76,7 @@ public class SourceInfo extends BaseSourceInfo { private Instant timestamp; - public SourceInfo(SingleStoreDBConnectorConfig connectorConfig, Integer numPartitions) { + public SourceInfo(SingleStoreConnectorConfig connectorConfig, Integer numPartitions) { super(connectorConfig); offsets = Collections.nCopies(numPartitions, null); @@ -85,7 +85,7 @@ public SourceInfo(SingleStoreDBConnectorConfig connectorConfig, Integer numParti /** * Updates the source with information about a particular received or read event. * - * @param partitionId index of the SingleStoreDB partition + * @param partitionId index of the SingleStore partition * @param txId the ID of the transaction that generated the transaction * @param offsets hex strings that represent offset for each database partition * @return this instance @@ -101,7 +101,7 @@ protected SourceInfo update(Integer partitionId, String txId, List offse /** * Updates the source with information about a particular received or read event. * - * @param partitionId index of the SingleStoreDB partition + * @param partitionId index of the SingleStore partition * @param txId the ID of the transaction that generated the transaction * @param offset hex strings that represent offset for given database partition * @return this instance diff --git a/src/main/java/com/singlestore/debezium/util/ObserveResultSetUtils.java b/src/main/java/com/singlestore/debezium/util/ObserveResultSetUtils.java index fc7ce7d..e1137a4 100644 --- a/src/main/java/com/singlestore/debezium/util/ObserveResultSetUtils.java +++ b/src/main/java/com/singlestore/debezium/util/ObserveResultSetUtils.java @@ -9,7 +9,7 @@ import org.apache.kafka.connect.data.Field; -import com.singlestore.debezium.SingleStoreDBTableSchemaBuilder; +import com.singlestore.debezium.SingleStoreTableSchemaBuilder; public final class ObserveResultSetUtils { @@ -21,7 +21,7 @@ public static List columnPositions(ResultSet resultSet, List fie List positions = new ArrayList<>(); for (int i = 0; i < fields.size(); i++) { String columnName = fields.get(i).name(); - if (populateInternalId && columnName.equals(SingleStoreDBTableSchemaBuilder.INTERNAL_ID)) { + if (populateInternalId && columnName.equals(SingleStoreTableSchemaBuilder.INTERNAL_ID)) { columnName = METADATA_COLUMNS[6]; } diff --git a/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java b/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java index e587c45..224b1ce 100644 --- a/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java +++ b/src/test/java/com/singlestore/debezium/ColumnMappingsIT.java @@ -16,14 +16,14 @@ public class ColumnMappingsIT extends IntegrationTestBase { @Test public void testHashMask() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("song"))) { Configuration config = defaultJdbcConfigWithTable("song"); config = config.edit() .with("column.mask.hash.SHA-256.with.salt.salt123", TEST_DATABASE + ".song.author") .with("column.mask.with.10.chars", TEST_DATABASE + ".song.name").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -48,13 +48,13 @@ public void testHashMask() throws SQLException, InterruptedException { @Test public void testTruncate() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("song"))) { Configuration config = defaultJdbcConfigWithTable("song"); config = config.edit().with("column.truncate.to.10.chars", TEST_DATABASE + ".song.author," + TEST_DATABASE + ".song.name").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -78,13 +78,13 @@ public void testTruncate() throws SQLException, InterruptedException { @Test public void testColumnPropagate() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("person"))) { Configuration config = defaultJdbcConfigWithTable("person"); - config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") + config = config.edit().with(SingleStoreConnectorConfig.PROPAGATE_COLUMN_SOURCE_TYPE, ".*") .build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -133,13 +133,13 @@ public void testColumnPropagate() throws SQLException, InterruptedException { @Test public void testDataTypePropagate() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("person"))) { Configuration config = defaultJdbcConfigWithTable("person"); - config = config.edit().with(SingleStoreDBConnectorConfig.PROPAGATE_DATATYPE_SOURCE_TYPE, + config = config.edit().with(SingleStoreConnectorConfig.PROPAGATE_DATATYPE_SOURCE_TYPE, ".+\\.VARCHAR,.+\\.DECIMAL").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { diff --git a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java index 9599fb1..887efdb 100644 --- a/src/test/java/com/singlestore/debezium/IntegrationTestBase.java +++ b/src/test/java/com/singlestore/debezium/IntegrationTestBase.java @@ -27,7 +27,7 @@ abstract class IntegrationTestBase extends AbstractConnectorTest { @BeforeClass public static void init() throws Exception { - try (SingleStoreDBConnection conn = create()) { + try (SingleStoreConnection conn = create()) { conn.connect(); } catch (SQLException e) { // Failed to connect @@ -56,26 +56,26 @@ public static void deinit() throws Exception { /** * Obtain a default DB connection. * - * @return the SingleStoreDBConnection instance; never null + * @return the SingleStoreConnection instance; never null */ - public static SingleStoreDBConnection create() { - return new SingleStoreDBConnection(defaultJdbcConnectionConfig()); + public static SingleStoreConnection create() { + return new SingleStoreConnection(defaultJdbcConnectionConfig()); } protected void waitForSnapshotToBeCompleted() throws InterruptedException { - waitForSnapshotToBeCompleted("singlestoredb", "singlestore_topic"); + waitForSnapshotToBeCompleted("singlestore", "singlestore_topic"); } protected void waitForSnapshotWithCustomMetricsToBeCompleted(Map props) throws InterruptedException { - waitForSnapshotWithCustomMetricsToBeCompleted("singlestoredb", "singlestore_topic", props); + waitForSnapshotWithCustomMetricsToBeCompleted("singlestore", "singlestore_topic", props); } protected void waitForStreamingToStart() throws InterruptedException { - waitForStreamingRunning("singlestoredb", "singlestore_topic"); + waitForStreamingRunning("singlestore", "singlestore_topic"); } protected void waitForStreamingWithCustomMetricsToStart(Map props) throws InterruptedException { - waitForStreamingWithCustomMetricsToStart("singlestoredb", "singlestore_topic", props); + waitForStreamingWithCustomMetricsToStart("singlestore", "singlestore_topic", props); } /** @@ -91,7 +91,7 @@ public static void execute(String statement, String... furtherStatements) { } } - try (SingleStoreDBConnection connection = create()) { + try (SingleStoreConnection connection = create()) { // TODO: JDBC 1.1.9 doesn't support non-auto commit mode. // When we will use newer JDBC driver then this can be rewritten to // don't commit changes if at least one query failed. @@ -112,7 +112,7 @@ public static void execute(String statement, String... furtherStatements) { * @throws SQLException if anything fails. */ public static void dropAllTables() throws SQLException { - try (SingleStoreDBConnection connection = create()) { + try (SingleStoreConnection connection = create()) { connection.readAllTableNames(new String[]{"TABLE"}).forEach(table -> { if (table.catalog().equals(TEST_DATABASE)) { execute(String.format("DROP TABLE `%s`.`%s`", table.catalog(), table.table())); @@ -121,18 +121,18 @@ public static void dropAllTables() throws SQLException { } } - public static SingleStoreDBConnection.SingleStoreDBConnectionConfiguration defaultJdbcConnectionConfig() { - return new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(defaultJdbcConfig()); + public static SingleStoreConnection.SingleStoreConnectionConfiguration defaultJdbcConnectionConfig() { + return new SingleStoreConnection.SingleStoreConnectionConfiguration(defaultJdbcConfig()); } - public static SingleStoreDBConnection.SingleStoreDBConnectionConfiguration defaultJdbcConnectionConfigWithTable(String table) { - return new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(defaultJdbcConfigWithTable(table)); + public static SingleStoreConnection.SingleStoreConnectionConfiguration defaultJdbcConnectionConfigWithTable(String table) { + return new SingleStoreConnection.SingleStoreConnectionConfiguration(defaultJdbcConfigWithTable(table)); } public static JdbcConfiguration defaultJdbcConfigWithTable(String table) { return defaultJdbcConfigBuilder() - .withDefault(SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, table) + .withDefault(SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, table) .build(); } @@ -142,12 +142,12 @@ public static JdbcConfiguration defaultJdbcConfig() { public static JdbcConfiguration.Builder defaultJdbcConfigBuilder() { return JdbcConfiguration.copy(Configuration.fromSystemProperties("database.")) - .with(SingleStoreDBConnectorConfig.TOPIC_PREFIX, TEST_TOPIC_PREFIX) - .withDefault(SingleStoreDBConnectorConfig.HOSTNAME, TEST_SERVER) - .withDefault(SingleStoreDBConnectorConfig.PORT, TEST_PORT) - .withDefault(SingleStoreDBConnectorConfig.USER, TEST_USER) - .withDefault(SingleStoreDBConnectorConfig.PASSWORD, TEST_PASSWORD) - .withDefault(SingleStoreDBConnectorConfig.DRIVER_PARAMETERS, "allowMultiQueries=true"); + .with(SingleStoreConnectorConfig.TOPIC_PREFIX, TEST_TOPIC_PREFIX) + .withDefault(SingleStoreConnectorConfig.HOSTNAME, TEST_SERVER) + .withDefault(SingleStoreConnectorConfig.PORT, TEST_PORT) + .withDefault(SingleStoreConnectorConfig.USER, TEST_USER) + .withDefault(SingleStoreConnectorConfig.PASSWORD, TEST_PASSWORD) + .withDefault(SingleStoreConnectorConfig.DRIVER_PARAMETERS, "allowMultiQueries=true"); } protected static void executeDDL(String ddlFile) throws Exception { @@ -156,7 +156,7 @@ protected static void executeDDL(String ddlFile) throws Exception { String statements = java.nio.file.Files.readAllLines(Paths.get(ddlTestFile.toURI())) .stream() .collect(Collectors.joining(System.lineSeparator())); - try (SingleStoreDBConnection connection = create()) { + try (SingleStoreConnection connection = create()) { connection.execute(statements); } } diff --git a/src/test/java/com/singlestore/debezium/MetricsIT.java b/src/test/java/com/singlestore/debezium/MetricsIT.java index e62dc65..1dc6359 100644 --- a/src/test/java/com/singlestore/debezium/MetricsIT.java +++ b/src/test/java/com/singlestore/debezium/MetricsIT.java @@ -25,7 +25,7 @@ public void testHeartBeat() throws InterruptedException { config = config.edit().withDefault(TOPIC_HEARTBEAT_PREFIX, "__heartbeat") .withDefault(HEARTBEAT_ACTION_QUERY, "SELECT 1").withDefault(HEARTBEAT_INTERVAL, 1000) .build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { List records = consumeRecordsByTopic(1).allRecordsInOrder(); @@ -49,13 +49,13 @@ public void testCustomMetrics() throws Exception { "SNAPSHOT DATABASE " + TEST_DATABASE + ";"; execute(statements); final Configuration config = defaultJdbcConfigBuilder().withDefault( - SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") .with(CommonConnectorConfig.CUSTOM_METRIC_TAGS, "env=test,bu=bigdata").build(); - Map customMetricTags = new SingleStoreDBConnectorConfig( + Map customMetricTags = new SingleStoreConnectorConfig( config).getCustomMetricTags(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); assertSnapshotWithCustomMetrics(customMetricTags); assertStreamingWithCustomMetrics(customMetricTags); @@ -64,7 +64,7 @@ public void testCustomMetrics() throws Exception { private void assertSnapshotWithCustomMetrics(Map customMetricTags) throws Exception { final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - final ObjectName objectName = getSnapshotMetricsObjectName("singlestoredb", "singlestore_topic", + final ObjectName objectName = getSnapshotMetricsObjectName("singlestore", "singlestore_topic", customMetricTags); waitForSnapshotWithCustomMetricsToBeCompleted(customMetricTags); assertThat(mBeanServer.getAttribute(objectName, "TotalTableCount")).isEqualTo(1); @@ -82,7 +82,7 @@ private void assertSnapshotWithCustomMetrics(Map customMetricTag private void assertStreamingWithCustomMetrics(Map customMetricTags) throws Exception { final MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); - final ObjectName objectName = getStreamingMetricsObjectName("singlestoredb", + final ObjectName objectName = getStreamingMetricsObjectName("singlestore", "singlestore_topic", customMetricTags); // Insert for streaming events waitForStreamingWithCustomMetricsToStart(customMetricTags); diff --git a/src/test/java/com/singlestore/debezium/NotificationsIT.java b/src/test/java/com/singlestore/debezium/NotificationsIT.java index 25d76ff..61b830c 100644 --- a/src/test/java/com/singlestore/debezium/NotificationsIT.java +++ b/src/test/java/com/singlestore/debezium/NotificationsIT.java @@ -37,11 +37,11 @@ public class NotificationsIT extends SnapshotIT { @Test public void notificationCorrectlySentOnItsTopic() { final Configuration config = defaultJdbcConfigBuilder().withDefault( - SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); List notifications = new ArrayList<>(); Awaitility.await().atMost(60, TimeUnit.SECONDS).until(() -> { @@ -72,10 +72,10 @@ public void notificationCorrectlySentOnItsTopic() { @Test public void notificationNotSentIfNoChannelIsConfigured() { final Configuration config = defaultJdbcConfigBuilder().withDefault( - SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); waitForAvailableRecords(1000, TimeUnit.MILLISECONDS); List notifications = consumedLines.stream() @@ -88,11 +88,11 @@ public void notificationCorrectlySentOnJmx() throws ReflectionException, MalformedObjectNameException, InstanceNotFoundException, IntrospectionException, AttributeNotFoundException, MBeanException { final Configuration config = defaultJdbcConfigBuilder().withDefault( - SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "jmx").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); Awaitility.await().atMost(60, TimeUnit.SECONDS).pollDelay(1, TimeUnit.SECONDS) @@ -127,6 +127,6 @@ private List readNotificationFromJmx() private ObjectName getObjectName() throws MalformedObjectNameException { return new ObjectName( String.format("debezium.%s:type=management,context=notifications,server=%s", - "singlestoredb", "singlestore_topic")); + "singlestore", "singlestore_topic")); } } diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectionIT.java b/src/test/java/com/singlestore/debezium/SingleStoreConnectionIT.java similarity index 95% rename from src/test/java/com/singlestore/debezium/SingleStoreDBConnectionIT.java rename to src/test/java/com/singlestore/debezium/SingleStoreConnectionIT.java index 43c5a87..4d3cdce 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectionIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreConnectionIT.java @@ -22,11 +22,11 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; -public class SingleStoreDBConnectionIT extends IntegrationTestBase { +public class SingleStoreConnectionIT extends IntegrationTestBase { @Test public void testConnection() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { conn.connect(); assertTrue(conn.isConnected()); assertTrue(conn.isValid()); @@ -40,7 +40,7 @@ public void testConnection() { @Test public void testPrepareQuery() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { conn.execute("use " + TEST_DATABASE); conn.prepareQuery("insert into person values(?, ?, ?, ?, ?)", ps -> { ps.setString(1, "product4"); @@ -63,7 +63,7 @@ public void testPrepareQuery() { @Test public void testGetCurrentTimeStamp() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Optional timeStamp = conn.getCurrentTimestamp(); assertTrue(timeStamp.isPresent()); } catch (SQLException e) { @@ -73,7 +73,7 @@ public void testGetCurrentTimeStamp() { @Test public void testMetadata() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Set tableIds = conn.readAllTableNames(new String[]{"TABLE", "VIEW"}).stream().filter(t -> t.catalog().equals(TEST_DATABASE)).collect(Collectors.toSet()); Set tableNames = tableIds.stream().map(TableId::table).collect(Collectors.toSet()); assertEquals("readAllTableNames returns a wrong number of tables", 5, tableIds.size()); @@ -96,7 +96,7 @@ public void testMetadata() { @Test public void testReadSchemaMetadata() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); assertThat(tables.size()).isEqualTo(5); @@ -224,7 +224,7 @@ public void testReadSchemaMetadata() { @Test public void testObserve() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { String tempTableName = "person_temporary"; conn.execute("USE " + TEST_DATABASE, "DROP TABLE IF EXISTS " + tempTableName, @@ -243,7 +243,7 @@ public void testObserve() { CountDownLatch latch1 = new CountDownLatch(1); CountDownLatch latch2 = new CountDownLatch(1); Thread observer = new Thread(() -> { - try (SingleStoreDBConnection observerConn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection observerConn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Set tableIds = observerConn.readAllTableNames(new String[]{"TABLE", "VIEW"}).stream().filter(t -> t.catalog().equals(TEST_DATABASE)).collect(Collectors.toSet()); Set person = tableIds.stream().filter(t -> t.table().equals(tempTableName)).collect(Collectors.toSet()); observerConn.observe(person, rs -> { diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectionTest.java b/src/test/java/com/singlestore/debezium/SingleStoreConnectionTest.java similarity index 64% rename from src/test/java/com/singlestore/debezium/SingleStoreDBConnectionTest.java rename to src/test/java/com/singlestore/debezium/SingleStoreConnectionTest.java index 66ff4db..4bd7a7e 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectionTest.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreConnectionTest.java @@ -18,19 +18,19 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; -public class SingleStoreDBConnectionTest { +public class SingleStoreConnectionTest { @Test public void testJdbcParameters() { - SingleStoreDBConnection connection = createConnectionWithParams(Map.of(SingleStoreDBConnectorConfig.DRIVER_PARAMETERS, "param1=value1;param2=value2;param3=value3")); + SingleStoreConnection connection = createConnectionWithParams(Map.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS, "param1=value1;param2=value2;param3=value3")); assertEquals(Map.of("param1", "value1", "param2", "value2", "param3", "value3"), connection.connectionConfig().driverParameters()); } @Test public void testSslDisabledParams() { - SingleStoreDBConnection connection = createConnectionWithParams( - Map.of(SingleStoreDBConnectorConfig.SSL_TRUSTSTORE, "trustStorePath", SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "pass", - SingleStoreDBConnectorConfig.SSL_KEYSTORE, "keyStorePath", SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD, "pass")); + SingleStoreConnection connection = createConnectionWithParams( + Map.of(SingleStoreConnectorConfig.SSL_TRUSTSTORE, "trustStorePath", SingleStoreConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "pass", + SingleStoreConnectorConfig.SSL_KEYSTORE, "keyStorePath", SingleStoreConnectorConfig.SSL_KEYSTORE_PASSWORD, "pass")); assertNull(connection.connectionConfig().config().getString("trustStorePassword")); assertNull(connection.connectionConfig().config().getString("keyStorePassword")); assertNull(connection.connectionConfig().config().getString("keyStore")); @@ -39,9 +39,9 @@ public void testSslDisabledParams() { @Test public void testSslVerifyParams() { - SingleStoreDBConnection connection = createConnectionWithParams( - Map.of(SingleStoreDBConnectorConfig.SSL_MODE, "verify_ca", SingleStoreDBConnectorConfig.SSL_TRUSTSTORE, "trustStorePath", SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "pass", - SingleStoreDBConnectorConfig.SSL_KEYSTORE, "keyStorePath", SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD, "pass")); + SingleStoreConnection connection = createConnectionWithParams( + Map.of(SingleStoreConnectorConfig.SSL_MODE, "verify_ca", SingleStoreConnectorConfig.SSL_TRUSTSTORE, "trustStorePath", SingleStoreConnectorConfig.SSL_TRUSTSTORE_PASSWORD, "pass", + SingleStoreConnectorConfig.SSL_KEYSTORE, "keyStorePath", SingleStoreConnectorConfig.SSL_KEYSTORE_PASSWORD, "pass")); assertEquals("pass", connection.connectionConfig().config().getString("trustStorePassword")); assertEquals("pass", connection.connectionConfig().config().getString("keyStorePassword")); assertEquals("file:keyStorePath", connection.connectionConfig().config().getString("keyStore")); @@ -50,13 +50,13 @@ public void testSslVerifyParams() { @Test public void testQueryFetchSizeParam() { - SingleStoreDBConnection connection = createConnectionWithParams(Collections.emptyMap()); + SingleStoreConnection connection = createConnectionWithParams(Collections.emptyMap()); assertEquals("1", connection.connectionConfig().config().getString("defaultFetchSize")); } @Test public void testObserveNoParams() throws SQLException { - SingleStoreDBConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); + SingleStoreConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); doReturn(connection).when(connection).query(anyString(), any()); connection.observe(Collections.emptySet(), rs -> { }); @@ -65,7 +65,7 @@ public void testObserveNoParams() throws SQLException { @Test public void testObserveWithTableAndColumnFilter() throws SQLException { - SingleStoreDBConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); + SingleStoreConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); doReturn(connection).when(connection).query(anyString(), any()); connection.observe( Set.of(ColumnId.parse("debezium.table1.field1"), ColumnId.parse("debezium.table2.field1")), @@ -76,12 +76,12 @@ public void testObserveWithTableAndColumnFilter() throws SQLException { @Test public void testObserveWithFilter() throws SQLException { - SingleStoreDBConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); + SingleStoreConnection connection = spy(createConnectionWithParams(Collections.emptyMap())); doReturn(connection).when(connection).query(anyString(), any()); connection.observe( Set.of(ColumnId.parse("debezium.table1.field1"), ColumnId.parse("debezium.table2.field1")), Set.of(TableId.parse("debezium.table1"), TableId.parse("debezium.table2")), - Optional.of(SingleStoreDBConnection.OBSERVE_OUTPUT_FORMAT.JSON), + Optional.of(SingleStoreConnection.OBSERVE_OUTPUT_FORMAT.JSON), Optional.empty(), Optional.of("(1, 2, NULL, 4)"), Optional.of("`table1`.`filed1`=1"), @@ -90,13 +90,13 @@ public void testObserveWithFilter() throws SQLException { verify(connection).query(matches("OBSERVE `debezium`.`table[12]`.`field1`,`debezium`.`table[21]`.`field1` FROM `debezium`.`table[12]`,`debezium`.`table[12]` AS JSON BEGIN AT \\(1, 2, NULL, 4\\) WHERE `table1`.`filed1`=1"), any()); } - private SingleStoreDBConnection createConnectionWithParams(Map fieldMap) { + private SingleStoreConnection createConnectionWithParams(Map fieldMap) { JdbcConfiguration.Builder builder = JdbcConfiguration.create() - .withDefault(SingleStoreDBConnectorConfig.HOSTNAME, "localhost") - .withDefault(SingleStoreDBConnectorConfig.PORT, 3306) - .withDefault(SingleStoreDBConnectorConfig.USER, "root") - .withDefault(SingleStoreDBConnectorConfig.PASSWORD, ""); + .withDefault(SingleStoreConnectorConfig.HOSTNAME, "localhost") + .withDefault(SingleStoreConnectorConfig.PORT, 3306) + .withDefault(SingleStoreConnectorConfig.USER, "root") + .withDefault(SingleStoreConnectorConfig.PASSWORD, ""); fieldMap.forEach(builder::with); - return new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(builder.build())); + return new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(builder.build())); } } diff --git a/src/test/java/com/singlestore/debezium/SingleStoreConnectorIT.java b/src/test/java/com/singlestore/debezium/SingleStoreConnectorIT.java new file mode 100644 index 0000000..b405314 --- /dev/null +++ b/src/test/java/com/singlestore/debezium/SingleStoreConnectorIT.java @@ -0,0 +1,197 @@ +package com.singlestore.debezium; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; + +import com.singlestore.debezium.SingleStoreConnectorConfig.SecureConnectionMode; +import com.singlestore.debezium.SingleStoreConnectorConfig.SnapshotMode; +import io.debezium.config.CommonConnectorConfig; +import io.debezium.config.Configuration; +import io.debezium.config.EnumeratedValue; +import io.debezium.config.Field; +import io.debezium.jdbc.TemporalPrecisionMode; +import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; +import io.debezium.schema.AbstractTopicNamingStrategy; +import io.debezium.schema.DefaultTopicNamingStrategy; +import io.debezium.schema.DefaultUnicodeTopicNamingStrategy; +import java.sql.SQLException; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.config.Config; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +public class SingleStoreConnectorIT extends IntegrationTestBase { + + @Test + public void shouldFailToValidateInvalidConfiguration() { + Configuration config = Configuration.create().build(); + SingleStoreConnector connector = new SingleStoreConnector(); + Config result = connector.validate(config.asMap()); + + assertConfigurationErrors(result, SingleStoreConnectorConfig.DATABASE_NAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.TABLE_NAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.HOSTNAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.USER, 1); + assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.PORT); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.COLUMN_EXCLUDE_LIST); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.CONNECTION_TIMEOUT_MS); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.MAX_QUEUE_SIZE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.MAX_BATCH_SIZE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.POLL_INTERVAL_MS); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SNAPSHOT_MODE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SSL_MODE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SSL_KEYSTORE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SSL_KEYSTORE_PASSWORD); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SSL_TRUSTSTORE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.SSL_TRUSTSTORE_PASSWORD); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.DECIMAL_HANDLING_MODE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.TIME_PRECISION_MODE); + assertNoConfigurationErrors(result, SingleStoreConnectorConfig.POPULATE_INTERNAL_ID); + } + + @Test + public void shouldValidateAcceptableConfiguration() { + Configuration config = Configuration.create().build(); + SingleStoreConnector connector = new SingleStoreConnector(); + Config result = connector.validate(config.asMap()); + + // validate that the required fields have errors + assertConfigurationErrors(result, SingleStoreConnectorConfig.DATABASE_NAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.TABLE_NAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.HOSTNAME, 1); + assertConfigurationErrors(result, SingleStoreConnectorConfig.USER, 1); + assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); + + // validate the non required fields + validateConfigField(result, SingleStoreConnectorConfig.PORT, 3306); + validateConfigField(result, SingleStoreConnectorConfig.PASSWORD, null); + validateConfigField(result, SingleStoreConnectorConfig.COLUMN_EXCLUDE_LIST, null); + validateConfigField(result, SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST, null); + validateConfigField(result, SingleStoreConnectorConfig.CONNECTION_TIMEOUT_MS, 30000); + validateConfigField(result, SingleStoreConnectorConfig.MAX_QUEUE_SIZE, 8192); + validateConfigField(result, SingleStoreConnectorConfig.MAX_BATCH_SIZE, 2048); + validateConfigField(result, SingleStoreConnectorConfig.POLL_INTERVAL_MS, 500L); + validateConfigField(result, SingleStoreConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL); + validateConfigField(result, SingleStoreConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLE); + validateConfigField(result, SingleStoreConnectorConfig.SSL_KEYSTORE, null); + validateConfigField(result, SingleStoreConnectorConfig.SSL_KEYSTORE_PASSWORD, null); + validateConfigField(result, SingleStoreConnectorConfig.SSL_TRUSTSTORE, null); + validateConfigField(result, SingleStoreConnectorConfig.SSL_TRUSTSTORE_PASSWORD, null); + validateConfigField(result, SingleStoreConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE); + validateConfigField(result, SingleStoreConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE); + validateConfigField(result, SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultTopicNamingStrategy.class.getName()); + validateConfigField(result, SingleStoreConnectorConfig.POPULATE_INTERNAL_ID, false); + } + + private void validateConfigField(Config config, Field field, T expectedValue) { + assertNoConfigurationErrors(config, field); + Object actualValue = configValue(config, field.name()).value(); + if (actualValue == null) { + actualValue = field.defaultValue(); + } + if (expectedValue == null) { + assertThat(actualValue).isNull(); + } + else { + if (expectedValue instanceof EnumeratedValue) { + assertThat(((EnumeratedValue) expectedValue).getValue()).isEqualTo(actualValue.toString()); + } + else { + assertThat(expectedValue).isEqualTo(actualValue); + } + } + } + + @Test + public void successfullConfigValidation() { + SingleStoreConnector connector = new SingleStoreConnector(); + Config validatedConfig = connector.validate(defaultJdbcConfigWithTable("person").asMap()); + + assertNoConfigurationErrors(validatedConfig, SingleStoreConnectorConfig.ALL_FIELDS.asArray()); + } + + @Test + public void configWrongCredentials() { + SingleStoreConnector connector = new SingleStoreConnector(); + Map config = defaultJdbcConfigWithTable("person").asMap(); + config.put("database.hostname", "wrongHost"); + + Config validatedConfig = connector.validate(config); + assertConfigurationErrors(validatedConfig, SingleStoreConnectorConfig.HOSTNAME, 1); + } + + @Test + public void configMissingDB() { + SingleStoreConnector connector = new SingleStoreConnector(); + Map config = defaultJdbcConfigWithTable("person").asMap(); + config.put("database.dbname", null); + + Config validatedConfig = connector.validate(config); + assertConfigurationErrors(validatedConfig, SingleStoreConnectorConfig.DATABASE_NAME, 1); + } + + @Test + public void configMissingTable() { + SingleStoreConnector connector = new SingleStoreConnector(); + Map config = defaultJdbcConfigWithTable("person").asMap(); + config.put("database.table", null); + + Config validatedConfig = connector.validate(config); + assertConfigurationErrors(validatedConfig, SingleStoreConnectorConfig.TABLE_NAME, 1); + } + + + @Test + public void testTopicOptions() throws SQLException, InterruptedException { + try (SingleStoreConnection conn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit() + .with(SingleStoreConnectorConfig.TOPIC_PREFIX, "prefix") + .with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_") + .build(); + + start(SingleStoreConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + assertEquals("prefix_db_product", records.get(0).topic()); + } finally { + stopConnector(); + } + } + } + + @Test + public void testTopicNamingStrategy() throws SQLException, InterruptedException { + try (SingleStoreConnection conn = new SingleStoreConnection( + defaultJdbcConnectionConfigWithTable("product"))) { + Configuration config = defaultJdbcConfigWithTable("product"); + config = config.edit() + .with(SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultUnicodeTopicNamingStrategy.class.getName()) + + .build(); + + start(SingleStoreConnector.class, config); + assertConnectorIsRunning(); + + try { + conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); + + List records = consumeRecordsByTopic(1).allRecordsInOrder(); + assertEquals(1, records.size()); + assertEquals("singlestore_u005ftopic.db.product", records.get(0).topic()); + } finally { + stopConnector(); + } + } + } +} diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java deleted file mode 100644 index cc7567a..0000000 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBConnectorIT.java +++ /dev/null @@ -1,197 +0,0 @@ -package com.singlestore.debezium; - -import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.Assert.assertEquals; - -import com.singlestore.debezium.SingleStoreDBConnectorConfig.SecureConnectionMode; -import com.singlestore.debezium.SingleStoreDBConnectorConfig.SnapshotMode; -import io.debezium.config.CommonConnectorConfig; -import io.debezium.config.Configuration; -import io.debezium.config.EnumeratedValue; -import io.debezium.config.Field; -import io.debezium.jdbc.TemporalPrecisionMode; -import io.debezium.relational.RelationalDatabaseConnectorConfig.DecimalHandlingMode; -import io.debezium.schema.AbstractTopicNamingStrategy; -import io.debezium.schema.DefaultTopicNamingStrategy; -import io.debezium.schema.DefaultUnicodeTopicNamingStrategy; -import java.sql.SQLException; -import java.util.List; -import java.util.Map; -import org.apache.kafka.common.config.Config; -import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; - -public class SingleStoreDBConnectorIT extends IntegrationTestBase { - - @Test - public void shouldFailToValidateInvalidConfiguration() { - Configuration config = Configuration.create().build(); - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Config result = connector.validate(config.asMap()); - - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.DATABASE_NAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.TABLE_NAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.HOSTNAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.USER, 1); - assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.PORT); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.MAX_QUEUE_SIZE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.MAX_BATCH_SIZE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.POLL_INTERVAL_MS); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SNAPSHOT_MODE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_MODE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.DECIMAL_HANDLING_MODE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.TIME_PRECISION_MODE); - assertNoConfigurationErrors(result, SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID); - } - - @Test - public void shouldValidateAcceptableConfiguration() { - Configuration config = Configuration.create().build(); - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Config result = connector.validate(config.asMap()); - - // validate that the required fields have errors - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.DATABASE_NAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.TABLE_NAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.HOSTNAME, 1); - assertConfigurationErrors(result, SingleStoreDBConnectorConfig.USER, 1); - assertConfigurationErrors(result, CommonConnectorConfig.TOPIC_PREFIX, 1); - - // validate the non required fields - validateConfigField(result, SingleStoreDBConnectorConfig.PORT, 3306); - validateConfigField(result, SingleStoreDBConnectorConfig.PASSWORD, null); - validateConfigField(result, SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST, null); - validateConfigField(result, SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, null); - validateConfigField(result, SingleStoreDBConnectorConfig.CONNECTION_TIMEOUT_MS, 30000); - validateConfigField(result, SingleStoreDBConnectorConfig.MAX_QUEUE_SIZE, 8192); - validateConfigField(result, SingleStoreDBConnectorConfig.MAX_BATCH_SIZE, 2048); - validateConfigField(result, SingleStoreDBConnectorConfig.POLL_INTERVAL_MS, 500L); - validateConfigField(result, SingleStoreDBConnectorConfig.SNAPSHOT_MODE, SnapshotMode.INITIAL); - validateConfigField(result, SingleStoreDBConnectorConfig.SSL_MODE, SecureConnectionMode.DISABLE); - validateConfigField(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE, null); - validateConfigField(result, SingleStoreDBConnectorConfig.SSL_KEYSTORE_PASSWORD, null); - validateConfigField(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE, null); - validateConfigField(result, SingleStoreDBConnectorConfig.SSL_TRUSTSTORE_PASSWORD, null); - validateConfigField(result, SingleStoreDBConnectorConfig.DECIMAL_HANDLING_MODE, DecimalHandlingMode.PRECISE); - validateConfigField(result, SingleStoreDBConnectorConfig.TIME_PRECISION_MODE, TemporalPrecisionMode.ADAPTIVE); - validateConfigField(result, SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultTopicNamingStrategy.class.getName()); - validateConfigField(result, SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID, false); - } - - private void validateConfigField(Config config, Field field, T expectedValue) { - assertNoConfigurationErrors(config, field); - Object actualValue = configValue(config, field.name()).value(); - if (actualValue == null) { - actualValue = field.defaultValue(); - } - if (expectedValue == null) { - assertThat(actualValue).isNull(); - } - else { - if (expectedValue instanceof EnumeratedValue) { - assertThat(((EnumeratedValue) expectedValue).getValue()).isEqualTo(actualValue.toString()); - } - else { - assertThat(expectedValue).isEqualTo(actualValue); - } - } - } - - @Test - public void successfullConfigValidation() { - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Config validatedConfig = connector.validate(defaultJdbcConfigWithTable("person").asMap()); - - assertNoConfigurationErrors(validatedConfig, SingleStoreDBConnectorConfig.ALL_FIELDS.asArray()); - } - - @Test - public void configWrongCredentials() { - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Map config = defaultJdbcConfigWithTable("person").asMap(); - config.put("database.hostname", "wrongHost"); - - Config validatedConfig = connector.validate(config); - assertConfigurationErrors(validatedConfig, SingleStoreDBConnectorConfig.HOSTNAME, 1); - } - - @Test - public void configMissingDB() { - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Map config = defaultJdbcConfigWithTable("person").asMap(); - config.put("database.dbname", null); - - Config validatedConfig = connector.validate(config); - assertConfigurationErrors(validatedConfig, SingleStoreDBConnectorConfig.DATABASE_NAME, 1); - } - - @Test - public void configMissingTable() { - SingleStoreDBConnector connector = new SingleStoreDBConnector(); - Map config = defaultJdbcConfigWithTable("person").asMap(); - config.put("database.table", null); - - Config validatedConfig = connector.validate(config); - assertConfigurationErrors(validatedConfig, SingleStoreDBConnectorConfig.TABLE_NAME, 1); - } - - - @Test - public void testTopicOptions() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( - defaultJdbcConnectionConfigWithTable("product"))) { - Configuration config = defaultJdbcConfigWithTable("product"); - config = config.edit() - .with(SingleStoreDBConnectorConfig.TOPIC_PREFIX, "prefix") - .with(AbstractTopicNamingStrategy.TOPIC_DELIMITER, "_") - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); - - List records = consumeRecordsByTopic(1).allRecordsInOrder(); - assertEquals(1, records.size()); - assertEquals("prefix_db_product", records.get(0).topic()); - } finally { - stopConnector(); - } - } - } - - @Test - public void testTopicNamingStrategy() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( - defaultJdbcConnectionConfigWithTable("product"))) { - Configuration config = defaultJdbcConfigWithTable("product"); - config = config.edit() - .with(SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY, DefaultUnicodeTopicNamingStrategy.class.getName()) - - .build(); - - start(SingleStoreDBConnector.class, config); - assertConnectorIsRunning(); - - try { - conn.execute("INSERT INTO `product` (`id`) VALUES (1)"); - - List records = consumeRecordsByTopic(1).allRecordsInOrder(); - assertEquals(1, records.size()); - assertEquals("singlestore_u005ftopic.db.product", records.get(0).topic()); - } finally { - stopConnector(); - } - } - } -} diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBPartitionTest.java b/src/test/java/com/singlestore/debezium/SingleStoreDBPartitionTest.java deleted file mode 100644 index a2aded8..0000000 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBPartitionTest.java +++ /dev/null @@ -1,16 +0,0 @@ -package com.singlestore.debezium; - -import io.debezium.connector.common.AbstractPartitionTest; - - -public class SingleStoreDBPartitionTest extends AbstractPartitionTest { - @Override - protected SingleStoreDBPartition createPartition1() { - return new SingleStoreDBPartition("server1", "database1"); - } - - @Override - protected SingleStoreDBPartition createPartition2() { - return new SingleStoreDBPartition("server2", "database2"); - } -} diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBDatabaseSchemaIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java similarity index 81% rename from src/test/java/com/singlestore/debezium/SingleStoreDBDatabaseSchemaIT.java rename to src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java index 71af346..c0ac07f 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBDatabaseSchemaIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDatabaseSchemaIT.java @@ -31,40 +31,40 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; -public class SingleStoreDBDatabaseSchemaIT extends IntegrationTestBase { +public class SingleStoreDatabaseSchemaIT extends IntegrationTestBase { - private static final SingleStoreDBValueConverters CONVERTERS = new SingleStoreDBValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, + private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); - private SingleStoreDBDatabaseSchema schema; + private SingleStoreDatabaseSchema schema; @Test public void testKeySchema() { - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("allTypesTable"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("allTypesTable"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("allTypesTable"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("allTypesTable"))) { schema.refresh(conn); assertKeySchema("db.allTypesTable", "intColumn", SchemaBuilder.int32().defaultValue(2147483647).optional().build());//as unique index } catch (SQLException e) { Assert.fail(e.getMessage()); } - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("person"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("person"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("person"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("person"))) { schema.refresh(conn); assertKeySchema("db.person", "name", SchemaBuilder.string().required().build()); } catch (SQLException e) { Assert.fail(e.getMessage()); } - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("product"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("product"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("product"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("product"))) { schema.refresh(conn); assertKeySchema("db.product", "id", SchemaBuilder.int32().required().build()); } catch (SQLException e) { Assert.fail(e.getMessage()); } - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("purchased"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("purchased"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("purchased"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("purchased"))) { schema.refresh(conn); assertKeySchema("db.purchased", "productId,purchaser", SchemaBuilder.int32().required().build(), SchemaBuilder.string().required().build()); @@ -75,8 +75,8 @@ public void testKeySchema() { @Test public void testTableSchema() { - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("person"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("person"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("person"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("person"))) { schema.refresh(conn); assertTablesIncluded("db.person"); assertTableSchema("db.person", "name, birthdate, age, salary, bitStr", @@ -90,8 +90,8 @@ public void testTableSchema() { Assert.fail(e.getMessage()); } - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("product"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("product"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("product"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("product"))) { schema.refresh(conn); assertTablesIncluded("db.product"); assertTableSchema("db.product", "id, createdByDate, modifiedDate", @@ -105,8 +105,8 @@ public void testTableSchema() { Assert.fail(e.getMessage()); } - schema = getSchema(new SingleStoreDBConnectorConfig(defaultJdbcConfigWithTable("allTypesTable"))); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfigWithTable("allTypesTable"))) { + schema = getSchema(new SingleStoreConnectorConfig(defaultJdbcConfigWithTable("allTypesTable"))); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfigWithTable("allTypesTable"))) { schema.refresh(conn); assertTablesIncluded("db.allTypesTable"); assertTableSchema("db.allTypesTable", "boolColumn, booleanColumn, bitColumn, tinyintColumn, mediumintColumn, " + @@ -178,11 +178,11 @@ public void testUpdateTableSchema() { "CREATE TABLE d3.B (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));"; execute(statements); Configuration configuration = defaultJdbcConfigBuilder() - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "d3") - .with(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "d3") + .with(SingleStoreConnectorConfig.TABLE_NAME, "A") .build(); - schema = getSchema(new SingleStoreDBConnectorConfig(configuration)); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(configuration))) { + schema = getSchema(new SingleStoreConnectorConfig(configuration)); + try (SingleStoreConnection conn = new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(configuration))) { schema.refresh(conn); assertTablesIncluded("d3.A"); assertTablesExcluded("d3.B"); @@ -199,8 +199,8 @@ public void testUpdateTableSchema() { "ALTER TABLE d3.A DROP COLUMN aa;" + "ALTER TABLE d3.A ADD COLUMN ac CHAR(1) default 'a';"; execute(updateStatements); - schema = getSchema(new SingleStoreDBConnectorConfig(configuration)); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(configuration))) { + schema = getSchema(new SingleStoreConnectorConfig(configuration)); + try (SingleStoreConnection conn = new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(configuration))) { schema.refresh(conn); assertTablesIncluded("d3.A"); assertTablesExcluded("d3.B"); @@ -222,24 +222,24 @@ public void testApplyFilters() { "CREATE TABLE d1.A (pk INT, aa VARCHAR(10), PRIMARY KEY(pk));"; execute(statements); Configuration configuration = defaultJdbcConfigBuilder() - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "d1") - .with(SingleStoreDBConnectorConfig.TABLE_NAME, "A") - .with(SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST, ".*aa") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "d1") + .with(SingleStoreConnectorConfig.TABLE_NAME, "A") + .with(SingleStoreConnectorConfig.COLUMN_EXCLUDE_LIST, ".*aa") .build(); - schema = getSchema(new SingleStoreDBConnectorConfig(configuration)); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(configuration))) { + schema = getSchema(new SingleStoreConnectorConfig(configuration)); + try (SingleStoreConnection conn = new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(configuration))) { schema.refresh(conn); assertColumnsExcluded("d1.A.aa"); } catch (SQLException e) { Assert.fail(e.getMessage()); } configuration = defaultJdbcConfigBuilder() - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "d1") - .with(SingleStoreDBConnectorConfig.TABLE_NAME, "A") - .with(SingleStoreDBConnectorConfig.COLUMN_EXCLUDE_LIST, ".*p.*") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "d1") + .with(SingleStoreConnectorConfig.TABLE_NAME, "A") + .with(SingleStoreConnectorConfig.COLUMN_EXCLUDE_LIST, ".*p.*") .build(); - schema = getSchema(new SingleStoreDBConnectorConfig(configuration)); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(configuration))) { + schema = getSchema(new SingleStoreConnectorConfig(configuration)); + try (SingleStoreConnection conn = new SingleStoreConnection(new SingleStoreConnection.SingleStoreConnectionConfiguration(configuration))) { schema.refresh(conn); assertColumnsExcluded("d1.A.pk"); } catch (SQLException e) { @@ -247,12 +247,12 @@ public void testApplyFilters() { } } - public static SingleStoreDBDatabaseSchema getSchema(SingleStoreDBConnectorConfig config) { - return new SingleStoreDBDatabaseSchema( + public static SingleStoreDatabaseSchema getSchema(SingleStoreConnectorConfig config) { + return new SingleStoreDatabaseSchema( config, CONVERTERS, - new SingleStoreDBDefaultValueConverter(CONVERTERS), - config.getTopicNamingStrategy(SingleStoreDBConnectorConfig.TOPIC_NAMING_STRATEGY), + new SingleStoreDefaultValueConverter(CONVERTERS), + config.getTopicNamingStrategy(SingleStoreConnectorConfig.TOPIC_NAMING_STRATEGY), false); } diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverterIT.java b/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java similarity index 81% rename from src/test/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverterIT.java rename to src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java index 1bd20df..3fbf8f3 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBDefaultValueConverterIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreDefaultValueConverterIT.java @@ -23,19 +23,19 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.*; -public class SingleStoreDBDefaultValueConverterIT extends IntegrationTestBase { +public class SingleStoreDefaultValueConverterIT extends IntegrationTestBase { - private static final SingleStoreDBValueConverters CONVERTERS = new SingleStoreDBValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, + private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.ADAPTIVE, CommonConnectorConfig.BinaryHandlingMode.BYTES); @Test public void testNumberValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); - SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(CONVERTERS); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(CONVERTERS); testColumn(defaultValueConverter, table, "tinyintColumn", (short) 124); testColumn(defaultValueConverter, table, "smallintColumn", (short) 32767); testColumn(defaultValueConverter, table, "mediumintColumn", 8388607); @@ -52,12 +52,12 @@ public void testNumberValues() { @Test public void testDefaultTimeAndDateValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); - SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(CONVERTERS); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(CONVERTERS); testColumn(defaultValueConverter, table, "dateColumn", (int) LocalDate.of(2000, 10, 10).atStartOfDay(ZoneId.of("UTC")).toEpochSecond() / 60 / 60 / 24);//epoch days testColumn(defaultValueConverter, table, "timeColumn", (int) Date.from(LocalDate.EPOCH.atTime(22, 59, 59).atZone(ZoneId.of("UTC")).toInstant()).getTime()); testColumn(defaultValueConverter, table, "time6Column", Date.from(LocalDate.EPOCH.atTime(22, 59, 59, 111111).atZone(ZoneId.of("UTC")).toInstant()).getTime() * 1_000 + 111111); @@ -77,22 +77,22 @@ public void testDefaultTimeAndDateValues() { @Test @Ignore //todo enable after PLAT-6817 is resolved public void testGeometryValues() throws ParseException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); - SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(CONVERTERS); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(CONVERTERS); Column geographyColumn = table.columnWithName("geographyColumn"); Optional geographyDefaultValue = defaultValueConverter.parseDefaultValue(geographyColumn, geographyColumn.defaultValueExpression().orElse(null)); assertTrue(geographyDefaultValue.isPresent()); - SingleStoreDBGeometry geographyValue = SingleStoreDBGeometry.fromEkt("POLYGON((1 1,2 1,2 2, 1 2, 1 1))"); + SingleStoreGeometry geographyValue = SingleStoreGeometry.fromEkt("POLYGON((1 1,2 1,2 2, 1 2, 1 1))"); Struct geographyColumnDefaultValue = (Struct) geographyDefaultValue.get(); assertArrayEquals(geographyValue.getWkb(), (byte[]) geographyColumnDefaultValue.get("wkb")); Column geographypointColumn = table.columnWithName("geographypointColumn"); Optional geographypointDefaultValue = defaultValueConverter.parseDefaultValue(geographypointColumn, geographypointColumn.defaultValueExpression().orElse(null)); assertTrue(geographypointDefaultValue.isPresent()); - SingleStoreDBGeometry geographyPointValue = SingleStoreDBGeometry.fromEkt("POINT(1.50000003 1.50000000)"); + SingleStoreGeometry geographyPointValue = SingleStoreGeometry.fromEkt("POINT(1.50000003 1.50000000)"); Struct geographypointColumnDefaultValue = (Struct) geographypointDefaultValue.get(); assertArrayEquals(geographyPointValue.getWkb(), (byte[]) geographypointColumnDefaultValue.get("wkb")); } catch (SQLException e) { @@ -102,12 +102,12 @@ public void testGeometryValues() throws ParseException { @Test public void testStringValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); - SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(CONVERTERS); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(CONVERTERS); testColumn(defaultValueConverter, table, "jsonColumn", "{}"); testColumn(defaultValueConverter, table, "enum_f", "val1"); testColumn(defaultValueConverter, table, "set_f", "v1"); @@ -124,12 +124,12 @@ public void testStringValues() { @Test public void testBlobValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); - SingleStoreDBDefaultValueConverter defaultValueConverter = new SingleStoreDBDefaultValueConverter(CONVERTERS); + SingleStoreDefaultValueConverter defaultValueConverter = new SingleStoreDefaultValueConverter(CONVERTERS); testColumn(defaultValueConverter, table, "blobColumn", ByteBuffer.wrap("abc".getBytes())); testColumn(defaultValueConverter, table, "longblobColumn", ByteBuffer.wrap("abc".getBytes())); testColumn(defaultValueConverter, table, "mediumblobColumn", ByteBuffer.wrap("abc".getBytes())); @@ -139,7 +139,7 @@ public void testBlobValues() { } } - private static void testColumn(SingleStoreDBDefaultValueConverter defaultValueConverter, Table table, String name, Object expectedValue) { + private static void testColumn(SingleStoreDefaultValueConverter defaultValueConverter, Table table, String name, Object expectedValue) { Column column = table.columnWithName(name); Optional defaultValue = defaultValueConverter.parseDefaultValue(column, column.defaultValueExpression().orElse(null)); assertTrue(defaultValue.isPresent()); diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBEventMetadataProviderTest.java b/src/test/java/com/singlestore/debezium/SingleStoreEventMetadataProviderTest.java similarity index 78% rename from src/test/java/com/singlestore/debezium/SingleStoreDBEventMetadataProviderTest.java rename to src/test/java/com/singlestore/debezium/SingleStoreEventMetadataProviderTest.java index edd2828..8cbacf9 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBEventMetadataProviderTest.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreEventMetadataProviderTest.java @@ -17,22 +17,22 @@ import io.debezium.relational.TableId; import io.debezium.util.Collect; -public class SingleStoreDBEventMetadataProviderTest { +public class SingleStoreEventMetadataProviderTest { TableId table; - SingleStoreDBOffsetContext offsetContext; + SingleStoreOffsetContext offsetContext; Struct value; - SingleStoreDBEventMetadataProvider provider = new SingleStoreDBEventMetadataProvider(); + SingleStoreEventMetadataProvider provider = new SingleStoreEventMetadataProvider(); @Before public void init() { - SingleStoreDBConnectorConfig conf = new SingleStoreDBConnectorConfig( + SingleStoreConnectorConfig conf = new SingleStoreConnectorConfig( Configuration.create() .with(CommonConnectorConfig.TOPIC_PREFIX, "server") - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "database") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "database") .build()); - SingleStoreDBOffsetContext offsetContext = new SingleStoreDBOffsetContext(conf, null, null, Arrays.asList(null, null, null, null), false, false); + SingleStoreOffsetContext offsetContext = new SingleStoreOffsetContext(conf, null, null, Arrays.asList(null, null, null, null), false, false); table = TableId.parse("db.t", true); offsetContext.event(table, Instant.parse("2018-11-30T18:35:24.00Z")); diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java b/src/test/java/com/singlestore/debezium/SingleStoreOffsetContextTest.java similarity index 72% rename from src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java rename to src/test/java/com/singlestore/debezium/SingleStoreOffsetContextTest.java index effaab2..466827b 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBOffsetContextTest.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreOffsetContextTest.java @@ -10,22 +10,22 @@ import org.junit.Test; -import com.singlestore.debezium.SingleStoreDBOffsetContext.Loader; +import com.singlestore.debezium.SingleStoreOffsetContext.Loader; import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; import io.debezium.relational.TableId; -public class SingleStoreDBOffsetContextTest { +public class SingleStoreOffsetContextTest { @Test public void saveAndLoad() { - SingleStoreDBConnectorConfig conf = new SingleStoreDBConnectorConfig( + SingleStoreConnectorConfig conf = new SingleStoreConnectorConfig( Configuration.create() .with(CommonConnectorConfig.TOPIC_PREFIX, "server") - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "database") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "database") .build()); - SingleStoreDBOffsetContext offsetContext = new SingleStoreDBOffsetContext(conf, null, null, Arrays.asList(null, null, null, null), false, false); + SingleStoreOffsetContext offsetContext = new SingleStoreOffsetContext(conf, null, null, Arrays.asList(null, null, null, null), false, false); offsetContext.event(TableId.parse("db.t", true), Instant.parse("2018-11-30T18:35:24.00Z")); offsetContext.update(0, "0", "1"); @@ -35,8 +35,8 @@ public void saveAndLoad() { Map offset = (Map)offsetContext.getOffset(); - Loader loader = new SingleStoreDBOffsetContext.Loader(conf); - SingleStoreDBOffsetContext loadedOffsetContext = loader.load(offset); + Loader loader = new SingleStoreOffsetContext.Loader(conf); + SingleStoreOffsetContext loadedOffsetContext = loader.load(offset); assertEquals(loadedOffsetContext.partitionId(), (Integer)1); assertEquals(loadedOffsetContext.txId(), "3"); diff --git a/src/test/java/com/singlestore/debezium/SingleStorePartitionTest.java b/src/test/java/com/singlestore/debezium/SingleStorePartitionTest.java new file mode 100644 index 0000000..47fd53f --- /dev/null +++ b/src/test/java/com/singlestore/debezium/SingleStorePartitionTest.java @@ -0,0 +1,16 @@ +package com.singlestore.debezium; + +import io.debezium.connector.common.AbstractPartitionTest; + + +public class SingleStorePartitionTest extends AbstractPartitionTest { + @Override + protected SingleStorePartition createPartition1() { + return new SingleStorePartition("server1", "database1"); + } + + @Override + protected SingleStorePartition createPartition2() { + return new SingleStorePartition("server2", "database2"); + } +} diff --git a/src/test/java/com/singlestore/debezium/SingleStoreDBValueConvertersIT.java b/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java similarity index 85% rename from src/test/java/com/singlestore/debezium/SingleStoreDBValueConvertersIT.java rename to src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java index 7647dff..9c9d378 100644 --- a/src/test/java/com/singlestore/debezium/SingleStoreDBValueConvertersIT.java +++ b/src/test/java/com/singlestore/debezium/SingleStoreValueConvertersIT.java @@ -26,13 +26,13 @@ import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; -public class SingleStoreDBValueConvertersIT extends IntegrationTestBase { +public class SingleStoreValueConvertersIT extends IntegrationTestBase { - private static final SingleStoreDBValueConverters CONVERTERS = new SingleStoreDBValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); + private static final SingleStoreValueConverters CONVERTERS = new SingleStoreValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); @Test public void testNumberValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -66,8 +66,8 @@ public void testDecimalModeValues() { } private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) { - SingleStoreDBValueConverters converters = new SingleStoreDBValueConverters(mode, TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + SingleStoreValueConverters converters = new SingleStoreValueConverters(mode, TemporalPrecisionMode.CONNECT, CommonConnectorConfig.BinaryHandlingMode.BYTES); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -91,18 +91,18 @@ private void testDecimalModeValues(JdbcValueConverters.DecimalMode mode) { public void testGeometry() throws ParseException { String geographyValue = "POLYGON ((1 1, 2 1, 2 2, 1 2, 1 1))"; String geographyPointValue = "POINT(1.50000003 1.50000000)"; - SingleStoreDBGeometry singleStoreDBgeographyValue = SingleStoreDBGeometry.fromEkt(geographyValue); - SingleStoreDBGeometry singleStoreDBgeographyPointValue = SingleStoreDBGeometry.fromEkt(geographyPointValue); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + SingleStoreGeometry singleStoregeographyValue = SingleStoreGeometry.fromEkt(geographyValue); + SingleStoreGeometry singleStoregeographyPointValue = SingleStoreGeometry.fromEkt(geographyPointValue); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); assertThat(table).isNotNull(); // TODO: PLAT-6907 // Struct convertedPolygon = (Struct) convertColumnValue(CONVERTERS, table, "geographyColumn", geographyValue); - // assertArrayEquals((byte[]) convertedPolygon.get("wkb"), singleStoreDBgeographyValue.getWkb()); + // assertArrayEquals((byte[]) convertedPolygon.get("wkb"), singleStoregeographyValue.getWkb()); Struct convertedPoint = (Struct) convertColumnValue(CONVERTERS, table, "geographypointColumn", geographyPointValue); - assertArrayEquals((byte[]) convertedPoint.get("wkb"), singleStoreDBgeographyPointValue.getWkb()); + assertArrayEquals((byte[]) convertedPoint.get("wkb"), singleStoregeographyPointValue.getWkb()); } catch (SQLException e) { Assert.fail(e.getMessage()); } @@ -116,8 +116,8 @@ public void testTimeAndDateValues() { } private void testTimeAndDateValues(TemporalPrecisionMode mode) { - SingleStoreDBValueConverters converters = new SingleStoreDBValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, mode, CommonConnectorConfig.BinaryHandlingMode.BYTES); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + SingleStoreValueConverters converters = new SingleStoreValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, mode, CommonConnectorConfig.BinaryHandlingMode.BYTES); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -154,7 +154,7 @@ private void testTimeAndDateValues(TemporalPrecisionMode mode) { @Test public void testStringValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -173,7 +173,7 @@ public void testStringValues() { @Test public void testBlobValues() { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -198,8 +198,8 @@ public void testBinaryMode() { } private void testBinaryMode(CommonConnectorConfig.BinaryHandlingMode mode) { - SingleStoreDBValueConverters converters = new SingleStoreDBValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode); - try (SingleStoreDBConnection conn = new SingleStoreDBConnection(defaultJdbcConnectionConfig())) { + SingleStoreValueConverters converters = new SingleStoreValueConverters(JdbcValueConverters.DecimalMode.DOUBLE, TemporalPrecisionMode.CONNECT, mode); + try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) { Tables tables = new Tables(); conn.readSchema(tables, TEST_DATABASE, null, null, null, true); Table table = tables.forTable(TEST_DATABASE, null, "allTypesTable"); @@ -225,11 +225,11 @@ private void testBinaryMode(CommonConnectorConfig.BinaryHandlingMode mode) { } } - private static void testColumn(SingleStoreDBValueConverters converters, Table table, String name, Object valueToConvert, Object expectedConvertedValue) { + private static void testColumn(SingleStoreValueConverters converters, Table table, String name, Object valueToConvert, Object expectedConvertedValue) { assertEquals(expectedConvertedValue, convertColumnValue(converters, table, name, valueToConvert)); } - private static Object convertColumnValue(SingleStoreDBValueConverters converters, Table table, String name, Object valueToConvert) { + private static Object convertColumnValue(SingleStoreValueConverters converters, Table table, String name, Object valueToConvert) { Column column = table.columnWithName(name); Field field = new Field(column.name(), -1, converters.schemaBuilder(column).build()); return converters.converter(column, field).convert(valueToConvert); diff --git a/src/test/java/com/singlestore/debezium/SnapshotIT.java b/src/test/java/com/singlestore/debezium/SnapshotIT.java index af04987..42a5153 100644 --- a/src/test/java/com/singlestore/debezium/SnapshotIT.java +++ b/src/test/java/com/singlestore/debezium/SnapshotIT.java @@ -48,14 +48,14 @@ public void initTestData() { @Test public void testSnapshotA() throws Exception { final Configuration config = defaultJdbcConfigBuilder() - .withDefault(SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") + .withDefault(SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") .with(SinkNotificationChannel.NOTIFICATION_TOPIC, "io.debezium.notification") .with(CommonConnectorConfig.NOTIFICATION_ENABLED_CHANNELS, "sink") .build(); ; - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -94,7 +94,7 @@ public void testSnapshotA() throws Exception { public void testSnapshotB() throws Exception { final Configuration config = defaultJdbcConfigWithTable("B"); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -127,7 +127,7 @@ public void testSnapshotB() throws Exception { public void testSnapshotFilter() throws InterruptedException { final Configuration config = defaultJdbcConfigWithTable("B"); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); final SourceRecords recordsB = consumeRecordsByTopic(1); final List table2 = recordsB.recordsForTopic( @@ -151,16 +151,16 @@ protected int consumeRecords(int numberOfRecords, Consumer recordC @Test public void filterColumns() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("A"))) { Configuration config = defaultJdbcConfigWithTable("A"); config = config.edit() - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") - .withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, "db.A.aa") + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") + .withDefault(SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST, "db.A.aa") .build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -198,12 +198,12 @@ public void filterColumns() throws SQLException, InterruptedException { @Test public void testSnapshotDelay() throws Exception { final Configuration config = defaultJdbcConfigBuilder() - .withDefault(SingleStoreDBConnectorConfig.DATABASE_NAME, TEST_DATABASE) - .withDefault(SingleStoreDBConnectorConfig.TABLE_NAME, "A") - .withDefault(SingleStoreDBConnectorConfig.SNAPSHOT_DELAY_MS, 30000) + .withDefault(SingleStoreConnectorConfig.DATABASE_NAME, TEST_DATABASE) + .withDefault(SingleStoreConnectorConfig.TABLE_NAME, "A") + .withDefault(SingleStoreConnectorConfig.SNAPSHOT_DELAY_MS, 30000) .build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { diff --git a/src/test/java/com/singlestore/debezium/SourceInfoTest.java b/src/test/java/com/singlestore/debezium/SourceInfoTest.java index 833fec9..5bd3bdd 100644 --- a/src/test/java/com/singlestore/debezium/SourceInfoTest.java +++ b/src/test/java/com/singlestore/debezium/SourceInfoTest.java @@ -24,10 +24,10 @@ public class SourceInfoTest { @Before public void beforeEach() { - source = new SourceInfo(new SingleStoreDBConnectorConfig( + source = new SourceInfo(new SingleStoreConnectorConfig( Configuration.create() .with(CommonConnectorConfig.TOPIC_PREFIX, "server") - .with(SingleStoreDBConnectorConfig.DATABASE_NAME, "database") + .with(SingleStoreConnectorConfig.DATABASE_NAME, "database") .build()), 4); source.update(10, "123", Arrays.asList("1", "2", null, "3")); source.update(TableId.parse("db.t", true), Instant.parse("2018-11-30T18:35:24.00Z")); diff --git a/src/test/java/com/singlestore/debezium/StreamingIT.java b/src/test/java/com/singlestore/debezium/StreamingIT.java index d46e401..70f2666 100644 --- a/src/test/java/com/singlestore/debezium/StreamingIT.java +++ b/src/test/java/com/singlestore/debezium/StreamingIT.java @@ -24,11 +24,11 @@ public class StreamingIT extends IntegrationTestBase { @Test public void canReadAllTypes() throws SQLException, InterruptedException, ParseException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("allTypesTable"))) { Configuration config = defaultJdbcConfigWithTable("allTypesTable"); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -129,10 +129,10 @@ public void canReadAllTypes() throws SQLException, InterruptedException, ParseEx assertEquals("val1", after.get("enum_f")); assertEquals("v1", after.get("set_f")); String geographyPointValue = "POINT(1.50000003 1.50000000)"; - SingleStoreDBGeometry singleStoreDBgeographyPointValue = SingleStoreDBGeometry.fromEkt( + SingleStoreGeometry singleStoregeographyPointValue = SingleStoreGeometry.fromEkt( geographyPointValue); assertArrayEquals((byte[]) ((Struct) after.get("geographypointColumn")).get("wkb"), - singleStoreDBgeographyPointValue.getWkb()); + singleStoregeographyPointValue.getWkb()); } finally { stopConnector(); } @@ -141,11 +141,11 @@ public void canReadAllTypes() throws SQLException, InterruptedException, ParseEx @Test public void populatesSourceInfo() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("purchased"))) { Configuration config = defaultJdbcConfigWithTable("purchased"); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { conn.execute("INSERT INTO `purchased` VALUES ('archie', 1, NOW())"); @@ -155,7 +155,7 @@ public void populatesSourceInfo() throws SQLException, InterruptedException { Struct source = (Struct) ((Struct) record.value()).get("source"); assertEquals(source.get("version"), "1.0-SNAPSHOT"); - assertEquals(source.get("connector"), "singlestoredb"); + assertEquals(source.get("connector"), "singlestore"); assertEquals(source.get("name"), "singlestore_topic"); assertNotNull(source.get("ts_ms")); assertEquals(source.get("snapshot"), "true"); @@ -173,11 +173,11 @@ public void populatesSourceInfo() throws SQLException, InterruptedException { @Test public void noPrimaryKey() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("song"))) { Configuration config = defaultJdbcConfigWithTable("song"); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -216,12 +216,12 @@ public void noPrimaryKey() throws SQLException, InterruptedException { @Test public void readSeveralOperations() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("product"))) { Configuration config = defaultJdbcConfigWithTable("product"); config = config.edit().withDefault("tombstones.on.delete", "false").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -263,13 +263,13 @@ public void readSeveralOperations() throws SQLException, InterruptedException { @Test public void filterColumns() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("person"))) { Configuration config = defaultJdbcConfigWithTable("person"); - config = config.edit().withDefault(SingleStoreDBConnectorConfig.COLUMN_INCLUDE_LIST, + config = config.edit().withDefault(SingleStoreConnectorConfig.COLUMN_INCLUDE_LIST, "db.person.name,db.person.age").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -312,17 +312,17 @@ public void filterColumns() throws SQLException, InterruptedException { @Test public void internalId() throws SQLException, InterruptedException { - try (SingleStoreDBConnection createTableConn = new SingleStoreDBConnection( + try (SingleStoreConnection createTableConn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("product"))) { createTableConn.execute("CREATE TABLE internalIdTable(a INT)"); try { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("internalIdTable"))) { Configuration config = defaultJdbcConfigWithTable("internalIdTable"); config = config.edit() - .withDefault(SingleStoreDBConnectorConfig.POPULATE_INTERNAL_ID, "true").build(); + .withDefault(SingleStoreConnectorConfig.POPULATE_INTERNAL_ID, "true").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -348,13 +348,13 @@ public void internalId() throws SQLException, InterruptedException { @Test public void testSkippedOperations() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("product"))) { Configuration config = defaultJdbcConfigWithTable("product"); - config = config.edit().withDefault(SingleStoreDBConnectorConfig.SKIPPED_OPERATIONS, "c") - .withDefault(SingleStoreDBConnectorConfig.TOMBSTONES_ON_DELETE, "false").build(); + config = config.edit().withDefault(SingleStoreConnectorConfig.SKIPPED_OPERATIONS, "c") + .withDefault(SingleStoreConnectorConfig.TOMBSTONES_ON_DELETE, "false").build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try { @@ -383,13 +383,13 @@ public void testSkippedOperations() throws SQLException, InterruptedException { @Test public void testStreamAfterInitialOnlySnapshot() throws SQLException, InterruptedException { - try (SingleStoreDBConnection conn = new SingleStoreDBConnection( + try (SingleStoreConnection conn = new SingleStoreConnection( defaultJdbcConnectionConfigWithTable("product"))) { Configuration config = defaultJdbcConfigWithTable("product"); - config = config.edit().withDefault(SingleStoreDBConnectorConfig.SNAPSHOT_MODE, - SingleStoreDBConnectorConfig.SnapshotMode.INITIAL_ONLY).build(); + config = config.edit().withDefault(SingleStoreConnectorConfig.SNAPSHOT_MODE, + SingleStoreConnectorConfig.SnapshotMode.INITIAL_ONLY).build(); - start(SingleStoreDBConnector.class, config); + start(SingleStoreConnector.class, config); assertConnectorIsRunning(); try {