Skip to content

Commit

Permalink
Merge pull request #12 from singlestore-labs/PLAT-6797
Browse files Browse the repository at this point in the history
Implemented CDC streaming
  • Loading branch information
AdalbertMemSQL authored Dec 1, 2023
2 parents 58ac7af + 0510711 commit 6abe5e8
Show file tree
Hide file tree
Showing 21 changed files with 650 additions and 64 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<singlestore.jdbc.driver>1.1.9</singlestore.jdbc.driver>
<singlestore.jdbc.driver>1.2.0</singlestore.jdbc.driver>
<version.mockito>3.0.0</version.mockito>

<!--
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.singlestore.debezium;

import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
Expand All @@ -15,17 +16,20 @@ public class SingleStoreDBChangeEventSourceFactory implements ChangeEventSourceF
MainConnectionProvidingConnectionFactory<SingleStoreDBConnection> connectionFactory;
SingleStoreDBDatabaseSchema schema;
EventDispatcher<SingleStoreDBPartition, TableId> dispatcher;
ErrorHandler errorHandler;
Clock clock;

public SingleStoreDBChangeEventSourceFactory(SingleStoreDBConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<SingleStoreDBConnection> connectionFactory,
SingleStoreDBDatabaseSchema schema,
EventDispatcher<SingleStoreDBPartition, TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock) {
this.connectorConfig = connectorConfig;
this.connectionFactory = connectionFactory;
this.schema = schema;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
}

Expand All @@ -37,7 +41,7 @@ public SnapshotChangeEventSource<SingleStoreDBPartition, SingleStoreDBOffsetCont

@Override
public StreamingChangeEventSource<SingleStoreDBPartition, SingleStoreDBOffsetContext> getStreamingChangeEventSource() {
return new SingleStoreDBStreamingChangeEventSource();
return new SingleStoreDBStreamingChangeEventSource(connectorConfig, connectionFactory.mainConnection(), dispatcher, errorHandler, schema, clock);
}

// TODO incremental snapshot
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
package com.singlestore.debezium;

import java.util.Objects;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.data.Envelope;
import io.debezium.data.Envelope.Operation;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.RelationalChangeRecordEmitter;
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;

public class SingleStoreDBChangeRecordEmitter extends RelationalChangeRecordEmitter<SingleStoreDBPartition> {

private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBSnapshotChangeRecordEmitter.class);
private final Envelope.Operation operation;
private final OffsetContext offset;
private final Object[] before;
private final Object[] after;
private final long internalId;

private static final String INTERNAL_ID = "internalId";

public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before,
Object[] after, long internalId, SingleStoreDBConnectorConfig connectorConfig) {
super(partition, offset, clock, connectorConfig);
this.offset = offset;
this.operation = operation;
this.before = before;
this.after = after;
this.internalId = internalId;
}

@Override
protected void emitCreateRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = getKey(tableSchema, newColumnValues);
Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct envelope = tableSchema.getEnvelopeSchema().create(newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
// This case can be hit on UPDATE / DELETE when there's no primary key defined while using certain decoders
LOGGER.warn("no new values found for table '{}' from create message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}
receiver.changeRecord(getPartition(), tableSchema, Operation.CREATE, newKey, envelope, getOffset(), null);
}

@Override
protected void emitUpdateRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();

Struct newKey = getKey(tableSchema, newColumnValues);

Struct newValue = tableSchema.valueFromColumnData(newColumnValues);
Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (newColumnValues == null || newColumnValues.length == 0)) {
LOGGER.debug("no new values found for table '{}' from update message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}

/*
* If skip.messages.without.change is configured true,
* Skip Publishing the message in case there is no change in monitored columns
* (Postgres) Only works if REPLICA IDENTITY is set to FULL - as oldValues won't be available
*/
if (skipMessagesWithoutChange() && Objects.nonNull(newValue) && newValue.equals(oldValue)) {
LOGGER.debug("No new values found for table '{}' in included columns from update message at '{}'; skipping record", tableSchema,
getOffset().getSourceInfo());
return;
}

Struct envelope = tableSchema.getEnvelopeSchema().update(oldValue, newValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.UPDATE, newKey, envelope, getOffset(), null);
}

@Override
protected void emitDeleteRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Struct newKey = getKey(tableSchema, newColumnValues);

Struct oldValue = tableSchema.valueFromColumnData(oldColumnValues);

if (skipEmptyMessages() && (oldColumnValues == null || oldColumnValues.length == 0)) {
LOGGER.warn("no old values found for table '{}' from delete message at '{}'; skipping record", tableSchema, getOffset().getSourceInfo());
return;
}

Struct envelope = tableSchema.getEnvelopeSchema().delete(oldValue, getOffset().getSourceInfo(), getClock().currentTimeAsInstant());
receiver.changeRecord(getPartition(), tableSchema, Operation.DELETE, newKey, envelope, getOffset(), null);
}

@Override
public OffsetContext getOffset() {
return offset;
}

@Override
public Operation getOperation() {
return operation;
}

@Override
protected Object[] getOldColumnValues() {
return before;
}

@Override
protected Object[] getNewColumnValues() {
return after;
}

private Struct getKey(TableSchema tableSchema, Object[] columnData) {
Struct key = tableSchema.keyFromColumnData(columnData);
if (key == null) {
return keyFromInternalId();
} else {
return key;
}
}

private Struct keyFromInternalId() {
Struct result = new Struct(SchemaBuilder.struct().field(INTERNAL_ID, Schema.INT64_SCHEMA).build());
result.put(INTERNAL_ID, internalId);
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class SingleStoreDBConnection extends JdbcConnection {

private static final String QUOTED_CHARACTER = "`";
protected static final String URL_PATTERN = "jdbc:singlestore://${hostname}:${port}/?connectTimeout=${connectTimeout}";
protected static final String URL_PATTERN_DATABASE = "jdbc:singlestore://${hostname}:${port}/${dbname}?connectTimeout=${connectTimeout}";

private final SingleStoreDBConnectionConfiguration connectionConfig;

Expand Down Expand Up @@ -98,7 +99,7 @@ public JdbcConnection observe(Set<ColumnId> fieldFilter, Set<TableId> tableFilte
}
format.ifPresent(f -> query.append(" AS ").append(f.name()));
outputConfig.ifPresent(c -> query.append(" INTO ").append(c));
offSetConfig.ifPresent(o -> query.append(" BEGINNING AT ").append(o));
offSetConfig.ifPresent(o -> query.append(" BEGIN AT ").append(o));
recordFilter.ifPresent(f -> query.append(" WHERE ").append(f));
return query(query.toString(), resultSetConsumer);
}
Expand All @@ -108,7 +109,7 @@ public SingleStoreDBConnectionConfiguration connectionConfig() {
}

public String connectionString() {
return connectionString(URL_PATTERN);
return database() != null ? connectionString(URL_PATTERN_DATABASE) : connectionString(URL_PATTERN);
}

@Override
Expand Down Expand Up @@ -160,6 +161,7 @@ public SingleStoreDBConnectionConfiguration(Configuration config) {
.with("connectTimeout", Long.toString(getConnectionTimeout().toMillis()))
.with("sslMode", sslMode().getValue())
.with("defaultFetchSize", 1)
.with("tinyInt1IsBit", "false")
.without("parameters");
if (useSSL) {
if (!Strings.isNullOrBlank(sslTrustStore())) {
Expand All @@ -180,7 +182,10 @@ public SingleStoreDBConnectionConfiguration(Configuration config) {
}
driverParameters().forEach(jdbcConfigBuilder::with);
this.jdbcConfig = JdbcConfiguration.adapt(jdbcConfigBuilder.build());
factory = JdbcConnection.patternBasedFactory(SingleStoreDBConnection.URL_PATTERN, com.singlestore.jdbc.Driver.class.getName(), getClass().getClassLoader());
factory = JdbcConnection.patternBasedFactory(
databaseName() != null ? SingleStoreDBConnection.URL_PATTERN_DATABASE : SingleStoreDBConnection.URL_PATTERN,
com.singlestore.jdbc.Driver.class.getName(),
getClass().getClassLoader());
}

public JdbcConfiguration config() {
Expand Down Expand Up @@ -211,6 +216,10 @@ public int port() {
return config.getInteger(SingleStoreDBConnectorConfig.PORT);
}

public String databaseName() {
return config.getString(SingleStoreDBConnectorConfig.DATABASE_NAME);
}

public SingleStoreDBConnectorConfig.SecureConnectionMode sslMode() {
String mode = config.getString(SingleStoreDBConnectorConfig.SSL_MODE);
return SingleStoreDBConnectorConfig.SecureConnectionMode.parse(mode);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public Map<String, ConfigValue> validate() {
PORT,
USER,
PASSWORD,
DATABASE_NAME,
SSL_MODE,
SSL_KEYSTORE,
SSL_KEYSTORE_PASSWORD,
Expand All @@ -165,8 +166,6 @@ public Map<String, ConfigValue> validate() {
DRIVER_PARAMETERS,
SNAPSHOT_MODE)
.events(
DATABASE_INCLUDE_LIST,
DATABASE_EXCLUDE_LIST,
INCONSISTENT_SCHEMA_HANDLING_MODE,
SOURCE_INFO_STRUCT_MAKER)
.create();
Expand Down Expand Up @@ -273,13 +272,14 @@ public enum SnapshotMode implements EnumeratedValue {
/**
* Perform a snapshot and then stop before attempting to stream events.
*/
INITIAL_ONLY("initial_only", true, true, false),
INITIAL_ONLY("initial_only", true, true, false);
/**
* Perform a snapshot of only the database schemas (without data) and then begin stream events.
* This should be used with care, but it is very useful when the change event consumers need only the changes
* from the point in time the snapshot is made (and doesn't care about any state or changes prior to this point).
*/
SCHEMA_ONLY("schema_only", true, false, true);
// TODO: PLAT-6912
// SCHEMA_ONLY("schema_only", true, false, true);

private final String value;
private final boolean includeSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import java.util.Map;
import java.util.stream.Collectors;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.jdbc.JdbcValueConverters;
import io.debezium.jdbc.TemporalPrecisionMode;
import org.apache.kafka.connect.source.SourceRecord;

import io.debezium.config.Configuration;
Expand Down Expand Up @@ -68,18 +65,17 @@ public ChangeEventSourceCoordinator<SingleStoreDBPartition, SingleStoreDBOffsetC
false);

SingleStoreDBTaskContext taskContext = new SingleStoreDBTaskContext(connectorConfig, schema);

queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

SingleStoreDBEventMetadataProvider metadataProvider = new SingleStoreDBEventMetadataProvider();
SingleStoreDBErrorHandler errorHandler = new SingleStoreDBErrorHandler(connectorConfig, queue);

this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
.pollInterval(connectorConfig.getPollInterval())
.maxBatchSize(connectorConfig.getMaxBatchSize())
.maxQueueSize(connectorConfig.getMaxQueueSize())
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
.loggingContextSupplier(() -> taskContext.configureLoggingContext(CONTEXT_NAME))
.build();

Offsets<SingleStoreDBPartition, SingleStoreDBOffsetContext> previousOffsets = getPreviousOffsets(
new SingleStoreDBPartition.Provider(connectorConfig, config),
new SingleStoreDBOffsetContext.Loader(connectorConfig));
Expand Down Expand Up @@ -111,7 +107,7 @@ public ChangeEventSourceCoordinator<SingleStoreDBPartition, SingleStoreDBOffsetC
errorHandler,
SingleStoreDBConnector.class,
connectorConfig,
new SingleStoreDBChangeEventSourceFactory(connectorConfig, connectionFactory, schema, dispatcher, clock),
new SingleStoreDBChangeEventSourceFactory(connectorConfig, connectionFactory, schema, dispatcher, errorHandler, clock),
// TODO create custom metrics
new DefaultChangeEventSourceMetricsFactory<>(),
dispatcher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.debezium.relational.RelationalDatabaseSchema;
import io.debezium.relational.TableId;
import io.debezium.relational.TableSchemaBuilder;
import io.debezium.relational.Key.KeyMapper;
import io.debezium.spi.topic.TopicNamingStrategy;

import java.sql.SQLException;
Expand All @@ -18,14 +19,14 @@ public class SingleStoreDBDatabaseSchema extends RelationalDatabaseSchema {

public SingleStoreDBDatabaseSchema(SingleStoreDBConnectorConfig config, SingleStoreDBValueConverters valueConverter,
SingleStoreDBDefaultValueConverter defaultValueConverter, TopicNamingStrategy<TableId> topicNamingStrategy,
boolean tableIdCaseInsensitive) {
boolean tableIdCaseInsensitive) {
super(config, topicNamingStrategy, config.getTableFilters().dataCollectionFilter(), config.getColumnFilter(),
getTableSchemaBuilder(config, valueConverter, defaultValueConverter), tableIdCaseInsensitive, config.getKeyMapper());
}

private static TableSchemaBuilder getTableSchemaBuilder(SingleStoreDBConnectorConfig config, SingleStoreDBValueConverters valueConverter,
SingleStoreDBDefaultValueConverter defaultValueConverter) {
return new TableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjuster(),
return new SingleStoreDBTableSchemaBuilder(valueConverter, defaultValueConverter, config.schemaNameAdjuster(),
config.customConverterRegistry(), config.getSourceInfoStructMaker().schema(),
config.getFieldNamer(), false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ public class SingleStoreDBOffsetContext extends CommonOffsetContext<SourceInfo>
private boolean snapshotCompleted;
private final Schema sourceInfoSchema;

public SingleStoreDBOffsetContext(SingleStoreDBConnectorConfig connectorConfig, Integer partitionId,
String txId, List<String> offsets, boolean snapshot, boolean snapshotCompleted) {
super(new SourceInfo(connectorConfig));
public SingleStoreDBOffsetContext(SingleStoreDBConnectorConfig connectorConfig, Integer partitionId,
String txId, List<String> offsets, boolean snapshot, boolean snapshotCompleted) {
super(new SourceInfo(connectorConfig, offsets.size()));

sourceInfo.update(partitionId, txId, offsets);
sourceInfoSchema = sourceInfo.schema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Provider(SingleStoreDBConnectorConfig connectorConfig, Configuration task
@Override
public Set<SingleStoreDBPartition> getPartitions() {
return Collections.singleton(new SingleStoreDBPartition(
connectorConfig.getLogicalName(), taskConfig.getString(DATABASE_NAME.name())));
connectorConfig.getLogicalName(), connectorConfig.databaseName()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,8 @@ protected void readTableStructure(ChangeEventSourceContext sourceContext,

@Override
protected void releaseSchemaSnapshotLocks(
RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> snapshotContext) {
RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> snapshotContext)
throws Exception {
}

@Override
Expand Down Expand Up @@ -566,7 +567,7 @@ protected SnapshottingTask getSnapshottingTask(SingleStoreDBPartition partition,
}

@Override
protected SnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> prepare(
protected RelationalSnapshotContext<SingleStoreDBPartition, SingleStoreDBOffsetContext> prepare(
SingleStoreDBPartition partition) throws Exception {
return new RelationalSnapshotContext<>(partition, connectorConfig.databaseName());
}
Expand Down
Loading

0 comments on commit 6abe5e8

Please sign in to comment.