From fd8f4425c9a90a8962124f7fa9dfd2b81413244e Mon Sep 17 00:00:00 2001 From: Youngjin Jo Date: Tue, 30 Apr 2024 14:04:26 +0900 Subject: [PATCH] feat: update cost data by a day, not an hour Signed-off-by: Youngjin Jo --- src/plugin/connector/mimir_connector.py | 32 ++++++++++++- src/plugin/manager/cost_manager.py | 62 ++++++++++++++----------- 2 files changed, 65 insertions(+), 29 deletions(-) diff --git a/src/plugin/connector/mimir_connector.py b/src/plugin/connector/mimir_connector.py index d4806ff..1ceb54c 100644 --- a/src/plugin/connector/mimir_connector.py +++ b/src/plugin/connector/mimir_connector.py @@ -71,7 +71,7 @@ def get_promql_response( "query": secret_data["promql"], "start": start_unix_timestamp, "end": end_unix_timestamp, - "step": "1h", + "step": "1d", }, ) @@ -97,6 +97,36 @@ def _get_unix_timestamp(start: str) -> (str, str): return str(start.timestamp()), str(end.timestamp()) + def get_kubecost_cluster_info( + self, + service_account_id: str, + secret_data: dict, + ) -> dict: + self.mimir_headers = { + "Content-Type": "application/json", + "X-Scope-OrgID": service_account_id, + } + cluster_info_query = f"{secret_data['mimir_endpoint']}/api/v1/query" + try: + response = requests.get( + cluster_info_query, + headers=self.mimir_headers, + params={ + "query": secret_data["cluster_info_query"], + }, + ) + + response.raise_for_status() + + result = response.json() + return result + except requests.HTTPError as http_err: + _LOGGER.error( + f"[get_kubecost_cluster_info] HTTP error occurred: {http_err}" + ) + except Exception as err: + _LOGGER.error(f"[get_kubecost_cluster_info] error occurred: {err}") + @staticmethod def get_cost_data(promql_response: List[dict]) -> Generator[List[dict], None, None]: page_count = int(len(promql_response) / _PAGE_SIZE) + 1 diff --git a/src/plugin/manager/cost_manager.py b/src/plugin/manager/cost_manager.py index a14c1c2..cc0d982 100644 --- a/src/plugin/manager/cost_manager.py +++ b/src/plugin/manager/cost_manager.py @@ -76,11 +76,15 @@ def get_data( promql_query_range, start, service_account_id, secret_data ) + cluster_info = self.mimir_connector.get_kubecost_cluster_info( + service_account_id, secret_data + ) + if promql_response: response_stream = self.mimir_connector.get_cost_data(promql_response) yield from self._process_response_stream( - response_stream, service_account_id + cluster_info, response_stream, service_account_id ) else: _LOGGER.error( @@ -121,37 +125,45 @@ def _has_agent( yield {"results": []} def _process_response_stream( - self, response_stream: Generator, service_account_id: str + self, cluster_info: dict, response_stream: Generator, service_account_id: str ) -> Generator[dict, None, None]: for results in response_stream: - yield self._make_cost_data(results, service_account_id) + yield self._make_cost_data(cluster_info, results, service_account_id) yield {"results": []} - def _make_cost_data(self, results: List[dict], x_scope_orgid: str) -> dict: + def _make_cost_data( + self, cluster_info: dict, results: List[dict], x_scope_orgid: str + ) -> dict: + cluster_metric = ( + cluster_info.get("data", {}).get("result", [])[0].get("metric", {}) + ) costs_data = [] for result in results: for i in range(len(result["values"])): + data = {} result["cost"] = float(result["values"][i][1]) result["billed_date"] = pd.to_datetime( result["values"][i][0], unit="s" ).strftime("%Y-%m-%d") additional_info = self._make_additional_info(result, x_scope_orgid) - region_code = self._get_region_code(result) - try: - data = { - "cost": result.get("cost"), - "usage_quantity": result.get("usage_quantity", 0), - "usage_type": result["metric"]["type"], - "usage_unit": result.get("usage_unit"), - "provider": "kubernetes", - "region_code": region_code, - "product": result.get("product"), - "billed_date": result["billed_date"], - "additional_info": additional_info, - "tags": result.get("tags", {}), - } + data.update( + { + "cost": result.get("cost"), + "billed_date": result["billed_date"], + "product": result.get("product"), + "provider": cluster_metric.get("provider", "kubernetes"), + "region_code": self._get_region_code( + cluster_metric.get("region", "Unknown") + ), + "usage_quantity": result.get("usage_quantity", 0), + "usage_type": result["metric"]["type"], + "usage_unit": result.get("usage_unit"), + "additional_info": additional_info, + "tags": result.get("tags", {}), + } + ) except Exception as e: _LOGGER.error( f"[_make_cost_data] make data error: {e}", exc_info=True @@ -185,23 +197,17 @@ def _check_required_fields(result: dict): def _make_additional_info(result: dict, service_account_id: str) -> dict: additional_info = { "Cluster": result["metric"].get("cluster", ""), - "Node": result["metric"].get("node", "Unmounted PVs"), + "Node": result["metric"].get("node", "PVs"), "Namespace": result["metric"].get("namespace", ""), + "PV": result["metric"].get("persistentvolume", ""), "Pod": result["metric"].get("pod", ""), - "Container": result["metric"].get("container", "Unmounted"), "X-Scope-OrgID": service_account_id, } return additional_info @staticmethod - def _get_region_code(result: dict) -> str: - node = result["metric"].get("node", "") - if node: - region = node.split(".")[1] - else: - region = "Unknown (Unmounted PVs)" - - region_name = AWS_REGION_MAP.get(region, "Unknown (Unmounted PVs)") + def _get_region_code(region: str) -> str: + region_name = AWS_REGION_MAP.get(region, "Unknown") return region_name