Skip to content

Commit

Permalink
Merge pull request #85 from microsoft/v3
Browse files Browse the repository at this point in the history
V 1.1 - Updates
  • Loading branch information
bluewatersql authored Dec 10, 2024
2 parents fffde3c + 24742cb commit 58f97ad
Show file tree
Hide file tree
Showing 16 changed files with 475 additions and 618 deletions.
230 changes: 85 additions & 145 deletions Packages/FabricSync/FabricSync/BQ/Loader.py

Large diffs are not rendered by default.

300 changes: 223 additions & 77 deletions Packages/FabricSync/FabricSync/BQ/Metadata.py

Large diffs are not rendered by default.

18 changes: 0 additions & 18 deletions Packages/FabricSync/FabricSync/BQ/Model/Schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,30 +159,12 @@ def IsTimePartitionedStrategy(self) -> bool:

@property
def IsPartitionedSyncLoad(self) -> bool:
"""
Determines if the sync load is partitioned.
This method checks if the sync load is partitioned based on the following conditions:
- The `IsPartitioned` attribute is True.
- If the `IsTimePartitionedStrategy` attribute is True and `PartitionId` is set, or if the `IsRangePartitioned` attribute is True.
Returns:
bool: True if the sync load is partitioned, False otherwise.
"""

if self.IsPartitioned:
if (self.IsTimePartitionedStrategy and self.PartitionId) or self.IsRangePartitioned:
return True
return False

def assign_enum_val(self, enum_class, value):
"""
Assigns a value to an enum class.
Args:
enum_class (Enum): The enumeration class to which the value should be assigned.
value (Any): The value to be assigned to the enum class.
Returns:
Enum or None: The corresponding enum member if the value is valid, otherwise None.
"""

try:
return enum_class(value)
except ValueError:
Expand Down
2 changes: 1 addition & 1 deletion Packages/FabricSync/FabricSync/BQ/Schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from ..Core import *
from ..Metastore import *

class Scheduler(ConfigBase):
class BQScheduler(ConfigBase):
"""
Class responsible for calculating the to-be run schedule based on the sync config and
the most recent BigQuery table metadata. Schedule is persisted to the Sync Schedule
Expand Down
77 changes: 16 additions & 61 deletions Packages/FabricSync/FabricSync/BQ/Sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,26 @@
from ..Enum import *
from .SyncUtils import *
from .Loader import BQScheduleLoader
from .Metadata import ConfigMetadataLoader
from .Schedule import Scheduler
from .Metadata import BQMetadataLoader
from .Schedule import BQScheduler

class BQSync(SyncBase):
"""
BQSync class for synchronizing metadata, building schedules, and running schedules with BigQuery.
Methods:
__init__(context: SparkSession, config_path: str):
sync_metadata():
build_schedule(sync_metadata: bool = True, schedule_type: str = str(ScheduleType.AUTO)) -> str:
run_schedule(group_schedule_id: str, optimize_metadata: bool = True):
"""

class BQSync(SyncBase):
def __init__(self, context:SparkSession, config_path:str):
"""
Initializes the Sync class with the given Spark session context and configuration path.
Args:
context (SparkSession): The Spark session context.
config_path (str): The path to the configuration file.
Attributes:
MetadataLoader (ConfigMetadataLoader): Loads metadata configuration.
Scheduler (Scheduler): Manages scheduling tasks.
Loader (BQScheduleLoader): Loads BigQuery schedules.
DataRetention (BQDataRetention): Manages data retention policies.
"""

super().__init__(context, config_path, clean_session=True)

self.MetadataLoader = ConfigMetadataLoader(context, self.UserConfig, self.GCPCredential)
self.Scheduler = Scheduler(context, self.UserConfig, self.GCPCredential)
self.MetadataLoader = BQMetadataLoader(context, self.UserConfig, self.GCPCredential)
self.Scheduler = BQScheduler(context, self.UserConfig, self.GCPCredential)
self.Loader = BQScheduleLoader(context, self.UserConfig, self.GCPCredential)
self.DataRetention = BQDataRetention(context, self.UserConfig, self.GCPCredential)

def sync_metadata(self):
"""
Synchronizes metadata using the MetadataLoader instance.
This method performs the following actions:
1. Calls the `sync_metadata` method of the `MetadataLoader` instance to synchronize metadata.
2. If the `Autodetect` attribute of `UserConfig` is set to True, it calls the `auto_detect_config` method of the `MetadataLoader` instance to automatically detect the configuration.
"""
metadata_loaded = self.MetadataLoader.sync_metadata()

self.MetadataLoader.sync_metadata()

if self.UserConfig.Autodetect:
if self.UserConfig.Autodetect and metadata_loaded:
self.MetadataLoader.auto_detect_config()

def build_schedule(self, sync_metadata:bool = True, schedule_type:str = str(ScheduleType.AUTO)) -> str:
"""
Builds a schedule for synchronization.
Args:
sync_metadata (bool): If True, synchronizes metadata. If False, creates proxy views. Default is True.
schedule_type (str): The type of schedule to build. Default is 'AUTO'.
Returns:
str: The built schedule.
"""

if sync_metadata:
self.sync_metadata()
else:
Expand All @@ -70,31 +34,22 @@ def build_schedule(self, sync_metadata:bool = True, schedule_type:str = str(Sche
return self.Scheduler.build_schedule(schedule_type=ScheduleType[schedule_type])

def run_schedule(self, group_schedule_id:str, optimize_metadata:bool=True):
"""
Executes the schedule for the given group schedule ID.
This method performs the following steps:
1. Creates proxy views using the MetadataLoader.
2. Ensures schemas are set up if schema support is enabled in the user configuration.
3. Runs the schedule using the Loader.
4. Commits the table configuration to the Metastore.
5. Executes data retention policies if enabled in the user configuration.
6. Optimizes metadata tables if the optimize_metadata flag is set to True.
Args:
group_schedule_id (str): The ID of the group schedule to run.
optimize_metadata (bool, optional): Flag to indicate whether to optimize metadata tables. Defaults to True.
"""

self.MetadataLoader.create_proxy_views()

if self.UserConfig.Fabric.EnableSchemas:
self.Metastore.ensure_schemas(self.UserConfig.Fabric.WorkspaceId, self.UserConfig.ID)

self.Loader.run_schedule(group_schedule_id)
initial_loads = self.Loader.run_schedule(group_schedule_id)

self.Metastore.commit_table_configuration(group_schedule_id)
if initial_loads:
self.Metastore.commit_table_configuration(group_schedule_id)

if self.UserConfig.EnableDataExpiration:
self.DataRetention.execute()
with SyncTimer() as t:
self.DataRetention.execute()
print(f"Data Expiration completed in {str(t)}...")

if optimize_metadata:
self.Metastore.optimize_metadata_tbls()
with SyncTimer() as t:
self.Metastore.optimize_metadata_tbls()
print(f"Metastore Metadata Optimization completed in {str(t)}...")
118 changes: 1 addition & 117 deletions Packages/FabricSync/FabricSync/BQ/SyncUtils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,13 @@ def __init__(self, context:SparkSession, user_config, gcp_credential:str):
"""
super().__init__(context, user_config, gcp_credential)

def execute(self):
"""
Executes the data expiration and retention policy enforcement and updates.
This method performs the following actions:
1. Enforces the data expiration and retention policy by calling the
`_enforce_retention_policy` method.
2. Updates the data expiration and retention policy by synchronizing
the retention configuration with the Metastore using the user's configuration ID.
Prints messages to indicate the progress of each action.
"""

def execute(self):
print ("Enforcing Data Expiration/Retention Policy...")
self._enforce_retention_policy()
print ("Updating Data Expiration/Retention Policy...")
self.Metastore.sync_retention_config(self.UserConfig.ID)

def _enforce_retention_policy(self):
"""
Enforces the retention policy for BigQuery tables based on the configuration.
This method retrieves the retention policy configuration from the Metastore
and applies it to the relevant tables. If a partition ID is specified, it
will drop the partition; otherwise, it will drop the entire table.
The table name is constructed based on whether the lakehouse schema is used.
The method prints out the action being taken (expiring partition or table)
for logging purposes.
Parameters:
None
Returns:
None
"""

df = self.Metastore.get_bq_retention_policy(self.UserConfig.ID)

for d in df.collect():
Expand Down Expand Up @@ -118,17 +94,6 @@ def flatten_df(explode_arrays:bool, df:DataFrame) -> DataFrame:

@staticmethod
def resolve_fabric_partition_predicate(partition_type:str, partition_column:str, partition_grain:str, partition_id:str):
"""
Resolves the partition predicate for a given fabric partition.
Args:
partition_type (str): The type of the partition (e.g., 'TIME').
partition_column (str): The column name of the partition.
partition_grain (str): The grain of the partition (e.g., 'DAY', 'MONTH').
partition_id (str): The identifier of the partition.
Returns:
str: The resolved partition predicate.
"""

if PartitionType[partition_type] == PartitionType.TIME:
partition_dt=SyncUtil.get_derived_date_from_part_id(partition_grain, partition_id)
proxy_cols = SyncUtil.get_fabric_partition_proxy_cols(partition_grain)
Expand All @@ -141,13 +106,6 @@ def resolve_fabric_partition_predicate(partition_type:str, partition_column:str,

@staticmethod
def get_fabric_partition_proxy_cols(partition_grain:str) -> list[str]:
"""
Returns a list of proxy columns based on the specified partition grain.
Args:
partition_grain (str): The partition grain, which should be one of the values in CalendarInterval.
Returns:
list[str]: A list of proxy column names corresponding to the partition grain.
"""
proxy_cols = list(CalendarInterval)

match CalendarInterval[partition_grain]:
Expand All @@ -165,13 +123,6 @@ def get_fabric_partition_proxy_cols(partition_grain:str) -> list[str]:

@staticmethod
def get_bq_partition_id_format(partition_grain:str) -> str:
"""
Returns the BigQuery partition ID format string based on the given partition grain.
Args:
partition_grain (str): The partition grain, which can be 'DAY', 'MONTH', 'YEAR', or 'HOUR'.
Returns:
str: The corresponding date format string for the partition grain.
"""
pattern = None

match CalendarInterval[partition_grain]:
Expand All @@ -188,30 +139,11 @@ def get_bq_partition_id_format(partition_grain:str) -> str:

@staticmethod
def get_derived_date_from_part_id(partition_grain:str, partition_id:str) -> datetime:
"""
Converts a BigQuery partition ID to a datetime object based on the specified partition grain.
Args:
partition_grain (str): The grain of the partition (e.g., 'DAY', 'MONTH', 'YEAR').
partition_id (str): The partition ID to be converted.
Returns:
datetime: The derived datetime object from the partition ID.
"""

dt_format = SyncUtil.get_bq_partition_id_format(partition_grain)
return datetime.strptime(partition_id, dt_format)

@staticmethod
def create_fabric_partition_proxy_cols(df:DataFrame, partition:str, proxy_cols:list[str]) -> DataFrame:
"""
Adds proxy columns to the DataFrame based on the specified partition and proxy columns.
Parameters:
df (DataFrame): The input DataFrame.
partition (str): The partition column name.
proxy_cols (list[str]): A list of proxy column types to be added.
Valid values are 'HOUR', 'DAY', 'MONTH', 'YEAR'.
Returns:
DataFrame: The DataFrame with the added proxy columns.
"""
for c in proxy_cols:
match CalendarInterval[c]:
case CalendarInterval.HOUR:
Expand All @@ -231,31 +163,10 @@ def create_fabric_partition_proxy_cols(df:DataFrame, partition:str, proxy_cols:l

@staticmethod
def get_fabric_partition_cols(partition:str, proxy_cols:list[str]):
"""
Generates a list of partitioned column names for a given partition and list of proxy columns.
Args:
partition (str): The partition name to be prefixed.
proxy_cols (list[str]): A list of proxy column names.
Returns:
list[str]: A list of partitioned column names in the format "__{partition}_{column}".
"""
return [f"__{partition}_{c}" for c in proxy_cols]

@staticmethod
def get_fabric_partition_predicate(partition_dt:datetime, partition:str, proxy_cols:list[str]) -> str:
"""
Generates a partition predicate string for a given datetime, partition, and list of proxy columns.
Args:
partition_dt (datetime): The datetime object representing the partition date and time.
partition (str): The partition name.
proxy_cols (list[str]): A list of proxy column names.
Returns:
str: A string representing the partition predicate, formatted as "__{partition}_{proxy_col} = '{part_id}'"
for each proxy column, joined by " AND ".
"""

partition_predicate = []

for c in proxy_cols:
Expand All @@ -275,30 +186,12 @@ def get_fabric_partition_predicate(partition_dt:datetime, partition:str, proxy_c

@staticmethod
def get_bq_range_map(tbl_ranges:str) -> DataFrame:
"""
Parses a string of comma-separated integers representing a range and step size,
and returns a DataFrame containing partition ranges.
Args:
tbl_ranges (str): A string of comma-separated integers in the format "start,end,step".
Returns:
DataFrame: A DataFrame containing partition ranges with columns for the range string,
start value, and end value.
"""
bq_range = [int(r.strip()) for r in tbl_ranges.split(",")]
partition_range = [(f"{r}-{r + bq_range[2]}", r, r + bq_range[2]) for r in range(bq_range[0], bq_range[1], bq_range[2])]
return partition_range

@staticmethod
def create_fabric_range_partition(context:SparkSession, df_bq:DataFrame, schedule:SyncSchedule) -> DataFrame:
"""
Creates a range partition for a given DataFrame based on the provided schedule.
Args:
context (SparkSession): The Spark session to use for creating DataFrames.
df_bq (DataFrame): The input DataFrame from BigQuery.
schedule (SyncSchedule): The synchronization schedule containing partitioning details.
Returns:
DataFrame: A DataFrame with the range partition applied, including a new column for the partition range name.
"""
partition_range = SyncUtil.get_bq_range_map(schedule.PartitionRange)

df = context.createDataFrame(partition_range, ["range_name", "range_low", "range_high"]) \
Expand All @@ -313,15 +206,6 @@ def create_fabric_range_partition(context:SparkSession, df_bq:DataFrame, schedul

@staticmethod
def get_partition_range_predicate(schedule:SyncSchedule) -> str:
"""
Generates a BigQuery partition range predicate string based on the provided schedule.
Args:
schedule (SyncSchedule): The synchronization schedule containing partition details.
Returns:
str: A string representing the partition range predicate for BigQuery.
Raises:
Exception: If the partition ID from the schedule does not match any range in the partition range map.
"""
partition_range = SyncUtil.get_bq_range_map(schedule.PartitionRange)
r = [x for x in partition_range if str(x[1]) == schedule.PartitionId]

Expand Down
Loading

0 comments on commit 58f97ad

Please sign in to comment.