-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Apply backpressure per partition instead of the entire assignment #540
Comments
I also need this feature! |
@gabrielgiussi It is possible to tune for that scenario though - I think small fetch size per partition, small buffer in parallel consumer should work in scenario with slow(-ish) processing - as in your example above. The few parameters to tune in that case would be: Note that max.poll.records doesnt help here - as it only controls handover of records from Kafka Consumer - not actual fetch from brokers - so it will still fetch 1000 from P0 and 10 from P1 - and will only give records from P1 to PC - after all 1000 from P0 is drained from Kafka Consumer to Parallel Consumer. |
+1, we really need this feature in our project too, which works with multi topics, so one topic messages affect the other topic (messages are stuck until the next fetch) in case with slow processing, per partition back pressure should solve this. |
@yura-arab4uk - thanks for your feedback. Generally as per my earlier comment - there are consumer configuration options to make sure that maximum fetch from single partition is smaller than overall fetch size - that ensures that data from all subscribed partitions is polled if its available. Similar logic applies to key based ordering / partitions on multiple topics as well. The size of buffer, size of overall single fetch and size of single partition fetch should all be considered and set as a combination in respect of processing time and concurrency balanced towards not having too many messages fetched from single partition, single poll and not to have too high buffer drain time. Of course it is not fully ideal in that if processing time fluctuates and for whatever reason messages from 1 topic or partition that have different processing logic / downstream system than messages from other partition / topic are suddenly processing slower than usual - it still can cause some bottlenecks. Refer to this section in Readme for specific configuration options to tune - 10.3. Ordered by Partition In cases when single Parallel Consumer instance is processing messages from multiple topics and message processing time is significantly different for messages from different topics - it may be worth looking into having separate Parallel Consumer instances per topic / group of topics with similar processing time. I.e. have Parallel Consumer instance for 'fast' messages and separate one for 'slow' messages. |
@rkolesnev thank you for the response with suggestion, can we pause specific partition when it hits its message buffer size and resume when the in memory message number drops? |
@yura-arab4uk - actually i had a look and KafkaConsumer does support selective pausing of partitions in the subsciption. So instead of tracking overall buffer size we could do per partition buffer size - it should be technically possible to track the per partition counters and then apply pause / resume logic per partition without impacting rebalancing logic. The maximum per partition fetch size will still need to be tuned - but that will be mostly to avoid overshooting per partition buffers too much / guard against OOMs rather than processing bottlenecking due to data imbalance in buffers. |
Currently when the BrokerPollSystem considers there is too much work pending already it pauses all the partitions assigned.
I think we could improve parallelization by applying backpressure per partition instead, and would be specially helpful for the case described in the KIP-41
I'm considering the following hypothetical scenario
In this scenario we won't be achieving good parallelization since the partition ordering will force us to process all the records from partition 0 in serial order, which will lead us to have one thread idle most of the time until the last poll (corresponding to a single fetch of 1000 and 10 records from p0 and p1 respectively) gets records from both partitions.
Of course increasing
max.poll.records
will reduce the probability of this happening, for example if we were usingmax.poll.records=501
the third poll will be already be able to parallelize work. And being the poll loop being detached from the processing we don't incur in the risk of being removed from the group by increasing the amount of work until the next poll. However this requires a better understanding of actual message rates and more fine tuning, and a good configuration of max.poll.records today could not work well in the future if the rates change.I think applying backpressure per partition is a more robust way to handle this scenario, which may not be a common one but I don't see this affecting other scenarios, the only disadvantage being a more complex implementation since we need to track load per partition instead of global load as it is today.
I've focused mainly in the partition ordering for my analysis and I'm not sure if the proposal may have a negative impact on the other two ordering modes, my initial though is that it should be ok (the backpressure won't have anything to do with the ordering since it can only be applied per partition, but it will benefit mostly the partition ordering).
The text was updated successfully, but these errors were encountered: