Skip to content

Commit

Permalink
feat: DLQ perf test (#205)
Browse files Browse the repository at this point in the history
This is the perf test for the DLQ design in #202.

We want to run this in prod to observe the overhead of storing a copy of every raw message in the stream processor.
  • Loading branch information
lynnagara authored Mar 30, 2023
1 parent 8c0da68 commit 390a5d3
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 5 deletions.
59 changes: 54 additions & 5 deletions arroyo/dlq.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from collections import defaultdict, deque
from concurrent.futures import Future
from dataclasses import dataclass
from typing import Any, Generic, Mapping, Optional
from typing import Any, Deque, Generic, Mapping, MutableMapping, Optional

from arroyo.backends.kafka import KafkaPayload, KafkaProducer
from arroyo.backends.abstract import Producer
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Partition, Topic, TStrategyPayload


Expand All @@ -18,8 +20,8 @@ class InvalidMessage(Exception):
"""

def __init__(self, partition: Partition, offset: int) -> None:
self.__partition = partition
self.__offset = offset
self.partition = partition
self.offset = offset


@dataclass(frozen=True)
Expand Down Expand Up @@ -118,7 +120,7 @@ class KafkaDlqProducer(DlqProducer[KafkaPayload]):
KafkaDLQProducer forwards invalid messages to a Kafka topic
"""

def __init__(self, producer: KafkaProducer, topic: Topic) -> None:
def __init__(self, producer: Producer[KafkaPayload], topic: Topic) -> None:
self.__producer = producer
self.__topic = topic

Expand All @@ -143,3 +145,50 @@ class DlqPolicy(Generic[TStrategyPayload]):

producer: DlqProducer[TStrategyPayload]
limit: DlqLimit


class BufferedMessages(Generic[TStrategyPayload]):
"""
Manages a buffer of messages that are pending commit. This is used to retreive raw messages
in case they need to be placed in the DLQ.
"""

def __init__(self, dlq_policy: Optional[DlqPolicy[TStrategyPayload]]) -> None:
self.__dlq_policy = dlq_policy
self.__buffered_messages: MutableMapping[
Partition, Deque[BrokerValue[TStrategyPayload]]
] = defaultdict(deque)

def append(self, message: BrokerValue[TStrategyPayload]) -> None:
"""
Append a message to DLQ buffer
"""
if self.__dlq_policy is not None:
self.__buffered_messages[message.partition].append(message)

def pop(
self, partition: Partition, offset: int
) -> Optional[BrokerValue[TStrategyPayload]]:
"""
Return the message at the given offset or None if it is not found in the buffer.
Messages up to the offset for the given partition are removed.
"""
if self.__dlq_policy is not None:
buffered = self.__buffered_messages[partition]

while buffered:
if buffered[0].offset == offset:
return buffered.popleft()
if buffered[0].offset > offset:
break
self.__buffered_messages[partition].popleft()

return None

return None

def reset(self) -> None:
"""
Reset the buffer.
"""
self.__buffered_messages = defaultdict(deque)
15 changes: 15 additions & 0 deletions arroyo/processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from arroyo.backends.abstract import Consumer
from arroyo.commit import CommitPolicy
from arroyo.dlq import BufferedMessages, DlqPolicy
from arroyo.errors import RecoverableError
from arroyo.processing.strategies.abstract import (
MessageRejected,
Expand Down Expand Up @@ -69,6 +70,7 @@ def __init__(
topic: Topic,
processor_factory: ProcessingStrategyFactory[TStrategyPayload],
commit_policy: CommitPolicy,
dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None,
join_timeout: Optional[float] = None,
) -> None:
self.__consumer = consumer
Expand All @@ -89,6 +91,13 @@ def __init__(
self.__join_timeout = join_timeout
self.__shutdown_requested = False

# Buffers messages for DLQ. Messages are added when they are submitted for processing and
# removed once the commit callback is fired as they are guaranteed to be valid at that point.
self.__dlq_policy = dlq_policy
self.__buffered_messages: BufferedMessages[TStrategyPayload] = BufferedMessages(
dlq_policy
)

def _close_strategy() -> None:
if self.__processing_strategy is None:
raise InvalidStateError(
Expand Down Expand Up @@ -120,6 +129,7 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None:

def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None:
logger.info("New partitions assigned: %r", partitions)
self.__buffered_messages.reset()
if partitions:
if self.__processing_strategy is not None:
logger.exception(
Expand Down Expand Up @@ -157,6 +167,9 @@ def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> Non
If force is passed, commit immediately and do not throttle. This should
be used during consumer shutdown where we do not want to wait before committing.
"""
for (partition, offset) in offsets.items():
self.__buffered_messages.pop(partition, offset - 1)

self.__consumer.stage_offsets(offsets)
now = time.time()

Expand Down Expand Up @@ -228,6 +241,8 @@ def _run_once(self) -> None:
message = (
Message(self.__message) if self.__message is not None else None
)
if not message_carried_over:
self.__buffered_messages.append(self.__message)
self.__processing_strategy.submit(message)
self.__metrics_buffer.increment(
ConsumerTiming.CONSUMER_PROCESSING_TIME,
Expand Down
53 changes: 53 additions & 0 deletions tests/test_dlq.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
from datetime import datetime
from typing import Generator
from unittest.mock import ANY

from arroyo.backends.kafka import KafkaPayload
from arroyo.backends.local.backend import LocalBroker
from arroyo.backends.local.storages.memory import MemoryMessageStorage
from arroyo.dlq import BufferedMessages, DlqLimit, DlqPolicy, KafkaDlqProducer
from arroyo.types import BrokerValue, Partition, Topic

topic = Topic("test")
dlq_topic = Topic("test-dlq")
partition = Partition(topic, 0)


def generate_values() -> Generator[BrokerValue[KafkaPayload], None, None]:
i = 0
while True:
yield BrokerValue(
KafkaPayload(None, str(i).encode("utf-8"), []),
Partition(topic, 0),
i,
datetime.now(),
)
i += 1


def test_buffered_messages() -> None:
broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage())

dlq_policy = DlqPolicy(
KafkaDlqProducer(broker.get_producer(), dlq_topic), DlqLimit()
)

buffer: BufferedMessages[KafkaPayload] = BufferedMessages(dlq_policy)

generator = generate_values()

for _ in range(10):
buffer.append(next(generator))

assert buffer.pop(partition, 1) == BrokerValue(
KafkaPayload(None, b"1", []), partition, 1, ANY
)


def test_no_buffered_messages() -> None:
buffer: BufferedMessages[KafkaPayload] = BufferedMessages(None)

generator = generate_values()
for _ in range(10):
buffer.append(next(generator))
assert buffer.pop(partition, 9) is None

0 comments on commit 390a5d3

Please sign in to comment.