Skip to content

Commit

Permalink
Merge pull request #110 from databrickslabs/revert-108-issue_94
Browse files Browse the repository at this point in the history
Revert "Issue 94"
  • Loading branch information
ravi-databricks authored Oct 24, 2024
2 parents 024ef5a + d0e389d commit 4307fdf
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 180 deletions.
94 changes: 29 additions & 65 deletions src/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,18 +108,14 @@ def __post_init__(self):
class DeployCommand:
"""Class representing the deploy command."""
layer: str
onboard_group: str
dlt_meta_schema: str
dataflowspec_table: str
pipeline_name: str
dlt_target_schema: str
onboard_bronze_group: None
onboard_silver_group: None
dlt_meta_bronze_schema: None
dlt_meta_silver_schema: None
dataflowspec_bronze_table: str = None
dataflowspec_silver_table: str = None
num_workers: int = None
uc_catalog_name: str = None
dataflowspec_bronze_path: str = None
dataflowspec_silver_path: str = None
dataflowspec_path: str = None
uc_enabled: bool = False
serverless: bool = False
dbfs_path: str = None
Expand All @@ -131,11 +127,11 @@ def __post_init__(self):
raise ValueError("num_workers is required")
if not self.layer:
raise ValueError("layer is required")
if not self.onboard_bronze_group or not self.onboard_silver_group:
if not self.onboard_group:
raise ValueError("onboard_group is required")
if not self.dataflowspec_bronze_table or not self.dataflowspec_silver_table:
if not self.dataflowspec_table:
raise ValueError("dataflowspec_table is required")
if not self.uc_enabled and (not self.dataflowspec_bronze_path or not self.dataflowspec_silver_path):
if not self.uc_enabled and not self.dataflowspec_path:
raise ValueError("dataflowspec_path is required")
if not self.pipeline_name:
raise ValueError("pipeline_name is required")
Expand Down Expand Up @@ -337,30 +333,14 @@ def _create_dlt_meta_pipeline(self, cmd: DeployCommand):
self._ws.workspace.upload(runner_notebook_path, runner_notebook_py, overwrite=True)
configuration = {
"layer": cmd.layer,
f"{cmd.layer}.group": cmd.onboard_group,
}
created = None
configuration["version"] = self.version
if cmd.uc_catalog_name:
if cmd.layer == "bronze_silver":
configuration["bronze.group"] = cmd.onboard_bronze_group
configuration["silver.group"] = cmd.onboard_silver_group
configuration["bronze.dataflowspecTable"] = (
f"{cmd.uc_catalog_name}.{cmd.dlt_meta_bronze_schema}.{cmd.dataflowspec_bronze_table}"
)
configuration["silver.dataflowspecTable"] = (
f"{cmd.uc_catalog_name}.{cmd.dlt_meta_silver_schema}.{cmd.dataflowspec_silver_table}"
)
if cmd.layer == "bronze":
configuration["bronze.group"] = cmd.onboard_bronze_group
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.uc_catalog_name}.{cmd.dlt_meta_bronze_schema}.{cmd.dataflowspec_bronze_table}"
)
configuration["bronze.group"] = cmd.onboard_bronze_group
if cmd.layer == "silver":
configuration["silver.group"] = cmd.onboard_silver_group
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.uc_catalog_name}.{cmd.dlt_meta_silver_schema}.{cmd.dataflowspec_silver_table}"
)
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.uc_catalog_name}.{cmd.dlt_meta_schema}.{cmd.dataflowspec_table}"
)
created = self._ws.pipelines.create(catalog=cmd.uc_catalog_name,
name=cmd.pipeline_name,
configuration=configuration,
Expand All @@ -371,33 +351,17 @@ def _create_dlt_meta_pipeline(self, cmd: DeployCommand):
)
)
],
# target=cmd.dlt_target_schema,
schema=cmd.dlt_target_schema,
target=cmd.dlt_target_schema,
clusters=[pipelines.PipelineCluster(label="default",
num_workers=cmd.num_workers)]
if not cmd.serverless else None,
serverless=cmd.serverless if cmd.uc_enabled else None,
channel="PREVIEW" if cmd.serverless else None
)
else:
if cmd.layer == "bronze_silver":
configuration["bronze.group"] = cmd.onboard_bronze_group
configuration["silver.group"] = cmd.onboard_silver_group
configuration["bronze.dataflowspecTable"] = (
f"{cmd.dlt_meta_bronze_schema}.{cmd.dataflowspec_bronze_table}"
)
configuration["silver.dataflowspecTable"] = (
f"{cmd.dlt_meta_silver_schema}.{cmd.dataflowspec_silver_table}"
)
if cmd.layer == "bronze":
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.dlt_meta_bronze_schema}.{cmd.dataflowspec_bronze_table}"
)
if cmd.layer == "silver":
configuration["silver.group"] = cmd.onboard_silver_group
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.dlt_meta_silver_schema}.{cmd.dataflowspec_silver_table}"
)
configuration[f"{cmd.layer}.dataflowspecTable"] = (
f"{cmd.dlt_meta_schema}.{cmd.dataflowspec_table}"
)
created = self._ws.pipelines.create(
name=cmd.pipeline_name,
configuration=configuration,
Expand Down Expand Up @@ -514,23 +478,23 @@ def _load_deploy_config(self) -> DeployCommand:
else:
deploy_cmd_dict["serverless"] = False
deploy_cmd_dict["layer"] = self._wsi._choice(
"Provide dlt meta layer", ['bronze', 'silver', 'bronze_silver'])
if deploy_cmd_dict["layer"] == "bronze" or deploy_cmd_dict["layer"] == "bronze_silver":
deploy_cmd_dict["onboard_bronze_group"] = self._wsi._question(
"Provide dlt meta onboard bronze group")
deploy_cmd_dict["dlt_meta_bronze_schema"] = self._wsi._question(
"Provide dlt_meta bronze dataflowspec schema name")
deploy_cmd_dict["dataflowspec_bronze_table"] = self._wsi._question(
"Provide dlt meta layer", ['bronze', 'silver'])
if deploy_cmd_dict["layer"] == "bronze":
deploy_cmd_dict["onboard_group"] = self._wsi._question(
"Provide dlt meta onboard group")
deploy_cmd_dict["dlt_meta_schema"] = self._wsi._question(
"Provide dlt_meta dataflowspec schema name")
deploy_cmd_dict["dataflowspec_table"] = self._wsi._question(
"Provide bronze dataflowspec table name", default='bronze_dataflowspec')
if not deploy_cmd_dict["uc_enabled"]:
deploy_cmd_dict["dataflowspec_bronze_path"] = self._wsi._question(
deploy_cmd_dict["dataflowspec_path"] = self._wsi._question(
"Provide bronze dataflowspec path", default=f'{self._install_folder()}/bronze_dataflow_specs')
if deploy_cmd_dict["layer"] == "silver" or deploy_cmd_dict["layer"] == "bronze_silver":
deploy_cmd_dict["onboard_silver_group"] = self._wsi._question(
"Provide dlt meta silver onboard group")
deploy_cmd_dict["dlt_meta_silver_schema"] = self._wsi._question(
"Provide dlt_meta silver dataflowspec schema name")
deploy_cmd_dict["dataflowspec_silver_table"] = self._wsi._question(
if deploy_cmd_dict["layer"] == "silver":
deploy_cmd_dict["onboard_group"] = self._wsi._question(
"Provide dlt meta onboard group")
deploy_cmd_dict["dlt_meta_schema"] = self._wsi._question(
"Provide dlt_meta dataflowspec schema name")
deploy_cmd_dict["dataflowspec_table"] = self._wsi._question(
"Provide silver dataflowspec table name", default='silver_dataflowspec')
if not deploy_cmd_dict["uc_enabled"]:
deploy_cmd_dict["dataflowspec_path"] = self._wsi._question(
Expand Down
93 changes: 32 additions & 61 deletions src/dataflow_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def __initialize_dataflow_pipeline(
else:
self.schema_json = None
else:
self.schema_json = None
self.next_snapshot_and_version = None
self.appy_changes_from_snapshot = None
if isinstance(dataflow_spec, SilverDataflowSpec):
Expand Down Expand Up @@ -441,9 +442,29 @@ def cdc_apply_changes(self):
if cdc_apply_changes is None:
raise Exception("cdcApplychanges is None! ")

struct_schema = None
if self.schema_json:
struct_schema = self.modify_schema_for_cdc_changes(cdc_apply_changes)
struct_schema = (
StructType.fromJson(self.schema_json)
if isinstance(self.dataflowSpec, BronzeDataflowSpec)
else self.silver_schema
)

sequenced_by_data_type = None

if cdc_apply_changes.except_column_list:
modified_schema = StructType([])
if struct_schema:
for field in struct_schema.fields:
if field.name not in cdc_apply_changes.except_column_list:
modified_schema.add(field)
if field.name == cdc_apply_changes.sequence_by:
sequenced_by_data_type = field.dataType
struct_schema = modified_schema
else:
raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ")

if struct_schema and cdc_apply_changes.scd_type == "2":
struct_schema.add(StructField("__START_AT", sequenced_by_data_type))
struct_schema.add(StructField("__END_AT", sequenced_by_data_type))

target_path = None if self.uc_enabled else self.dataflowSpec.targetDetails["path"]

Expand All @@ -458,7 +479,7 @@ def cdc_apply_changes(self):
apply_as_truncates = expr(cdc_apply_changes.apply_as_truncates)

dlt.apply_changes(
target=f"{self.dataflowSpec.targetDetails['database']}.{self.dataflowSpec.targetDetails['table']}",
target=f"{self.dataflowSpec.targetDetails['table']}",
source=self.view_name,
keys=cdc_apply_changes.keys,
sequence_by=cdc_apply_changes.sequence_by,
Expand All @@ -477,40 +498,10 @@ def cdc_apply_changes(self):
ignore_null_updates_except_column_list=cdc_apply_changes.ignore_null_updates_except_column_list
)

def modify_schema_for_cdc_changes(self, cdc_apply_changes):
if isinstance(self.dataflowSpec, BronzeDataflowSpec) and self.schema_json is None:
return None
if isinstance(self.dataflowSpec, SilverDataflowSpec) and self.silver_schema is None:
return None
struct_schema = (
StructType.fromJson(self.schema_json)
if isinstance(self.dataflowSpec, BronzeDataflowSpec)
else self.silver_schema
)

sequenced_by_data_type = None

if cdc_apply_changes.except_column_list:
modified_schema = StructType([])
if struct_schema:
for field in struct_schema.fields:
if field.name not in cdc_apply_changes.except_column_list:
modified_schema.add(field)
if field.name == cdc_apply_changes.sequence_by:
sequenced_by_data_type = field.dataType
struct_schema = modified_schema
else:
raise Exception(f"Schema is None for {self.dataflowSpec} for cdc_apply_changes! ")

if struct_schema and cdc_apply_changes.scd_type == "2":
struct_schema.add(StructField("__START_AT", sequenced_by_data_type))
struct_schema.add(StructField("__END_AT", sequenced_by_data_type))
return struct_schema

def create_streaming_table(self, struct_schema, target_path=None):
expect_all_dict, expect_all_or_drop_dict, expect_all_or_fail_dict = self.get_dq_expectations()
dlt.create_streaming_table(
name=f"{self.dataflowSpec.targetDetails['database']}.{self.dataflowSpec.targetDetails['table']}",
name=f"{self.dataflowSpec.targetDetails['table']}",
table_properties=self.dataflowSpec.tableProperties,
partition_cols=DataflowSpecUtils.get_partition_cols(self.dataflowSpec.partitionColumns),
path=target_path,
Expand Down Expand Up @@ -556,41 +547,19 @@ def run_dlt(self):
self.write()

@staticmethod
def invoke_dlt_pipeline(spark,
layer,
bronze_custom_transform_func=None,
silver_custom_transform_func=None,
next_snapshot_and_version: Callable = None
):
def invoke_dlt_pipeline(spark, layer, custom_transform_func=None, next_snapshot_and_version: Callable = None):
"""Invoke dlt pipeline will launch dlt with given dataflowspec.
Args:
spark (_type_): _description_
layer (_type_): _description_
"""
dataflowspec_list = None
if "bronze" == layer.lower():
dataflowspec_list = DataflowSpecUtils.get_bronze_dataflow_spec(spark)
DataflowPipeline._launch_dlt_flow(spark, "bronze", dataflowspec_list, bronze_custom_transform_func)
elif "silver" == layer.lower():
dataflowspec_list = DataflowSpecUtils.get_silver_dataflow_spec(spark)
DataflowPipeline._launch_dlt_flow(spark, "silver", dataflowspec_list, silver_custom_transform_func)
elif "bronze_silver" == layer.lower():
bronze_dataflowspec_list = DataflowSpecUtils.get_bronze_dataflow_spec(spark)
DataflowPipeline._launch_dlt_flow(
spark, "bronze", bronze_dataflowspec_list, bronze_custom_transform_func
)
silver_dataflowspec_list = DataflowSpecUtils.get_silver_dataflow_spec(spark)
DataflowPipeline._launch_dlt_flow(
spark, "silver", silver_dataflowspec_list, silver_custom_transform_func
)

@staticmethod
def _launch_dlt_flow(spark,
layer,
dataflowspec_list,
custom_transform_func=None,
next_snapshot_and_version: Callable = None
):
logger.info(f"Length of Dataflow Spec {len(dataflowspec_list)}")
for dataflowSpec in dataflowspec_list:
logger.info("Printing Dataflow Spec")
logger.info(dataflowSpec)
Expand All @@ -599,7 +568,8 @@ def _launch_dlt_flow(spark,
and dataflowSpec.quarantineTargetDetails != {}:
quarantine_input_view_name = (
f"{dataflowSpec.quarantineTargetDetails['table']}"
f"_{layer}_quarantine_inputView"
f"_{layer}_quarantine_inputView",
custom_transform_func
)
else:
logger.info("quarantine_input_view_name set to None")
Expand All @@ -611,4 +581,5 @@ def _launch_dlt_flow(spark,
custom_transform_func,
next_snapshot_and_version
)

dlt_data_flow.run_dlt()
8 changes: 4 additions & 4 deletions src/dataflow_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -256,22 +256,22 @@ def check_spark_dataflowpipeline_conf_params(spark, layer_arg):
f"""parameter {layer_arg} is missing in spark.conf.
Please set spark.conf.set({layer_arg},'silver') """
)
dataflow_spec_table = spark.conf.get(f"{layer_arg}.dataflowspecTable", None)
dataflow_spec_table = spark.conf.get(f"{layer}.dataflowspecTable", None)
if dataflow_spec_table is None:
raise Exception(
f"""parameter {layer_arg}.dataflowspecTable is missing in sparkConf
Please set spark.conf.set('{layer_arg}.dataflowspecTable'='database.dataflowSpecTableName')"""
)

group = spark.conf.get(f"{layer_arg}.group", None)
group = spark.conf.get(f"{layer}.group", None)
dataflow_ids = spark.conf.get(f"{layer}.dataflowIds", None)

if group is None and dataflow_ids is None:
raise Exception(
f"""please provide {layer_arg}.group or {layer}.dataflowIds in spark.conf
f"""please provide {layer}.group or {layer}.dataflowIds in spark.conf
Please set spark.conf.set('{layer}.group'='groupName')
OR
spark.conf.set('{layer_arg}.dataflowIds'='comma seperated dataflowIds')
spark.conf.set('{layer}.dataflowIds'='comma seperated dataflowIds')
"""
)

Expand Down
2 changes: 1 addition & 1 deletion tests/generate_delta_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@

transactions_parquet_df = spark.read.options(**options).json("tests/resources/data/transactions")
transactions_parquet_df.withColumn("_rescued_data", lit("Test")).write.format("delta").mode("overwrite").save(
"tests/resources/delta/transactions")
"tests/resources/delta/transactions")
11 changes: 4 additions & 7 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,14 @@ class CliTests(unittest.TestCase):

deploy_cmd = DeployCommand(
layer="bronze",
onboard_group="A1",
dlt_meta_schema="dlt_meta",
pipeline_name="unittest_dlt_pipeline",
dataflowspec_table="dataflowspec_table",
dlt_target_schema="dlt_target_schema",
onboard_bronze_group="A1",
onboard_silver_group="A1",
dlt_meta_bronze_schema="dlt_bronze_schema",
dlt_meta_silver_schema="dlt_silver_schema",
dataflowspec_bronze_table="bronze_dataflowspec_table",
dataflowspec_silver_table="silver_dataflowspec_table",
num_workers=1,
uc_catalog_name="uc_catalog",
dataflowspec_bronze_path="tests/resources/dataflowspec",
dataflowspec_path="tests/resources/dataflowspec",
uc_enabled=True,
serverless=False,
dbfs_path="/dbfs",
Expand Down
Loading

0 comments on commit 4307fdf

Please sign in to comment.