From 86591225b3b978189a0c43034d562a731d273292 Mon Sep 17 00:00:00 2001 From: Martyn Ye Date: Sat, 4 Nov 2023 19:50:49 +0900 Subject: [PATCH] try fix Truncating state #546 --- .../state/PartitionState.java | 32 +++++++++++-------- .../state/PartitionStateManager.java | 3 ++ 2 files changed, 22 insertions(+), 13 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java index a0f8c8cd8..0cc8871be 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionState.java @@ -100,6 +100,7 @@ public class PartitionState { * storage */ @NonNull + @Getter(PACKAGE) private ConcurrentSkipListMap>> incompleteOffsets; /** @@ -341,19 +342,24 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) { boolean pollBelowExpected = bootstrapPolledOffset < expectedBootstrapRecordOffset; if (pollAboveExpected) { - // previously committed offset record has been removed from the topic, so we need to truncate up to it - log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " + - "by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " + - "Could be caused by record retention or compaction and offset reset policy LATEST.", - bootstrapPolledOffset, - this.tp.partition(), - this.tp.topic(), - bootstrapPolledOffset, - expectedBootstrapRecordOffset); - - // truncate - final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false); - incompletesToPrune.forEach(incompleteOffsets::remove); + // only truncate : + // 1. is old consumer which has been assigned to this partition + // 2. the incompleteOffsets not clear yet. + if (!incompleteOffsets.isEmpty()) { + // previously committed offset record has been removed from the topic, so we need to truncate up to it + log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " + + "by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " + + "Could be caused by record retention or compaction and offset reset policy LATEST.", + bootstrapPolledOffset, + this.tp.partition(), + this.tp.topic(), + bootstrapPolledOffset, + expectedBootstrapRecordOffset); + + // truncate + final NavigableSet incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false); + incompletesToPrune.forEach(incompleteOffsets::remove); + } } else if (pollBelowExpected) { // reset to lower offset detected, so we need to reset our state to match log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " + diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java index 4895cc84b..a9f8e874e 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/state/PartitionStateManager.java @@ -110,6 +110,9 @@ public void onPartitionsAssigned(Collection assignedPartitions) log.warn("New assignment of partition which already exists and isn't recorded as removed in " + "partition state. Could be a state bug - was the partition revocation somehow missed, " + "or is this a race? Please file a GH issue. Partition: {}, state: {}", partitionAssignment, previouslyAssignedState); + + // remove the previouslyAssignedState IncompleteOffsets since previous stale workers already removed +// previouslyAssignedState.getIncompleteOffsets().clear(); } } }