Skip to content

Commit

Permalink
renamed SingleStoreDB -> SingleStore
Browse files Browse the repository at this point in the history
  • Loading branch information
Adalbert Makarovych committed Jan 31, 2024
1 parent 0e842db commit 1be920c
Show file tree
Hide file tree
Showing 46 changed files with 719 additions and 719 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/maven.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
uses: softprops/action-gh-release@v1
if: startsWith(github.ref, 'refs/tags/')
with:
files: target/singlestoredb-debezium-connector-*-plugin.tar.gz
files: target/singlestore-debezium-connector-*-plugin.tar.gz

# Optional: Uploads the full dependency graph to GitHub to improve the quality of Dependabot alerts this repository can receive
- name: Update dependency graph
Expand Down
36 changes: 18 additions & 18 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
# SingleStoreDB connector for Debezium
# SingleStore connector for Debezium
<!--TODO add latest connector version-->
[![License](http://img.shields.io/:license-Apache%202-brightgreen.svg)](http://www.apache.org/licenses/LICENSE-2.0.txt)
[![Github Actions status image](https://github.com/singlestore-labs/singlestoredb-debezium-connector/actions/workflows/maven.yml/badge.svg)](https://github.com/singlestore-labs/singlestoredb-debezium-connector/actions)
[![Github Actions status image](https://github.com/singlestore-labs/singlestore-debezium-connector/actions/workflows/maven.yml/badge.svg)](https://github.com/singlestore-labs/singlestore-debezium-connector/actions)

SingleStoreDB connector for Debezium ("the connector") captures and records row-level changes that occur in the database.
SingleStore connector for Debezium ("the connector") captures and records row-level changes that occur in the database.
You can configure the connector to read from a single table and to ignore, mask, or truncate values in specific columns.

## TOC
- [Getting started](#getting-started)
- [How the SingleStoreDB Debezium connector works](#how-the-singlestoredb-debezium-connector-works)
- [How the SingleStore Debezium connector works](#how-the-singlestore-debezium-connector-works)
- [Data change events](#data-change-events)
- [Data type mappings](#data-type-mappings)
- [Connector properties](#connector-properties)
Expand Down Expand Up @@ -40,7 +40,7 @@ curl -i -X POST \
"name": "<Unique name for the connector>",
"config":
{
"connector.class": "com.singlestore.debezium.SingleStoreDBConnector",
"connector.class": "com.singlestore.debezium.SingleStoreConnector",
"tasks.max": "1",
"database.hostname": "<SingleStoreDB Hostname>",
"database.port": "<SingleStoreDB Port>",
Expand All @@ -54,10 +54,10 @@ curl -i -X POST \
}'
```

## How the SingleStoreDB Debezium connector works
## How the SingleStore Debezium connector works
To optimally configure and run the connector, it is essential to understand how the connector performs snapshots, streams change events, determines Kafka topic names, and uses metadata.

SingleStoreDB Debezium connector uses Change Data Capture (CDC) to capture change events. <!--TODO add link to CDC docs-->
SingleStore Debezium connector uses Change Data Capture (CDC) to capture change events. <!--TODO add link to CDC docs-->

The connector does not support handling schema changes.
You cannot run `ALTER` and `DROP` queries while the `OBSERVE` query is running, specifically during snapshotting and filtering.
Expand All @@ -73,13 +73,13 @@ When the connector starts for the first time, it performs an initial consistent
### Streaming
After the initial snapshot is complete, the connector continues streaming from the offset that it receives in Steps 4 and 5 above. If the streaming stops again for any reason, upon restart, the connector tries to continue streaming changes from where it previously left off.

The SingleStoreDB connector forwards change events in records to the Kafka Connect framework. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
The SingleStore connector forwards change events in records to the Kafka Connect framework. The Kafka Connect process asynchronously writes the change event records in the same order in which they were generated to the appropriate Kafka topic.
Kafka Connect periodically records the most recent offset in another Kafka topic. The offset indicates source-specific position information that Debezium includes with each event. The connector records database-level offsets for each database partition in each change event.

When Kafka Connect gracefully shuts down, it stops the connectors, flushes all event records to Kafka, and records the last offset received from each connector. When Kafka Connect restarts, it reads the last recorded offset for each connector, and starts each connector at its last recorded offset. When the connector restarts, it sends a request to the SingleStoreDB to send the events that occurred after the offset position.

### Topic names
The SingleStoreDB Debezium connector writes change events for all `INSERT`, `UPDATE`, and `DELETE` operations that occur in a table to a single Apache Kafka topic that is specific to that table. The connector uses the following naming convention for the change event topics:
The SingleStore Debezium connector writes change events for all `INSERT`, `UPDATE`, and `DELETE` operations that occur in a table to a single Apache Kafka topic that is specific to that table. The connector uses the following naming convention for the change event topics:

```
topicPrefix.databaseName.tableName
Expand All @@ -95,7 +95,7 @@ The following list provides definitions for the components of the default name:
For example, if the topic prefix is `fulfillment`, database name is `inventory`, and the table where the operation occurred is `orders`, the connector writes events to the `fulfillment.inventory.orders` Kafka topic.

## Data change events
Every data change event that the SingleStoreDB Debezium connector generates has a **key** and a **value**. The structures of the key and value depend on the table from which the change events originate. For information on how Debezium constructs topic names, refer to [Topic names](#topic-names).
Every data change event that the SingleStore Debezium connector generates has a **key** and a **value**. The structures of the key and value depend on the table from which the change events originate. For information on how Debezium constructs topic names, refer to [Topic names](#topic-names).

Debezium and Kafka Connect are designed around continuous streams of event messages. However, the structure of these events may change over time, which can be difficult for topic consumers to handle. To facilitate the processing of mutable event structures, each event in Kafka Connect is self-contained. Every message key and value has two parts: a schema and payload. The schema describes the structure of the payload, while the payload contains the actual data.

Expand Down Expand Up @@ -161,7 +161,7 @@ The following example demonstrates the value of a change event that the connecto
},
"source":{
"version":"0.1.0",
"connector":"singlestoredb",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197043473,
"snapshot":"true",
Expand Down Expand Up @@ -192,7 +192,7 @@ The following example shows an update change event that the connector captures f
},
"source":{
"version":"0.1.0",
"connector":"singlestoredb",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197446500,
"snapshot":"true",
Expand Down Expand Up @@ -227,8 +227,8 @@ The following example shows a delete event for the table that is shown in the pr
"before":null,
"after":null,
"source":{
"version":"${project.version}",
"connector":"singlestoredb",
"version":"0.1.0",
"connector":"singlestore",
"name":"singlestore",
"ts_ms":1706197665407,
"snapshot":"true",
Expand Down Expand Up @@ -345,14 +345,14 @@ Debezium connectors handle binary data types based on the value of the [`binary.
<!-- TODO add GEOGRAPHY type when it will be fixed -->

## Connector properties
The SingleStoreDB Debezium connector supports the following configuration properties which can be used to achieve the right connector behavior for your application.
The SingleStore Debezium connector supports the following configuration properties which can be used to achieve the right connector behavior for your application.

### Kafka connect properties
| Property | Default | Description
| - | - | -
| name | | Unique name for the connector. Any attempts to register again with the same name will fail. This property is required by all Kafka Connect connectors.
| connector.class | | The name of the Java Class for the connector. For the SingleStore connector specify `com.singlestore.debezium.SingleStoreDBConnector`.
| tasks.max | 1 | The maximum number of tasks that should be created for this connector. The SingleStoreDB connector always uses a single task and therefore does not use this value, so the default is always acceptable.
| connector.class | | The name of the Java Class for the connector. For the SingleStore connector specify `com.singlestore.debezium.SingleStoreConnector`.
| tasks.max | 1 | The maximum number of tasks that should be created for this connector. The SingleStore connector always uses a single task and therefore does not use this value, so the default is always acceptable.

### Connection properties
| Property | Default | Description
Expand Down Expand Up @@ -413,6 +413,6 @@ The following advanced configuration properties have defaults that work in most
| topic.naming.strategy | no default | The name of the TopicNamingStrategy Class that should be used to determine the topic name for data change, schema change, transaction, heartbeat event, etc.
| custom.metric.tags | no default | The custom metric tags will accept key-value pairs to customize the MBean object name which should be appended at the end of the regular name. Each key represents a tag for the MBean object name, and the corresponding value represents the value of that key. For example: k1=v1,k2=v2.
| errors.max.retries | -1 (no limit) | The maximum number of retries on connection errors before failing.
| sourceinfo.struct.maker | SingleStoreDBSourceInfoStructMaker | The name of the SourceInfoStructMaker Class that returns the SourceInfo schema and struct.
| sourceinfo.struct.maker | SingleStoreSourceInfoStructMaker | The name of the SourceInfoStructMaker Class that returns the SourceInfo schema and struct.
| notification.sink.topic.name | no default | The name of the topic for the notifications. This property is required if the 'sink' is in the list of enabled channels.
| post.processors | no default | Optional list of post processors. The processors are defined using the `<post.processor.prefix>.type` option and configured using `<post.processor.prefix.<option>`.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<modelVersion>4.0.0</modelVersion>

<groupId>com.singlestore</groupId>
<artifactId>singlestoredb-debezium-connector</artifactId>
<artifactId>singlestore-debezium-connector</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/singlestore/debezium/Module.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ public static String version() {
* @return symbolic name of the connector plugin
*/
public static String name() {
return "singlestoredb";
return "singlestore";
}

/**
* @return context name used in log MDC and JMX metrics
*/
public static String contextName() {
return "SingleStoreDB";
return "SingleStore";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.singlestore.debezium;

import io.debezium.jdbc.MainConnectionProvidingConnectionFactory;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.notification.NotificationService;
import io.debezium.pipeline.source.spi.ChangeEventSourceFactory;
import io.debezium.pipeline.source.spi.SnapshotChangeEventSource;
import io.debezium.pipeline.source.spi.SnapshotProgressListener;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.util.Clock;

public class SingleStoreChangeEventSourceFactory implements ChangeEventSourceFactory<SingleStorePartition, SingleStoreOffsetContext> {

SingleStoreConnectorConfig connectorConfig;
MainConnectionProvidingConnectionFactory<SingleStoreConnection> connectionFactory;
SingleStoreDatabaseSchema schema;
EventDispatcher<SingleStorePartition, TableId> dispatcher;
ErrorHandler errorHandler;
Clock clock;

public SingleStoreChangeEventSourceFactory(SingleStoreConnectorConfig connectorConfig,
MainConnectionProvidingConnectionFactory<SingleStoreConnection> connectionFactory,
SingleStoreDatabaseSchema schema,
EventDispatcher<SingleStorePartition, TableId> dispatcher,
ErrorHandler errorHandler,
Clock clock) {
this.connectorConfig = connectorConfig;
this.connectionFactory = connectionFactory;
this.schema = schema;
this.dispatcher = dispatcher;
this.errorHandler = errorHandler;
this.clock = clock;
}

@Override
public SnapshotChangeEventSource<SingleStorePartition, SingleStoreOffsetContext> getSnapshotChangeEventSource(
SnapshotProgressListener<SingleStorePartition> snapshotProgressListener,
NotificationService<SingleStorePartition, SingleStoreOffsetContext> notificationService) {
return new SingleStoreSnapshotChangeEventSource(connectorConfig, connectionFactory, schema, dispatcher, clock, snapshotProgressListener, notificationService);
}

@Override
public StreamingChangeEventSource<SingleStorePartition, SingleStoreOffsetContext> getStreamingChangeEventSource() {
return new SingleStoreStreamingChangeEventSource(connectorConfig, connectionFactory.mainConnection(), dispatcher, errorHandler, schema, clock);
}

// TODO incremental snapshot
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import io.debezium.relational.TableSchema;
import io.debezium.util.Clock;

public class SingleStoreDBChangeRecordEmitter extends RelationalChangeRecordEmitter<SingleStoreDBPartition> {
public class SingleStoreChangeRecordEmitter extends RelationalChangeRecordEmitter<SingleStorePartition> {

private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreDBSnapshotChangeRecordEmitter.class);
private static final Logger LOGGER = LoggerFactory.getLogger(SingleStoreSnapshotChangeRecordEmitter.class);
private final Envelope.Operation operation;
private final OffsetContext offset;
private final Object[] before;
Expand All @@ -26,8 +26,8 @@ public class SingleStoreDBChangeRecordEmitter extends RelationalChangeRecordEmit

private static final String INTERNAL_ID = "internalId";

public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before,
Object[] after, long internalId, SingleStoreDBConnectorConfig connectorConfig) {
public SingleStoreChangeRecordEmitter(SingleStorePartition partition, OffsetContext offset, Clock clock, Operation operation, Object[] before,
Object[] after, long internalId, SingleStoreConnectorConfig connectorConfig) {
super(partition, offset, clock, connectorConfig);
this.offset = offset;
this.operation = operation;
Expand All @@ -37,7 +37,7 @@ public SingleStoreDBChangeRecordEmitter(SingleStoreDBPartition partition, Offset
}

@Override
protected void emitCreateRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
protected void emitCreateRecord(Receiver<SingleStorePartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] newColumnValues = getNewColumnValues();
Struct newKey = keyFromInternalId();
Expand All @@ -53,7 +53,7 @@ protected void emitCreateRecord(Receiver<SingleStoreDBPartition> receiver, Table
}

@Override
protected void emitUpdateRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema)
protected void emitUpdateRecord(Receiver<SingleStorePartition> receiver, TableSchema tableSchema)
throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Object[] newColumnValues = getNewColumnValues();
Expand Down Expand Up @@ -84,7 +84,7 @@ protected void emitUpdateRecord(Receiver<SingleStoreDBPartition> receiver, Table
}

@Override
protected void emitDeleteRecord(Receiver<SingleStoreDBPartition> receiver, TableSchema tableSchema) throws InterruptedException {
protected void emitDeleteRecord(Receiver<SingleStorePartition> receiver, TableSchema tableSchema) throws InterruptedException {
Object[] oldColumnValues = getOldColumnValues();
Struct newKey = keyFromInternalId();

Expand Down
Loading

0 comments on commit 1be920c

Please sign in to comment.