diff --git a/README.md b/README.md index b663f75..2345473 100644 --- a/README.md +++ b/README.md @@ -66,28 +66,29 @@ For more information on module version pinning, see [Selecting a Revision](https | Name | Description | Type | Default | Required | |------|-------------|:----:|:-----:|:-----:| -| batch_size | Maximum number of records passed for a single Lambda invocation | string | - | yes | -| enable_kinesis_mapping | Determines if the event source mapping will be enabled | string | `true` | no | -| failed_log_s3_bucket | S3 bucket name for saving failed logs (ES API errors etc.) | string | - | yes | -| failed_log_s3_prefix | Path prefix for failed logs | string | - | yes | -| handler | Lambda Function handler (entrypoint) | string | `main.handler` | no | -| lambda_package_url | Lambda package URL (see Usage in README) | string | - | yes | -| log_id_field | Key name for unique log ID | string | `log_id` | no | -| log_retention_in_days | Lambda Function log retention in days | string | `30` | no | -| log_timestamp_field | Key name for log timestamp | string | `time` | no | -| log_type_field | Key name for log type | string | `log_type` | no | -| log_type_field_whitelist | Log type whitelist (if empty, all types will be processed) | list(string) | `[]` | no | -| log_type_unknown_prefix | Log type prefix for logs without log type field | string | `unknown` | no | -| memory | Lambda Function memory in megabytes | string | `256` | no | -| name | Resource name | string | - | yes | -| runtime | Lambda Function runtime | string | `python3.7` | no | -| source_stream_name | Source Kinesis Data Stream name | string | - | yes | -| starting_position | Kinesis ShardIterator type (see: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html ) | string | `TRIM_HORIZON` | no | +| batch\_size | Maximum number of records passed for a single Lambda invocation | string | n/a | yes | +| enable\_kinesis\_mapping | Determines if the event source mapping will be enabled | string | `"true"` | no | +| failed\_log\_s3\_bucket | S3 bucket name for saving failed logs (ES API errors etc.) | string | n/a | yes | +| failed\_log\_s3\_prefix | Path prefix for failed logs | string | n/a | yes | +| handler | Lambda Function handler (entrypoint) | string | `"main.handler"` | no | +| kinesis\_max\_retries | Times to retry PutRecords on errors (wait time between retires is 500ms) | number | `"3"` | no | +| lambda\_package\_url | Lambda package URL (see Usage in README) | string | n/a | yes | +| log\_id\_field | Key name for unique log ID | string | `"log_id"` | no | +| log\_retention\_in\_days | Lambda Function log retention in days | string | `"30"` | no | +| log\_timestamp\_field | Key name for log timestamp | string | `"time"` | no | +| log\_type\_field | Key name for log type | string | `"log_type"` | no | +| log\_type\_field\_whitelist | Log type whitelist (if empty, all types will be processed) | list(string) | `[]` | no | +| log\_type\_unknown\_prefix | Log type prefix for logs without log type field | string | `"unknown"` | no | +| memory | Lambda Function memory in megabytes | string | `"256"` | no | +| name | Resource name | string | n/a | yes | +| runtime | Lambda Function runtime | string | `"python3.7"` | no | +| source\_stream\_name | Source Kinesis Data Stream name | string | n/a | yes | +| starting\_position | Kinesis ShardIterator type (see: https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html ) | string | `"TRIM_HORIZON"` | no | | tags | Tags for Lambda Function | map(string) | `{}` | no | -| target_stream_name | Target Kinesis Data Stream name | string | - | yes | -| timeout | Lambda Function timeout in seconds | string | `60` | no | -| timezone | tz database timezone name (e.g. Asia/Tokyo) | string | `UTC` | no | -| tracing_mode | X-Ray tracing mode (see: https://docs.aws.amazon.com/lambda/latest/dg/API_TracingConfig.html ) | string | `PassThrough` | no | +| target\_stream\_name | Target Kinesis Data Stream name | string | n/a | yes | +| timeout | Lambda Function timeout in seconds | string | `"60"` | no | +| timezone | tz database timezone name (e.g. Asia/Tokyo) | string | `"UTC"` | no | +| tracing\_mode | X-Ray tracing mode (see: https://docs.aws.amazon.com/lambda/latest/dg/API_TracingConfig.html ) | string | `"PassThrough"` | no | diff --git a/lambda/main.py b/lambda/main.py index 210cdc8..d2a6dc1 100644 --- a/lambda/main.py +++ b/lambda/main.py @@ -10,6 +10,7 @@ import time import uuid from json import JSONDecodeError +from typing import List, Any, Tuple, Dict import boto3 import dateutil.parser @@ -35,9 +36,6 @@ s3 = boto3.client('s3') kinesis_client = boto3.client('kinesis') -# consts -RANDOM_ALPHANUMERICAL = string.ascii_lowercase + string.ascii_uppercase + string.digits - # configure with env vars FAILED_LOG_S3_PREFIX: str = os.environ['FAILED_LOG_S3_PREFIX'] FAILED_LOG_S3_BUCKET: str = os.environ['FAILED_LOG_S3_BUCKET'] @@ -45,35 +43,45 @@ LOG_ID_FIELD: str = os.environ['LOG_ID_FIELD'] LOG_TYPE_FIELD: str = os.environ['LOG_TYPE_FIELD'] LOG_TIMESTAMP_FIELD: str = os.environ['LOG_TIMESTAMP_FIELD'] -LOG_TYPE_FIELD_WHITELIST: list = str(os.environ['LOG_TYPE_WHITELIST']).split(',') LOG_TYPE_UNKNOWN_PREFIX: str = os.environ['LOG_TYPE_UNKNOWN_PREFIX'] +LOG_TYPE_FIELD_WHITELIST_TMP: list = str(os.environ['LOG_TYPE_WHITELIST']).split(',') +if len(LOG_TYPE_FIELD_WHITELIST_TMP) == 0: + LOG_TYPE_FIELD_WHITELIST = set() +else: + LOG_TYPE_FIELD_WHITELIST = set(LOG_TYPE_FIELD_WHITELIST_TMP) + TARGET_STREAM_NAME: str = os.environ['TARGET_STREAM_NAME'] +KINESIS_MAX_RETRIES: int = int(os.environ['KINESIS_MAX_RETRIES']) + +class KinesisException(Exception): + pass -def append_to_dict(dictionary: dict, log_type: str, log_data: object, log_timestamp=None, log_id=None): + +def append_to_log_dict(dictionary: dict, log_type: str, log_data: object, log_timestamp=None, log_id=None): if log_type not in dictionary: # we've got first record for this type, initialize value for type # first record timestamp to use in file path - if log_timestamp: + if log_timestamp is None: + logger.info(f"No timestamp for first record") + logger.info(f"Falling back to current time for type \"{log_type}\"") + log_timestamp = datetime.datetime.now() + else: try: log_timestamp = dateutil.parser.parse(log_timestamp) except TypeError: logger.error(f"Bad timestamp: {log_timestamp}") logger.info(f"Falling back to current time for type \"{log_type}\"") log_timestamp = datetime.datetime.now() - else: - logger.info(f"No timestamp for first record") - logger.info(f"Falling back to current time for type \"{log_type}\"") - log_timestamp = datetime.datetime.now() # first record log_id field to use as filename suffix to prevent duplicate files - if log_id: - logger.info(f"Using first log record ID as filename suffix: {log_id}") - else: + if log_id is None: log_id = str(uuid.uuid4()) logger.info(f"First log record ID is not available, using random ID as filename suffix instead: {log_id}") + else: + logger.info(f"Using first log record ID as filename suffix: {log_id}") dictionary[log_type] = { 'records': list(), @@ -84,235 +92,305 @@ def append_to_dict(dictionary: dict, log_type: str, log_data: object, log_timest dictionary[log_type]['records'].append(log_data) -def normalize_kinesis_payload(p: dict): +def normalize_kinesis_payload(p: dict) -> List[dict]: # Normalize messages from CloudWatch (subscription filters) and pass through anything else # https://docs.aws.amazon.com/ja_jp/AmazonCloudWatch/latest/logs/SubscriptionFilters.html - logger.debug(f"normalizer input: {p}") - - payloads = [] + logger.debug(f"Normalizer input: {p}") if len(p) < 1: - logger.error(f"Got weird record: \"{p}\", skipping") - return payloads + logger.error(f"Got weird record, skipping: {p}") + return [] # check if data is JSON and parse try: payload = json.loads(p) if type(payload) is not dict: logger.error(f"Top-level JSON data is not an object, giving up: {payload}") - return payloads + return [] except JSONDecodeError: logger.error(f"Non-JSON data found: {p}, giving up") - return payloads + return [] - if 'messageType' in payload: - logger.debug(f"Got payload looking like CloudWatch Logs via subscription filters: {payload}") + if 'messageType' not in payload: + return [payload] - if payload['messageType'] == "DATA_MESSAGE": - if 'logEvents' in payload: - for event in payload['logEvents']: - # check if data is JSON and parse - try: - logger.debug(f"message: {event['message']}") - payload_parsed = json.loads(event['message']) - logger.debug(f"parsed payload: {payload_parsed}") + # messageType is present in payload, must be coming from CloudWatch + logger.debug(f"Got payload looking like CloudWatch Logs via subscription filters: {payload}") - if type(payload_parsed) is not dict: - logger.error(f"Top-level JSON data in CWL payload is not an object, giving up: " - f"{payload_parsed}") - continue + return extract_json_data_from_cwl_message(payload) - except JSONDecodeError as e: - logger.debug(e) - logger.debug(f"Non-JSON data found inside CWL message: {event}, giving up") - continue - payloads.append(payload_parsed) +def extract_json_data_from_cwl_message(payload: dict) -> List[dict]: + # see: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html + if payload['messageType'] == "CONTROL_MESSAGE": + logger.info(f"Got CONTROL_MESSAGE from CloudWatch: {payload}, skipping") + return [] - else: - logger.error(f"Got DATA_MESSAGE from CloudWatch but logEvents are not present, " - f"skipping payload: {payload}") + elif payload['messageType'] == "DATA_MESSAGE": + payloads = [] - elif payload['messageType'] == "CONTROL_MESSAGE": - logger.info(f"Got CONTROL_MESSAGE from CloudWatch: {payload}, skipping") - return payloads + if 'logEvents' not in payload: + logger.error(f"Got DATA_MESSAGE from CloudWatch Logs but logEvents are not present, " + f"skipping payload: {payload}") + return [] + + events = payload['logEvents'] + + for event in events: + # check if data is JSON and parse + try: + logger.debug(f"message: {event['message']}") + payload_parsed = json.loads(event['message']) + logger.debug(f"parsed payload: {payload_parsed}") + + if type(payload_parsed) is not dict: + logger.error(f"Top-level JSON data in CloudWatch Logs payload is not an object, skipping: " + f"{payload_parsed}") + continue + + except JSONDecodeError as e: + logger.debug(e) + logger.debug(f"Non-JSON data found inside CloudWatch Logs message: {event}, skipping") + continue + + payloads.append(payload_parsed) + + return payloads - else: - logger.error(f"Got unknown messageType, shutting down") - raise ValueError(f"Unknown messageType: {payload}") else: - payloads.append(payload) + logger.error(f"Got unknown messageType: {payload['messageType']} , skipping") + return [] + + +def dict_get_default(dictionary: dict, key: str, default: any, verbose: bool = False) -> Any: + """ + Get key from dictionary if key is in dictionary, default value otherwise + + :param dictionary: dictionary to retrieve key from + :param key: key name in dictionary + :param default: value to return if key is not in dictionary + :param verbose: output detailed warning message when returning default value + :return: value for key if key is in dictionary, default value otherwise + """ + if key not in dictionary: + if verbose: + logger.warning(f"Cannot retrieve field \"{key}\" from data: {dictionary}, " + f"falling back to default value: {default}") + return default, True - return payloads + else: + return dictionary[key], False -def decode_validate(raw_records: list): - xray_recorder.begin_subsegment('decode and validate') +def parse_json_logs(raw_kinesis_records: list) -> List[dict]: + """ + Deaggregates, decodes, decompresses Kinesis Records and parses them as JSON + events by log_type. - log_dict = dict() + :param raw_kinesis_records: Raw Kinesis records (usually event['Records'] in Lambda handler function) + :return: + """ + parent_segment = xray_recorder.begin_subsegment('parse_json_logs') + + all_payloads = list() processed_records = 0 - for record in iter_deaggregate_records(raw_records): - logger.debug(f"raw Kinesis record: {record}") + for record in iter_deaggregate_records(raw_kinesis_records): + processed_records += 1 + + logger.debug(f"Raw Kinesis record: {record}") + # Kinesis data is base64 encoded - decoded_data = base64.b64decode(record['kinesis']['data']) + raw_data = base64.b64decode(record['kinesis']['data']) - # check if base64 contents is gzip - # gzip magic number 0x1f 0x8b - if decoded_data[0] == 0x1f and decoded_data[1] == 0x8b: - decoded_data = gzip.decompress(decoded_data) + # decompress data if raw data is gzip (log data from CloudWatch Logs subscription filters comes gzipped) + # gzip magic number: 0x1f 0x8b + if raw_data[0] == 0x1f and raw_data[1] == 0x8b: + raw_data = gzip.decompress(raw_data) - decoded_data = decoded_data.decode() - normalized_payloads = normalize_kinesis_payload(decoded_data) - logger.debug(f"Normalized payloads: {normalized_payloads}") + data = raw_data.decode() + payloads = normalize_kinesis_payload(data) + logger.debug(f"Normalized payloads: {payloads}") - for normalized_payload in normalized_payloads: - logger.debug(f"Parsing normalized payload: {normalized_payload}") + for payload in payloads: + all_payloads.append(payload) - processed_records += 1 + logger.info(f"Processed {processed_records} records from Kinesis") + parent_segment.end_subsegment() - # check if log type field is available - try: - log_type = normalized_payload[LOG_TYPE_FIELD] + return all_payloads - except KeyError: - logger.error(f"Cannot retrieve necessary field \"{LOG_TYPE_FIELD}\" " - f"from payload: {normalized_payload}") - log_type = f"{LOG_TYPE_UNKNOWN_PREFIX}/unknown_type" - logger.error(f"Marking as {log_type}") - # check if timestamp is present - try: - timestamp = normalized_payload[LOG_TIMESTAMP_FIELD] +def parse_payloads_to_log_dict(payloads, + log_id_key, + log_timestamp_key, + log_type_key, + log_type_whitelist) -> Tuple[Dict[str, dict], Dict[str, dict]]: - except KeyError: - logger.error(f"Cannot retrieve recommended field \"{LOG_TIMESTAMP_FIELD}\" " - f"from payload: {normalized_payload}") - timestamp = None + log_dict = dict() + failed_dict = dict() - try: - log_id = normalized_payload[LOG_ID_FIELD] - except KeyError: - logger.error(f"Cannot retrieve recommended field \"{LOG_ID_FIELD}\" " - f"from payload: {normalized_payload}") - log_id = None + for payload in payloads: + target_dict = log_dict - # valid data - append_to_dict(log_dict, log_type, normalized_payload, log_timestamp=timestamp, log_id=log_id) + logger.debug(f"Parsing normalized payload: {payload}") - logger.info(f"Processed {processed_records} records from Kinesis") - xray_recorder.end_subsegment() - return log_dict + log_type_unknown = f"{LOG_TYPE_UNKNOWN_PREFIX}/unknown_type" + + log_type, log_type_missing = dict_get_default( + payload, + key=log_type_key, + default=log_type_unknown, + verbose=True, + ) + + if log_type_missing: + target_dict = failed_dict + else: + if (log_type_whitelist is not None) and (log_type not in log_type_whitelist): + continue + timestamp, _ = dict_get_default( + payload, + key=log_timestamp_key, + default=None, + ) -def apply_whitelist(log_dict: dict, whitelist: list): - retval = dict() - if len(whitelist) == 0: - for key in log_dict.keys(): - if not key.startswith(LOG_TYPE_UNKNOWN_PREFIX): - retval[key] = log_dict[key] - return retval + log_id, _ = dict_get_default( + payload, + key=log_id_key, + default=None, + ) - for entry in whitelist: - if entry in log_dict: - retval[entry] = log_dict[entry] - return retval + # valid data + append_to_log_dict(target_dict, log_type, payload, log_timestamp=timestamp, log_id=log_id) + return log_dict, failed_dict -def split_list(l, size): - for i in range(0, len(l), size): - yield l[i:i+size] +def split_list(lst: list, size: int) -> List[list]: + for i in range(0, len(lst), size): + yield lst[i:i + size] -def kinesis_put(log_records: list): - xray_recorder.begin_subsegment(f"kinesis put records") + +def kinesis_put_batch_json(client, records: list, max_retries: int) -> None or List[dict]: + """ + Put multiple records to Kinesis Data Streams using PutRecords API. + + + :param client: Kinesis API client (e.g. boto3.client('kinesis') ) + :param records: list of records to send. Records will be dumped with json.dumps + :param max_retries: Maximum retries for resending failed records + :return: Records failed to put in Kinesis Data Stream after all retries + """ + parent_segment = xray_recorder.begin_subsegment(f"kinesis_put_batch_json") retry_list = [] - failed_list = [] - - # Each PutRecords request can support up to 500 records - # see: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records - - for batch_index, batch in enumerate(split_list(log_records, 500)): - records = [] - for record in batch: - data_blob = json.dumps(record).encode('utf-8') - partition_key: str = ''.join(random.choices(RANDOM_ALPHANUMERICAL, k=20)) # max 256 chars - records.append({ - 'Data': data_blob, - 'PartitionKey': partition_key, - }) - - logger.debug(records) - - retry_count = 0 - while len(records) > 0: - subsegment = xray_recorder.begin_subsegment(f"put records batch {batch_index} retry {retry_count}") - response = kinesis_client.put_records( - Records=records, + + # Each PutRecords API request can support up to 500 records: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records + + for batch_index, batch in enumerate(split_list(records, 500)): + records_to_send = create_kinesis_records_json(batch) + retries_left = max_retries + + while len(records_to_send) > 0: + subsegment = xray_recorder.begin_subsegment(f"kinesis_put_batch_json try") + kinesis_response = client.put_records( + Records=records_to_send, StreamName=TARGET_STREAM_NAME, ) - subsegment.put_annotation("records", len(records)) - subsegment.put_annotation("failed", response['FailedRecordCount']) - xray_recorder.end_subsegment() + subsegment.put_annotation("batch_index", batch_index) + subsegment.put_annotation("records", len(records_to_send)) + subsegment.put_annotation("records_failed", kinesis_response['FailedRecordCount']) + subsegment.end_subsegment() - if response['FailedRecordCount'] == 0: - xray_recorder.end_subsegment() + if kinesis_response['FailedRecordCount'] == 0: break else: - retry_count += 1 - subsegment.put_annotation("failed_records", response['FailedRecordCount']) - for index, record in enumerate(response['Records']): + index: int + record: dict + for index, record in enumerate(kinesis_response['Records']): if 'ErrorCode' in record: - if record['ErrorCode'] == 'ProvisionedThroughputExceededException': - retry_list.append(records[index]) - elif record['ErrorCode'] == 'InternalFailure': - failed_list.append(records[index]) + # original records list and response record list have same order, guaranteed: + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/kinesis.html#Kinesis.Client.put_records + logger.error(f"A record failed with error: {record['ErrorCode']} {record['ErrorMessage']}") + retry_list.append(records_to_send[index]) - records = retry_list + records_to_send = retry_list retry_list = [] - if len(retry_list) > 0: - logger.info(f"Waiting 1 second for capacity") - time.sleep(1) - xray_recorder.end_subsegment() + if retries_left == 0: + error_msg = f"No retries left, giving up on records: {records_to_send}" + logger.error(error_msg) + return records_to_send - xray_recorder.end_subsegment() - return failed_list + retries_left -= 1 + + logger.info(f"Waiting 500 ms before retrying") + time.sleep(0.5) + parent_segment.end_subsegment() + + return None + + +def create_kinesis_records_json(batch: List[dict]) -> List[dict]: + random_alphanumerical = string.ascii_lowercase + string.ascii_uppercase + string.digits + + records = [] + record: str + for record in batch: + data_blob = json.dumps(record).encode('utf-8') + partition_key: str = ''.join(random.choices(random_alphanumerical, k=20)) # max 256 chars + records.append({ + 'Data': data_blob, + 'PartitionKey': partition_key, + }) + logger.debug(f"Formed Kinesis Records batch for PutRecords API: {records}") + return records + + +def save_json_logs_to_s3(client, log_dict: dict, reason: str = "not specified"): + logger.info(f"Saving logs to S3. Reason: {reason}") + + xray_recorder.begin_subsegment(f"s3 upload") -def save_failed(log_dict: dict): for log_type in log_dict: - if log_type.startswith(LOG_TYPE_UNKNOWN_PREFIX): - xray_recorder.begin_subsegment(f"bad data upload: {log_type}") + xray_recorder.begin_subsegment(f"s3 upload: {log_type}") - data = log_dict[log_type]['records'] - logger.error(f"Got {len(data)} failed Kinesis records ({log_type})") + timestamp = log_dict[log_type]['first_timestamp'] + key = FAILED_LOG_S3_PREFIX + '/' + timestamp.strftime("%Y-%m/%d/%Y-%m-%d-%H:%M:%S-") - timestamp = log_dict[log_type]['first_timestamp'] - key = FAILED_LOG_S3_PREFIX + '/' + timestamp.strftime("%Y-%m/%d/%Y-%m-%d-%H:%M:%S-") - key += log_dict[log_type]['first_id'] + ".gz" + key += log_dict[log_type]['first_id'] + ".gz" - logger.info(f"Saving failed records to S3: s3://{FAILED_LOG_S3_BUCKET}/{key}") - data = '\n'.join(str(f) for f in data) - put_to_s3_gzip(key, FAILED_LOG_S3_BUCKET, data) + data = log_dict[log_type]['records'] + data = '\n'.join(str(f) for f in data) - xray_recorder.end_subsegment() + logger.info(f"Saving logs to S3: s3://{FAILED_LOG_S3_BUCKET}/{key}") + put_to_s3(client, FAILED_LOG_S3_BUCKET, key, data, gzip_compress=True) + xray_recorder.end_subsegment() -def put_to_s3_gzip(key: str, bucket: str, data: str): - # gzip and put data to s3 in-memory - xray_recorder.begin_subsegment('gzip compress') - data_gz = gzip.compress(data.encode(), compresslevel=9) xray_recorder.end_subsegment() + +def put_to_s3(client, bucket: str, key: str, data: str, gzip_compress: bool = False): + if gzip_compress: + # gzip and put data to s3 in-memory + xray_recorder.begin_subsegment('gzip compress') + data_p = gzip.compress(data.encode(), compresslevel=9) + xray_recorder.end_subsegment() + else: + data_p = data + xray_recorder.begin_subsegment('s3 upload') try: - with io.BytesIO(data_gz) as data_gz_fileobj: - s3_results = s3.upload_fileobj(data_gz_fileobj, bucket, key) + with io.BytesIO(data_p) as fileobj: + s3_results = client.upload_fileobj(fileobj, bucket, key) logger.info(f"S3 upload errors: {s3_results}") @@ -326,28 +404,26 @@ def put_to_s3_gzip(key: str, bucket: str, data: str): def handler(event, context): - # check if stream exists: - response = kinesis_client.describe_stream( - StreamName=TARGET_STREAM_NAME, - ) - - if not response: - raise kinesis_client.exceptions.ResourceNotFoundException() - raw_records = event['Records'] - logger.debug(raw_records) - log_dict: dict = decode_validate(raw_records) - save_failed(log_dict) + log_dict: dict + failed_dict: dict + + payloads = parse_json_logs(raw_records) + + log_dict, failed_dict = parse_payloads_to_log_dict( + payloads, + LOG_TYPE_FIELD, + LOG_TIMESTAMP_FIELD, + LOG_ID_FIELD, + LOG_TYPE_FIELD_WHITELIST, + ) - log_dict_filtered: dict = apply_whitelist(log_dict, LOG_TYPE_FIELD_WHITELIST) - logger.debug(log_dict_filtered) + for key in log_dict: + logger.info(f"Processing log type {key}: {len(log_dict[key]['records'])} records") + kinesis_put_batch_json(kinesis_client, log_dict[key]['records'], KINESIS_MAX_RETRIES) - for key in log_dict_filtered: - logger.info(f"Processing log type {key}, {len(log_dict_filtered[key]['records'])} records") - failed_records = kinesis_put(log_dict_filtered[key]['records']) - if len(failed_records) > 0: - logger.error(f"Got failed records from Kinesis: {failed_records}") + save_json_logs_to_s3(s3, failed_dict, reason="Failed logs") logger.info("Finished") diff --git a/main.tf b/main.tf index 22aa33d..f7741e1 100644 --- a/main.tf +++ b/main.tf @@ -55,6 +55,7 @@ resource "aws_lambda_function" "function" { LOG_TIMESTAMP_FIELD = var.log_timestamp_field LOG_TYPE_WHITELIST = join(",", var.log_type_field_whitelist) TARGET_STREAM_NAME = data.aws_kinesis_stream.target.name + KINESIS_MAX_RETRIES = var.kinesis_max_retries FAILED_LOG_S3_BUCKET = var.failed_log_s3_bucket FAILED_LOG_S3_PREFIX = var.failed_log_s3_prefix } diff --git a/variables.tf b/variables.tf index 7aa3563..4c13b63 100644 --- a/variables.tf +++ b/variables.tf @@ -87,8 +87,14 @@ variable "target_stream_name" { description = "Target Kinesis Data Stream name" } +variable "kinesis_max_retries" { + description = "Times to retry PutRecords on errors (wait time between retires is 500ms)" + default = 3 + type = number +} + variable "tracing_mode" { - description = "X-Ray tracing mode (see: https://docs.aws.amazon.com/lambda/latest/dg/API_TracingConfig.html )" + description = "X-Ray tracing mode \n(see: https://docs.aws.amazon.com/lambda/latest/dg/API_TracingConfig.html )" default = "PassThrough" }