Skip to content

Commit

Permalink
try fix Truncating state confluentinc#546
Browse files Browse the repository at this point in the history
  • Loading branch information
sangreal committed Nov 4, 2023
1 parent df2271a commit 8659122
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class PartitionState<K, V> {
* storage
*/
@NonNull
@Getter(PACKAGE)
private ConcurrentSkipListMap<Long, Optional<ConsumerRecord<K, V>>> incompleteOffsets;

/**
Expand Down Expand Up @@ -341,19 +342,24 @@ private void maybeTruncateBelowOrAbove(long bootstrapPolledOffset) {
boolean pollBelowExpected = bootstrapPolledOffset < expectedBootstrapRecordOffset;

if (pollAboveExpected) {
// previously committed offset record has been removed from the topic, so we need to truncate up to it
log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " +
"by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " +
"Could be caused by record retention or compaction and offset reset policy LATEST.",
bootstrapPolledOffset,
this.tp.partition(),
this.tp.topic(),
bootstrapPolledOffset,
expectedBootstrapRecordOffset);

// truncate
final NavigableSet<Long> incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false);
incompletesToPrune.forEach(incompleteOffsets::remove);
// only truncate :
// 1. is old consumer which has been assigned to this partition
// 2. the incompleteOffsets not clear yet.
if (!incompleteOffsets.isEmpty()) {
// previously committed offset record has been removed from the topic, so we need to truncate up to it
log.warn("Truncating state - removing records lower than {} from partition {} of topic {}. Offsets have been removed from the partition " +
"by the broker or committed offset has been raised. Bootstrap polled {} but expected {} from loaded commit data. " +
"Could be caused by record retention or compaction and offset reset policy LATEST.",
bootstrapPolledOffset,
this.tp.partition(),
this.tp.topic(),
bootstrapPolledOffset,
expectedBootstrapRecordOffset);

// truncate
final NavigableSet<Long> incompletesToPrune = incompleteOffsets.keySet().headSet(bootstrapPolledOffset, false);
incompletesToPrune.forEach(incompleteOffsets::remove);
}
} else if (pollBelowExpected) {
// reset to lower offset detected, so we need to reset our state to match
log.warn("Bootstrap polled offset has been reset to an earlier offset ({}) for partition {} of topic {} - truncating state - all records " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ public void onPartitionsAssigned(Collection<TopicPartition> assignedPartitions)
log.warn("New assignment of partition which already exists and isn't recorded as removed in " +
"partition state. Could be a state bug - was the partition revocation somehow missed, " +
"or is this a race? Please file a GH issue. Partition: {}, state: {}", partitionAssignment, previouslyAssignedState);

// remove the previouslyAssignedState IncompleteOffsets since previous stale workers already removed
// previouslyAssignedState.getIncompleteOffsets().clear();
}
}
}
Expand Down

0 comments on commit 8659122

Please sign in to comment.