From 824b72bfaf04bbca1c575290c7e6adccd87f750f Mon Sep 17 00:00:00 2001 From: kkondaka <41027584+kkondaka@users.noreply.github.com> Date: Tue, 5 Dec 2023 16:54:55 -0800 Subject: [PATCH] Fix kafka buffer metrics (#3805) Signed-off-by: Krishna Kondaka Co-authored-by: Krishna Kondaka --- .../model/buffer/AbstractBuffer.java | 22 +++++++++++++------ .../plugins/kafka/buffer/KafkaBuffer.java | 2 +- .../plugins/kafka/buffer/KafkaBufferTest.java | 2 +- 3 files changed, 17 insertions(+), 9 deletions(-) diff --git a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java index 598fbbf218..f8ad3479a8 100644 --- a/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java +++ b/data-prepper-api/src/main/java/org/opensearch/dataprepper/model/buffer/AbstractBuffer.java @@ -71,8 +71,10 @@ public void write(T record, int timeoutInMillis) throws TimeoutException { try { doWrite(record, timeoutInMillis); - recordsWrittenCounter.increment(); - recordsInBuffer.incrementAndGet(); + if (!isByteBuffer()) { + recordsWrittenCounter.increment(); + recordsInBuffer.incrementAndGet(); + } postProcess(recordsInBuffer.get()); } catch (TimeoutException e) { recordsWriteFailed.increment(); @@ -98,8 +100,11 @@ public void writeAll(Collection records, int timeoutInMillis) throws Exceptio final int size = records.size(); try { doWriteAll(records, timeoutInMillis); - recordsWrittenCounter.increment(size); - recordsInBuffer.addAndGet(size); + // we do not know how many records when the buffer is bytebuffer + if (!isByteBuffer()) { + recordsWrittenCounter.increment(size); + recordsInBuffer.addAndGet(size); + } postProcess(recordsInBuffer.get()); } catch (Exception e) { recordsWriteFailed.increment(size); @@ -127,9 +132,12 @@ public void writeBytes(final byte[] bytes, final String key, int timeoutInMillis @Override public Map.Entry, CheckpointState> read(int timeoutInMillis) { final Map.Entry, CheckpointState> readResult = readTimer.record(() -> doRead(timeoutInMillis)); - recordsReadCounter.increment(readResult.getKey().size() * 1.0); - recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked()); - recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked()); + // we do not know how many records when the buffer is bytebuffer + if (!isByteBuffer()) { + recordsReadCounter.increment(readResult.getKey().size() * 1.0); + recordsInFlight.addAndGet(readResult.getValue().getNumRecordsToBeChecked()); + recordsInBuffer.addAndGet(-1 * readResult.getValue().getNumRecordsToBeChecked()); + } postProcess(recordsInBuffer.get()); return readResult; } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java index ef86b558d8..9e1ce4b9b7 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBuffer.java @@ -117,7 +117,7 @@ public void doWriteAll(Collection> records, int timeoutInMillis) t @Override public Map.Entry>, CheckpointState> doRead(int timeoutInMillis) { - return innerBuffer.doRead(timeoutInMillis); + return innerBuffer.read(timeoutInMillis); } @Override diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java index 7b7f910bdb..d805512cd4 100644 --- a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/buffer/KafkaBufferTest.java @@ -205,7 +205,7 @@ void test_kafkaBuffer_basicFunctionality() throws TimeoutException { kafkaBuffer.doWrite(record, 10000); kafkaBuffer.doRead(10000); verify(producer).produceRecords(record); - verify(blockingBuffer).doRead(anyInt()); + verify(blockingBuffer).read(anyInt()); } @Test