Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Truncating state #546

Open
lennehendrickx opened this issue Feb 9, 2023 · 22 comments
Open

Truncating state #546

lennehendrickx opened this issue Feb 9, 2023 · 22 comments

Comments

@lennehendrickx
Copy link
Contributor

lennehendrickx commented Feb 9, 2023

Hi @astubbs ,

We started using the parallel consumer recently for some of our services. Thanks for creating this great new consumer.
We sometimes see the following warnings in our logs and wonder what could be causing them.

Kafka Configuration

  • no compact topics
  • retention.ms=604800000 (7 days)
  • auto.offset.reset=latest
  • broker version=2.8.1
  • we do not use Kafka transactions
  • we use parallel-consumer version 0.5.2.4

PC Configuration

ParallelConsumerOptions.<K, V>builder()
            .ordering(ProcessingOrder.KEY)
            .maxConcurrency(20)
            .commitInterval(Duration.ofSeconds(2))
            .consumer(consumer)
            .commitMode(CommitMode.PERIODIC_CONSUMER_ASYNCHRONOUS)
            .build();

Warning message

Truncating state - removing records lower than 208437. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 208437 but expected 1 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.

The expected value is also sometimes 0 instead of 1

We also see the following warning, which might be related

Bootstrap polled offset has been reset to an earlier offset (1081798) - truncating state - all records above (including this) will be replayed. Was expecting 1081799 but bootstrap poll was 1081798. Could be caused by record retention or compaction and offset reset policy EARLIEST.

I have also created a PR to add the partition and topic to the truncate logging.

Do you know how we could explain these warnings?

@rkolesnev
Copy link
Contributor

Hi,

I have found a bug in offset encoding / decoding that can lead to those warnings in certain conditions.

The warning is only shown on partition assignment - and first poll - subsequent polls dont trigger this logic.
The logic is to load commit data (including any metadata about incomplete offsets below highest succeeded) and subsequently compare offset read from commit data and offset of first polled record - the warning is shown is record's offset is above the expected offset.

The bug that i have found is in offset encoding / decoding when RunLength encoding is used and there are no incomplete offsets below highest seen / polled offset when partition is revoked and offsets are committed.
On decoding such offset metadata - highest succeeded offset is incorrectly returned as 1 (actually is decoded as 0 - but its calculated as last committed + 1).

The warning then reads Truncating state .... Bootstrap polled [OFFSET] but expected 1 from loaded commit data....

I am not sure why the other warning could be shown - that particular bug wouldn't cause that - Truncating state ... Was expecting 1081799 but bootstrap poll was 1081798....

I will raise a PR to fix that bug shortly and will incorporate the PR raised for the logging change - to include the Topic and Partition - into it.

@lennehendrickx
Copy link
Contributor Author

Hi @rkolesnev ,

Thanks for looking into this. With regards to the warnings I see that the following PR was included in the latest release:
#489

However this only adds the partition, not the topic. And it only does so for one of the warnings, not for the "Bootstrap polled offset has been reset to an earlier offset" warning. This is fixed by:
#545

Did you have time to look into a fix for the bug in offset encoding / decoding?

Thanks!

@rkolesnev
Copy link
Contributor

Hi @lennehendrickx,

Yes - I plan to raise a PR for it shortly (today - tomorrow).

@rkolesnev
Copy link
Contributor

@lennehendrickx - could you try the latest snapshot build - with this PR merged to see if that fixes your issue?

@lennehendrickx
Copy link
Contributor Author

@rkolesnev Thanks. I have configured some of our services to use the 0.5.2.6-SNAPSHOT version. I'll leave them running for a while and get back to you with feedback.

@lennehendrickx
Copy link
Contributor Author

@rkolesnev I have been monitoring the services that are using the SNAPSHOT version. We do not see the warning that starts with Truncating state ... anymore. So that seems to be fixed now.
Ex.

Truncating state - removing records lower than 208437. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 208437 but expected 1 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.

We still regularly see the warning below:

Bootstrap polled offset has been reset to an earlier offset (533072) for partition 1 of topic hs.app.social_care.conversation_context.integration_event - truncating state - all records above (including this) will be replayed. Was expecting 533078 but bootstrap poll was 533072. Could be caused by record retention or compaction and offset reset policy EARLIEST.

Consumers are configured to reset to LATEST and these are topics that are used very frequently.

Any idea what could be causing this?

@rkolesnev
Copy link
Contributor

@lennehendrickx - nothing that i can point a finger at and say - ah that might be the cause - so needs further investigation.

Can you see from logs - is it on application startup or on existing running application rebalancing? - that warning is only possible on partition assignment - but it can be on app instance that is being started or on the already running instance that gets rebalanced due to new app instance joining / leaving the group.

Anything that stands out in the logs?

Is processing generally fast or slow? do you have a rough idea how long does it take to process an event in processing function?

What flow rate / throughput approximately on the topic ?

How many partitions on the topic?
How many instances of PC?
How often rebalances happen ? are there some scheduled restarts or auto scaling, etc - what is cause of rebalances?

Is there any pattern to those warnings - i.e. only happen at high load / low load / when there is slowdown in processing or anything like that?

I will see if i can setup a long running soak test to reproduce this behaviour - but ideally i want to set it up with similar load / characteristics to your setup.

@lennehendrickx
Copy link
Contributor Author

Hi @rkolesnev ,

The warnings that I see are logged by consumers that are already running and are now assigned new partitions.
There is nothing that immediately stands out from the logs.
Processing is fast (in the milliseconds range). We are not handling long running tasks with the parallel consumer.

Throughput varies. I have now deployed the SNAPSHOT version to our develop environment. Throughput is typically low here (some messages per minute). The changes are being promoted to staging and in one week to production. So I will monitor the behavior on the different environments.

Some topics for which we see the warning have 10 partitions, some only 2.
When the warnings happened on development, there were 2 pods running, so 2 instances of the PC.
Rebalances are triggered by

  • Kubernetes HPA when scaling up/down
  • When deploying a new version

I do see that there often is only a difference of a 1 item in the polled offset and the expected offset:

Bootstrap polled offset has been reset to an earlier offset (14357) for partition 4 of topic dev_dashboard-push_messages - truncating state - all records above (including this) will be replayed. Was expecting 14358 but bootstrap poll was 14357. Could be caused by record retention or compaction and offset reset policy EARLIEST.

@lennehendrickx
Copy link
Contributor Author

Hi @rkolesnev ,

The changes you have made are definitely already an improvement. This already eliminates the Truncating state warning. Is there any chance that this can be merged to master?

@rkolesnev
Copy link
Contributor

Hi @lennehendrickx - it is merged - #563 or do you mean a different PR?

@lennehendrickx
Copy link
Contributor Author

Hi @rkolesnev ,

My mistake, thanks for looking into this. Any idea when release 0.5.2.6 will be released?

@rkolesnev
Copy link
Contributor

I don't think we have a concrete date in mind yet. We plan to release it once metrics feature is finished - that will be main driver for the release, at least per current plans.

@colinkuo
Copy link

colinkuo commented Sep 21, 2023

@rkolesnev We have seen the same error with version 0.5.2.7.
Truncating state - removing records lower than 55674910 from partition 34 of topic data.input.topic Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 55674910 but expected 0 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.

Here is what we observed and the deployment info

  1. Single pod deployment with 40 PC threads
  2. The Kafka broker version is 2.8, and the client version is 3.5
  3. the partition number of input topics is about 100
  4. The above messages occurred on some partitions
  5. throughput was low or close to zero on most partitions
  6. It happened on both new consumer group and existing consumer group
  7. It was working fine on my local setup (single instance with 10 PC threads, ten partitions)
  8. We observed that rebalance had happened (in a single client?). It triggered partition revocation and partition assigning in a short time.

We didn't see this issue in the previous version 0.5.2.4. Do you have any idea how to proceed? Thanks!

@colinkuo
Copy link

colinkuo commented Sep 25, 2023

more clues to the above issue

When we checked the offset status from Kafk CLI, the values of "CURRENT-OFFSET" on some partitions were empty

kafka-cli-describe-consumer-group-1

@rkolesnev We have seen the same error with version 0.5.2.7. Truncating state - removing records lower than 55674910 from partition 34 of topic data.input.topic Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 55674910 but expected 0 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.

Here is what we observed and the deployment info

1. Single pod deployment with 40 PC threads

2. The Kafka broker version is 2.8, and the client version is 3.5

3. the partition number of input topics is about 100

4. The above messages occurred on some partitions

5. throughput was low or close to zero on most partitions

6. It happened on both new consumer group and existing consumer group

7. It was working fine on my local setup (single instance with 10 PC threads, ten partitions)

8. **We observed that rebalance had happened (in a single client?). It triggered partition revocation and partition assigning in a short time.**

We didn't see this issue in the previous version 0.5.2.4. Do you have any idea how to proceed? Thanks!

@JorgenRingen
Copy link
Contributor

We're experiencing this as well on 0.5.2.5. Sometimes happens on restarts where a partition is shuffled from one pod to another. Kafka client version 3.5.0. Don't know if it's the bump in kafka version or PC version that has caused this (or both) 🤔

@rkolesnev
Copy link
Contributor

Ok, this looks weirder and weirder - I don't see how the committed offsets could get reset / removed from offsets topic / log - as it looks like offset marker is not present for those partitions at all. I will investigate but it does not look like an easy issue to reproduce in synthetic environment / test.

This warning can ever only be logged on partition assignment - first poll after new partition is assigned - so i am not 100% sure what you mean by "6. It happened on both new consumer group and existing consumer group".

@colinkuo
Copy link

colinkuo commented Oct 9, 2023

@rkolesnev My observation was that the offset polling from the broker sometimes was not synced to the internal Offset Map. So, it ended up truncating the state and resetting the offset. Interestingly, the offset polling from the broker was usually one less than the offset from the internal Offset Map in my case.

Furthermore, I've done two experiments to see if it's relevant to the legacy consumer group.
One is with a brand newly created consumer group, and the other is for the existing consumer group, which Kafka Streams used. It turned out the issue was reproducible in both cases. This is to rule out whether it's related to the following commit.
8417498

@LOG-INFO
Copy link

LOG-INFO commented Nov 1, 2023

Hi :) We use parallel-consumer 0.5.2.5.1. I have same warning logs after rebalancing.

i.c.p.state.PartitionState - Truncating state - removing records lower than 51234 from partition 53. Offsets have been removed from the partition by the broker or committed offset has been raised. Bootstrap polled 51234 but expected 1 from loaded commit data. Could be caused by record retention or compaction and offset reset policy LATEST.

I think this can cause message loss, right?

Below is parallel consumer's code.
It looks failed to get offsetHighestSequentialSucceeded and initialized to 0.

image

image

@rkolesnev
Copy link
Contributor

@LOG-INFO
It cannot cause message loss - but it can cause message processing duplication.
Basically it says that on first poll on partition assignment - first event has offset 51234, but commit data (offset received on partition assignment) had offset 1.
This is a defensive logic in case when partition was revoked and assigned back and / or event compaction and such happened.
Generally that warning should not happen and there is an issue either on loading offsets from commit data, commit data format mismatch or posting commit data back to Kafka during commit phase.
Hence this issue is still open and i am still investigating / collecting information / trying to find the possible bug.

@rkolesnev
Copy link
Contributor

@colinkuo with addition of metrics in last release - the internal offset data is reported through metrics - could you try to run it with metrics enabled and collected and see if there is anything useful?

@LOG-INFO
Copy link

LOG-INFO commented Nov 4, 2023

@rkolesnev Thanks for clarification :)
OK I understand.

sangreal added a commit to sangreal/parallel-consumer that referenced this issue Nov 4, 2023
@sangreal
Copy link
Contributor

sangreal commented Dec 6, 2023

@colinkuo @rkolesnev from my perspective, the most possible guess is the offset from TopicPartition is 0.
Since for both bitSet and Runlength encoding, if there is no incompleteoffsets, you should be current offset from Tp-1. So if the guess is correct, the offset from TopicPartition is 0.

See for bitSet.
https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetBitSet.java#L66
And for Runlength
https://github.com/confluentinc/parallel-consumer/blob/master/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/offsets/OffsetRunLength.java#L93

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants