Skip to content

Commit

Permalink
feat(consumer): Support incremental cooperative rebalancing (#53)
Browse files Browse the repository at this point in the history
KafkaConsumer now supports the `cooperative-sticky` partitioning strategy.
The goal of this change is to try and reduce the amount of unnecessary rebalances
that happen during Kubernetes rolling deployments.
  • Loading branch information
lynnagara authored Apr 1, 2022
1 parent e549e80 commit 4854c67
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 70 deletions.
149 changes: 93 additions & 56 deletions arroyo/backends/kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def as_kafka_configuration_bool(value: Any) -> bool:

class KafkaConsumer(Consumer[KafkaPayload]):
"""
The behavior of this consumer differs slightly from the Confluent
If a non-cooperative partition assignment strategy is selected,
the behavior of this consumer differs slightly from the Confluent
consumer during rebalancing operations. Whenever a partition is assigned
to this consumer, offsets are *always* automatically reset to the
committed offset for that partition (or if no offsets have been committed
Expand All @@ -117,6 +118,12 @@ class KafkaConsumer(Consumer[KafkaPayload]):
prevent uncommitted messages from being consumed multiple times,
``commit`` should be called in the partition revocation callback.
If the `cooperative-sticky` strategy is used, this won't happen as
only the incremental partitions are passed to the callback during
rebalancing, and any previously assigned partitions will continue
from their previous position without being reset to the last committed
position.
The behavior of ``auto.offset.reset`` also differs slightly from the
Confluent consumer as well: offsets are only reset during initial
assignment or subsequent rebalancing operations. Any other circumstances
Expand Down Expand Up @@ -182,6 +189,10 @@ def __init__(
"invalid value for 'enable.auto.offset.store' configuration"
)

self.__incremental_cooperative = (
configuration.get("partition.assignment.strategy") == "cooperative-sticky"
)

# NOTE: Offsets are explicitly managed as part of the assignment
# callback, so preemptively resetting offsets is not enabled.
self.__consumer = ConfluentConsumer(
Expand Down Expand Up @@ -246,40 +257,65 @@ def assignment_callback(
) -> None:
self.__state = KafkaConsumerState.ASSIGNING

try:
assignment: MutableSequence[ConfluentTopicPartition] = []

for partition in self.__consumer.committed(partitions):
if partition.offset >= 0:
assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets: MutableMapping[Partition, int] = {
Partition(Topic(i.topic), i.partition): i.offset for i in assignment
}
self.__seek(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.__consumer.resume(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)
if self.__incremental_cooperative is True:
try:
incremental_assignment: MutableSequence[
ConfluentTopicPartition
] = []

for partition in partitions:
if partition.offset >= 0:
incremental_assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
incremental_assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets = {
Partition(Topic(i.topic), i.partition): i.offset
for i in incremental_assignment
}

self.__incremental_assign(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.resume([p for p in offsets])

except Exception:
self.__state = KafkaConsumerState.ERROR
raise

for partition in offsets:
self.__paused.discard(partition)
except Exception:
self.__state = KafkaConsumerState.ERROR
raise
else:
try:
assignment: MutableSequence[ConfluentTopicPartition] = []

for partition in self.__consumer.committed(partitions):
if partition.offset >= 0:
assignment.append(partition)
elif partition.offset == OFFSET_INVALID:
assignment.append(
self.__resolve_partition_starting_offset(partition)
)
else:
raise ValueError("received unexpected offset")

offsets = {
Partition(Topic(i.topic), i.partition): i.offset
for i in assignment
}

self.__assign(offsets)

# Ensure that all partitions are resumed on assignment to avoid
# carrying over state from a previous assignment.
self.resume([p for p in offsets])

except Exception:
self.__state = KafkaConsumerState.ERROR
raise

try:
if on_assign is not None:
Expand Down Expand Up @@ -431,29 +467,24 @@ def __validate_offsets(self, offsets: Mapping[Partition, int]) -> None:
if invalid_offsets:
raise ConsumerError(f"invalid offsets: {invalid_offsets!r}")

def __seek(self, offsets: Mapping[Partition, int]) -> None:
def __assign(self, offsets: Mapping[Partition, int]) -> None:
self.__validate_offsets(offsets)
self.__consumer.assign(
[
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
)
self.__offsets.update(offsets)

if self.__state is KafkaConsumerState.ASSIGNING:
# Calling ``seek`` on the Confluent consumer from an assignment
# callback will throw an "Erroneous state" error. Instead,
# partition offsets have to be initialized by calling ``assign``.
self.__consumer.assign(
[
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
for partition, offset in offsets.items()
]
)
else:
for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(
partition.topic.name, partition.index, offset
)
)

def __incremental_assign(self, offsets: Mapping[Partition, int]) -> None:
self.__validate_offsets(offsets)
self.__consumer.incremental_assign(
[
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
for partition, offset in offsets.items()
]
)
self.__offsets.update(offsets)

def seek(self, offsets: Mapping[Partition, int]) -> None:
Expand All @@ -468,7 +499,13 @@ def seek(self, offsets: Mapping[Partition, int]) -> None:
if offsets.keys() - self.__offsets.keys():
raise ConsumerError("cannot seek on unassigned partitions")

self.__seek(offsets)
self.__validate_offsets(offsets)

for partition, offset in offsets.items():
self.__consumer.seek(
ConfluentTopicPartition(partition.topic.name, partition.index, offset)
)
self.__offsets.update(offsets)

def pause(self, partitions: Sequence[Partition]) -> None:
"""
Expand Down
108 changes: 94 additions & 14 deletions tests/backends/test_kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from contextlib import closing
from datetime import datetime
from pickle import PickleBuffer
from typing import Iterator, MutableSequence, Optional
from typing import Any, Iterator, Mapping, MutableSequence, Optional
from unittest import TestCase

import pytest
Expand Down Expand Up @@ -46,6 +46,25 @@ def test_payload_pickle_out_of_band() -> None:
assert pickle.loads(data, buffers=[b.raw() for b in buffers]) == payload


@contextlib.contextmanager
def get_topic(
configuration: Mapping[str, Any], partitions_count: int
) -> Iterator[Topic]:
name = f"test-{uuid.uuid1().hex}"
client = AdminClient(configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions_count, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None


class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase):

configuration = build_kafka_configuration(
Expand All @@ -54,19 +73,11 @@ class KafkaStreamsTestCase(StreamsTestMixin[KafkaPayload], TestCase):

@contextlib.contextmanager
def get_topic(self, partitions: int = 1) -> Iterator[Topic]:
name = f"test-{uuid.uuid1().hex}"
client = AdminClient(self.configuration)
[[key, future]] = client.create_topics(
[NewTopic(name, num_partitions=partitions, replication_factor=1)]
).items()
assert key == name
assert future.result() is None
try:
yield Topic(name)
finally:
[[key, future]] = client.delete_topics([name]).items()
assert key == name
assert future.result() is None
with get_topic(self.configuration, partitions) as topic:
try:
yield topic
finally:
pass

def get_consumer(
self,
Expand Down Expand Up @@ -133,6 +144,75 @@ def test_auto_offset_reset_error(self) -> None:
consumer.poll(10.0) # XXX: getting the subcription is slow


def test_cooperative_rebalancing() -> None:
configuration = build_kafka_configuration(
{"bootstrap.servers": os.environ.get("DEFAULT_BROKERS", "localhost:9092")}
)

partitions_count = 2

group_id = uuid.uuid4().hex
producer = KafkaProducer(configuration)

consumer_a = KafkaConsumer(
{
**configuration,
"partition.assignment.strategy": "cooperative-sticky",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
)
consumer_b = KafkaConsumer(
{
**configuration,
"partition.assignment.strategy": "cooperative-sticky",
"auto.offset.reset": "earliest",
"enable.auto.commit": False,
"enable.auto.offset.store": False,
"group.id": group_id,
"session.timeout.ms": 10000,
},
)

with get_topic(configuration, partitions_count) as topic, closing(
producer
), closing(consumer_a), closing(consumer_b):
for i in range(10):
for j in range(partitions_count):
producer.produce(
Partition(topic, j),
KafkaPayload(None, f"{j}-{i}".encode("utf8"), []),
)

consumer_a.subscribe([topic])

assert consumer_a.poll(10.0) is not None

# Consumer A has 2 partitions assigned, B has none
assert len(consumer_a.tell()) == 2
assert len(consumer_b.tell()) == 0

consumer_b.subscribe([topic])

# At some point, 1 partition will move to consumer B
consumer_a.pause([p for p in consumer_a.tell()])
for i in range(10):
assert consumer_a.poll(0) is None # attempt to force session timeout
if consumer_b.poll(1.0) is not None:
break

assert len(consumer_a.tell()) == 1
assert len(consumer_b.tell()) == 1

# Resume A and assert that both consumer_a and consumer_b are getting messages
consumer_a.resume([p for p in consumer_a.tell()])
assert consumer_a.poll(1.0) is not None
assert consumer_b.poll(1.0) is not None


def test_commit_codec() -> None:
commit = Commit("group", Partition(Topic("topic"), 0), 0, datetime.now())
assert commit_codec.decode(commit_codec.encode(commit)) == commit
Expand Down

0 comments on commit 4854c67

Please sign in to comment.