diff --git a/arroyo/processing/strategies/dead_letter_queue/dead_letter_queue.py b/arroyo/processing/strategies/dead_letter_queue/dead_letter_queue.py index 49c2a18f..ec023a1a 100644 --- a/arroyo/processing/strategies/dead_letter_queue/dead_letter_queue.py +++ b/arroyo/processing/strategies/dead_letter_queue/dead_letter_queue.py @@ -23,7 +23,7 @@ class DeadLetterQueue(ProcessingStep[TPayload]): def __init__( self, next_step: ProcessingStep[TPayload], - policy: DeadLetterQueuePolicy[TPayload], + policy: DeadLetterQueuePolicy, ) -> None: self.__next_step = next_step self.__policy = policy diff --git a/arroyo/processing/strategies/dead_letter_queue/policies/abstract.py b/arroyo/processing/strategies/dead_letter_queue/policies/abstract.py index 2209c29b..8d616a5c 100644 --- a/arroyo/processing/strategies/dead_letter_queue/policies/abstract.py +++ b/arroyo/processing/strategies/dead_letter_queue/policies/abstract.py @@ -1,5 +1,4 @@ from abc import ABC, abstractmethod -from typing import Generic from arroyo.types import Message, TPayload @@ -12,7 +11,7 @@ def __str__(self) -> str: return f"Invalid Message: {self.message}" -class DeadLetterQueuePolicy(ABC, Generic[TPayload]): +class DeadLetterQueuePolicy(ABC): """ A DLQ Policy defines how to handle an invalid message. """ diff --git a/arroyo/processing/strategies/dead_letter_queue/policies/count.py b/arroyo/processing/strategies/dead_letter_queue/policies/count.py index 6872879d..7f8203b7 100644 --- a/arroyo/processing/strategies/dead_letter_queue/policies/count.py +++ b/arroyo/processing/strategies/dead_letter_queue/policies/count.py @@ -6,7 +6,6 @@ DeadLetterQueuePolicy, InvalidMessage, ) -from arroyo.types import TPayload from arroyo.utils.metrics import get_metrics @@ -19,7 +18,7 @@ class _Bucket(NamedTuple): hits: int -class CountInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]): +class CountInvalidMessagePolicy(DeadLetterQueuePolicy): """ Ignore invalid messages up to a certain limit per time unit window in seconds. This window is 1 minute by default. The exception associated with the invalid diff --git a/arroyo/processing/strategies/dead_letter_queue/policies/ignore.py b/arroyo/processing/strategies/dead_letter_queue/policies/ignore.py index 4ec23da2..8bddc42e 100644 --- a/arroyo/processing/strategies/dead_letter_queue/policies/ignore.py +++ b/arroyo/processing/strategies/dead_letter_queue/policies/ignore.py @@ -2,11 +2,10 @@ DeadLetterQueuePolicy, InvalidMessage, ) -from arroyo.types import TPayload from arroyo.utils.metrics import get_metrics -class IgnoreInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]): +class IgnoreInvalidMessagePolicy(DeadLetterQueuePolicy): def __init__(self) -> None: self.__metrics = get_metrics() diff --git a/arroyo/processing/strategies/dead_letter_queue/policies/raise_e.py b/arroyo/processing/strategies/dead_letter_queue/policies/raise_e.py index 90467104..ae749bef 100644 --- a/arroyo/processing/strategies/dead_letter_queue/policies/raise_e.py +++ b/arroyo/processing/strategies/dead_letter_queue/policies/raise_e.py @@ -2,9 +2,8 @@ DeadLetterQueuePolicy, InvalidMessage, ) -from arroyo.types import TPayload -class RaiseInvalidMessagePolicy(DeadLetterQueuePolicy[TPayload]): +class RaiseInvalidMessagePolicy(DeadLetterQueuePolicy): def handle_invalid_message(self, e: InvalidMessage) -> None: raise e