Skip to content

Commit

Permalink
Merge pull request #14 from singlestore-labs/deps
Browse files Browse the repository at this point in the history
Updated dependencies
  • Loading branch information
AdalbertMemSQL authored Jan 9, 2024
2 parents 6abe5e8 + 2ce2f1f commit 27fd152
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 79 deletions.
99 changes: 53 additions & 46 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,21 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<singlestore.jdbc.driver>1.2.0</singlestore.jdbc.driver>

<version.debezium>2.5.0.Final</version.debezium>
<version.kafka>3.6.1</version.kafka>
<version.singlestore.jdbc.driver>1.2.0</version.singlestore.jdbc.driver>
<version.jts.io>1.19.0</version.jts.io>

<version.kafka-connect-avro-converter>7.5.1</version.kafka-connect-avro-converter>
<version.connect-json>3.6.1</version.connect-json>
<version.logback-classic>1.4.14</version.logback-classic>
<version.testcontainers>1.19.3</version.testcontainers>
<version.awaitility>4.2.0</version.awaitility>
<version.mockito>3.0.0</version.mockito>

<version.junit>4.13.2</version.junit>
<version.assertj-core>3.25.1</version.assertj-core>

<!--
Specify the properties that will be used for setting up the integration tests' Docker container.
Note that the `dockerhost.ip` property is computed from the IP address of DOCKER_HOST, which will
Expand Down Expand Up @@ -42,97 +54,92 @@
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.3.2.Final</version>
<version>${version.debezium}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.5.1</version>
<version>${version.kafka}</version>
</dependency>
<dependency>
<groupId>com.singlestore</groupId>
<artifactId>singlestore-jdbc-client</artifactId>
<version>${singlestore.jdbc.driver}</version>
</dependency>
<dependency>
<groupId>io.github.matthieun</groupId>
<artifactId>wkb-wkt-converter</artifactId>
<version>1.0.1</version>
<version>${version.singlestore.jdbc.driver}</version>
</dependency>
<dependency>
<groupId>javax.xml.bind</groupId>
<artifactId>jaxb-api</artifactId>
<version>2.3.1</version>
<groupId>org.locationtech.jts.io</groupId>
<artifactId>jts-io-common</artifactId>
<version>${version.jts.io}</version>
</dependency>

<!-- Testing -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>${version.debezium}</version>
<type>test-jar</type>
<scope>test</scope>
<version>1.2.10</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-core</artifactId>
<version>2.3.2.Final</version>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>1.19.0</version>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>${version.debezium}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${version.mockito}</version>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>${version.kafka-connect-avro-converter}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>${version.connect-json}</version>
<scope>test</scope>
<version>4.13.1</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
<version>3.11.1</version>
<version>${version.logback-classic}</version>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.1</version>
<groupId>org.testcontainers</groupId>
<artifactId>testcontainers</artifactId>
<version>${version.testcontainers}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-connect-avro-converter</artifactId>
<version>7.0.1</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${version.awaitility}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-json</artifactId>
<version>3.4.0</version>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>${version.mockito}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>2.3.2.Final</version>
<type>test-jar</type>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
<version>${version.junit}</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>2.3.2.Final</version>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
<version>${version.assertj-core}</version>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
Expand Down Expand Up @@ -35,8 +36,9 @@ public SingleStoreDBChangeEventSourceFactory(SingleStoreDBConnectorConfig connec

@Override
public SnapshotChangeEventSource<SingleStoreDBPartition, SingleStoreDBOffsetContext> getSnapshotChangeEventSource(
SnapshotProgressListener<SingleStoreDBPartition> snapshotProgressListener) {
return new SingleStoreDBSnapshotChangeEventSource(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener);
SnapshotProgressListener<SingleStoreDBPartition> snapshotProgressListener,
NotificationService<SingleStoreDBPartition, SingleStoreDBOffsetContext> notificationService) {
return new SingleStoreDBSnapshotChangeEventSource(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
}

@Override
Expand Down
19 changes: 19 additions & 0 deletions src/main/java/com/singlestore/debezium/SingleStoreDBConnector.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigValue;
import org.apache.kafka.connect.connector.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.annotation.Immutable;
import io.debezium.config.Configuration;
import io.debezium.connector.common.RelationalBaseSourceConnector;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
import io.debezium.spi.schema.DataCollectionId;

/**
* A Kafka Connect source connector that creates tasks that read the SingleStoreDB change log and generate the corresponding
Expand Down Expand Up @@ -95,4 +99,19 @@ protected void validateConnection(Map<String, ConfigValue> configValues, Configu
protected Map<String, ConfigValue> validateAllFields(Configuration config) {
return config.validate(SingleStoreDBConnectorConfig.ALL_FIELDS);
}

@Override
public List<TableId> getMatchingCollections(Configuration config) {
SingleStoreDBConnectorConfig connectorConfig = new SingleStoreDBConnectorConfig(config);
final SingleStoreDBConnection.SingleStoreDBConnectionConfiguration connectionConfig =
new SingleStoreDBConnection.SingleStoreDBConnectionConfiguration(config);
try (SingleStoreDBConnection connection = new SingleStoreDBConnection(connectionConfig)) {
return connection.readTableNames(connectorConfig.databaseName(), null, null, new String[]{ "TABLE" }).stream()
.filter(tableId -> connectorConfig.getTableFilters().dataCollectionFilter().isIncluded(tableId))
.collect(Collectors.toList());
}
catch (SQLException e) {
throw new DebeziumException(e);
}
}
}
19 changes: 12 additions & 7 deletions src/main/java/com/singlestore/debezium/SingleStoreDBGeometry.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package com.singlestore.debezium;

import io.debezium.util.HexConverter;
import io.github.matthieun.conversion.WktWkbConverter;

import org.locationtech.jts.io.ParseException;
import org.locationtech.jts.io.WKBWriter;
import org.locationtech.jts.io.WKTReader;
import org.locationtech.jts.geom.Geometry;

public class SingleStoreDBGeometry {

/**
* WKT to WKB converter
*/
private static final WktWkbConverter WKT_WKB_CONVERTER = new WktWkbConverter();
private static final WKBWriter wkbWriter = new WKBWriter();
private static final WKTReader wktReader = new WKTReader();

/**
* Static Hex EKWB for a GEOMETRYCOLLECTION EMPTY.
*/
Expand Down Expand Up @@ -42,8 +45,10 @@ public static SingleStoreDBGeometry fromHexEwkb(String hexEwkb) {
* Create a SingleStoreDBGeometry using the supplied WKT.
* srid is null as not specified by Single Store.
*/
public static SingleStoreDBGeometry fromEkt(String wkt) {
return new SingleStoreDBGeometry(WKT_WKB_CONVERTER.wktToWkb(wkt), null);
public static SingleStoreDBGeometry fromEkt(String wkt) throws ParseException {
final Geometry geometry = wktReader.read(wkt);
final byte[] wkb = wkbWriter.write(geometry);
return new SingleStoreDBGeometry(wkb, null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@
import io.debezium.jdbc.JdbcConnection;
import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.SnapshottingTask;
import io.debezium.pipeline.source.spi.ChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.spi.ChangeRecordEmitter;
import io.debezium.pipeline.spi.SnapshotResult;
import io.debezium.relational.RelationalSnapshotChangeEventSource;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.Strings;
Expand Down Expand Up @@ -58,8 +61,9 @@ public class SingleStoreDBSnapshotChangeEventSource extends RelationalSnapshotCh
public SingleStoreDBSnapshotChangeEventSource(SingleStoreDBConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<SingleStoreDBConnection> jdbcConnectionFactory,
SingleStoreDBDatabaseSchema schema, EventDispatcher<SingleStoreDBPartition, TableId> dispatcher, Clock clock,
SnapshotProgressListener<SingleStoreDBPartition> snapshotProgressListener) {
super(connectorConfig, jdbcConnectionFactory, schema, dispatcher, clock, snapshotProgressListener);
SnapshotProgressListener<SingleStoreDBPartition> snapshotProgressListener,
NotificationService<SingleStoreDBPartition, SingleStoreDBOffsetContext> notificationService) {
super(connectorConfig, jdbcConnectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
this.connectorConfig = connectorConfig;
this.jdbcConnection = jdbcConnectionFactory.mainConnection();
this.schema = schema;
Expand All @@ -77,6 +81,8 @@ public SnapshotResult<SingleStoreDBOffsetContext> doExecute(ChangeEventSourceCon
Exception exceptionWhileSnapshot = null;
Queue<JdbcConnection> connectionPool = null;
try {
Set<Pattern> dataCollectionsToBeSnapshotted = getDataCollectionPattern(snapshottingTask.getDataCollections());

LOGGER.info("Snapshot step 1 - Preparing");

if (previousOffset != null && previousOffset.isSnapshotRunning()) {
Expand All @@ -90,14 +96,14 @@ public SnapshotResult<SingleStoreDBOffsetContext> doExecute(ChangeEventSourceCon

// Note that there's a minor race condition here: a new table matching the filters could be created between
// this call and the determination of the initial snapshot position below; this seems acceptable, though
determineCapturedTables(ctx);
determineCapturedTables(ctx, dataCollectionsToBeSnapshotted);
snapshotProgressListener.monitoredDataCollectionsDetermined(snapshotContext.partition, ctx.capturedTables);

LOGGER.info("Snapshot step 3 - Determining snapshot offset");
determineSnapshotOffset(ctx, previousOffset);

LOGGER.info("Snapshot step 4 - Reading structure of captured tables");
readTableStructure(context, ctx, previousOffset);
readTableStructure(context, ctx, previousOffset, snapshottingTask);

if (snapshottingTask.snapshotData()) {
LOGGER.info("Snapshot step 4.a - Creating connection pool");
Expand Down Expand Up @@ -356,9 +362,9 @@ private Threads.Timer getTableScanLogTimer() {
return Threads.timer(clock, LOG_INTERVAL);
}

private void determineCapturedTables(RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> ctx) throws Exception {
private void determineCapturedTables(RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> ctx, Set<Pattern> dataCollectionsToBeSnapshotted) throws Exception {
Set<TableId> allTableIds = getAllTableIds(ctx);
Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds).collect(Collectors.toSet());
Set<TableId> snapshottedTableIds = determineDataCollectionsToBeSnapshotted(allTableIds, dataCollectionsToBeSnapshotted).collect(Collectors.toSet());

Set<TableId> capturedTables = new HashSet<>();
Set<TableId> capturedSchemaTables = new HashSet<>();
Expand Down Expand Up @@ -485,10 +491,15 @@ private int readNumberOfPartitions(String database) {
@Override
protected void readTableStructure(ChangeEventSourceContext sourceContext,
RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> snapshotContext,
SingleStoreDBOffsetContext offsetContext) throws Exception {
SingleStoreDBOffsetContext offsetContext,
SnapshottingTask snapshottingTask) throws Exception {
Set<String> catalogs = snapshotContext.capturedTables.stream()
.map(TableId::catalog)
.collect(Collectors.toSet());

Tables.TableFilter tableFilter = snapshottingTask.isBlocking() ? Tables.TableFilter.fromPredicate(snapshotContext.capturedTables::contains)
: connectorConfig.getTableFilters().dataCollectionFilter();

for (String catalog : catalogs) {
if (!sourceContext.isRunning()) {
throw new InterruptedException("Interrupted while reading structure of schema " + catalog);
Expand All @@ -498,7 +509,7 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,
snapshotContext.tables,
catalog,
null,
connectorConfig.getTableFilters().dataCollectionFilter(),
tableFilter,
null,
false);
}
Expand Down Expand Up @@ -545,8 +556,12 @@ protected Optional<String> getSnapshotSelect(
}

@Override
protected SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition,
public SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition,
SingleStoreDBOffsetContext previousOffset) {
List<String> dataCollectionsToBeSnapshotted = connectorConfig.getDataCollectionsToBeSnapshotted();
Map<String, String> snapshotSelectOverridesByTable = connectorConfig.getSnapshotSelectOverridesByTable().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey().identifier(), Map.Entry::getValue));

boolean snapshotSchema = true;
boolean snapshotData = true;
// found a previous offset and the earlier snapshot has completed
Expand All @@ -563,13 +578,12 @@ protected SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition,
}
snapshotData = this.connectorConfig.getSnapshotMode().includeData();
}
return new SnapshottingTask(snapshotSchema, snapshotData);
return new SnapshottingTask(snapshotSchema, snapshotData, dataCollectionsToBeSnapshotted, snapshotSelectOverridesByTable, false);
}

@Override
protected RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> prepare(
SingleStoreDBPartition partition) throws Exception {
return new RelationalSnapshotContext<>(partition, connectorConfig.databaseName());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@

public class SingleStoreDBTaskContext extends CdcSourceTaskContext {
public SingleStoreDBTaskContext(SingleStoreDBConnectorConfig config, SingleStoreDBDatabaseSchema schema) {
super(config.getContextName(), config.getLogicalName(), schema::tableIds);
super(config.getContextName(), config.getLogicalName(), config.getCustomMetricTags(), schema::tableIds);
}
}
Loading

0 comments on commit 27fd152

Please sign in to comment.