Skip to content

Commit

Permalink
Removed Generic payload from DLQ Policy (#49)
Browse files Browse the repository at this point in the history
  • Loading branch information
rahul-kumar-saini authored Mar 22, 2022
1 parent f0e62a2 commit d058608
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class DeadLetterQueue(ProcessingStep[TPayload]):
def __init__(
self,
next_step: ProcessingStep[TPayload],
policy: DeadLetterQueuePolicy[TPayload],
policy: DeadLetterQueuePolicy,
) -> None:
self.__next_step = next_step
self.__policy = policy
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from abc import ABC, abstractmethod
from typing import Generic

from arroyo.types import Message, TPayload

Expand All @@ -12,7 +11,7 @@ def __str__(self) -> str:
return f"Invalid Message: {self.message}"


class DeadLetterQueuePolicy(ABC, Generic[TPayload]):
class DeadLetterQueuePolicy(ABC):
"""
A DLQ Policy defines how to handle an invalid message.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
DeadLetterQueuePolicy,
InvalidMessage,
)
from arroyo.types import TPayload
from arroyo.utils.metrics import get_metrics


Expand All @@ -19,7 +18,7 @@ class _Bucket(NamedTuple):
hits: int


class CountInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]):
class CountInvalidMessagePolicy(DeadLetterQueuePolicy):
"""
Ignore invalid messages up to a certain limit per time unit window in seconds.
This window is 1 minute by default. The exception associated with the invalid
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
DeadLetterQueuePolicy,
InvalidMessage,
)
from arroyo.types import TPayload
from arroyo.utils.metrics import get_metrics


class IgnoreInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]):
class IgnoreInvalidMessagePolicy(DeadLetterQueuePolicy):
def __init__(self) -> None:
self.__metrics = get_metrics()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,8 @@
DeadLetterQueuePolicy,
InvalidMessage,
)
from arroyo.types import TPayload


class RaiseInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]):
class RaiseInvalidMessagePolicy(DeadLetterQueuePolicy):
def handle_invalid_message(self, e: InvalidMessage) -> None:
raise e

0 comments on commit d058608

Please sign in to comment.