From c01b3a890e31762bb2a2d6c24c7e68e2fefcdb01 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Fri, 16 Feb 2024 17:58:48 +0000 Subject: [PATCH 1/6] Support indexing internal data via the OutputBuffer This is needed to allow us to restore Archives by using the common OutputBuffer. Sharing the Output buffer with the regluar Messages gives us backpressure and the possibility for a self regulating rate limit for Archiving. --- .../processors/OutputBufferProcessor.java | 11 ++++- .../indexer/messages/IndexingResult.java | 8 +++- .../messages/IndexingResultCallback.java | 22 +++++++++ .../graylog2/indexer/messages/Messages.java | 6 +-- .../outputs/BlockingBatchedESOutput.java | 22 +++++++-- .../graylog2/plugin/GlobalMetricNames.java | 4 +- .../java/org/graylog2/plugin/Message.java | 48 +++++++++++++++++++ .../org/graylog2/plugin/buffers/Buffer.java | 8 +++- 8 files changed, 113 insertions(+), 16 deletions(-) create mode 100644 graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java diff --git a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java index 503a7047ffe7..419d56d3bb4e 100644 --- a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java +++ b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java @@ -57,6 +57,7 @@ public class OutputBufferProcessor implements WorkHandler { private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class); private static final String INCOMING_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingMessages"); + private static final String INCOMING_SYSTEM_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingSystemMessages"); private static final String PROCESS_TIME_METRICNAME = name(OutputBufferProcessor.class, "processTime"); private final ExecutorService executor; @@ -65,6 +66,7 @@ public class OutputBufferProcessor implements WorkHandler { private final ServerStatus serverStatus; private final Meter incomingMessages; + private final Meter incomingSystemMessages; private final Counter outputThroughput; private final Timer processTime; @@ -89,6 +91,7 @@ public OutputBufferProcessor(Configuration configuration, this.executor = executorService(globalMetricRegistry, corePoolSize); this.incomingMessages = globalMetricRegistry.meter(INCOMING_MESSAGES_METRICNAME); + this.incomingSystemMessages = globalMetricRegistry.meter(INCOMING_SYSTEM_MESSAGES_METRICNAME); this.outputThroughput = globalMetricRegistry.counter(GlobalMetricNames.OUTPUT_THROUGHPUT); this.processTime = globalMetricRegistry.timer(PROCESS_TIME_METRICNAME); } @@ -149,8 +152,6 @@ private ExecutorService executorService(final MetricRegistry globalRegistry, fin */ @Override public void onEvent(MessageEvent event) throws Exception { - incomingMessages.mark(); - final Message msg = event.getMessage(); if (msg == null) { LOG.debug("Skipping null message."); @@ -158,6 +159,12 @@ public void onEvent(MessageEvent event) throws Exception { } LOG.trace("Processing message <{}> from OutputBuffer.", msg.getId()); + if (msg.isSystemMessage()) { + incomingSystemMessages.mark(); + } else { + incomingMessages.mark(); + } + final Set messageOutputs = outputRouter.getStreamOutputsForMessage(msg); msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size()); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java index 36515fd92324..96b760ab67d9 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java @@ -17,7 +17,11 @@ package org.graylog2.indexer.messages; public interface IndexingResult { - public Indexable message(); + Indexable message(); - public String index(); + String index(); + + default boolean isSuccess() { + return this instanceof IndexingSuccess; + } } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java new file mode 100644 index 000000000000..738181ddd27b --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.indexer.messages; + +import java.util.function.Consumer; + +public interface IndexingResultCallback extends Consumer { +} diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index 47fa4316b2aa..54722f3343a9 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -26,6 +26,8 @@ import com.github.rholder.retry.WaitStrategy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.graylog.failure.FailureSubmissionService; import org.graylog2.indexer.InvalidWriteTargetException; import org.graylog2.indexer.MasterNotDiscoveredException; @@ -36,10 +38,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - -import jakarta.inject.Inject; -import jakarta.inject.Singleton; - import java.io.IOException; import java.util.HashSet; import java.util.List; diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index 6e84abb63533..38268c8ecf04 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -21,6 +21,8 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import jakarta.inject.Inject; +import jakarta.inject.Named; import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.IndexingResults; @@ -32,9 +34,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; -import jakarta.inject.Named; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -145,12 +144,25 @@ private void flush(List messages) { } protected IndexingResults indexMessageBatch(List messages) throws Exception { + IndexingResults indexingResults; try (Timer.Context ignored = processTime.time()) { lastFlushTime.set(System.nanoTime()); - final IndexingResults indexingResults = writeMessageEntries(messages); + indexingResults = writeMessageEntries(messages); batchSize.update(messages.size()); bufferFlushes.mark(); - return indexingResults; + } + + runIndexingResultCallbacks(messages, indexingResults); + + return indexingResults; + } + + // TODO not the ideal place for this + private static void runIndexingResultCallbacks(List messages, IndexingResults indexingResults) { + if (messages.stream().anyMatch(m -> m.message().hasIndexingResultCallback())) { + final var resultMap = indexingResults.allResults().stream().collect(Collectors.toMap(r -> r.message().getMessageId(), r -> r, (a, b) -> a)); + + messages.forEach(m -> m.message().runIndexingResultCallback(resultMap.get(m.message().getMessageId()))); } } diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java b/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java index a0c4006fd318..36f8c2ac9954 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java @@ -20,7 +20,8 @@ public final class GlobalMetricNames { - private GlobalMetricNames() {} + private GlobalMetricNames() { + } public static final String OLDEST_SEGMENT_SUFFIX = "oldest-segment"; public static final String RATE_SUFFIX = "1-sec-rate"; @@ -43,6 +44,7 @@ private GlobalMetricNames() {} public static final String OUTPUT_BUFFER_USAGE = "org.graylog2.buffers.output.usage"; public static final String OUTPUT_BUFFER_SIZE = "org.graylog2.buffers.output.size"; + public static final String OUTPUT_BUFFER_RATELIMIT = "org.graylog2.buffers.output.ratelimit"; public static final String JOURNAL_APPEND_RATE = name("org.graylog2.journal.append", RATE_SUFFIX); public static final String JOURNAL_READ_RATE = name("org.graylog2.journal.read", RATE_SUFFIX); diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java index 7bea4ce56073..83100caf14b3 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java @@ -37,6 +37,8 @@ import org.graylog.failure.ProcessingFailureCause; import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.messages.Indexable; +import org.graylog2.indexer.messages.IndexingResult; +import org.graylog2.indexer.messages.IndexingResultCallback; import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.utilities.date.DateTimeConverter; import org.graylog2.plugin.utilities.ratelimitedlog.RateLimitedLogFactory; @@ -322,6 +324,9 @@ public class Message implements Messages, Indexable { */ private Map metadata; + private final static String SYSTEM_MESSAGE_METADATA_FLAG = "is-system-message"; + private final static String SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK = "indexing-result-callback"; + private com.codahale.metrics.Counter sizeCounter = new com.codahale.metrics.Counter(); private List processingErrors; @@ -369,6 +374,26 @@ public Message(final Map fields) { this((String) fields.get(FIELD_ID), Maps.filterKeys(fields, not(equalTo(FIELD_ID)))); } + /** + * Creates a Message that is used for System purposes like restoring Archives. + * The message has the following properties: + *
    + *
  • A size of 0, so its traffic is not accounted
  • + *
  • A single predetermined IndexSet
  • + *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • + *
  • Returns true to {@link #isSystemMessage()}
  • + *
+ */ + public static Message createSystemMessage(@Nonnull String id, @Nonnull IndexSet indexSet, Map fields) { + var message = new Message(id, fields); + message.sizeCounter = new com.codahale.metrics.Counter(); + message.indexSets = Set.of(indexSet); + message.streams = Set.of(); + message.markAsSystemMessage(); + + return message; + } + private Message(String id, Map newFields) { Preconditions.checkArgument(id != null, "message id cannot be null"); fields.put(FIELD_ID, id); @@ -929,6 +954,29 @@ private boolean shouldNotRecord(ServerStatus serverStatus) { return !serverStatus.getDetailedMessageRecordingStrategy().shouldRecord(this); } + private void markAsSystemMessage() { + setMetadata(SYSTEM_MESSAGE_METADATA_FLAG, true); + } + + public boolean isSystemMessage() { + return getMetadataValue(SYSTEM_MESSAGE_METADATA_FLAG) != null; + } + + public void registerIndexingResultCallback(IndexingResultCallback callback) { + setMetadata(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK, callback); + } + + public boolean hasIndexingResultCallback() { + return getMetadataValue(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK) != null; + } + + public void runIndexingResultCallback(IndexingResult result) { + var callBack = getMetadataValue(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK); + if (callBack instanceof IndexingResultCallback indexingResultCallback) { + indexingResultCallback.accept(result); + } + } + /** * Appends another processing error. * diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java b/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java index 76565c058bda..8e5c13a05196 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java @@ -56,6 +56,10 @@ public long getUsage() { return (long) ringBuffer.getBufferSize() - ringBuffer.remainingCapacity(); } + public int getUsagePercent() { + return (int) (getUsage() * 100 / getRingBufferSize()); + } + protected void insert(Message message) { long sequence = ringBuffer.next(); MessageEvent event = ringBuffer.get(sequence); @@ -78,7 +82,7 @@ protected WaitStrategy getWaitStrategy(String waitStrategyName, String configOpt return new BusySpinWaitStrategy(); default: log.warn("Invalid setting for [{}]:" - + " Falling back to default: BlockingWaitStrategy.", configOptionName); + + " Falling back to default: BlockingWaitStrategy.", configOptionName); return new BlockingWaitStrategy(); } } @@ -91,7 +95,7 @@ protected void insert(Message[] messages) { long lo = hi - (length - 1); for (long sequence = lo; sequence <= hi; sequence++) { MessageEvent event = ringBuffer.get(sequence); - event.setMessage(messages[(int)(sequence - lo)]); + event.setMessage(messages[(int) (sequence - lo)]); } ringBuffer.publish(lo, hi); afterInsert(length); From 827f89d62b2daeec32a42950e400bf0bdcb0c568 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 4 Mar 2024 15:09:37 +0000 Subject: [PATCH 2/6] Refactor to use SystemMessage --- .../processors/OutputBufferProcessor.java | 3 +- .../outputs/BlockingBatchedESOutput.java | 16 +----- .../java/org/graylog2/plugin/Message.java | 48 ---------------- .../org/graylog2/plugin/SystemMessage.java | 57 +++++++++++++++++++ .../messageq/MessageQueueAcknowledger.java | 18 +++++- 5 files changed, 76 insertions(+), 66 deletions(-) create mode 100644 graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java diff --git a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java index 419d56d3bb4e..124a532685ce 100644 --- a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java +++ b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java @@ -34,6 +34,7 @@ import org.graylog2.plugin.GlobalMetricNames; import org.graylog2.plugin.Message; import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.SystemMessage; import org.graylog2.plugin.buffers.MessageEvent; import org.graylog2.plugin.outputs.MessageOutput; import org.graylog2.shared.buffers.WorkHandler; @@ -159,7 +160,7 @@ public void onEvent(MessageEvent event) throws Exception { } LOG.trace("Processing message <{}> from OutputBuffer.", msg.getId()); - if (msg.isSystemMessage()) { + if (msg instanceof SystemMessage) { incomingSystemMessages.mark(); } else { incomingMessages.mark(); diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index 38268c8ecf04..aa68baee7cc5 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -45,7 +45,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -132,9 +131,9 @@ private void flush(List messages) { } try { - indexMessageBatch(messages); + var indexingResults = indexMessageBatch(messages); // This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages. - acknowledger.acknowledge(messages.stream().map(MessageWithIndex::message).collect(Collectors.toList())); + acknowledger.acknowledge(indexingResults); } catch (Exception e) { log.error("Unable to flush message buffer", e); bufferFlushFailures.mark(); @@ -152,20 +151,9 @@ protected IndexingResults indexMessageBatch(List messages) thr bufferFlushes.mark(); } - runIndexingResultCallbacks(messages, indexingResults); - return indexingResults; } - // TODO not the ideal place for this - private static void runIndexingResultCallbacks(List messages, IndexingResults indexingResults) { - if (messages.stream().anyMatch(m -> m.message().hasIndexingResultCallback())) { - final var resultMap = indexingResults.allResults().stream().collect(Collectors.toMap(r -> r.message().getMessageId(), r -> r, (a, b) -> a)); - - messages.forEach(m -> m.message().runIndexingResultCallback(resultMap.get(m.message().getMessageId()))); - } - } - public void forceFlushIfTimedout() { // if we shouldn't flush at all based on the last flush time, no need to synchronize on this. if (lastFlushTime.get() != 0 && diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java index 83100caf14b3..7bea4ce56073 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/Message.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/Message.java @@ -37,8 +37,6 @@ import org.graylog.failure.ProcessingFailureCause; import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.messages.Indexable; -import org.graylog2.indexer.messages.IndexingResult; -import org.graylog2.indexer.messages.IndexingResultCallback; import org.graylog2.plugin.streams.Stream; import org.graylog2.plugin.utilities.date.DateTimeConverter; import org.graylog2.plugin.utilities.ratelimitedlog.RateLimitedLogFactory; @@ -324,9 +322,6 @@ public class Message implements Messages, Indexable { */ private Map metadata; - private final static String SYSTEM_MESSAGE_METADATA_FLAG = "is-system-message"; - private final static String SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK = "indexing-result-callback"; - private com.codahale.metrics.Counter sizeCounter = new com.codahale.metrics.Counter(); private List processingErrors; @@ -374,26 +369,6 @@ public Message(final Map fields) { this((String) fields.get(FIELD_ID), Maps.filterKeys(fields, not(equalTo(FIELD_ID)))); } - /** - * Creates a Message that is used for System purposes like restoring Archives. - * The message has the following properties: - *
    - *
  • A size of 0, so its traffic is not accounted
  • - *
  • A single predetermined IndexSet
  • - *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • - *
  • Returns true to {@link #isSystemMessage()}
  • - *
- */ - public static Message createSystemMessage(@Nonnull String id, @Nonnull IndexSet indexSet, Map fields) { - var message = new Message(id, fields); - message.sizeCounter = new com.codahale.metrics.Counter(); - message.indexSets = Set.of(indexSet); - message.streams = Set.of(); - message.markAsSystemMessage(); - - return message; - } - private Message(String id, Map newFields) { Preconditions.checkArgument(id != null, "message id cannot be null"); fields.put(FIELD_ID, id); @@ -954,29 +929,6 @@ private boolean shouldNotRecord(ServerStatus serverStatus) { return !serverStatus.getDetailedMessageRecordingStrategy().shouldRecord(this); } - private void markAsSystemMessage() { - setMetadata(SYSTEM_MESSAGE_METADATA_FLAG, true); - } - - public boolean isSystemMessage() { - return getMetadataValue(SYSTEM_MESSAGE_METADATA_FLAG) != null; - } - - public void registerIndexingResultCallback(IndexingResultCallback callback) { - setMetadata(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK, callback); - } - - public boolean hasIndexingResultCallback() { - return getMetadataValue(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK) != null; - } - - public void runIndexingResultCallback(IndexingResult result) { - var callBack = getMetadataValue(SYSTEM_MESSAGE_INDEXING_RESULT_CALLBACK); - if (callBack instanceof IndexingResultCallback indexingResultCallback) { - indexingResultCallback.accept(result); - } - } - /** * Appends another processing error. * diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java new file mode 100644 index 000000000000..9ab438a792c0 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java @@ -0,0 +1,57 @@ +package org.graylog2.plugin; + +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResult; +import org.graylog2.indexer.messages.IndexingResultCallback; +import org.graylog2.plugin.streams.Stream; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Set; + +/** + * A Message that is used for System purposes like restoring Archives. + * The message has the following properties: + *
    + *
  • A size of 0, so its traffic is not accounted
  • + *
  • A single predetermined IndexSet
  • + *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • + *
+ */ +public class SystemMessage extends Message { + private final IndexSet indexSet; + private final IndexingResultCallback resultCallback; + + public SystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + super(fields); + this.indexSet = indexSet; + this.resultCallback = resultCallback; + } + + public void runIndexingResultCallback(IndexingResult result) { + if (resultCallback != null) { + resultCallback.accept(result); + } + } + + @Override + public Set getIndexSets() { + return Set.of(indexSet); + } + + @Override + public long getSize() { + return 0; + } + + @Override + public Set getStreams() { + return Set.of(); + } + + @Override + public Object getMessageQueueId() { + return null; + } + +} diff --git a/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java b/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java index 4b5dc41d9c95..e7a596f87fe2 100644 --- a/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java +++ b/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java @@ -19,12 +19,13 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.auto.value.AutoValue; +import jakarta.inject.Inject; +import org.graylog2.indexer.messages.IndexingResult; +import org.graylog2.indexer.messages.IndexingResults; import org.graylog2.plugin.Message; +import org.graylog2.plugin.SystemMessage; import javax.annotation.Nullable; - -import jakarta.inject.Inject; - import java.util.List; import static com.codahale.metrics.MetricRegistry.name; @@ -37,6 +38,17 @@ public interface MessageQueueAcknowledger { void acknowledge(List messages); + default void acknowledge(IndexingResults indexingResults) { + var messages = indexingResults.allResults().stream().map(IndexingResult::message).filter(Message.class::isInstance).map(Message.class::cast).toList(); + acknowledge(messages); + + // SystemMessages + indexingResults.allResults().stream().filter(ir -> ir.message() instanceof SystemMessage).forEach(ir -> { + var systemMessage = (SystemMessage) ir.message(); + systemMessage.runIndexingResultCallback(ir); + }); + } + @AutoValue abstract class Metrics { public static class Provider implements jakarta.inject.Provider { From 89c74f1f12da6ffec2b2312baa71e178d1d53cb1 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Mon, 4 Mar 2024 15:15:34 +0000 Subject: [PATCH 3/6] fix license --- .../java/org/graylog2/plugin/SystemMessage.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java index 9ab438a792c0..c96f798c7545 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java @@ -1,3 +1,19 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ package org.graylog2.plugin; import org.graylog2.indexer.IndexSet; From 6660c9a72c451561edd1eac06c12832788660cd8 Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 5 Mar 2024 11:08:53 +0000 Subject: [PATCH 4/6] remove unneeded Journal field from ESOutput --- .../java/org/graylog2/outputs/BlockingBatchedESOutput.java | 4 +--- .../java/org/graylog2/outputs/ElasticSearchOutput.java | 7 +------ .../org/graylog2/outputs/BlockingBatchedESOutputTest.java | 4 +--- 3 files changed, 3 insertions(+), 12 deletions(-) diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index aa68baee7cc5..83e2beb913c7 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -29,7 +29,6 @@ import org.graylog2.indexer.messages.MessageWithIndex; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; -import org.graylog2.shared.journal.Journal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,11 +72,10 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput { public BlockingBatchedESOutput(MetricRegistry metricRegistry, Messages messages, org.graylog2.Configuration serverConfiguration, - Journal journal, MessageQueueAcknowledger acknowledger, Cluster cluster, @Named("daemonScheduler") ScheduledExecutorService daemonScheduler) { - super(metricRegistry, messages, journal, acknowledger); + super(metricRegistry, messages, acknowledger); this.maxBufferSize = serverConfiguration.getOutputBatchSize(); outputFlushInterval = serverConfiguration.getOutputFlushInterval(); this.processTime = metricRegistry.timer(name(this.getClass(), "processTime")); diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index e8b877bbe814..e7bee6d9bee4 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import jakarta.inject.Inject; import org.graylog2.indexer.messages.IndexingResults; import org.graylog2.indexer.messages.MessageWithIndex; import org.graylog2.indexer.messages.Messages; @@ -27,13 +28,10 @@ import org.graylog2.plugin.configuration.ConfigurationRequest; import org.graylog2.plugin.outputs.MessageOutput; import org.graylog2.plugin.streams.Stream; -import org.graylog2.shared.journal.Journal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; - import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -54,17 +52,14 @@ public class ElasticSearchOutput implements MessageOutput { private final Meter failures; private final Timer processTime; private final Messages messages; - private final Journal journal; private final AtomicBoolean isRunning = new AtomicBoolean(false); protected final MessageQueueAcknowledger acknowledger; @Inject public ElasticSearchOutput(MetricRegistry metricRegistry, Messages messages, - Journal journal, MessageQueueAcknowledger acknowledger) { this.messages = messages; - this.journal = journal; this.acknowledger = acknowledger; // Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin) this.writes = metricRegistry.meter(WRITES_METRICNAME); diff --git a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java index ef7e1b7e45ad..2e552c92418c 100644 --- a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java +++ b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java @@ -26,7 +26,6 @@ import org.graylog2.plugin.Message; import org.graylog2.plugin.Tools; import org.graylog2.shared.SuppressForbidden; -import org.graylog2.shared.journal.NoopJournal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -68,7 +67,6 @@ public class BlockingBatchedESOutputTest { @SuppressForbidden("Using Executors.newSingleThreadExecutor() is okay in tests") public void setUp() throws Exception { MetricRegistry metricRegistry = new MetricRegistry(); - NoopJournal journal = new NoopJournal(); this.config = new Configuration() { @Override public int getOutputBatchSize() { @@ -81,7 +79,7 @@ public int getShutdownTimeout() { } }; - output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal, acknowledger, cluster, Executors.newSingleThreadScheduledExecutor()); + output = new BlockingBatchedESOutput(metricRegistry, messages, config, acknowledger, cluster, Executors.newSingleThreadScheduledExecutor()); output.initialize(); } From 1e31d329066d0386e01692992363158a6824b54c Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 12 Mar 2024 14:28:07 +0000 Subject: [PATCH 5/6] Add createSystemMessage to MessageFactory --- .../plugin/DefaultMessageFactory.java | 8 ++++++++ .../org/graylog2/plugin/MessageFactory.java | 19 +++++++++++++++++++ .../org/graylog2/plugin/SystemMessage.java | 3 ++- 3 files changed, 29 insertions(+), 1 deletion(-) diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java b/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java index def7ceb23573..d9798e16aeb7 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java @@ -17,8 +17,11 @@ package org.graylog2.plugin; import jakarta.inject.Singleton; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; // Intentionally package-private to enforce usage of injected MessageFactory. @@ -38,4 +41,9 @@ public Message createMessage(final Map fields) { public Message createMessage(final String id, Map newFields) { return new Message(id, newFields); } + + @Override + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + return new SystemMessage(indexSet, fields, resultCallback); + } } diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java b/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java index f5d5fc416195..e049ec4a1686 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java @@ -16,6 +16,9 @@ */ package org.graylog2.plugin; +import jakarta.annotation.Nullable; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; import org.joda.time.DateTime; import java.util.Map; @@ -48,6 +51,22 @@ public interface MessageFactory { */ Message createMessage(String id, Map newFields); + /** + * Return a new {@link SystemMessage} object which can be used for System purposes like restoring Archives. + * The message has the following properties: + *
    + *
  • A size of 0, so its traffic is not accounted
  • + *
  • A single predetermined IndexSet
  • + *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • + *
+ * + * @param indexSet the predetermined indexSet where the message will be indexed + * @param fields the map of fields + * @param resultCallback an optional {@link IndexingResultCallback} that will be called once the Message is indexed + * @return the new SystemMessage object + */ + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback); + /** * Returns a fake {@link Message}. This message must not be used for real message processing! * diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java index c96f798c7545..8605a88566aa 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java @@ -38,7 +38,8 @@ public class SystemMessage extends Message { private final IndexSet indexSet; private final IndexingResultCallback resultCallback; - public SystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + // Intentionally package-private to enforce MessageFactory usage. + SystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { super(fields); this.indexSet = indexSet; this.resultCallback = resultCallback; From 96d4829e68f865d63310c14326343031ae0b64ea Mon Sep 17 00:00:00 2001 From: Marco Pfatschbacher Date: Tue, 12 Mar 2024 15:00:29 +0000 Subject: [PATCH 6/6] Also update TestMessageFactory --- .../test/java/org/graylog2/plugin/TestMessageFactory.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java b/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java index 4a053bc0fb75..15e1ed127556 100644 --- a/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java +++ b/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java @@ -16,6 +16,9 @@ */ package org.graylog2.plugin; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; +import org.jetbrains.annotations.Nullable; import org.joda.time.DateTime; import java.util.Map; @@ -39,4 +42,9 @@ public Message createMessage(Map fields) { public Message createMessage(String id, Map newFields) { return new Message(id, newFields); } + + @Override + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + return new SystemMessage(indexSet, fields, resultCallback); + } }