From f2b235a6fe06ac7dee6964d370c49aae5dbc6dfa Mon Sep 17 00:00:00 2001 From: Yevhenii Vaskivskyi Date: Tue, 7 Nov 2023 20:46:02 +0100 Subject: [PATCH] Improve HA converters (#351) --- asusrouter/modules/homeassistant.py | 43 +++++++++++++------------- asusrouter/tools/converters.py | 47 ++++++++++++++++++++--------- 2 files changed, 53 insertions(+), 37 deletions(-) diff --git a/asusrouter/modules/homeassistant.py b/asusrouter/modules/homeassistant.py index 1c3f26f..a60d8e0 100644 --- a/asusrouter/modules/homeassistant.py +++ b/asusrouter/modules/homeassistant.py @@ -15,7 +15,7 @@ from asusrouter.modules.data import AsusData from asusrouter.modules.state import AsusState from asusrouter.modules.vpnc import AsusVPNC -from asusrouter.tools.converters import as_dict, flatten_dict, list_from_dict +from asusrouter.tools.converters import flatten_dict, list_from_dict _LOGGER = logging.getLogger(__name__) @@ -60,30 +60,30 @@ def convert_to_ha_sensors(data: dict[str, Any], datatype: AsusData) -> list[str] return sensors -def convert_to_ha_data(data: dict[str, Any], datatype: AsusData) -> dict[str, Any]: +def convert_to_ha_data(data: dict[str, Any]) -> dict[str, Any]: """Convert available data to the HA-compatible dictionary.""" - output: dict[str, Any] = {} - - # First go through all the data (including nested) - # and convert it to the HA-compatible format, e.g. - # `state` values using `convert_to_ha_state_bool` - for key, value in data.items(): - # Check if the value is a dict - if isinstance(value, dict): - # If the value is a dict, go recursive - output[key] = convert_to_ha_data(value, datatype) - continue - # If the value is not a dict, convert it to the HA-compatible format - if key == "state": - output[key] = convert_to_ha_state_bool(value) - else: - output[key] = value + def convert_recursive(data: dict[str, Any]) -> dict[str, Any]: + """Convert data to the HA-compatible dictionary recursively.""" + return { + key: convert_recursive(value) + if isinstance(value, dict) + else convert_to_ha_state_bool(value) + if key.endswith("state") + else value + for key, value in data.items() + } # Flatten the dictionary - output = as_dict(flatten_dict(output)) + # Skip all the `list`, `clients` etc keys - this data should be preserved + output = flatten_dict(data, exclude=["list", "clients", "rules"]) - return output + # Convert values to HA-compatible format + if output is not None: + output = convert_recursive(output) + return output + + return {} def convert_to_ha_sensors_by_map( @@ -130,8 +130,7 @@ def convert_to_ha_sensors_group(data: dict[str, Any]) -> list[str]: def convert_to_ha_sensors_list(data: dict[str, Any]) -> list[str]: """Convert all the available data to the list of sensors.""" - flat = as_dict(flatten_dict(data)) - return list_from_dict(flat) + return list_from_dict(convert_to_ha_data(data)) def convert_to_ha_state_bool(data: AsusState | Optional[bool]) -> Optional[bool]: diff --git a/asusrouter/tools/converters.py b/asusrouter/tools/converters.py index 3dc2616..0b7545a 100644 --- a/asusrouter/tools/converters.py +++ b/asusrouter/tools/converters.py @@ -10,17 +10,11 @@ from __future__ import annotations from datetime import datetime, timedelta, timezone -from typing import Any, Callable, Optional +from typing import Any, Callable, Iterable, Optional, Tuple from dateutil.parser import parse as dtparse -def as_dict(pyobj): - """Return generator object as dictionary.""" - - return dict(pyobj) - - def clean_string(content: Optional[str]) -> Optional[str]: """Get a clean string or return None if it is empty.""" @@ -36,15 +30,38 @@ def clean_string(content: Optional[str]) -> Optional[str]: return content -def flatten_dict(obj: Any, keystring: str = "", delimiter: str = "_"): - """Flatten dictionary.""" +def flatten_dict( + d: Optional[dict[Any, Any]], + parent_key: str = "", + sep: str = "_", + exclude: Optional[str | Iterable[str]] = None, +) -> Optional[dict[str, Any]]: + """Flatten a nested dictionary.""" + + if d is None: + return None - if isinstance(obj, dict): - keystring = keystring + delimiter if keystring else keystring - for key in obj: - yield from flatten_dict(obj[key], keystring + str(key)) - else: - yield keystring, obj + if not isinstance(d, dict): + return {} + + items = [] + exclude = tuple(exclude or []) + for k, v in d.items(): + new_key = f"{parent_key}{sep}{k}" if parent_key else k + # We have a dict - check it + if isinstance(v, dict): + # This key should be skipped + if isinstance(new_key, str) and new_key.endswith(exclude): + items.append((new_key, v)) + continue + # Go recursive + flattened = flatten_dict(v, new_key, sep, exclude) + if flattened is not None: + items.extend(flattened.items()) + continue + # Not a dict - add it + items.append((new_key, v)) + return dict(items) def list_from_dict(raw: dict[str, Any]) -> list[str]: