Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(nested-event-sources): Generic unwrapping of event source data #4069

Open
wants to merge 43 commits into
base: v2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
541e9cc
testing poc
seshubaws Jan 24, 2024
630bac9
Merge branch 'aws-powertools:develop' into nested_event_sources
seshubaws Jan 24, 2024
58ffa89
Merge branch 'aws-powertools:develop' into nested_event_sources
seshubaws Feb 1, 2024
0e76edb
Added sample nested test events, parent class for unwrapping, and cus…
seshubaws Mar 26, 2024
c46c544
Extended EventWrapper class in some events
seshubaws Mar 26, 2024
a0f8bc7
Added more test events, tried to utilize schemas but wasn't working
seshubaws Apr 3, 2024
d406f43
Merging upstream develop into local branch
seshubaws Apr 4, 2024
20d55c3
Merge branch 'develop' into nested_event_sources
seshubaws Apr 4, 2024
11ee8d6
Removed code for schemas
seshubaws Apr 4, 2024
5fa798d
Merge branch 'develop' into nested_event_sources
seshubaws Apr 4, 2024
26188b2
Merge branch 'develop' into nested_event_sources
seshubaws Apr 9, 2024
980041f
Added decode for first event and error handling
seshubaws Apr 9, 2024
5376073
Merge branch 'develop' into nested_event_sources
seshubaws Apr 9, 2024
7b09126
Merge branch 'develop' into nested_event_sources
seshubaws Apr 12, 2024
432d8a6
Fixed EB event unwrapping
seshubaws Apr 12, 2024
31a7a09
Merge branch 'develop' into nested_event_sources
seshubaws Apr 12, 2024
02935e0
Fixed triple nested to not use iterator
seshubaws Apr 12, 2024
a6c7a34
Merge branch 'develop' into nested_event_sources
seshubaws Apr 17, 2024
2fb67c3
Merge branch 'develop' into nested_event_sources
seshubaws Apr 19, 2024
ab594f1
Adding unit tests
seshubaws Apr 20, 2024
fb0be61
Added unit tests
seshubaws Apr 20, 2024
ee16fc6
fix linting
seshubaws Apr 20, 2024
58dd2ea
Merge branch 'develop' into nested_event_sources
seshubaws Apr 22, 2024
6a0676c
Fixing unit tests
seshubaws Apr 23, 2024
c6b6a10
Merge branch 'develop' into nested_event_sources
seshubaws Apr 26, 2024
7fb170e
Updating tests
seshubaws Apr 26, 2024
1066438
Fix some linting
seshubaws Apr 26, 2024
659f989
Merge branch 'develop' into nested_event_sources
seshubaws Apr 29, 2024
ca50ac0
Merge branch 'develop' into nested_event_sources
seshubaws May 2, 2024
162be0b
Merge branch 'develop' into nested_event_sources
seshubaws May 2, 2024
d67f1f1
Merge branch 'develop' into nested_event_sources
seshubaws May 3, 2024
96cc71e
Merge branch 'develop' into nested_event_sources
seshubaws May 6, 2024
98998b2
Merge branch 'develop' into nested_event_sources
seshubaws May 8, 2024
4236ca2
Merge branch 'develop' into nested_event_sources
seshubaws May 8, 2024
2e7839d
Merge branch 'develop' into nested_event_sources
seshubaws May 9, 2024
3f5292d
Merge branch 'develop' into nested_event_sources
seshubaws May 9, 2024
ef35f02
Merge branch 'develop' into nested_event_sources
seshubaws May 13, 2024
64bbe9f
Merge branch 'develop' into nested_event_sources
seshubaws May 13, 2024
47ba533
Merge branch 'develop' into nested_event_sources
seshubaws May 17, 2024
0fa8f4f
Merge branch 'develop' into nested_event_sources
seshubaws May 21, 2024
2ef3aec
Merge branch 'develop' into nested_event_sources
seshubaws May 30, 2024
123e31f
Merge branch 'develop' into nested_event_sources
seshubaws Jun 25, 2024
6c5095b
Merge branch 'develop' into nested_event_sources
seshubaws Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
import zlib
from typing import Dict, List, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper


class CloudWatchLogsLogEvent(DictWrapper):
class CloudWatchLogsLogEvent(EventWrapper):
@property
def get_id(self) -> str:
"""The ID property is a unique identifier for every log event."""
Expand Down Expand Up @@ -72,7 +72,7 @@ def log_events(self) -> List[CloudWatchLogsLogEvent]:
return [CloudWatchLogsLogEvent(i) for i in self["logEvents"]]


class CloudWatchLogsEvent(DictWrapper):
class CloudWatchLogsEvent(EventWrapper):
"""CloudWatch Logs log stream event

You can use a Lambda function to monitor and analyze logs from an Amazon CloudWatch Logs log stream.
Expand Down
60 changes: 55 additions & 5 deletions aws_lambda_powertools/utilities/data_classes/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import json
from collections.abc import Mapping
from functools import cached_property
from typing import Any, Callable, Dict, Iterator, List, Optional, overload
from typing import Any, Callable, Dict, Iterator, List, Optional, Type, TypeVar, overload

from aws_lambda_powertools.shared.headers_serializer import BaseHeadersSerializer
from aws_lambda_powertools.utilities.data_classes.shared_functions import (
Expand Down Expand Up @@ -95,6 +95,52 @@ def raw_event(self) -> Dict[str, Any]:
return self._data


class EventWrapper(DictWrapper):
NestedEvent = TypeVar("NestedEvent", bound=DictWrapper)

def __init__(self, data: Dict[str, Any], json_deserializer: Optional[Callable] = None):
"""
Parameters
----------
data : Dict[str, Any]
Lambda Event Source Event payload
json_deserializer : Callable, optional
function to deserialize `str`, `bytes`, `bytearray`
containing a JSON document to a Python `obj`,
by default json.loads
"""
super().__init__(data, json_deserializer)

def nested_event_contents(self):
records = self.get("Records")
if records is None:
raise KeyError("No 'Records' key found in the event data.")
for record in records:
if not isinstance(record, dict):
raise TypeError(f"Expected 'Records' to be a dictionary, but got {type(record)}.")
body = record.get("body")
if body is not None:
yield body
else:
raise KeyError("No 'body' key found in the 'Records' dict.")

def decode_nested_events(self, nested_event_class: Type[NestedEvent], nested_event_content_deserializer=None):
if nested_event_content_deserializer is None:
nested_event_content_deserializer = self._json_deserializer

for content in self.nested_event_contents():
deserialized_data = nested_event_content_deserializer(content)
yield nested_event_class(deserialized_data)

def decode_nested_event(self, nested_event_class: Type[NestedEvent], nested_event_content_deserializer=None):
if nested_event_content_deserializer is None:
nested_event_content_deserializer = self._json_deserializer

for content in self.nested_event_contents():
deserialized_data = nested_event_content_deserializer(content)
return nested_event_class(deserialized_data)


class BaseProxyEvent(DictWrapper):
@property
def headers(self) -> Dict[str, str]:
Expand Down Expand Up @@ -173,10 +219,12 @@ def http_method(self) -> str:
return self["httpMethod"]

@overload
def get_query_string_value(self, name: str, default_value: str) -> str: ...
def get_query_string_value(self, name: str, default_value: str) -> str:
...

@overload
def get_query_string_value(self, name: str, default_value: Optional[str] = None) -> Optional[str]: ...
def get_query_string_value(self, name: str, default_value: Optional[str] = None) -> Optional[str]:
...

def get_query_string_value(self, name: str, default_value: Optional[str] = None) -> Optional[str]:
"""Get query string value by name
Expand Down Expand Up @@ -229,15 +277,17 @@ def get_header_value(
name: str,
default_value: str,
case_sensitive: bool = False,
) -> str: ...
) -> str:
...

@overload
def get_header_value(
self,
name: str,
default_value: Optional[str] = None,
case_sensitive: bool = False,
) -> Optional[str]: ...
) -> Optional[str]:
...

def get_header_value(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import json
from typing import Any, Dict, List, Optional

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import EventWrapper


class EventBridgeEvent(DictWrapper):
class EventBridgeEvent(EventWrapper):
"""Amazon EventBridge Event

Documentation:
Expand Down Expand Up @@ -67,3 +68,6 @@ def detail(self) -> Dict[str, Any]:
def replay_name(self) -> Optional[str]:
"""Identifies whether the event is being replayed and what is the name of the replay."""
return self["replay-name"]

def nested_event_contents(self):
yield json.dumps(self.get("detail"))
6 changes: 4 additions & 2 deletions aws_lambda_powertools/utilities/data_classes/kafka_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,17 @@ def get_header_value(
name: str,
default_value: str,
case_sensitive: bool = True,
) -> str: ...
) -> str:
...

@overload
def get_header_value(
self,
name: str,
default_value: Optional[str] = None,
case_sensitive: bool = True,
) -> Optional[str]: ...
) -> Optional[str]:
...

def get_header_value(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from typing_extensions import Literal

from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper


@dataclass(repr=False, order=False, frozen=True)
Expand Down Expand Up @@ -208,7 +208,7 @@ def subsequence_number(self) -> int:
return self._metadata["subsequenceNumber"]


class KinesisFirehoseRecord(DictWrapper):
class KinesisFirehoseRecord(EventWrapper):
@property
def approximate_arrival_timestamp(self) -> int:
"""The approximate time that the record was inserted into the delivery stream"""
Expand Down Expand Up @@ -271,7 +271,7 @@ def build_data_transformation_response(
)


class KinesisFirehoseEvent(DictWrapper):
class KinesisFirehoseEvent(EventWrapper):
"""Kinesis Data Firehose event

Documentation:
Expand Down Expand Up @@ -303,3 +303,9 @@ def region(self) -> str:
def records(self) -> Iterator[KinesisFirehoseRecord]:
for record in self["records"]:
yield KinesisFirehoseRecord(data=record, json_deserializer=self._json_deserializer)

def nested_event_contents(self):
for record in self.get("records"):
body = record.get("data")
body = base64.b64decode(body).decode("utf-8")
yield body
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from aws_lambda_powertools.utilities.data_classes.cloud_watch_logs_event import (
CloudWatchLogsDecodedData,
)
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper
from aws_lambda_powertools.utilities.data_classes.common import DictWrapper, EventWrapper


class KinesisStreamRecordPayload(DictWrapper):
Expand Down Expand Up @@ -95,7 +95,7 @@ def kinesis(self) -> KinesisStreamRecordPayload:
return KinesisStreamRecordPayload(self._data)


class KinesisStreamEvent(DictWrapper):
class KinesisStreamEvent(EventWrapper):
"""Kinesis stream event

Documentation:
Expand Down
Loading
Loading