From 7ec1a76997ebffd46debd5edd476a19ed89d1306 Mon Sep 17 00:00:00 2001 From: sd-hystax <110374605+sd-hystax@users.noreply.github.com> Date: Mon, 3 Jun 2024 09:14:45 +0300 Subject: [PATCH] OS-7535. Implement CUR2.0 data export support ## Description Implement CUR2.0 data export support ## Related issue number OS-7535 ## Checklist * [ ] The pull request title is a good summary of the changes * [ ] Unit tests for the changes exist * [ ] New and existing unit tests pass locally --- .../recommendations/short_living_instances.py | 12 +- diworker/diworker/constants.py | 192 ------------------ diworker/diworker/importers/aws.py | 160 +++++++++++---- risp/risp_worker/worker.py | 13 +- tools/cloud_adapter/clouds/aws.py | 79 ++++--- 5 files changed, 194 insertions(+), 262 deletions(-) diff --git a/bumiworker/bumiworker/modules/recommendations/short_living_instances.py b/bumiworker/bumiworker/modules/recommendations/short_living_instances.py index 73c3b4c88..c3f98b7d0 100644 --- a/bumiworker/bumiworker/modules/recommendations/short_living_instances.py +++ b/bumiworker/bumiworker/modules/recommendations/short_living_instances.py @@ -1,3 +1,4 @@ +import re from collections import OrderedDict from datetime import datetime, timedelta @@ -59,6 +60,13 @@ def _get_work_hrs(exp, cpu_count=None): return float(exp['pricing_quantity']) / cpu_count return 0 + @staticmethod + def _datetime_from_value(value): + dt_format = '%Y-%m-%dT%H:%M:%SZ' + if re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z", value): + dt_format = '%Y-%m-%dT%H:%M:%S.%fZ' + return datetime.strptime(value, dt_format) + def _get(self): (days_threshold, excluded_pools, skip_cloud_accounts) = self.get_options_values() @@ -111,8 +119,8 @@ def _get(self): if r_id in inst_to_remove: continue if exp.get('lineItem/UsageStartDate'): - exp_start_date = datetime.strptime( - exp['lineItem/UsageStartDate'], '%Y-%m-%dT%H:%M:%SZ') + exp_start_date = self._datetime_from_value( + exp['lineItem/UsageStartDate']) else: exp_start_date = exp['start_date'] if exp_start_date < start_date: diff --git a/diworker/diworker/constants.py b/diworker/diworker/constants.py index 9c73232b1..e69de29bb 100644 --- a/diworker/diworker/constants.py +++ b/diworker/diworker/constants.py @@ -1,192 +0,0 @@ -# according to https://docs.aws.amazon.com/cur/latest/userguide/data-dictionary.html -AWS_PARQUET_CSV_MAP = { - 'identity_line_item_id': 'identity/LineItemId', - 'identity_time_interval': 'identity/TimeInterval', - 'bill_invoice_id': 'bill/InvoiceId', - 'bill_billing_entity': 'bill/BillingEntity', - 'bill_bill_type': 'bill/BillType', - 'bill_payer_account_id': 'bill/PayerAccountId', - 'bill_billing_period_start_date': 'bill/BillingPeriodStartDate', - 'bill_billing_period_end_date': 'bill/BillingPeriodEndDate', - 'bill_invoicing_entity': 'bill/InvoicingEntity', - 'line_item_usage_account_id': 'lineItem/UsageAccountId', - 'line_item_line_item_type': 'lineItem/LineItemType', - 'line_item_usage_start_date': 'lineItem/UsageStartDate', - 'line_item_usage_end_date': 'lineItem/UsageEndDate', - 'line_item_product_code': 'lineItem/ProductCode', - 'line_item_usage_type': 'lineItem/UsageType', - 'line_item_operation': 'lineItem/Operation', - 'line_item_availability_zone': 'lineItem/AvailabilityZone', - 'line_item_resource_id': 'lineItem/ResourceId', - 'line_item_usage_amount': 'lineItem/UsageAmount', - 'line_item_net_unblended_cost': 'lineItem/NetUnblendedCost', - 'line_item_normalization_factor': 'lineItem/NormalizationFactor', - 'line_item_normalized_usage_amount': 'lineItem/NormalizedUsageAmount', - 'line_item_currency_code': 'lineItem/CurrencyCode', - 'line_item_unblended_rate': 'lineItem/UnblendedRate', - 'line_item_unblended_cost': 'lineItem/UnblendedCost', - 'line_item_blended_rate': 'lineItem/BlendedRate', - 'line_item_blended_cost': 'lineItem/BlendedCost', - 'line_item_line_item_description': 'lineItem/LineItemDescription', - 'line_item_tax_type': 'lineItem/TaxType', - 'line_item_legal_entity': 'lineItem/LegalEntity', - 'product_product_name': 'product/ProductName', - 'product_purchase_option': 'product/PurchaseOption', - 'product_account_assistance': 'product/accountAssistance', - 'product_architectural_review': 'product/architecturalReview', - 'product_architecture_support': 'product/architectureSupport', - 'product_availability': 'product/availability', - 'product_availability_zone': 'product/availabilityZone', - 'product_best_practices': 'product/bestPractices', - 'product_capacitystatus': 'product/capacitystatus', - 'product_case_severityresponse_times': 'product/caseSeverityresponseTimes', - 'product_classicnetworkingsupport': 'product/classicnetworkingsupport', - 'product_clock_speed': 'product/clockSpeed', - 'product_current_generation': 'product/currentGeneration', - 'product_customer_service_and_communities': 'product/customerServiceAndCommunities', - 'product_database_engine': 'product/databaseEngine', - 'product_dedicated_ebs_throughput': 'product/dedicatedEbsThroughput', - 'product_durability': 'product/durability', - 'product_ecu': 'product/ecu', - 'product_engine_code': 'product/engineCode', - 'product_enhanced_networking_supported': 'product/enhancedNetworkingSupported', - 'product_fee_code': 'product/feeCode', - 'product_fee_description': 'product/feeDescription', - 'product_from_location': 'product/fromLocation', - 'product_from_location_type': 'product/fromLocationType', - 'product_group': 'product/group', - 'product_group_description': 'product/groupDescription', - 'product_included_services': 'product/includedServices', - 'product_insightstype': 'product/insightstype', - 'product_instance_family': 'product/instanceFamily', - 'product_instance_type': 'product/instanceType', - 'product_instance_type_family': 'product/instanceTypeFamily', - 'product_intel_avx2_available': 'product/intelAvx2Available', - 'product_intel_avx_available': 'product/intelAvxAvailable', - 'product_intel_turbo_available': 'product/intelTurboAvailable', - 'product_launch_support': 'product/launchSupport', - 'product_license_model': 'product/licenseModel', - 'product_location': 'product/location', - 'product_location_type': 'product/locationType', - 'product_marketoption': 'product/marketoption', - 'product_max_iops_burst_performance': 'product/maxIopsBurstPerformance', - 'product_max_iopsvolume': 'product/maxIopsvolume', - 'product_max_throughputvolume': 'product/maxThroughputvolume', - 'product_max_volume_size': 'product/maxVolumeSize', - 'product_memory': 'product/memory', - 'product_network_performance': 'product/networkPerformance', - 'product_normalization_size_factor': 'product/normalizationSizeFactor', - 'product_operating_system': 'product/operatingSystem', - 'product_operation': 'product/operation', - 'product_operations_support': 'product/operationsSupport', - 'product_physical_processor': 'product/physicalProcessor', - 'product_platousagetype': 'product/platousagetype', - 'product_platovolumetype': 'product/platovolumetype', - 'product_pre_installed_sw': 'product/preInstalledSw', - 'product_proactive_guidance': 'product/proactiveGuidance', - 'product_processor_architecture': 'product/processorArchitecture', - 'product_processor_features': 'product/processorFeatures', - 'product_product_family': 'product/productFamily', - 'product_programmatic_case_management': 'product/programmaticCaseManagement', - 'product_provisioned': 'product/provisioned', - 'product_purchaseterm': 'product/purchaseterm', - 'product_region': 'product/region', - 'product_servicecode': 'product/servicecode', - 'product_servicename': 'product/servicename', - 'product_sku': 'product/sku', - 'product_storage': 'product/storage', - 'product_storage_class': 'product/storageClass', - 'product_storage_media': 'product/storageMedia', - 'product_technical_support': 'product/technicalSupport', - 'product_tenancy': 'product/tenancy', - 'product_thirdparty_software_support': 'product/thirdpartySoftwareSupport', - 'product_tiertype': 'product/tiertype', - 'product_to_location': 'product/toLocation', - 'product_to_location_type': 'product/toLocationType', - 'product_training': 'product/training', - 'product_transfer_type': 'product/transferType', - 'product_usagetype': 'product/usagetype', - 'product_vcpu': 'product/vcpu', - 'product_version': 'product/version', - 'product_volume_api_name': 'product/volumeApiName', - 'product_volume_type': 'product/volumeType', - 'product_vpcnetworkingsupport': 'product/vpcnetworkingsupport', - 'product_who_can_open_cases': 'product/whoCanOpenCases', - 'pricing_lease_contract_length': 'pricing/LeaseContractLength', - 'pricing_offering_class': 'pricing/OfferingClass', - 'pricing_purchase_option': 'pricing/PurchaseOption', - 'pricing_rate_code': 'pricing/RateCode', - 'pricing_rate_id': 'pricing/RateId', - 'pricing_currency': 'pricing/currency', - 'pricing_public_on_demand_cost': 'pricing/publicOnDemandCost', - 'pricing_public_on_demand_rate': 'pricing/publicOnDemandRate', - 'pricing_term': 'pricing/term', - 'pricing_unit': 'pricing/unit', - 'reservation_amortized_upfront_cost_for_usage': - 'reservation/AmortizedUpfrontCostForUsage', - 'reservation_amortized_upfront_fee_for_billing_period': - 'reservation/AmortizedUpfrontFeeForBillingPeriod', - 'reservation_effective_cost': 'reservation/EffectiveCost', - 'reservation_end_time': 'reservation/EndTime', - 'reservation_modification_status': 'reservation/ModificationStatus', - 'reservation_net_amortized_upfront_cost_for_usage': - 'reservation/NetAmortizedUpfrontCostForUsage', - 'reservation_net_amortized_upfront_fee_for_billing_period': - 'reservation/NetAmortizedUpfrontFeeForBillingPeriod', - 'reservation_net_recurring_fee_for_usage': - 'reservation/NetRecurringFeeForUsage', - 'reservation_net_unused_recurring_fee': 'reservation/NetUnusedRecurringFee', - 'reservation_net_upfront_value': 'reservation/NetUpfrontValue', - 'reservation_net_unused_amortized_upfront_fee_for_billing_period': - 'reservation/NetUnusedAmortizedUpfrontFeeForBillingPeriod', - 'reservation_net_effective_cost': 'reservation/NetEffectiveCost', - 'reservation_normalized_units_per_reservation': - 'reservation/NormalizedUnitsPerReservation', - 'reservation_number_of_reservations': 'reservation/NumberOfReservations', - 'reservation_recurring_fee_for_usage': 'reservation/RecurringFeeForUsage', - 'reservation_reservation_a_r_n': 'reservation/ReservationARN', - 'reservation_start_time': 'reservation/StartTime', - 'reservation_subscription_id': 'reservation/SubscriptionId', - 'reservation_total_reserved_normalized_units': - 'reservation/TotalReservedNormalizedUnits', - 'reservation_total_reserved_units': 'reservation/TotalReservedUnits', - 'reservation_units_per_reservation': 'reservation/UnitsPerReservation', - 'reservation_unused_amortized_upfront_fee_for_billing_period': - 'reservation/UnusedAmortizedUpfrontFeeForBillingPeriod', - 'reservation_unused_normalized_unit_quantity': - 'reservation/UnusedNormalizedUnitQuantity', - 'reservation_unused_quantity': 'reservation/UnusedQuantity', - 'reservation_unused_recurring_fee': 'reservation/UnusedRecurringFee', - 'reservation_upfront_value': 'reservation/UpfrontValue', - 'savings_plan_total_commitment_to_date': - 'savingsPlan/TotalCommitmentToDate', - 'savings_plan_savings_plan_a_r_n': 'savingsPlan/SavingsPlanARN', - 'savings_plan_savings_plan_rate': 'savingsPlan/SavingsPlanRate', - 'savings_plan_used_commitment': 'savingsPlan/UsedCommitment', - 'savings_plan_savings_plan_effective_cost': - 'savingsPlan/SavingsPlanEffectiveCost', - 'savings_plan_amortized_upfront_commitment_for_billing_period': - 'savingsPlan/AmortizedUpfrontCommitmentForBillingPeriod', - 'savings_plan_recurring_commitment_for_billing_period': - 'savingsPlan/RecurringCommitmentForBillingPeriod', - 'savings_plan_start_time': 'savingsPlan/StartTime', - 'savings_plan_end_time': 'savingsPlan/EndTime', - 'savings_plan_offering_type': 'savingsPlan/OfferingType', - 'savings_plan_payment_option': 'savingsPlan/PaymentOption', - 'savings_plan_purchase_term': 'savingsPlan/PurchaseTerm', - 'savings_plan_region': 'savingsPlan/Region', - 'savings_plan_net_savings_plan_effective_cost': - 'savingsPlan/NetSavingsPlanEffectiveCost', - 'savings_plan_net_amortized_upfront_commitment_for_billing_period': - 'savingsPlan/NetAmortizedUpfrontCommitmentForBillingPeriod', - 'savings_plan_net_recurring_commitment_for_billing_period': - 'savingsPlan/NetRecurringCommitmentForBillingPeriod', - 'resource_tags_user_name': 'resourceTags/user:Name', - 'resource_tags_aws_cloudformation_stack_id': - 'resourceTags/aws:cloudformation:stack-id', - 'resource_tags_aws_cloudformation_logical_id': - 'resourceTags/aws:cloudformation:logical-id', - 'resource_tags_aws_cloudformation_stack_name': - 'resourceTags/aws:cloudformation:stack-name', - 'resource_tags_aws_created_by': 'resourceTags/aws:createdBy', -} diff --git a/diworker/diworker/importers/aws.py b/diworker/diworker/importers/aws.py index 3ef24aa93..045f33e58 100644 --- a/diworker/diworker/importers/aws.py +++ b/diworker/diworker/importers/aws.py @@ -3,14 +3,15 @@ import gzip import logging import os +import re import pyarrow import shutil import uuid import zipfile +import json from collections import defaultdict, OrderedDict from datetime import datetime, timedelta, timezone -from diworker.diworker.constants import AWS_PARQUET_CSV_MAP from diworker.diworker.importers.base import CSVBaseReportImporter import pyarrow.parquet as pq @@ -32,7 +33,28 @@ 'Windows with SQL Server Web', 'Windows with SQL Server Enterprise', ] -tag_prefixes = ['resource_tags_aws_', 'resource_tags_user_'] +SERVICE_TAGS_MAP = { + 'user_name': 'user:Name', + 'aws_cloudformation_stack_id': 'aws:cloudformation:stack-id', + 'aws_cloudformation_logical_id': 'aws:cloudformation:logical-id', + 'aws_cloudformation_stack_name': 'aws:cloudformation:stack-name', + 'aws_created_by': 'aws:createdBy', +} +SERVICE_TAG_PREFIXES = ['aws', 'user', 'cloudformation'] +# This map is needed for proper extraction of nested objects +# format: {field_name_prefix: (is_lowercase, [case exceptions])} +AWS_CUR_PREFIX_MAP = { + 'identity': (False, []), + 'bill': (False, []), + 'line_item': (False, []), + 'product': (True, ['product_name', 'purchase_option', 'size_flex']), + 'pricing': (True, ['rate_code', 'rate_id', 'purchase_option', + 'offering_class', 'lease_contract_length']), + 'reservation': (False, []), + 'savings_plan': (False, []), + 'resource_tags': (False, []), + 'cost_category': (False, []), +} class AWSReportImporter(CSVBaseReportImporter): @@ -91,6 +113,14 @@ def gunzip_report(report_path, dest_dir): return new_report_path + @staticmethod + def to_camel_case(snake_str): + return "".join(x.capitalize() for x in snake_str.split("_")) + + @staticmethod + def to_lower_case(snake_str): + return snake_str[0].lower() + snake_str[1:] + def unpack_report(self, report_file, date): dest_dir = self.get_new_report_path(date) os.makedirs(dest_dir, exist_ok=True) @@ -269,14 +299,85 @@ def _set_resource_id(self, expense): if res_id: expense['resource_id'] = res_id + def _to_csv_tag(self, prefix, tag, root=True): + if root and tag in SERVICE_TAGS_MAP: + return f'{prefix}{SERVICE_TAGS_MAP[tag]}' + subprefix = next(( + s for s in SERVICE_TAG_PREFIXES if tag.startswith(s) + ), None) + if not subprefix: + return f'{prefix}{tag}' + subkey = f'{tag[len(subprefix) + 1:]}' + return self._to_csv_tag(f'{prefix}{subprefix}:', subkey, False) + + def _get_legacy_csv_key(self, old_key): + key = next(( + s for s in AWS_CUR_PREFIX_MAP.keys() if old_key.startswith(f'{s}_') + ), None) + if not key: + return old_key + prefix = self.to_lower_case(self.to_camel_case(key)) + subkey = old_key[len(key) + 1:] + if not subkey: + return prefix + if key == 'resource_tags': + return self._to_csv_tag(f'{prefix}/', subkey) + else: + to_lower, exceptions = AWS_CUR_PREFIX_MAP[key] + new_key = self.to_camel_case(subkey) + if subkey in exceptions: + to_lower = not to_lower + if to_lower: + new_key = self.to_lower_case(new_key) + return f'{prefix}/{new_key}' + + def _extract_nested_objects(self, obj, parquet=False): + updates = defaultdict(dict) + removed_keys = set() + # extract nested objects + for k in AWS_CUR_PREFIX_MAP.keys(): + values = obj.get(k) + if not values: + continue + if parquet: + for n, vals in values.items(): + if isinstance(vals, list): + for postfix, value in vals: + snake_key = f'{k}_{postfix}' + csv_key = self._get_legacy_csv_key(snake_key) + updates[csv_key][n] = value + removed_keys.add(k) + else: + try: + nested_objects = json.loads(values) + except Exception: + continue + for new_key, new_value in nested_objects.items(): + snake_key = f'{k}_{new_key}' + csv_key = self._get_legacy_csv_key(snake_key) + updates[csv_key] = new_value + removed_keys.add(k) + for k in removed_keys: + obj.pop(k) + obj.update(updates) + return obj + + def _convert_to_legacy_csv_columns(self, columns, dict_format=False): + if not dict_format: + return [self._get_legacy_csv_key(col) for col in columns] + return {col: self._get_legacy_csv_key(col) for col in columns} + def load_csv_report(self, report_path, account_id_ca_id_map, billing_period, skipped_accounts): date_start = datetime.utcnow() with open(report_path, newline='') as csvfile: reader = csv.DictReader(csvfile) + reader.fieldnames = self._convert_to_legacy_csv_columns( + reader.fieldnames) chunk = [] record_number = 0 for row in reader: + row = self._extract_nested_objects(row) if billing_period is None: billing_period = row['bill/BillingPeriodStartDate'] LOG.info('detected billing period: %s', billing_period) @@ -333,9 +434,12 @@ def load_parquet_report(self, report_path, account_id_ca_id_map, billing_period, skipped_accounts): date_start = datetime.utcnow() dataframe = pq.read_pandas(report_path).to_pandas() - dataframe.rename(columns=AWS_PARQUET_CSV_MAP, inplace=True) + new_columns = self._convert_to_legacy_csv_columns( + dataframe.columns, dict_format=True) + dataframe.rename(columns=new_columns, inplace=True) for i in range(0, dataframe.shape[0], CHUNK_SIZE): - expense_chunk = dataframe.iloc[i:i + CHUNK_SIZE, :].to_dict() + expense_chunk = self._extract_nested_objects( + dataframe.iloc[i:i + CHUNK_SIZE, :].to_dict(), parquet=True) chunk = [{} for _ in range(0, CHUNK_SIZE)] skipped_rows = set() for field_name, values_dict in expense_chunk.items(): @@ -359,7 +463,7 @@ def load_parquet_report(self, report_path, account_id_ca_id_map, continue chunk[expense_num]['cloud_account_id'] = cloud_account_id self.detected_cloud_accounts.add(cloud_account_id) - elif field_name == 'lineItem/ResourceId' and value != '': + elif field_name == 'lineItem/ResourceId' and value: chunk[expense_num]['resource_id'] = value[value.find('/') + 1:] elif field_name == 'lineItem/UsageStartDate': start_date = self._datetime_from_value(value).replace( @@ -373,7 +477,7 @@ def load_parquet_report(self, report_path, account_id_ca_id_map, elif field_name == 'lineItem/UsageType': if 'BoxUsage' in value: chunk[expense_num]['box_usage'] = True - if value != '': + if value: chunk[expense_num][field_name] = value expenses = [x for x in chunk if x and @@ -402,13 +506,6 @@ def _extract_tag_name(tag_key, prefix_symbol): prefix_len = tag_key.find(prefix_symbol) + 1 return tag_key[prefix_len:] - def _extract_parquet_tag_name(tag_key): - prefix = 'resource_tags_' - for prefix_ in tag_prefixes: - if tag_key.startswith(prefix_): - prefix = prefix_ - return tag_key[len(prefix):] - for k, v in expense.items(): if (not k.startswith('resourceTags') and not k.startswith('resource_tags')): @@ -416,10 +513,8 @@ def _extract_parquet_tag_name(tag_key): if k != 'resourceTags/user:Name': if k.startswith('resourceTags/aws'): name = _extract_tag_name(k, '/') - elif k.startswith('resourceTags/'): - name = _extract_tag_name(k, ':') else: - name = _extract_parquet_tag_name(k) + name = _extract_tag_name(k, ':') raw_tags[name] = v tags = self.extract_tags(raw_tags) return tags @@ -428,14 +523,15 @@ def _extract_parquet_tag_name(tag_key): def _datetime_from_expense(expense, key): value = expense[key] if isinstance(value, str): - return datetime.strptime(expense[key], '%Y-%m-%dT%H:%M:%SZ' - ).replace(tzinfo=timezone.utc) + return AWSReportImporter._datetime_from_value(expense[key]) return value.replace(tzinfo=timezone.utc) @staticmethod def _datetime_from_value(value): - return datetime.strptime(value, '%Y-%m-%dT%H:%M:%SZ' - ).replace(tzinfo=timezone.utc) + dt_format = '%Y-%m-%dT%H:%M:%SZ' + if re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z", value): + dt_format = '%Y-%m-%dT%H:%M:%S.%fZ' + return datetime.strptime(value, dt_format).replace(tzinfo=timezone.utc) def get_resource_info_from_expenses(self, expenses, resource_type=None): name = None @@ -518,19 +614,15 @@ def get_resource_info_from_expenses(self, expenses, resource_type=None): meta_dict['applied_region'] = applied_region if not start and 'savingsPlan/StartTime' in e: try: - start = int(datetime.strptime( - e.get('savingsPlan/StartTime'), - '%Y-%m-%dT%H:%M:%S.%fZ').replace( - tzinfo=timezone.utc).timestamp()) + start = int(self._datetime_from_value( + e['savingsPlan/StartTime']).timestamp()) meta_dict['start'] = start except (TypeError, ValueError): pass if not end and 'savingsPlan/EndTime' in e: try: - end = int(datetime.strptime( - e.get('savingsPlan/EndTime'), - '%Y-%m-%dT%H:%M:%S.%fZ').replace( - tzinfo=timezone.utc).timestamp()) + end = int(self._datetime_from_value( + e['savingsPlan/EndTime']).timestamp()) meta_dict['end'] = end except (TypeError, ValueError): pass @@ -546,19 +638,15 @@ def get_resource_info_from_expenses(self, expenses, resource_type=None): meta_dict['purchase_term'] = purchase_term if not start and 'reservation/StartTime' in e: try: - start = int(datetime.strptime( - e.get('reservation/StartTime'), - '%Y-%m-%dT%H:%M:%S.%fZ').replace( - tzinfo=timezone.utc).timestamp()) + start = int(self._datetime_from_value( + e['reservation/StartTime']).timestamp()) meta_dict['start'] = start except (TypeError, ValueError): pass if not end and 'reservation/EndTime' in e: try: - end = int(datetime.strptime( - e.get('reservation/EndTime'), - '%Y-%m-%dT%H:%M:%S.%fZ').replace( - tzinfo=timezone.utc).timestamp()) + end = int(self._datetime_from_value( + e['reservation/EndTime']).timestamp()) meta_dict['end'] = end except (TypeError, ValueError): pass diff --git a/risp/risp_worker/worker.py b/risp/risp_worker/worker.py index 238d7c3dd..c3ab131f7 100644 --- a/risp/risp_worker/worker.py +++ b/risp/risp_worker/worker.py @@ -3,6 +3,7 @@ from threading import Thread import os +import re import time import logging import urllib3 @@ -66,6 +67,13 @@ def clickhouse_client(self): host=host, password=password, database=CH_DB_NAME, user=user) return self._clickhouse_client + @staticmethod + def _datetime_from_value(value): + dt_format = '%Y-%m-%dT%H:%M:%SZ' + if re.match(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+Z", value): + dt_format = '%Y-%m-%dT%H:%M:%S.%fZ' + return datetime.strptime(value, dt_format).replace(tzinfo=timezone.utc) + def get_consumers(self, consumer, channel): return [consumer(queues=[TASK_QUEUE], accept=['json'], callbacks=[self.process_task], prefetch_count=10)] @@ -297,9 +305,8 @@ def ri_expected_cost_per_day(self, cloud_account_id, start_date, end_date): date] = HRS_IN_DAY * norm_factor * cost_per_n_hr # the first RIFee expense not includes hours from start of the # month to RI purchasing time - period_start = datetime.strptime( - expense['lineItem/UsageStartDate'], '%Y-%m-%dT%H:%M:%SZ' - ).replace(tzinfo=timezone.utc) + period_start = self._datetime_from_value( + expense['lineItem/UsageStartDate']) if period_start > exp_start_date: not_used_hrs = (period_start - exp_start_date ).total_seconds() / SECONDS_IN_HOUR diff --git a/tools/cloud_adapter/clouds/aws.py b/tools/cloud_adapter/clouds/aws.py index e713831e2..c3e9c0682 100644 --- a/tools/cloud_adapter/clouds/aws.py +++ b/tools/cloud_adapter/clouds/aws.py @@ -47,6 +47,13 @@ # maximum value for MaxResults (AWS limitation) MAX_RESULTS = 1000 +REGEX_AWS_REPORT_FORMAT = 'data/BILLING_PERIOD=[0-9]{{4}}-[0-9]{{2}}' +REGEX_AWS_REPORT_GROUP = 'BILLING_PERIOD=[0-9]{4}-[0-9]{2}' +REGEX_AWS_REPORT_FORMAT_CSV_LEGACY = '[0-9]{{8}}-[0-9]{{8}}(/[0-9]{{8}}T[0-9]{{6}}Z)?' +REGEX_AWS_REPORT_GROUP_CSV_LEGACY = '[0-9]{8}-[0-9]{8}' +REGEX_AWS_REPORT_FORMAT_PARQUET_LEGACY = '{1}/year=[0-9]{{4}}/month=([1-9]|1[0-2])' +REGEX_AWS_REPORT_GROUP_PARQUET_LEGACY = 'year=[0-9]{4}/month=([1-9]|1[0-2])/' + def _retry_on_error(exc): if isinstance(exc, ResponseParserError): @@ -566,30 +573,38 @@ def get_report_files(self): def find_parquet_reports(self, s3_objects, prefix, report_name): reports = {} + parquet_regex_parts = [ + (REGEX_AWS_REPORT_FORMAT, + REGEX_AWS_REPORT_GROUP), # parquet reports + (REGEX_AWS_REPORT_FORMAT_PARQUET_LEGACY, + REGEX_AWS_REPORT_GROUP_PARQUET_LEGACY) # legacy parquet reports + ] try: - report_regex_fmt = '^{0}/{1}/{1}/year=[0-9]{{4}}/' \ - 'month=([1-9]|1[0-2])/{1}-[0-9]{{5}}' \ - '.snappy.parquet$' - report_regex = re.compile( - report_regex_fmt.format(re.escape(prefix), - re.escape(report_name))) - for report in [f for f in s3_objects['Contents'] - if re.match(report_regex, f['Key'])]: - group = re.search(r'year=[0-9]{4}/month=([1-9]|1[0-2])/', - report['Key']).group(0) - common_group = self._group_to_daterange(group) - - if common_group not in reports: - reports[common_group] = [] - reports[common_group].append(report) + for format_part, group_part in parquet_regex_parts: + report_regex_fmt = r'^{0}/{1}/%s/{1}-[0-9]{{5}}.snappy.parquet$' \ + % format_part + report_regex = re.compile( + report_regex_fmt.format(re.escape(prefix), + re.escape(report_name))) + for report in [f for f in s3_objects['Contents'] + if re.match(report_regex, f['Key'])]: + group = re.search(group_part, report['Key']).group(0) + common_group = self._group_to_daterange(group) + if common_group not in reports: + reports[common_group] = [] + reports[common_group].append(report) except KeyError: reports = {} return reports @staticmethod def _group_to_daterange(group): - year = int(group[5:].split('/')[0]) - month = int(group.split('month=')[1].split('/')[0]) + if 'BILLING_PERIOD' in group: + year = int(group[-7:-3]) + month = int(group[-2:]) + else: + year = int(group[5:].split('/')[0]) + month = int(group.split('month=')[1].split('/')[0]) if month == 12: next_year = year + 1 next_month = 1 @@ -602,18 +617,24 @@ def _group_to_daterange(group): def find_csv_reports(self, s3_objects, prefix, report_name): reports = {} try: - report_regex_fmt = '^{0}/{1}/[0-9]{{8}}-[0-9]{{8}}(/[0-9]{{8}}' \ - 'T[0-9]{{6}}Z)?/{1}-[0-9]{{5}}.csv.(gz|zip)$' - report_regex = re.compile( - report_regex_fmt.format(re.escape(prefix), - re.escape(report_name))) - for report in [f for f in s3_objects['Contents'] - if re.match(report_regex, f['Key'])]: - group = re.search(r'[0-9]{8}-[0-9]{8}', - report['Key']).group(0) - if group not in reports: - reports[group] = [] - reports[group].append(report) + csv_regex_parts = [ + (REGEX_AWS_REPORT_FORMAT, + REGEX_AWS_REPORT_GROUP), # csv reports + (REGEX_AWS_REPORT_FORMAT_CSV_LEGACY, + REGEX_AWS_REPORT_GROUP_CSV_LEGACY) # legacy csv reports + ] + for format_part, group_part in csv_regex_parts: + report_regex_fmt = '^{0}/{1}/%s/{1}-[0-9]{{5}}.csv.(gz|zip)$' \ + % format_part + report_regex = re.compile( + report_regex_fmt.format(re.escape(prefix), + re.escape(report_name))) + for report in [f for f in s3_objects['Contents'] + if re.match(report_regex, f['Key'])]: + group = re.search(group_part, report['Key']).group(0) + if group not in reports: + reports[group] = [] + reports[group].append(report) except KeyError: reports = {} return reports