Skip to content

Commit

Permalink
OS-7222. Run cost calculation performance
Browse files Browse the repository at this point in the history
## Description

Run cost calculation performance

## Related issue number

OS-7222

## Special notes

<!-- Please provide additional information if required. -->

## Checklist

* [ ] The pull request title is a good summary of the changes
* [ ] Unit tests for the changes exist
* [ ] New and existing unit tests pass locally
  • Loading branch information
sd-hystax authored Jan 17, 2024
1 parent f3fdc30 commit efbd274
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 36 deletions.
98 changes: 65 additions & 33 deletions rest_api/rest_api_server/controllers/profiling/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from datetime import datetime, timedelta

from rest_api.rest_api_server.controllers.base import (
BaseProfilingTokenController, MongoMixin)
BaseProfilingTokenController, MongoMixin, ClickHouseMixin)
from rest_api.rest_api_server.models.enums import RunStates
from rest_api.rest_api_server.models.models import CloudAccount
from rest_api.rest_api_server.utils import handle_http_exc
Expand Down Expand Up @@ -92,33 +92,18 @@ def format(cls, obj):
obj['executors'] = []


class RunCostsMixin(MongoMixin):
class RunCostsMixin(MongoMixin, ClickHouseMixin):
@staticmethod
def __is_flavor_cost(expense) -> bool:
if (expense.get('lineItem/UsageType') and
'BoxUsage' in expense['lineItem/UsageType']):
return True
elif (expense.get('meter_details', {}).get(
'meter_category') == 'Virtual Machines'):
return True
elif (expense.get('BillingItem') == 'Cloud server configuration' and
'key:acs:ecs:payType value:spot' not in expense.get('Tags', [])):
return True
return False

def __calculate_instance_work(self, expenses) -> tuple[int, float]:
total_cost = 0
def __calculate_instance_work(expenses) -> float:
working_hours = 0
for e in expenses:
total_cost += e['cost']
if self.__is_flavor_cost(e):
if e.get('lineItem/UsageAmount'):
working_hours += float(e['lineItem/UsageAmount'])
elif e.get('usage_quantity'):
working_hours += float(e['usage_quantity'])
elif e.get('Usage'):
working_hours += float(e['Usage'])
return working_hours or DAY_IN_HOURS, total_cost
if e.get('lineItem/UsageAmount'):
working_hours += float(e['lineItem/UsageAmount'])
elif e.get('usage_quantity'):
working_hours += float(e['usage_quantity'])
elif e.get('Usage'):
working_hours += float(e['Usage'])
return working_hours or DAY_IN_HOURS

def _get_run_costs(self, cloud_account_ids: list, runs: list) -> dict:
if not runs:
Expand Down Expand Up @@ -151,18 +136,14 @@ def _get_run_costs(self, cloud_account_ids: list, runs: list) -> dict:
'cloud_account_id': {'$in': cloud_account_ids},
'start_date': {'$gte': min_dt, '$lt': max_dt},
'end_date': {'$lte': max_dt},
'resource_id': {'$in': list(executors)}
'resource_id': {'$in': list(executors)},
'box_usage': True
}},
{'$project': {
'cost': 1,
'resource_id': 1,
'lineItem/UsageAmount': 1, # aws
'usage_quantity': 1, # azure
'Usage': 1, # ali
'lineItem/UsageType': 1,
'meter_details': 1,
'BillingItem': 1,
'Tags': 1,
}}
]
exp_map = defaultdict(list)
Expand All @@ -173,19 +154,70 @@ def _get_run_costs(self, cloud_account_ids: list, runs: list) -> dict:
for resource_id, expenses in exp_map.items():
executor_work_map[resource_id] = self.__calculate_instance_work(
expenses)
executor_costs = self._get_executor_costs(
cloud_account_ids, list(executors), min_dt, max_dt)
for run in runs:
run_id = run['id']
cost = 0
for executor, duration in run_executor_duration_map.get(
run_id, {}).items():
working_hours, w_cost = executor_work_map.get(
executor, (DAY_IN_HOURS, 0))
w_cost = executor_costs.get(executor, 0)
working_hours = executor_work_map.get(
executor, DAY_IN_HOURS)
# cost for what period in hours was collected
w_time = working_hours * HOUR_IN_SEC
cost += w_cost * duration / w_time
result[run_id] = cost
return result

def _get_resource_ids_map(self, cloud_account_ids, cloud_resource_ids):
resources = self.resources_collection.find({
'cloud_account_id': {'$in': cloud_account_ids},
'cloud_resource_id': {'$in': cloud_resource_ids}
}, ['_id', 'cloud_resource_id'])
return {r['_id']: r['cloud_resource_id'] for r in resources}

def _get_executor_costs(self, cloud_account_ids, cloud_resource_ids,
start_date, end_date):
resource_ids_map = self._get_resource_ids_map(cloud_account_ids,
cloud_resource_ids)
query = """
SELECT
resource_id, SUM(cost * sign)
FROM expenses
WHERE cloud_account_id IN cloud_account_ids
AND resource_id IN resource_ids
AND date >= %(start_date)s
AND date <= %(end_date)s
GROUP BY resource_id
HAVING SUM(sign) > 0
"""
expenses = self.execute_clickhouse(
query=query,
params={
'start_date': start_date,
'end_date': end_date,
},
external_tables=[
{
'name': 'resource_ids',
'structure': [('_id', 'String')],
'data': [
{'_id': r_id} for r_id in list(resource_ids_map.keys())
]
},
{
'name': 'cloud_account_ids',
'structure': [('_id', 'String')],
'data': [{'_id': r_id} for r_id in cloud_account_ids]
}
],
)
result = {}
for r_id, cost in expenses:
result[resource_ids_map[r_id]] = cost
return result


class BaseProfilingController(BaseProfilingTokenController):
def _get_cloud_accounts(self, organization_id) -> dict[str, dict]:
Expand Down
27 changes: 24 additions & 3 deletions rest_api/rest_api_server/tests/unittests/test_profiling_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,13 +308,23 @@ def test_application_run_cost(self):
'end_date': datetime(2022, 5, 15, 16),
'lineItem/UsageAmount': '1',
'cost': 1,
'lineItem/UsageType': 'BoxUsage',
'box_usage': True,
'cloud_account_id': cloud_acc['id'],
'resource_id': res_2['cloud_resource_id']
},
]
self.raw_expenses.insert_many(raw_data)

resource_ids_map = {
r['cloud_resource_id']: r['id'] for r in result['resources']
}
for raw in raw_data:
self.expenses.append({
'resource_id': resource_ids_map[raw['resource_id']],
'cost': raw['cost'],
'date': int(raw['start_date'].timestamp()),
'cloud_account_id': raw['cloud_account_id'],
'sign': 1
})
code, app = self.client.application_create(
self.org['id'], {
'name': 'My test project',
Expand Down Expand Up @@ -595,12 +605,23 @@ def test_not_completed_run_cost(self):
'end_date': datetime(2022, 5, 15, 16),
'identity/TimeInterval': '2017-11-01T00:00:00Z/2017-11-01T01:00:00Z',
'cost': 200,
'lineItem/UsageType': 'BoxUsage',
'box_usage': True,
'cloud_account_id': cloud_acc['id'],
'resource_id': res_2['cloud_resource_id']
},
]
self.raw_expenses.insert_many(raw_data)
resource_ids_map = {
r['cloud_resource_id']: r['id'] for r in result['resources']
}
for raw in raw_data:
self.expenses.append({
'resource_id': resource_ids_map[raw['resource_id']],
'cost': raw['cost'],
'date': int(raw['start_date'].timestamp()),
'cloud_account_id': raw['cloud_account_id'],
'sign': 1
})

code, app = self.client.application_create(
self.org['id'], {
Expand Down

0 comments on commit efbd274

Please sign in to comment.