Skip to content

Commit

Permalink
Merge pull request #129 from harsha-pasala/issue_125
Browse files Browse the repository at this point in the history
Fix Issue #125: Support for multiple partition columns in bronze, bronze quarantine, and silver layers.
  • Loading branch information
ravi-databricks authored Dec 4, 2024
2 parents 4307fdf + 4dac68d commit 82e5978
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 4 deletions.
18 changes: 15 additions & 3 deletions src/onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -577,7 +577,11 @@ def __get_bronze_dataflow_spec_dataframe(self, onboarding_df, env):
"bronze_partition_columns" in onboarding_row
and onboarding_row["bronze_partition_columns"]
):
partition_columns = [onboarding_row["bronze_partition_columns"]]
# Split if this is a list separated by commas
if "," in onboarding_row["bronze_partition_columns"]:
partition_columns = onboarding_row["bronze_partition_columns"].split(",")
else:
partition_columns = [onboarding_row["bronze_partition_columns"]]

cdc_apply_changes = None
if (
Expand Down Expand Up @@ -650,7 +654,11 @@ def __get_quarantine_details(self, env, onboarding_row):
"bronze_quarantine_table_partitions" in onboarding_row
and onboarding_row["bronze_quarantine_table_partitions"]
):
quarantine_table_partition_columns = onboarding_row["bronze_quarantine_table_partitions"]
# Split if this is a list separated by commas
if "," in onboarding_row["bronze_quarantine_table_partitions"]:
quarantine_table_partition_columns = onboarding_row["bronze_quarantine_table_partitions"].split(",")
else:
quarantine_table_partition_columns = onboarding_row["bronze_quarantine_table_partitions"]
if (
f"bronze_database_quarantine_{env}" in onboarding_row
and onboarding_row[f"bronze_database_quarantine_{env}"]
Expand Down Expand Up @@ -998,7 +1006,11 @@ def __get_silver_dataflow_spec_dataframe(self, onboarding_df, env):
"silver_partition_columns" in onboarding_row
and onboarding_row["silver_partition_columns"]
):
silver_parition_columns = [onboarding_row["silver_partition_columns"]]
# Split if this is a list separated by commas
if "," in onboarding_row["silver_partition_columns"]:
silver_parition_columns = onboarding_row["silver_partition_columns"].split(",")
else:
silver_parition_columns = [onboarding_row["silver_partition_columns"]]

silver_cdc_apply_changes = None
if (
Expand Down
70 changes: 70 additions & 0 deletions tests/resources/onboarding_multiple_partitions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
[
{
"data_flow_id": "100",
"data_flow_group": "A1",
"source_system": "MYSQL",
"source_format": "cloudFiles",
"source_details": {
"source_database": "APP",
"source_table": "CUSTOMERS",
"source_path_dev": "tests/resources/data/customers",
"source_schema_path": "tests/resources/schema/customer_schema.ddl",
"source_metadata": {
"include_autoloader_metadata_column": "True",
"autoloader_metadata_col_name": "source_metadata",
"select_metadata_cols": {
"input_file_name": "_metadata.file_name",
"input_file_path": "_metadata.file_path"
}
}
},
"bronze_database_dev": "bronze",
"bronze_database_staging": "bronze",
"bronze_database_prd": "bronze",
"bronze_table": "customers_cdc",
"bronze_reader_options": {
"cloudFiles.format": "json",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.rescuedDataColumn": "_rescued_data"
},
"bronze_table_path_dev": "tests/resources/delta/customers",
"bronze_partition_columns": "id,operation_date",
"bronze_table_properties": {
"pipelines.autoOptimize.managed": "false",
"pipelines.reset.allowed": "false"
},
"bronze_data_quality_expectations_json_dev": "tests/resources/dqe/customers/bronze_data_quality_expectations.json",
"bronze_database_quarantine_dev": "bronze",
"bronze_database_quarantine_staging": "bronze",
"bronze_database_quarantine_prd": "bronze",
"bronze_quarantine_table": "customers_cdc_quarantine",
"bronze_quarantine_table_partitions": "id,operation_date",
"bronze_quarantine_table_path_dev": "tests/resources/data/bronze/customers_quarantine",
"silver_database_dev": "silver",
"silver_database_staging": "silver",
"silver_database_prd": "silver",
"silver_table": "customers",
"silver_partition_columns": "id,operation_date",
"silver_cdc_apply_changes": {
"keys": [
"id"
],
"sequence_by": "operation_date",
"scd_type": "1",
"apply_as_deletes": "operation = 'DELETE'",
"except_column_list": [
"operation",
"operation_date",
"_rescued_data"
]
},
"silver_table_path_dev": "tests/resources/data/silver/customers",
"silver_table_properties": {
"pipelines.autoOptimize.managed": "false",
"pipelines.reset.allowed": "false",
"pipelines.autoOptimize.zOrderCols": "id,email"
},
"silver_transformation_json_dev": "tests/resources/silver_transformations.json",
"silver_data_quality_expectations_json_dev": "tests/resources/dqe/customers/silver_data_quality_expectations.json"
}
]
32 changes: 31 additions & 1 deletion tests/test_onboard_dataflowspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import copy
from tests.utils import DLTFrameworkTestCase
from src.onboard_dataflowspec import OnboardDataflowspec
from src.dataflow_spec import BronzeDataflowSpec
from src.dataflow_spec import BronzeDataflowSpec, SilverDataflowSpec
from unittest.mock import MagicMock, patch
from pyspark.sql import DataFrame

Expand Down Expand Up @@ -169,6 +169,36 @@ def test_onboardDataFlowSpecs_with_merge_uc(self):
if bronze_row.dataFlowId == "103":
self.assertEqual(bronze_row.readerConfigOptions.get("maxOffsetsPerTrigger"), "60000")

def test_onboardDataflowSpec_with_multiple_partitions(self):
"""Test for onboardDataflowspec with multiple partitions for bronze layer."""
onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_uc_map)
del onboarding_params_map["uc_enabled"]
onboarding_params_map["onboarding_file_path"] = self.onboarding_multiple_partitions_file
onboardDataFlowSpecs = OnboardDataflowspec(self.spark, onboarding_params_map, uc_enabled=True)
onboardDataFlowSpecs.onboard_dataflow_specs()

#Assert Bronze DataflowSpec for multiple partition, and quarantine partition columns.
bronze_dataflowSpec_df = self.read_dataflowspec(
self.onboarding_bronze_silver_params_uc_map['database'],
self.onboarding_bronze_silver_params_uc_map['bronze_dataflowspec_table'])
bronze_df_rows = bronze_dataflowSpec_df.collect()
for bronze_df_row in bronze_df_rows:
bronze_row = BronzeDataflowSpec(**bronze_df_row.asDict())
self.assertEqual(len(bronze_row.partitionColumns), 2)
quarantine_partitions = [
col for col in bronze_row.quarantineTargetDetails.get('partition_columns').strip('[]').split(',')
]
self.assertEqual(len(quarantine_partitions), 2)

#Assert Silver DataflowSpec for multiple partition columns.
silver_dataflowSpec_df = self.read_dataflowspec(
self.onboarding_bronze_silver_params_map['database'],
self.onboarding_bronze_silver_params_map['silver_dataflowspec_table'])
silver_df_rows = silver_dataflowSpec_df.collect()
for silver_df_row in silver_df_rows:
silver_row = SilverDataflowSpec(**silver_df_row.asDict())
self.assertEqual(len(silver_row.partitionColumns), 2)

def test_onboardBronzeDataflowSpec_positive(self):
"""Test for onboardDataflowspec."""
onboarding_params_map = copy.deepcopy(self.onboarding_bronze_silver_params_map)
Expand Down
1 change: 1 addition & 0 deletions tests/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def setUp(self):
self.onboarding_bronze_type2_json_file = "tests/resources/onboarding_ac_bronze_type2.json"
self.onboarding_append_flow_json_file = "tests/resources/onboarding_append_flow.json"
self.onboarding_silver_fanout_json_file = "tests/resources/onboarding_silverfanout.json"
self.onboarding_multiple_partitions_file = "tests/resources/onboarding_multiple_partitions.json"
self.onboarding_apply_changes_from_snapshot_json_file = (
"tests/resources/onboarding_applychanges_from_snapshot.json"
)
Expand Down

0 comments on commit 82e5978

Please sign in to comment.