Skip to content

Commit

Permalink
Support null keys via magic field
Browse files Browse the repository at this point in the history
  • Loading branch information
ryannedolan committed Jul 13, 2024
1 parent 92f1864 commit 78c03b0
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 4 deletions.
2 changes: 1 addition & 1 deletion deploy/samples/subscriptions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ kind: Subscription
metadata:
name: names
spec:
sql: SELECT NAME, NAME AS KEY FROM DATAGEN.PERSON
sql: SELECT NAME, NULL AS KEY FROM DATAGEN.PERSON
database: RAWKAFKA


Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ public SqlNode visit(SqlCall call) {
*
* N.B. the following magic:
* - field 'PRIMARY_KEY' is treated as a PRIMARY KEY
* - field 'NULL_KEY' is treated as a COMPUTED COLUMN with value NULL
*/
class ConnectorImplementor implements ScriptImplementor {
private final String database;
Expand Down Expand Up @@ -288,7 +289,11 @@ public void implement(SqlWriter w) {
}
}

/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER` */
/** Implements row type specs, e.g. `NAME VARCHAR(20), AGE INTEGER`.
*
* N.B. the following magic:
* - field 'NULL_KEY' is treated as a COMPUTED COLUMN with value NULL
*/
class RowTypeSpecImplementor implements ScriptImplementor {
private final RelDataType dataType;

Expand All @@ -308,8 +313,12 @@ public void implement(SqlWriter w) {
.collect(Collectors.toList());
for (int i = 0; i < fieldNames.size(); i++) {
w.sep(",");
fieldNames.get(i).unparse(w, 0, 0);
fieldTypes.get(i).unparse(w, 0, 0);
if (fieldNames.get(i).toString().equals("NULL_KEY")) {
w.literal("KEY AS NULL");
} else {
fieldNames.get(i).unparse(w, 0, 0);
fieldTypes.get(i).unparse(w, 0, 0);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,26 @@ public void implementsFlinkCreateTableDDL() {
assertTrue(out, out.contains("'topic'='topic1'"));
assertFalse(out, out.contains("Row"));
}

@Test
public void magicPrimaryKey() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("PRIMARY_KEY", DataType.VARCHAR).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("PRIMARY KEY (PRIMARY_KEY)"));
}

@Test
public void magicNullKey() {
SqlWriter w = new SqlPrettyWriter();
RelDataType rowType = DataType.struct().with("F1", DataType.VARCHAR)
.with("NULL_KEY", DataType.VARCHAR).rel();
HopTable table = new HopTable("DATABASE", "TABLE1", rowType, ConfigProvider.empty().config("x"));
table.implement(w);
String out = w.toString();
assertTrue(out, out.contains("KEY AS NULL"));
}
}

0 comments on commit 78c03b0

Please sign in to comment.