Skip to content

Commit

Permalink
Merge pull request #33 from singlestore-labs/fixTests1
Browse files Browse the repository at this point in the history
Fixed flacky tests
  • Loading branch information
AdalbertMemSQL authored Mar 21, 2024
2 parents 61b4498 + e87d9b0 commit 9bed757
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private SingleStoreOffsetContext doCreateDataEventsForTable(
ResultSet rs = rsWrapper.getResultSet();
List<Integer> columnPostitions =
ObserveResultSetUtils
.columnPositions(rs, schema.schemaFor(table.id()).valueSchema().fields(),
.columnPositions(rs, table.columns(),
connectorConfig.populateInternalId());
long rows = 0;
Threads.Timer logTimer = getTableScanLogTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import io.debezium.config.CommonConnectorConfig;
import io.debezium.connector.AbstractSourceInfoStructMaker;

import java.util.ArrayList;

public class SingleStoreSourceInfoStructMaker extends AbstractSourceInfoStructMaker<SourceInfo> {

private Schema schema;
Expand Down Expand Up @@ -39,7 +41,7 @@ public Struct struct(SourceInfo sourceInfo) {
result.put(SourceInfo.TABLE_NAME_KEY, sourceInfo.table());
result.put(SourceInfo.TXID_KEY, sourceInfo.txId());
result.put(SourceInfo.PARTITIONID_KEY, sourceInfo.partitionId());
result.put(SourceInfo.OFFSETS_KEY, sourceInfo.offsets());
result.put(SourceInfo.OFFSETS_KEY, new ArrayList(sourceInfo.offsets()));

return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void run() {
t.start();

List<Integer> columnPositions =
ObserveResultSetUtils.columnPositions(rs, schema.schemaFor(table).valueSchema().fields(), connectorConfig.populateInternalId());
ObserveResultSetUtils.columnPositions(rs, schema.tableFor(table).columns(), connectorConfig.populateInternalId());
try {
while (rs.next() && context.isRunning()) {
LOGGER.trace("Streaming record, type: {}, internalId: {}, partitionId: {}, offset: {} values: {}",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.singlestore.debezium;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.kafka.common.protocol.types.Field.Bool;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.singlestore.debezium.util;

import io.debezium.relational.Column;
import io.debezium.relational.Table;

import java.sql.ResultSet;
Expand All @@ -17,16 +18,17 @@ public final class ObserveResultSetUtils {
private static final String BEGIN_SNAPSHOT = "BeginSnapshot";
private static final String COMMIT_SNAPSHOT = "CommitSnapshot";

public static List<Integer> columnPositions(ResultSet resultSet, List<Field> fields, Boolean populateInternalId) throws SQLException {
public static List<Integer> columnPositions(ResultSet resultSet, List<Column> columns, Boolean populateInternalId) throws SQLException {
List<Integer> positions = new ArrayList<>();
for (int i = 0; i < fields.size(); i++) {
String columnName = fields.get(i).name();
if (populateInternalId && columnName.equals(SingleStoreTableSchemaBuilder.INTERNAL_ID)) {
columnName = METADATA_COLUMNS[6];
}

for (int i = 0; i < columns.size(); i++) {
String columnName = columns.get(i).name();
positions.add(resultSet.findColumn(columnName));
}

if (populateInternalId) {
positions.add(resultSet.findColumn(METADATA_COLUMNS[6]));
}

return positions;
}

Expand Down
101 changes: 64 additions & 37 deletions src/test/java/com/singlestore/debezium/SingleStoreConnectionIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
import java.sql.*;
import java.time.Instant;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -27,11 +29,13 @@ public class SingleStoreConnectionIT extends IntegrationTestBase {

@Test
public void testConnection() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
conn.connect();
assertTrue(conn.isConnected());
assertTrue(conn.isValid());
assertEquals("jdbc:singlestore://" + TEST_SERVER + ":" + TEST_PORT + "/?connectTimeout=30000", conn.connectionString());
assertEquals("jdbc:singlestore://" + TEST_SERVER + ":" + TEST_PORT
+ "/?connectTimeout=30000", conn.connectionString());
conn.close();
assertFalse(conn.isConnected());
} catch (SQLException e) {
Expand All @@ -41,7 +45,8 @@ public void testConnection() {

@Test
public void testPrepareQuery() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
conn.execute("use " + TEST_DATABASE);
conn.prepareQuery("insert into person values(?, ?, ?, ?, ?)", ps -> {
ps.setString(1, "product4");
Expand All @@ -64,7 +69,8 @@ public void testPrepareQuery() {

@Test
public void testGetCurrentTimeStamp() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
Optional<Instant> timeStamp = conn.getCurrentTimestamp();
assertTrue(timeStamp.isPresent());
} catch (SQLException e) {
Expand All @@ -74,20 +80,29 @@ public void testGetCurrentTimeStamp() {

@Test
public void testMetadata() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Set<TableId> tableIds = conn.readAllTableNames(new String[]{"TABLE", "VIEW"}).stream().filter(t -> t.catalog().equals(TEST_DATABASE)).collect(Collectors.toSet());
Set<String> tableNames = tableIds.stream().map(TableId::table).collect(Collectors.toSet());
assertTrue("readAllTableNames doesn't contain correct table names", tableNames.containsAll(Arrays.asList("person", "product", "purchased")));
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
Set<TableId> tableIds = conn.readAllTableNames(new String[]{"TABLE", "VIEW"}).stream()
.filter(t -> t.catalog().equals(TEST_DATABASE)).collect(Collectors.toSet());
Set<String> tableNames = tableIds.stream().map(TableId::table).collect(Collectors
.toSet());
assertTrue("readAllTableNames doesn't contain correct table names", tableNames
.containsAll(Arrays.asList("person", "product", "purchased")));
Set<String> catalogNames = conn.readAllCatalogNames();
assertTrue("readAllCatalogNames returns a wrong catalog name", catalogNames.contains(TEST_DATABASE));
tableNames = conn.readTableNames(TEST_DATABASE, "", "person", new String[]{"TABLE", "VIEW"})
assertTrue("readAllCatalogNames returns a wrong catalog name", catalogNames.contains(
TEST_DATABASE));
tableNames = conn.readTableNames(TEST_DATABASE, "", "person", new String[]{"TABLE",
"VIEW"})
.stream().map(TableId::table).collect(Collectors.toSet());
assertTrue("readTableNames returns a wrong table name", tableNames.contains("person"));
TableId person = tableIds.stream().filter(t -> t.table().equals("person")).findFirst().orElseThrow();
TableId person = tableIds.stream().filter(t -> t.table().equals("person")).findFirst()
.orElseThrow();
List<String> pkList = conn.readPrimaryKeyNames(conn.connection().getMetaData(), person);
assertTrue(pkList.contains("name"));
TableId allTypes = tableIds.stream().filter(t -> t.table().equals("allTypesTable")).findFirst().orElseThrow();
List<String> uniqueList = conn.readTableUniqueIndices(conn.connection().getMetaData(), allTypes);
TableId allTypes = tableIds.stream().filter(t -> t.table().equals("allTypesTable"))
.findFirst().orElseThrow();
List<String> uniqueList = conn.readTableUniqueIndices(conn.connection().getMetaData(),
allTypes);
assertTrue(uniqueList.contains("intColumn"));
} catch (SQLException e) {
Assert.fail(e.getMessage());
Expand All @@ -96,14 +111,16 @@ public void testMetadata() {

@Test
public void testReadSchemaMetadata() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
Tables tables = new Tables();
conn.readSchema(tables, TEST_DATABASE, null, null, null, true);
Table person = tables.forTable(TEST_DATABASE, null, "person");
assertThat(person).isNotNull();
assertThat(person.filterColumns(col -> col.isAutoIncremented())).isEmpty();
assertThat(person.primaryKeyColumnNames()).containsOnly("name");
assertThat(person.retrieveColumnNames()).containsExactly("name", "birthdate", "age", "salary", "bitStr");
assertThat(person.retrieveColumnNames()).containsExactly("name", "birthdate", "age",
"salary", "bitStr");
assertThat(person.columnWithName("name").name()).isEqualTo("name");
assertThat(person.columnWithName("name").typeName()).isEqualTo("VARCHAR");
assertThat(person.columnWithName("name").jdbcType()).isEqualTo(Types.VARCHAR);
Expand Down Expand Up @@ -156,7 +173,8 @@ public void testReadSchemaMetadata() {
List<Column> autoIncColumns = product.filterColumns(Column::isAutoIncremented);
assertThat(autoIncColumns.get(0).name()).isEqualTo("id");
assertThat(product.primaryKeyColumnNames()).containsOnly("id");
assertThat(product.retrieveColumnNames()).containsExactly("id", "createdByDate", "modifiedDate");
assertThat(product.retrieveColumnNames()).containsExactly("id", "createdByDate",
"modifiedDate");
assertThat(product.columnWithName("id").name()).isEqualTo("id");
assertThat(product.columnWithName("id").typeName()).isEqualTo("BIGINT");
assertThat(product.columnWithName("id").jdbcType()).isEqualTo(Types.BIGINT);
Expand All @@ -169,15 +187,17 @@ public void testReadSchemaMetadata() {
assertThat(product.columnWithName("id").isOptional()).isFalse();
assertThat(product.columnWithName("createdByDate").name()).isEqualTo("createdByDate");
assertThat(product.columnWithName("createdByDate").typeName()).isEqualTo("DATETIME");
assertThat(product.columnWithName("createdByDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(product.columnWithName("createdByDate").jdbcType()).isEqualTo(
Types.TIMESTAMP);
assertThat(product.columnWithName("createdByDate").length()).isEqualTo(19);
assertFalse(product.columnWithName("createdByDate").scale().isPresent());
assertThat(product.columnWithName("createdByDate").position()).isEqualTo(2);
assertThat(product.columnWithName("createdByDate").isAutoIncremented()).isFalse();
assertThat(product.columnWithName("createdByDate").isOptional()).isFalse();
assertThat(product.columnWithName("modifiedDate").name()).isEqualTo("modifiedDate");
assertThat(product.columnWithName("modifiedDate").typeName()).isEqualTo("DATETIME");
assertThat(product.columnWithName("modifiedDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(product.columnWithName("modifiedDate").jdbcType()).isEqualTo(
Types.TIMESTAMP);
assertThat(product.columnWithName("modifiedDate").length()).isEqualTo(19);
assertFalse(product.columnWithName("modifiedDate").scale().isPresent());
assertThat(product.columnWithName("modifiedDate").position()).isEqualTo(3);
Expand All @@ -189,7 +209,8 @@ public void testReadSchemaMetadata() {
assertThat(purchased).isNotNull();
assertThat(person.filterColumns(col -> col.isAutoIncremented())).isEmpty();
assertThat(purchased.primaryKeyColumnNames()).containsOnly("productId", "purchaser");
assertThat(purchased.retrieveColumnNames()).containsExactly("purchaser", "productId", "purchaseDate");
assertThat(purchased.retrieveColumnNames()).containsExactly("purchaser", "productId",
"purchaseDate");
assertThat(purchased.columnWithName("purchaser").name()).isEqualTo("purchaser");
assertThat(purchased.columnWithName("purchaser").typeName()).isEqualTo("VARCHAR");
assertThat(purchased.columnWithName("purchaser").jdbcType()).isEqualTo(Types.VARCHAR);
Expand All @@ -210,7 +231,8 @@ public void testReadSchemaMetadata() {
assertThat(purchased.columnWithName("productId").isOptional()).isFalse();
assertThat(purchased.columnWithName("purchaseDate").name()).isEqualTo("purchaseDate");
assertThat(purchased.columnWithName("purchaseDate").typeName()).isEqualTo("DATETIME");
assertThat(purchased.columnWithName("purchaseDate").jdbcType()).isEqualTo(Types.TIMESTAMP);
assertThat(purchased.columnWithName("purchaseDate").jdbcType()).isEqualTo(
Types.TIMESTAMP);
assertThat(purchased.columnWithName("purchaseDate").length()).isEqualTo(19);
assertFalse(purchased.columnWithName("purchaseDate").scale().isPresent());
assertThat(purchased.columnWithName("purchaseDate").position()).isEqualTo(3);
Expand All @@ -223,7 +245,8 @@ public void testReadSchemaMetadata() {

@Test
public void testObserve() {
try (SingleStoreConnection conn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
try (SingleStoreConnection conn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
String tempTableName = "person_temporary1";
conn.execute("USE " + TEST_DATABASE,
"CREATE TABLE IF NOT EXISTS " + tempTableName + " ("
Expand All @@ -234,28 +257,32 @@ public void testObserve() {
+ " bitStr BIT(18)"
+ ")");
//3 partitions BeginSnapshot/CommitSnapshot
String[] expectedTypesOrder = {
"BeginSnapshot", "CommitSnapshot",
"BeginSnapshot", "CommitSnapshot",
"BeginSnapshot", "CommitSnapshot",
"BeginTransaction", "Insert", "CommitTransaction",
"BeginTransaction", "Insert", "CommitTransaction",
"BeginTransaction", "Delete", "CommitTransaction"};
List<String> expectedTypesOrder = Arrays.asList("BeginSnapshot", "CommitSnapshot",
"BeginSnapshot", "CommitSnapshot",
"BeginSnapshot", "CommitSnapshot",
"BeginTransaction", "Insert", "CommitTransaction",
"BeginTransaction", "Insert", "CommitTransaction",
"BeginTransaction", "Delete", "CommitTransaction");
List<String> actualTypes = new CopyOnWriteArrayList<>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
Thread observer = new Thread(() -> {
try (SingleStoreConnection observerConn = new SingleStoreConnection(defaultJdbcConnectionConfig())) {
Set<TableId> tableIds = observerConn.readAllTableNames(new String[]{"TABLE", "VIEW"}).stream().filter(t -> t.catalog().equals(TEST_DATABASE)).collect(Collectors.toSet());
Set<TableId> person = tableIds.stream().filter(t -> t.table().equals(tempTableName)).collect(Collectors.toSet());
try (SingleStoreConnection observerConn = new SingleStoreConnection(
defaultJdbcConnectionConfig())) {
Set<TableId> tableIds = observerConn.readAllTableNames(new String[]{"TABLE",
"VIEW"}).stream().filter(t -> t.catalog().equals(TEST_DATABASE))
.collect(Collectors.toSet());
Set<TableId> person = tableIds.stream().filter(t -> t.table().equals(
tempTableName)).collect(Collectors.toSet());
observerConn.observe(person, rs -> {
int counter = 0;
latch1.countDown();
while (counter < expectedTypesOrder.length && rs.next()) {
while (counter < expectedTypesOrder.size() && rs.next()) {
actualTypes.add(ObserveResultSetUtils.snapshotType(rs));
counter++;
}
((com.singlestore.jdbc.Connection)rs.getStatement().getConnection()).cancelCurrentQuery();
((com.singlestore.jdbc.Connection) rs.getStatement().getConnection())
.cancelCurrentQuery();
latch2.countDown();
});
} catch (Exception e) {
Expand All @@ -272,10 +299,10 @@ public void testObserve() {
"delete from " + tempTableName + " where name = 'product1'");
latch2.await();
observer.interrupt();
for (int i = 0; i < expectedTypesOrder.length; i++) {
assertEquals(expectedTypesOrder[i], actualTypes.get(i));
}
conn.execute("DROP TABLE " + tempTableName);

Collections.sort(expectedTypesOrder);
Collections.sort(actualTypes);
assertEquals(expectedTypesOrder, actualTypes);
} catch (SQLException | InterruptedException e) {
Assert.fail(e.getMessage());
}
Expand Down
Loading

0 comments on commit 9bed757

Please sign in to comment.