Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: Metrics processor #75

Merged
merged 3 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 8 additions & 6 deletions openshift_metrics/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import utils

from metrics_processor import MetricsProcessor

def compare_dates(date_str1, date_str2):
"""Returns true is date1 is earlier than date2"""
Expand All @@ -33,20 +34,21 @@ def main():
else:
output_file = f"{datetime.today().strftime('%Y-%m-%d')}.csv"

merged_dictionary = {}
report_start_date = None
report_end_date = None

processor = MetricsProcessor()

for file in files:
with open(file, "r") as jsonfile:
metrics_from_file = json.load(jsonfile)
cpu_request_metrics = metrics_from_file["cpu_metrics"]
memory_request_metrics = metrics_from_file["memory_metrics"]
gpu_request_metrics = metrics_from_file.get("gpu_metrics", None)
utils.merge_metrics("cpu_request", cpu_request_metrics, merged_dictionary)
utils.merge_metrics("memory_request", memory_request_metrics, merged_dictionary)
processor.merge_metrics("cpu_request", cpu_request_metrics)
processor.merge_metrics("memory_request", memory_request_metrics)
if gpu_request_metrics is not None:
utils.merge_metrics("gpu_request", gpu_request_metrics, merged_dictionary)
processor.merge_metrics("gpu_request", gpu_request_metrics)

if report_start_date is None:
report_start_date = metrics_from_file["start_date"]
Expand All @@ -69,8 +71,8 @@ def main():
print("Warning: The report spans multiple months")
report_month += " to " + datetime.strftime(report_end_date, "%Y-%m")

condensed_metrics_dict = utils.condense_metrics(
merged_dictionary, ["cpu_request", "memory_request", "gpu_request", "gpu_type"]
condensed_metrics_dict = processor.condense_metrics(
["cpu_request", "memory_request", "gpu_request", "gpu_type"]
)
utils.write_metrics_by_namespace(
condensed_metrics_dict,
Expand Down
112 changes: 112 additions & 0 deletions openshift_metrics/metrics_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
from typing import List, Dict

GPU_UNKNOWN_TYPE = "GPU_UNKNOWN_TYPE"


class MetricsProcessor:
"""Provides methods for merging metrics and processing it for billing purposes"""

def __init__(self, interval_minutes: int = 15, merged_data: dict = None):
self.interval_minutes = interval_minutes
self.merged_data = merged_data if merged_data is not None else {}

def merge_metrics(self, metric_name, metric_list):
"""Merge metrics (cpu, memory, gpu) by pod"""

for metric in metric_list:
pod = metric["metric"]["pod"]
namespace = metric["metric"]["namespace"]
node = metric["metric"].get("node")

gpu_type = None
gpu_resource = None
node_model = None

self.merged_data.setdefault(namespace, {})
self.merged_data[namespace].setdefault(pod, {"metrics": {}})

if metric_name == "gpu_request":
gpu_type = metric["metric"].get(
"label_nvidia_com_gpu_product", GPU_UNKNOWN_TYPE
)
gpu_resource = metric["metric"].get("resource")
node_model = metric["metric"].get("label_nvidia_com_gpu_machine")

for value in metric["values"]:
epoch_time = value[0]

self.merged_data[namespace][pod]["metrics"].setdefault(epoch_time, {})

self.merged_data[namespace][pod]["metrics"][epoch_time][
metric_name
] = value[1]
if gpu_type:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"gpu_type"
] = gpu_type
if gpu_resource:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"gpu_resource"
] = gpu_resource
if node_model:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"node_model"
] = node_model
if node:
self.merged_data[namespace][pod]["metrics"][epoch_time][
"node"
] = node

def condense_metrics(self, metrics_to_check: List[str]) -> Dict:
"""
Checks if the value of metrics is the same, and removes redundant
metrics while updating the duration. If there's a gap in the reported
metrics then don't count that as part of duration.
"""
interval = self.interval_minutes * 60
condensed_dict = {}

for namespace, pods in self.merged_data.items():

condensed_dict.setdefault(namespace, {})

for pod, pod_dict in pods.items():

metrics_dict = pod_dict["metrics"]
new_metrics_dict = {}
epoch_times_list = sorted(metrics_dict.keys())

start_epoch_time = epoch_times_list[0]

start_metric_dict = metrics_dict[start_epoch_time].copy()

for i in range(len(epoch_times_list)):
epoch_time = epoch_times_list[i]
same_metrics = True
continuous_metrics = True
for metric in metrics_to_check:
# If either cpu, memory or gpu request is diferent.
if metrics_dict[start_epoch_time].get(metric, 0) != metrics_dict[epoch_time].get(metric, 0): # fmt: skip
same_metrics = False

if i != 0 and epoch_time - epoch_times_list[i - 1] > interval:
# i.e. if the difference between 2 consecutive timestamps
# is more than the expected frequency then the pod was stopped
continuous_metrics = False

if not same_metrics or not continuous_metrics:
duration = epoch_times_list[i - 1] - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict
start_epoch_time = epoch_time
start_metric_dict = metrics_dict[start_epoch_time].copy()

duration = epoch_time - start_epoch_time + interval
start_metric_dict["duration"] = duration
new_metrics_dict[start_epoch_time] = start_metric_dict

new_pod_dict = pod_dict.copy()
new_pod_dict["metrics"] = new_metrics_dict
condensed_dict[namespace][pod] = new_pod_dict

return condensed_dict
Loading
Loading