Skip to content

Commit

Permalink
fixes #62: Off by one error when restoring offsets when no offsets ar…
Browse files Browse the repository at this point in the history
…e encoded in metadata

#62

The off by one issue would cause the first message to be skipped in some situations.

Added simple test for reproducing issue 62 where offset is skipped after restart
Test for full example as described
Test for first error
  • Loading branch information
astubbs committed Jan 21, 2021
1 parent 710316f commit 5fa5f59
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ void loadOffsetMapForPartition(final Set<TopicPartition> assignment) {

committed.forEach((tp, offsetAndMeta) -> {
if (offsetAndMeta != null) {
long offset = offsetAndMeta.offset();
long nextExpectedOffset = offsetAndMeta.offset();
String metadata = offsetAndMeta.metadata();
try {
loadOffsetMetadataPayload(offset, tp, metadata);
loadOffsetMetadataPayload(nextExpectedOffset, tp, metadata);
} catch (OffsetDecodingError offsetDecodingError) {
log.error("Error decoding offsets from assigned partition, dropping offset map (will replay previously completed messages - partition: {}, data: {})",
tp, offsetAndMeta, offsetDecodingError);
Expand All @@ -126,18 +126,18 @@ void loadOffsetMapForPartition(final Set<TopicPartition> assignment) {
});
}

static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long finalBaseComittedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
static HighestOffsetAndIncompletes deserialiseIncompleteOffsetMapFromBase64(long committedOffsetForPartition, String base64EncodedOffsetPayload) throws OffsetDecodingError {
byte[] decodedBytes;
try {
decodedBytes = OffsetSimpleSerialisation.decodeBase64(base64EncodedOffsetPayload);
} catch (IllegalArgumentException a) {
throw new OffsetDecodingError(msg("Error decoding offset metadata, input was: {}", base64EncodedOffsetPayload), a);
}
return decodeCompressedOffsets(finalBaseComittedOffsetForPartition, decodedBytes);
return decodeCompressedOffsets(committedOffsetForPartition, decodedBytes);
}

void loadOffsetMetadataPayload(long startOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
HighestOffsetAndIncompletes incompletes = deserialiseIncompleteOffsetMapFromBase64(startOffset, offsetMetadataPayload);
void loadOffsetMetadataPayload(long nextExpectedOffset, TopicPartition tp, String offsetMetadataPayload) throws OffsetDecodingError {
HighestOffsetAndIncompletes incompletes = deserialiseIncompleteOffsetMapFromBase64(nextExpectedOffset, offsetMetadataPayload);
wm.raisePartitionHighWaterMark(incompletes.getHighestSeenOffset(), tp);
Set<Long> incompleteOffsets = incompletes.getIncompleteOffsets();
wm.partitionIncompleteOffsets.put(tp, incompleteOffsets);
Expand Down Expand Up @@ -185,20 +185,24 @@ byte[] encodeOffsetsCompressed(long finalOffsetForPartition, TopicPartition tp,
*
* @return Set of offsets which are not complete, and the highest offset encoded.
*/
static HighestOffsetAndIncompletes decodeCompressedOffsets(long finalOffsetForPartition, byte[] decodedBytes) {
if (decodedBytes.length == 0) {
// no offset bitmap data
return HighestOffsetAndIncompletes.of(finalOffsetForPartition, UniSets.of());
}
static HighestOffsetAndIncompletes decodeCompressedOffsets(long nextExpectedOffset, byte[] decodedBytes) {

EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);
// if no offset bitmap data
if (decodedBytes.length == 0) {
// in this case, as there is no encoded offset data in the matadata, the highest we previously saw must be
// the offset before the committed offset
long highestSeenOffsetIsThen = nextExpectedOffset - 1;
return HighestOffsetAndIncompletes.of(highestSeenOffsetIsThen, UniSets.of());
} else {
EncodedOffsetPair result = EncodedOffsetPair.unwrap(decodedBytes);

HighestOffsetAndIncompletes incompletesTuple = result.getDecodedIncompletes(finalOffsetForPartition);
HighestOffsetAndIncompletes incompletesTuple = result.getDecodedIncompletes(nextExpectedOffset);

Set<Long> incompletes = incompletesTuple.getIncompleteOffsets();
long highWater = incompletesTuple.getHighestSeenOffset();
Set<Long> incompletes = incompletesTuple.getIncompleteOffsets();
long highWater = incompletesTuple.getHighestSeenOffset();

return HighestOffsetAndIncompletes.of(highWater, incompletes);
return HighestOffsetAndIncompletes.of(highWater, incompletes);
}
}

String incompletesToBitmapString(long finalOffsetForPartition, TopicPartition tp, Set<Long> incompleteOffsets) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package io.confluent.parallelconsumer.integrationTests;

import io.confluent.parallelconsumer.ParallelConsumerOptions;
import io.confluent.parallelconsumer.ParallelEoSStreamProcessor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import pl.tlinkowski.unij.api.UniLists;
import pl.tlinkowski.unij.api.UniSets;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import static java.time.Duration.ofSeconds;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.waitAtMost;

/**
* Test offset restoring from boundary conditions, i.e. when no offset data is encoded in metadata
*
* Reproduces issue 62: https://github.com/confluentinc/parallel-consumer/issues/62
*
* @see io.confluent.parallelconsumer.ParallelEoSStreamProcessorTest#closeOpenBoundaryCommits
*/
@Slf4j
class OffsetCommittingSanityTest extends BrokerIntegrationTest<String, String> {

@Test
void shouldNotSkipAnyMessagesOnRestartRoot() throws Exception {
setupTopic("foo");
List<Long> producedOffsets = new ArrayList<>();
List<Long> consumedOffsets = new ArrayList<>();

var kafkaProducer = kcu.createNewProducer(false);

// offset 0
sendCheckClose(producedOffsets, consumedOffsets, kafkaProducer, "key-0", "value-0", true);

assertCommittedOffset(1);

// offset 1
sendCheckClose(producedOffsets, consumedOffsets, kafkaProducer, "key-1", "value-1", true);

assertCommittedOffset(2);

// sanity
assertThat(producedOffsets).containsExactly(0L, 1L);
assertThat(consumedOffsets).containsExactly(0L, 1L);
}

@Test
void shouldNotSkipAnyMessagesOnRestartAsDescribed() throws Exception {
setupTopic("foo");
List<Long> producedOffsets = new ArrayList<>();
List<Long> consumedOffsets = new ArrayList<>();

var kafkaProducer = kcu.createNewProducer(false);

// offset 0
sendCheckClose(producedOffsets, consumedOffsets, kafkaProducer, "key-0", "value-0", true);

assertCommittedOffset(1);

// offset 1
sendCheckClose(producedOffsets, consumedOffsets, kafkaProducer, "key-1", "value-1", false);

// offset 2
sendCheckClose(producedOffsets, consumedOffsets, kafkaProducer, "key-2", "value-2", true);
}

private void sendCheckClose(List<Long> producedOffsets,
List<Long> consumedOffsets,
KafkaProducer<Object, Object> kafkaProducer,
String key, String val,
boolean check) throws Exception {
producedOffsets.add(kafkaProducer.send(new ProducerRecord<>(topic, key, val)).get().offset());
var newConsumer = kcu.createNewConsumer(false);
var pc = createParallelConsumer(topic, newConsumer);
pc.poll(consumerRecord -> consumedOffsets.add(consumerRecord.offset()));
if (check) {
waitAtMost(ofSeconds(1)).alias("all produced messages consumed")
.untilAsserted(() -> assertThat(consumedOffsets).isEqualTo(producedOffsets));
} else {
Thread.sleep(2000);
}
pc.closeDrainFirst();
}

private void assertCommittedOffset(long expectedOffset) {
// assert committed offset
var newConsumer = kcu.createNewConsumer(false);
newConsumer.subscribe(UniSets.of(topic));
newConsumer.poll(ofSeconds(1));
Map<TopicPartition, OffsetAndMetadata> committed = newConsumer.committed(newConsumer.assignment());
assertThat(committed.get(new TopicPartition(topic, 0)).offset()).isEqualTo(expectedOffset);
newConsumer.close();
}

private ParallelEoSStreamProcessor<String, String> createParallelConsumer(String topicName, Consumer consumer) {
ParallelEoSStreamProcessor<String, String> pc = new ParallelEoSStreamProcessor<>(ParallelConsumerOptions.builder()
.consumer(consumer)
.build()
);
pc.subscribe(UniLists.of(topicName));
return pc;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.junit.jupiter.params.provider.EnumSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import pl.tlinkowski.unij.api.UniLists;

import java.time.Duration;
import java.util.*;
Expand All @@ -37,6 +38,7 @@
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.*;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.waitAtMost;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.internal.verification.VerificationModeFactory.times;
Expand Down Expand Up @@ -297,7 +299,7 @@ public void offsetCommitsAreIsolatedPerPartition(CommitMode commitMode) {
parallelConsumer.requestCommitAsap();

waitForOneLoopCycle();
if(isUsingAsyncCommits())
if (isUsingAsyncCommits())
waitForSomeLoopCycles(3); // async commit can be slow - todo change this to event based

// make sure offset 0 and 1 is committed
Expand Down Expand Up @@ -801,5 +803,6 @@ private void requestCommitAndPause() {
parallelConsumer.requestCommitAsap();
waitForSomeLoopCycles(2);
}

}

0 comments on commit 5fa5f59

Please sign in to comment.