From c3fa16af202828e6074f8dfc05f4ef195fbc0e57 Mon Sep 17 00:00:00 2001 From: Ryanne Dolan Date: Tue, 9 Apr 2024 13:09:02 -0500 Subject: [PATCH] Use implicit row constructors. Optionally skip nested rows. Nullability bug fixes. (#64) --- .../hoptimator/catalog/AvroConverter.java | 46 ++++++++++---- .../linkedin/hoptimator/catalog/DataType.java | 25 +++++++- .../hoptimator/catalog/ScriptImplementor.java | 63 +++++++++++++++---- .../hoptimator/catalog/TableResolver.java | 22 +++++++ .../hoptimator/catalog/AvroConverterTest.java | 42 +++++++++++++ .../hoptimator/catalog/DataTypeTest.java | 23 +++++++ .../catalog/ScriptImplementorTest.java | 10 +-- .../catalog/kafka/RawKafkaSchemaFactory.java | 4 +- .../subscription/SubscriptionReconciler.java | 1 + .../hoptimator/planner/PipelineRel.java | 3 + 10 files changed, 204 insertions(+), 35 deletions(-) create mode 100644 hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java create mode 100644 hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java index ea31f4af..3b2694e6 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/AvroConverter.java @@ -27,7 +27,8 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { .filter(x -> !x.getName().startsWith("__")) // don't write out hidden fields .map(x -> new Schema.Field(sanitize(x.getName()), avro(namespace, x.getName(), x.getType()), describe(x), null)) .collect(Collectors.toList()); - return Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields); + return createAvroSchemaWithNullability(Schema.createRecord(sanitize(name), dataType.toString(), namespace, false, fields), + dataType.isNullable()); } else { switch (dataType.getSqlTypeName()) { case INTEGER: @@ -42,6 +43,15 @@ public static Schema avro(String namespace, String name, RelDataType dataType) { return createAvroTypeWithNullability(Schema.Type.DOUBLE, dataType.isNullable()); case CHAR: return createAvroTypeWithNullability(Schema.Type.STRING, dataType.isNullable()); + case BOOLEAN: + return createAvroTypeWithNullability(Schema.Type.BOOLEAN, dataType.isNullable()); + case ARRAY: + return createAvroSchemaWithNullability(Schema.createArray(avro(null, null, dataType.getComponentType())), + dataType.isNullable()); + // TODO support map types + // Appears to require a Calcite version bump + // case MAP: + // return createAvroSchemaWithNullability(Schema.createMap(avroPrimitive(dataType.getValueType())), dataType.isNullable()); case UNKNOWN: case NULL: return Schema.createUnion(Schema.create(Schema.Type.NULL)); @@ -56,14 +66,18 @@ public static Schema avro(String namespace, String name, RelProtoDataType relPro return avro(namespace, name, relProtoDataType.apply(factory)); } - private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { + private static Schema createAvroSchemaWithNullability(Schema schema, boolean nullable) { if (nullable) { - return Schema.createUnion(Schema.create(Schema.Type.NULL), Schema.create(rawType)); + return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); } else { - return Schema.create(rawType); + return schema; } } + private static Schema createAvroTypeWithNullability(Schema.Type rawType, boolean nullable) { + return createAvroSchemaWithNullability(Schema.create(rawType), nullable); + } + public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { RelDataType unknown = typeFactory.createUnknownType(); switch (schema.getType()) { @@ -74,17 +88,25 @@ public static RelDataType rel(Schema schema, RelDataTypeFactory typeFactory) { .filter(x -> x.getValue().getSqlTypeName() != unknown.getSqlTypeName()) .collect(Collectors.toList())); case INT: - // schema.isNullable() should be false for basic types iiuc - return createRelTypeWithNullability(typeFactory, SqlTypeName.INTEGER, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.INTEGER); case LONG: - return createRelTypeWithNullability(typeFactory, SqlTypeName.BIGINT, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.BIGINT); case ENUM: + case FIXED: case STRING: - return createRelTypeWithNullability(typeFactory, SqlTypeName.VARCHAR, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.VARCHAR); case FLOAT: - return createRelTypeWithNullability(typeFactory, SqlTypeName.FLOAT, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.FLOAT); case DOUBLE: - return createRelTypeWithNullability(typeFactory, SqlTypeName.DOUBLE, schema.isNullable()); + return createRelType(typeFactory, SqlTypeName.DOUBLE); + case BOOLEAN: + return createRelType(typeFactory, SqlTypeName.BOOLEAN); + case ARRAY: + return typeFactory.createArrayType(rel(schema.getElementType(), typeFactory), -1); +// TODO support map types +// Appears to require a Calcite version bump +// case MAP: +// return typeFactory.createMapType(typeFactory.createSqlType(SqlTypeName.VARCHAR), rel(schema.getValueType(), typeFactory)); case UNION: if (schema.isNullable() && schema.getTypes().size() == 2) { Schema innerType = schema.getTypes().stream().filter(x -> x.getType() != Schema.Type.NULL).findFirst().get(); @@ -102,9 +124,9 @@ public static RelDataType rel(Schema schema) { return rel(schema, DataType.DEFAULT_TYPE_FACTORY); } - private static RelDataType createRelTypeWithNullability(RelDataTypeFactory typeFactory, SqlTypeName typeName, boolean nullable) { + private static RelDataType createRelType(RelDataTypeFactory typeFactory, SqlTypeName typeName) { RelDataType rawType = typeFactory.createSqlType(typeName); - return typeFactory.createTypeWithNullability(rawType, nullable); + return typeFactory.createTypeWithNullability(rawType, false); } public static RelProtoDataType proto(Schema schema) { diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java index bfad74dc..2da14063 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/DataType.java @@ -14,7 +14,7 @@ /** Common data types. Not authoratitive or exhaustive. */ public enum DataType { - VARCHAR_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)), + VARCHAR(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), true)), VARCHAR_NOT_NULL(x -> x.createTypeWithNullability(x.createSqlType(SqlTypeName.VARCHAR), false)); public static final RelDataTypeFactory DEFAULT_TYPE_FACTORY = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); @@ -56,16 +56,24 @@ public static Struct struct(RelDataType relDataType) { /** Convenience builder for non-scalar types */ public interface Struct extends RelProtoDataType { - default Struct with(String name, DataType dataType) { + default Struct with(String name, RelDataType dataType) { return x -> { RelDataType existing = apply(x); RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x); builder.addAll(existing.getFieldList()); - builder.add(name, dataType.rel(x)); + builder.add(name, dataType); return builder.build(); }; } + default Struct with(String name, DataType dataType) { + return with(name, dataType.rel()); + } + + default Struct with(String name, Struct struct) { + return with(name, struct.rel()); + } + default RelDataType rel() { return apply(DEFAULT_TYPE_FACTORY); } @@ -85,6 +93,17 @@ default Struct drop(String name) { }; } + default Struct dropNestedRows() { + return x -> { + RelDataType dataType = apply(x); + RelDataTypeFactory.Builder builder = new RelDataTypeFactory.Builder(x); + builder.addAll(dataType.getFieldList().stream() + .filter(y -> y.getType().getSqlTypeName() != SqlTypeName.ROW) + .collect(Collectors.toList())); + return builder.build(); + }; + } + default Struct get(String name) { return x -> { RelDataTypeField field = apply(x).getField(name, true, false); diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java index 64977959..e9368efa 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/ScriptImplementor.java @@ -1,23 +1,33 @@ package com.linkedin.hoptimator.catalog; +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelDataTypeField; +import org.apache.calcite.rel.type.RelDataTypeSystem; +import org.apache.calcite.rel.rel2sql.RelToSqlConverter; +import org.apache.calcite.rel.rel2sql.SqlImplementor; import org.apache.calcite.sql.SqlWriter; -//import org.apache.calcite.sql.SqlWriterConfig; +// needed in next Calcite version +// import org.apache.calcite.sql.SqlWriterConfig; import org.apache.calcite.sql.SqlDataTypeSpec; -import org.apache.calcite.sql.SqlRowTypeNameSpec; import org.apache.calcite.sql.SqlBasicTypeNameSpec; +import org.apache.calcite.sql.SqlCollectionTypeNameSpec; +import org.apache.calcite.sql.SqlRowTypeNameSpec; +import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlKind; +import org.apache.calcite.sql.SqlNode; +import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlRowTypeNameSpec; +import org.apache.calcite.sql.SqlSelect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.sql.fun.SqlRowOperator; import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; import org.apache.calcite.sql.pretty.SqlPrettyWriter; -import org.apache.calcite.rel.RelNode; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeFactory; -import org.apache.calcite.rel.type.RelDataTypeField; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.rel.rel2sql.RelToSqlConverter; -import org.apache.calcite.rel.rel2sql.SqlImplementor; +import org.apache.calcite.sql.type.SqlTypeFactoryImpl; +import org.apache.calcite.sql.util.SqlShuttle; import java.util.Map; import java.util.List; @@ -94,6 +104,7 @@ default String sql() { /** Render the script as DDL/SQL in the given dialect */ default String sql(SqlDialect dialect) { SqlWriter w = new SqlPrettyWriter(dialect); +// TODO: fix in next Calcite version // above is deprecated; replace with: // SqlWriter w = new SqlPrettyWriter(SqlWriterConfig.of().withDialect(dialect)); implement(w); @@ -129,9 +140,31 @@ public QueryImplementor(RelNode relNode) { public void implement(SqlWriter w) { RelToSqlConverter converter = new RelToSqlConverter(w.getDialect()); SqlImplementor.Result result = converter.visitRoot(relNode); - w.literal(result.asSelect().toSqlString(w.getDialect()).getSql()); + SqlSelect select = result.asSelect(); + if (select.getSelectList() != null) { + select.setSelectList((SqlNodeList) select.getSelectList().accept(REMOVE_ROW_CONSTRUCTOR)); + } + w.literal(select.toSqlString(w.getDialect()).getSql()); } - } + + // A `ROW(...)` operator which will unparse as just `(...)`. + private final SqlRowOperator IMPLIED_ROW_OPERATOR = new SqlRowOperator(""); // empty string name + + // a shuttle that replaces `Row(...)` with just `(...)` + private final SqlShuttle REMOVE_ROW_CONSTRUCTOR = new SqlShuttle() { + @Override + public SqlNode visit(SqlCall call) { + List operands = call.getOperandList().stream().map(x -> x.accept(this)).collect(Collectors.toList()); + if ((call.getKind() == SqlKind.ROW || call.getKind() == SqlKind.COLUMN_LIST + || call.getOperator() instanceof SqlRowOperator) + && operands.size() > 1) { + return IMPLIED_ROW_OPERATOR.createCall(call.getParserPosition(), operands); + } else { + return call.getOperator().createCall(call.getParserPosition(), operands); + } + } + }; + } /** * Implements a CREATE TABLE...WITH... DDL statement. @@ -291,6 +324,10 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { .map(x -> toSpec(x)) .collect(Collectors.toList()); return maybeNullable(dataType, new SqlDataTypeSpec(new SqlRowTypeNameSpec(SqlParserPos.ZERO, fieldNames, fieldTypes), SqlParserPos.ZERO)); + } if (dataType.getComponentType() != null) { + return maybeNullable(dataType, new SqlDataTypeSpec(new SqlCollectionTypeNameSpec(new SqlBasicTypeNameSpec( + dataType.getComponentType().getSqlTypeName(), SqlParserPos.ZERO), dataType.getSqlTypeName(), SqlParserPos.ZERO), + SqlParserPos.ZERO)); } else { return maybeNullable(dataType, new SqlDataTypeSpec(new SqlBasicTypeNameSpec(dataType.getSqlTypeName(), SqlParserPos.ZERO), SqlParserPos.ZERO)); } @@ -298,7 +335,7 @@ private static SqlDataTypeSpec toSpec(RelDataType dataType) { private static SqlDataTypeSpec maybeNullable(RelDataType dataType, SqlDataTypeSpec spec) { if (!dataType.isNullable()) { - return spec.withNullable(true); + return spec.withNullable(false); } else { // we don't want "VARCHAR NULL", only "VARCHAR NOT NULL" return spec; diff --git a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java index 4109de7f..3c12f5d5 100644 --- a/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java +++ b/hoptimator-catalog/src/main/java/com/linkedin/hoptimator/catalog/TableResolver.java @@ -2,13 +2,19 @@ import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.rel.type.RelProtoDataType; import java.util.concurrent.ExecutionException; +import java.util.function.Function; /** Resolves a table name into a concrete row type. Usually involves a network call. */ public interface TableResolver { RelDataType resolve(String table) throws InterruptedException, ExecutionException; + static TableResolver from(Function f) { + return x -> f.apply(x); + } + /** Appends an extra column to the resolved type */ default TableResolver with(String name, RelDataType dataType) { return x -> { @@ -19,4 +25,20 @@ default TableResolver with(String name, RelDataType dataType) { return builder.build(); }; } + + default TableResolver with(String name, DataType dataType) { + return with(name, dataType.rel()); + } + + default TableResolver with(String name, DataType.Struct struct) { + return with(name, struct.rel()); + } + + default TableResolver mapStruct(Function f) { + return x -> f.apply(DataType.struct(resolve(x))).rel(); + } + + default TableResolver map(Function f) { + return x -> f.apply(resolve(x)); + } } diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java new file mode 100644 index 00000000..005f4b60 --- /dev/null +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/AvroConverterTest.java @@ -0,0 +1,42 @@ +package com.linkedin.hoptimator.catalog; + +import org.apache.calcite.plan.RelOptUtil; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.util.Litmus; +import org.apache.avro.Schema; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class AvroConverterTest { + + @Test + public void convertsNestedSchemas() { + String schemaString = "{\"type\":\"record\",\"name\":\"E\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"h\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"H\",\"namespace\":\"ns\",\"fields\":[{\"name\":\"A\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"A\",\"fields\":[]}]}]}]}]}"; + + Schema avroSchema1 = (new Schema.Parser()).parse(schemaString); + RelDataType rel1 = AvroConverter.rel(avroSchema1); + assertEquals(rel1.toString(), rel1.getFieldCount(), avroSchema1.getFields().size()); + assertTrue(rel1.toString(), rel1.getField("h", false, false) != null); + RelDataType rel2 = rel1.getField("h", false, false).getType(); + assertTrue(rel2.toString(), rel2.isNullable()); + Schema avroSchema2 = avroSchema1.getField("h").schema().getTypes().get(1); + assertEquals(rel2.toString(), rel2.getFieldCount(), avroSchema2.getFields().size()); + assertTrue(rel2.toString(), rel2.getField("A", false, false) != null); + RelDataType rel3 = rel2.getField("A", false, false).getType(); + assertTrue(rel3.toString(), rel3.isNullable()); + Schema avroSchema3 = avroSchema2.getField("A").schema().getTypes().get(1); + assertEquals(rel3.toString(), rel3.getFieldCount(), avroSchema3.getFields().size()); + Schema avroSchema4 = AvroConverter.avro("NS", "R", rel1); + assertTrue("!avroSchema4.isNullable()", !avroSchema4.isNullable()); + assertEquals(avroSchema4.toString(), avroSchema4.getFields().size(), rel1.getFieldCount()); + Schema avroSchema5 = AvroConverter.avro("NS", "R", rel2); + assertTrue("avroSchema5.isNullable()", avroSchema5.isNullable()); + assertEquals(avroSchema5.toString(), avroSchema5.getTypes().get(1).getFields().size(), rel2.getFieldCount()); + Schema avroSchema6 = AvroConverter.avro("NS", "R", rel3); + assertEquals(avroSchema6.toString(), avroSchema6.getTypes().get(1).getFields().size(), rel3.getFieldCount()); + RelDataType rel4 = AvroConverter.rel(avroSchema4); + assertTrue("types match", RelOptUtil.eq("rel4", rel4, "rel1", rel1, Litmus.THROW)); + } +} diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java new file mode 100644 index 00000000..e5788ba6 --- /dev/null +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/DataTypeTest.java @@ -0,0 +1,23 @@ +package com.linkedin.hoptimator.catalog; + +import org.apache.calcite.rel.type.RelDataType; + +import static org.junit.Assert.assertTrue; +import org.junit.Test; + +public class DataTypeTest { + + @Test + public void skipsNestedRows() { + DataType.Struct struct = DataType.struct().with("one", DataType.VARCHAR) + .with("two", DataType.struct().with("three", DataType.VARCHAR)); + RelDataType row1 = struct.rel(); + assertTrue(row1.toString(), row1.getFieldCount() == 2); + assertTrue(row1.toString(), row1.getField("one", false, false) != null); + assertTrue(row1.toString(), row1.getField("two", false, false) != null); + RelDataType row2 = struct.dropNestedRows().rel(); + assertTrue(row2.toString(), row2.getFieldCount() == 1); + assertTrue(row2.toString(), row2.getField("one", false, false) != null); + assertTrue(row2.toString(), row2.getField("two", false, false) == null); + } +} diff --git a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java index 40a2614d..763bc1ea 100644 --- a/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java +++ b/hoptimator-catalog/src/test/java/com/linkedin/hoptimator/catalog/ScriptImplementorTest.java @@ -28,10 +28,10 @@ public void implementsFlinkCreateTableDDL() { // Output isn't necessarily deterministic, but should be something like: // CREATE TABLE IF NOT EXISTS "DATABASE"."TABLE1" ("idValue1" VARCHAR) WITH // ('connector'='kafka', 'properties.bootstrap.servers'='localhost:9092', 'topic'='topic1') - assertTrue(out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH ")); - assertTrue(out.contains("'connector'='kafka'")); - assertTrue(out.contains("'properties.bootstrap.servers'='localhost:9092'")); - assertTrue(out.contains("'topic'='topic1'")); - assertFalse(out.contains("Row")); + assertTrue(out, out.contains("CREATE TABLE IF NOT EXISTS \"DATABASE\".\"TABLE1\" (\"idValue1\" VARCHAR) WITH ")); + assertTrue(out, out.contains("'connector'='kafka'")); + assertTrue(out, out.contains("'properties.bootstrap.servers'='localhost:9092'")); + assertTrue(out, out.contains("'topic'='topic1'")); + assertFalse(out, out.contains("Row")); } } diff --git a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java index 7b02cbad..b151d0e4 100644 --- a/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java +++ b/hoptimator-kafka-adapter/src/main/java/com/linkedin/hoptimator/catalog/kafka/RawKafkaSchemaFactory.java @@ -27,8 +27,8 @@ public Schema create(SchemaPlus parentSchema, String name, Map o String principal = (String) operand.getOrDefault("principal", "User:ANONYMOUS"); Map clientConfig = (Map) operand.get("clientConfig"); DataType.Struct rowType = DataType.struct() - .with("PAYLOAD", DataType.VARCHAR_NULL) - .with("KEY", DataType.VARCHAR_NULL); + .with("PAYLOAD", DataType.VARCHAR) + .with("KEY", DataType.VARCHAR); ConfigProvider connectorConfigProvider = ConfigProvider.from(clientConfig) .withPrefix("properties.") .with("connector", "upsert-kafka") diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index cf52c63a..9adbe227 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -148,6 +148,7 @@ public Result reconcile(Request request) { // Mark the Subscription as failed. status.setFailed(true); status.setMessage("Error: " + e.getMessage()); + result = new Result(true, operator.failureRetryDuration()); } } else if (status.getReady() == null && status.getResources() != null) { // Phase 2 diff --git a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java index 1ce2cb65..6cfcb846 100644 --- a/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java +++ b/hoptimator-planner/src/main/java/com/linkedin/hoptimator/planner/PipelineRel.java @@ -1,12 +1,14 @@ package com.linkedin.hoptimator.planner; import org.apache.calcite.plan.Convention; +import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelProtoDataType; import org.apache.calcite.rel.type.RelDataTypeImpl; import org.apache.calcite.sql.SqlDialect; import org.apache.calcite.sql.dialect.AnsiSqlDialect; +import org.apache.calcite.util.Litmus; import com.linkedin.hoptimator.catalog.Resource; import com.linkedin.hoptimator.catalog.ResourceProvider; @@ -77,6 +79,7 @@ public ScriptImplementor query() { /** Script ending in INSERT INTO ... */ public ScriptImplementor insertInto(HopTable sink) { + RelOptUtil.eq(sink.name(), sink.rowType(), "subscription", rowType(), Litmus.THROW); return script.database(sink.database()).with(sink) .insert(sink.database(), sink.name(), relNode); }