Skip to content

Commit

Permalink
Added configuration validation (#65)
Browse files Browse the repository at this point in the history
Co-authored-by: Adalbert Makarovych <[email protected]>
  • Loading branch information
AdalbertMemSQL and Adalbert Makarovych authored Jan 6, 2025
1 parent 2062c79 commit f1e3da7
Show file tree
Hide file tree
Showing 2 changed files with 176 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.debezium.config.EnumeratedValue;
import io.debezium.config.Field;
import io.debezium.connector.SourceInfoStructMaker;
import io.debezium.jdbc.JdbcValueConverters.DecimalMode;
import io.debezium.relational.ColumnFilterMode;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.RelationalTableFilters;
Expand Down Expand Up @@ -59,7 +58,27 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
"Additional JDBC parameters to use with connection string to SingleStore server. Format: 'param1=value1; param2 = value2; ...'. The supported parameters are\n"
+
"available in the `SingleStore Connection String Parameters\n" +
"<https://docs.singlestore.com/cloud/developer-resources/connect-with-application-development-tools/connect-with-java-jdbc/the-singlestore-jdbc-driver/#connection-string-parameters>`_.");
"<https://docs.singlestore.com/cloud/developer-resources/connect-with-application-development-tools/connect-with-java-jdbc/the-singlestore-jdbc-driver/#connection-string-parameters>`_.")
.withValidation((config, field, problems) -> {
String value = config.getString(field);
if (value == null || value.isEmpty()) {
return 0;
}

String[] parameters = value.split(";");
int invalidParameters = 0;
for (String parameter : parameters) {
String[] parts = parameter.split("=");
if (parts.length != 2) {
problems.accept(field, value,
String.format("Invalid parameter: '%s'", parameter));
invalidParameters++;
}
}

return invalidParameters;
});

public static final Field SSL_MODE = Field.create("database.ssl.mode")
.withDisplayName("SSL mode")
.withEnum(SecureConnectionMode.class, SecureConnectionMode.DISABLE)
Expand Down Expand Up @@ -138,7 +157,25 @@ public class SingleStoreConnectorConfig extends RelationalDatabaseConnectorConfi
.withDescription(
"When specified and 'snapshot.mode' is 'no_data' - connector will start streaming changes from these offsets. "
+ "Should be provided as a comma separated list of hex offsets per each partitions. "
+ "Example: 0000000000000077000000000000000E000000000000E06E,0x0000000000000077000000000000000E000000000000E087,0000000000000077000000000000000E000000000000E088");
+ "Example: 0000000000000077000000000000000E000000000000E06E,0x0000000000000077000000000000000E000000000000E087,0000000000000077000000000000000E000000000000E088")
.withValidation((config, field, problems) -> {
String value = config.getString(field);
if (value == null || value.isEmpty()) {
return 0;
}

String[] offsets = value.split(",");
int invalidOffsets = 0;
for (String offset : offsets) {
if (!(offset.matches("[0-9a-fA-F]+") || offset.equalsIgnoreCase("null"))) {
problems.accept(field, value, String.format("Invalid offset: '%s'", offset));
invalidOffsets++;
}
}

return invalidOffsets;
});

public static final Field TOPIC_NAMING_STRATEGY = CommonConnectorConfig.TOPIC_NAMING_STRATEGY
.withDefault(DefaultTopicNamingStrategy.class.getName());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package com.singlestore.debezium;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import io.debezium.config.Configuration;
import java.util.List;
import org.junit.Test;

public class SingleStoreConnectorConfigTest {

@Test
public void testDriverParametersValidationSuccessful() {
SingleStoreConnectorConfig config = new SingleStoreConnectorConfig(
Configuration.create()
.with("driver.parameters", "asd=123;qwe=234")
.build());

assertTrue(config.validate(List.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS),
(field, value, problemMessage) -> {
assert false;
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("driver.parameters", "")
.build());

assertTrue(config.validate(List.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS),
(field, value, problemMessage) -> {
assert false;
}));
}

@Test
public void testDriverParametersValidationFailure() {
SingleStoreConnectorConfig config = new SingleStoreConnectorConfig(
Configuration.create()
.with("driver.parameters", "asd=123;qwe=234=234")
.build());

assertFalse(config.validate(List.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS),
(field, value, problemMessage) -> {
assertEquals("Invalid parameter: 'qwe=234=234'", problemMessage);
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("driver.parameters", "asd=123;qwe")
.build());

assertFalse(config.validate(List.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS),
(field, value, problemMessage) -> {
assertEquals("Invalid parameter: 'qwe'", problemMessage);
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("driver.parameters", "asd=123;qwe=234=234;qwe")
.build());

int[] parameterIndex = {0};
assertFalse(config.validate(List.of(SingleStoreConnectorConfig.DRIVER_PARAMETERS),
(field, value, problemMessage) -> {
if (parameterIndex[0] == 0) {
assertEquals("Invalid parameter: 'qwe=234=234'", problemMessage);
parameterIndex[0]++;
} else {
assertEquals("Invalid parameter: 'qwe'", problemMessage);
}
}));
}

@Test
public void testOffsetsValidationSuccessful() {
SingleStoreConnectorConfig config = new SingleStoreConnectorConfig(
Configuration.create()
.with("offsets", "0123456789abcdefABCDEF,NULL,NULL,0123")
.build());

assertTrue(config.validate(List.of(SingleStoreConnectorConfig.OFFSETS),
(field, value, problemMessage) -> {
assert false;
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("offsets", "")
.build());

assertTrue(config.validate(List.of(SingleStoreConnectorConfig.OFFSETS),
(field, value, problemMessage) -> {
assert false;
}));
}

@Test
public void testOffsetsValidationFailure() {
SingleStoreConnectorConfig config = new SingleStoreConnectorConfig(
Configuration.create()
.with("offsets", "0123456789abcdefABCDEFGE,NULL,NULL,0123")
.build());

assertFalse(config.validate(List.of(SingleStoreConnectorConfig.OFFSETS),
(field, value, problemMessage) -> {
assertEquals("Invalid offset: '0123456789abcdefABCDEFGE'", problemMessage);
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("offsets", "0123456789abcdefABCDEF,NULL1,NULL,0123")
.build());

assertFalse(config.validate(List.of(SingleStoreConnectorConfig.OFFSETS),
(field, value, problemMessage) -> {
assertEquals("Invalid offset: 'NULL1'", problemMessage);
}));

config = new SingleStoreConnectorConfig(
Configuration.create()
.with("offsets", "0123456789abcdefABCDEFGE,NULL1,NULL,0123")
.build());

int[] offsetIndex = {0};
assertFalse(config.validate(List.of(SingleStoreConnectorConfig.OFFSETS),
(field, value, problemMessage) -> {
if (offsetIndex[0] == 0) {
assertEquals("Invalid offset: '0123456789abcdefABCDEFGE'", problemMessage);
offsetIndex[0]++;
} else {
assertEquals("Invalid offset: 'NULL1'", problemMessage);
}
}));
}
}

0 comments on commit f1e3da7

Please sign in to comment.