From 390a5d3f60597ab7a2ccbd8bd3701f7149509f2c Mon Sep 17 00:00:00 2001 From: Lyn Nagara Date: Thu, 30 Mar 2023 10:24:20 -0700 Subject: [PATCH] feat: DLQ perf test (#205) This is the perf test for the DLQ design in https://github.com/getsentry/arroyo/pull/202. We want to run this in prod to observe the overhead of storing a copy of every raw message in the stream processor. --- arroyo/dlq.py | 59 +++++++++++++++++++++++++++++++--- arroyo/processing/processor.py | 15 +++++++++ tests/test_dlq.py | 53 ++++++++++++++++++++++++++++++ 3 files changed, 122 insertions(+), 5 deletions(-) create mode 100644 tests/test_dlq.py diff --git a/arroyo/dlq.py b/arroyo/dlq.py index 49ae83af..0394122e 100644 --- a/arroyo/dlq.py +++ b/arroyo/dlq.py @@ -1,11 +1,13 @@ from __future__ import annotations from abc import ABC, abstractmethod +from collections import defaultdict, deque from concurrent.futures import Future from dataclasses import dataclass -from typing import Any, Generic, Mapping, Optional +from typing import Any, Deque, Generic, Mapping, MutableMapping, Optional -from arroyo.backends.kafka import KafkaPayload, KafkaProducer +from arroyo.backends.abstract import Producer +from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Partition, Topic, TStrategyPayload @@ -18,8 +20,8 @@ class InvalidMessage(Exception): """ def __init__(self, partition: Partition, offset: int) -> None: - self.__partition = partition - self.__offset = offset + self.partition = partition + self.offset = offset @dataclass(frozen=True) @@ -118,7 +120,7 @@ class KafkaDlqProducer(DlqProducer[KafkaPayload]): KafkaDLQProducer forwards invalid messages to a Kafka topic """ - def __init__(self, producer: KafkaProducer, topic: Topic) -> None: + def __init__(self, producer: Producer[KafkaPayload], topic: Topic) -> None: self.__producer = producer self.__topic = topic @@ -143,3 +145,50 @@ class DlqPolicy(Generic[TStrategyPayload]): producer: DlqProducer[TStrategyPayload] limit: DlqLimit + + +class BufferedMessages(Generic[TStrategyPayload]): + """ + Manages a buffer of messages that are pending commit. This is used to retreive raw messages + in case they need to be placed in the DLQ. + """ + + def __init__(self, dlq_policy: Optional[DlqPolicy[TStrategyPayload]]) -> None: + self.__dlq_policy = dlq_policy + self.__buffered_messages: MutableMapping[ + Partition, Deque[BrokerValue[TStrategyPayload]] + ] = defaultdict(deque) + + def append(self, message: BrokerValue[TStrategyPayload]) -> None: + """ + Append a message to DLQ buffer + """ + if self.__dlq_policy is not None: + self.__buffered_messages[message.partition].append(message) + + def pop( + self, partition: Partition, offset: int + ) -> Optional[BrokerValue[TStrategyPayload]]: + """ + Return the message at the given offset or None if it is not found in the buffer. + Messages up to the offset for the given partition are removed. + """ + if self.__dlq_policy is not None: + buffered = self.__buffered_messages[partition] + + while buffered: + if buffered[0].offset == offset: + return buffered.popleft() + if buffered[0].offset > offset: + break + self.__buffered_messages[partition].popleft() + + return None + + return None + + def reset(self) -> None: + """ + Reset the buffer. + """ + self.__buffered_messages = defaultdict(deque) diff --git a/arroyo/processing/processor.py b/arroyo/processing/processor.py index f2f2a247..cbab1903 100644 --- a/arroyo/processing/processor.py +++ b/arroyo/processing/processor.py @@ -8,6 +8,7 @@ from arroyo.backends.abstract import Consumer from arroyo.commit import CommitPolicy +from arroyo.dlq import BufferedMessages, DlqPolicy from arroyo.errors import RecoverableError from arroyo.processing.strategies.abstract import ( MessageRejected, @@ -69,6 +70,7 @@ def __init__( topic: Topic, processor_factory: ProcessingStrategyFactory[TStrategyPayload], commit_policy: CommitPolicy, + dlq_policy: Optional[DlqPolicy[TStrategyPayload]] = None, join_timeout: Optional[float] = None, ) -> None: self.__consumer = consumer @@ -89,6 +91,13 @@ def __init__( self.__join_timeout = join_timeout self.__shutdown_requested = False + # Buffers messages for DLQ. Messages are added when they are submitted for processing and + # removed once the commit callback is fired as they are guaranteed to be valid at that point. + self.__dlq_policy = dlq_policy + self.__buffered_messages: BufferedMessages[TStrategyPayload] = BufferedMessages( + dlq_policy + ) + def _close_strategy() -> None: if self.__processing_strategy is None: raise InvalidStateError( @@ -120,6 +129,7 @@ def _create_strategy(partitions: Mapping[Partition, int]) -> None: def on_partitions_assigned(partitions: Mapping[Partition, int]) -> None: logger.info("New partitions assigned: %r", partitions) + self.__buffered_messages.reset() if partitions: if self.__processing_strategy is not None: logger.exception( @@ -157,6 +167,9 @@ def __commit(self, offsets: Mapping[Partition, int], force: bool = False) -> Non If force is passed, commit immediately and do not throttle. This should be used during consumer shutdown where we do not want to wait before committing. """ + for (partition, offset) in offsets.items(): + self.__buffered_messages.pop(partition, offset - 1) + self.__consumer.stage_offsets(offsets) now = time.time() @@ -228,6 +241,8 @@ def _run_once(self) -> None: message = ( Message(self.__message) if self.__message is not None else None ) + if not message_carried_over: + self.__buffered_messages.append(self.__message) self.__processing_strategy.submit(message) self.__metrics_buffer.increment( ConsumerTiming.CONSUMER_PROCESSING_TIME, diff --git a/tests/test_dlq.py b/tests/test_dlq.py new file mode 100644 index 00000000..44274285 --- /dev/null +++ b/tests/test_dlq.py @@ -0,0 +1,53 @@ +from datetime import datetime +from typing import Generator +from unittest.mock import ANY + +from arroyo.backends.kafka import KafkaPayload +from arroyo.backends.local.backend import LocalBroker +from arroyo.backends.local.storages.memory import MemoryMessageStorage +from arroyo.dlq import BufferedMessages, DlqLimit, DlqPolicy, KafkaDlqProducer +from arroyo.types import BrokerValue, Partition, Topic + +topic = Topic("test") +dlq_topic = Topic("test-dlq") +partition = Partition(topic, 0) + + +def generate_values() -> Generator[BrokerValue[KafkaPayload], None, None]: + i = 0 + while True: + yield BrokerValue( + KafkaPayload(None, str(i).encode("utf-8"), []), + Partition(topic, 0), + i, + datetime.now(), + ) + i += 1 + + +def test_buffered_messages() -> None: + broker: LocalBroker[KafkaPayload] = LocalBroker(MemoryMessageStorage()) + + dlq_policy = DlqPolicy( + KafkaDlqProducer(broker.get_producer(), dlq_topic), DlqLimit() + ) + + buffer: BufferedMessages[KafkaPayload] = BufferedMessages(dlq_policy) + + generator = generate_values() + + for _ in range(10): + buffer.append(next(generator)) + + assert buffer.pop(partition, 1) == BrokerValue( + KafkaPayload(None, b"1", []), partition, 1, ANY + ) + + +def test_no_buffered_messages() -> None: + buffer: BufferedMessages[KafkaPayload] = BufferedMessages(None) + + generator = generate_values() + for _ in range(10): + buffer.append(next(generator)) + assert buffer.pop(partition, 9) is None