diff --git a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java index 503a7047ffe7..124a532685ce 100644 --- a/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java +++ b/graylog2-server/src/main/java/org/graylog2/buffers/processors/OutputBufferProcessor.java @@ -34,6 +34,7 @@ import org.graylog2.plugin.GlobalMetricNames; import org.graylog2.plugin.Message; import org.graylog2.plugin.ServerStatus; +import org.graylog2.plugin.SystemMessage; import org.graylog2.plugin.buffers.MessageEvent; import org.graylog2.plugin.outputs.MessageOutput; import org.graylog2.shared.buffers.WorkHandler; @@ -57,6 +58,7 @@ public class OutputBufferProcessor implements WorkHandler { private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class); private static final String INCOMING_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingMessages"); + private static final String INCOMING_SYSTEM_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingSystemMessages"); private static final String PROCESS_TIME_METRICNAME = name(OutputBufferProcessor.class, "processTime"); private final ExecutorService executor; @@ -65,6 +67,7 @@ public class OutputBufferProcessor implements WorkHandler { private final ServerStatus serverStatus; private final Meter incomingMessages; + private final Meter incomingSystemMessages; private final Counter outputThroughput; private final Timer processTime; @@ -89,6 +92,7 @@ public OutputBufferProcessor(Configuration configuration, this.executor = executorService(globalMetricRegistry, corePoolSize); this.incomingMessages = globalMetricRegistry.meter(INCOMING_MESSAGES_METRICNAME); + this.incomingSystemMessages = globalMetricRegistry.meter(INCOMING_SYSTEM_MESSAGES_METRICNAME); this.outputThroughput = globalMetricRegistry.counter(GlobalMetricNames.OUTPUT_THROUGHPUT); this.processTime = globalMetricRegistry.timer(PROCESS_TIME_METRICNAME); } @@ -149,8 +153,6 @@ private ExecutorService executorService(final MetricRegistry globalRegistry, fin */ @Override public void onEvent(MessageEvent event) throws Exception { - incomingMessages.mark(); - final Message msg = event.getMessage(); if (msg == null) { LOG.debug("Skipping null message."); @@ -158,6 +160,12 @@ public void onEvent(MessageEvent event) throws Exception { } LOG.trace("Processing message <{}> from OutputBuffer.", msg.getId()); + if (msg instanceof SystemMessage) { + incomingSystemMessages.mark(); + } else { + incomingMessages.mark(); + } + final Set messageOutputs = outputRouter.getStreamOutputsForMessage(msg); msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size()); diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java index 36515fd92324..96b760ab67d9 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResult.java @@ -17,7 +17,11 @@ package org.graylog2.indexer.messages; public interface IndexingResult { - public Indexable message(); + Indexable message(); - public String index(); + String index(); + + default boolean isSuccess() { + return this instanceof IndexingSuccess; + } } diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java new file mode 100644 index 000000000000..738181ddd27b --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/IndexingResultCallback.java @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.indexer.messages; + +import java.util.function.Consumer; + +public interface IndexingResultCallback extends Consumer { +} diff --git a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java index 47fa4316b2aa..54722f3343a9 100644 --- a/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java +++ b/graylog2-server/src/main/java/org/graylog2/indexer/messages/Messages.java @@ -26,6 +26,8 @@ import com.github.rholder.retry.WaitStrategy; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Sets; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; import org.graylog.failure.FailureSubmissionService; import org.graylog2.indexer.InvalidWriteTargetException; import org.graylog2.indexer.MasterNotDiscoveredException; @@ -36,10 +38,6 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nullable; - -import jakarta.inject.Inject; -import jakarta.inject.Singleton; - import java.io.IOException; import java.util.HashSet; import java.util.List; diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java index 6e84abb63533..83e2beb913c7 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/BlockingBatchedESOutput.java @@ -21,20 +21,18 @@ import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import jakarta.inject.Inject; +import jakarta.inject.Named; import org.graylog2.indexer.IndexSet; import org.graylog2.indexer.cluster.Cluster; import org.graylog2.indexer.messages.IndexingResults; import org.graylog2.indexer.messages.MessageWithIndex; import org.graylog2.indexer.messages.Messages; import org.graylog2.plugin.Message; -import org.graylog2.shared.journal.Journal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; -import jakarta.inject.Named; - import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; @@ -46,7 +44,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.stream.Collectors; import static com.codahale.metrics.MetricRegistry.name; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -75,11 +72,10 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput { public BlockingBatchedESOutput(MetricRegistry metricRegistry, Messages messages, org.graylog2.Configuration serverConfiguration, - Journal journal, MessageQueueAcknowledger acknowledger, Cluster cluster, @Named("daemonScheduler") ScheduledExecutorService daemonScheduler) { - super(metricRegistry, messages, journal, acknowledger); + super(metricRegistry, messages, acknowledger); this.maxBufferSize = serverConfiguration.getOutputBatchSize(); outputFlushInterval = serverConfiguration.getOutputFlushInterval(); this.processTime = metricRegistry.timer(name(this.getClass(), "processTime")); @@ -133,9 +129,9 @@ private void flush(List messages) { } try { - indexMessageBatch(messages); + var indexingResults = indexMessageBatch(messages); // This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages. - acknowledger.acknowledge(messages.stream().map(MessageWithIndex::message).collect(Collectors.toList())); + acknowledger.acknowledge(indexingResults); } catch (Exception e) { log.error("Unable to flush message buffer", e); bufferFlushFailures.mark(); @@ -145,13 +141,15 @@ private void flush(List messages) { } protected IndexingResults indexMessageBatch(List messages) throws Exception { + IndexingResults indexingResults; try (Timer.Context ignored = processTime.time()) { lastFlushTime.set(System.nanoTime()); - final IndexingResults indexingResults = writeMessageEntries(messages); + indexingResults = writeMessageEntries(messages); batchSize.update(messages.size()); bufferFlushes.mark(); - return indexingResults; } + + return indexingResults; } public void forceFlushIfTimedout() { diff --git a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java index e8b877bbe814..e7bee6d9bee4 100644 --- a/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java +++ b/graylog2-server/src/main/java/org/graylog2/outputs/ElasticSearchOutput.java @@ -19,6 +19,7 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.codahale.metrics.Timer; +import jakarta.inject.Inject; import org.graylog2.indexer.messages.IndexingResults; import org.graylog2.indexer.messages.MessageWithIndex; import org.graylog2.indexer.messages.Messages; @@ -27,13 +28,10 @@ import org.graylog2.plugin.configuration.ConfigurationRequest; import org.graylog2.plugin.outputs.MessageOutput; import org.graylog2.plugin.streams.Stream; -import org.graylog2.shared.journal.Journal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import jakarta.inject.Inject; - import java.util.Collections; import java.util.Comparator; import java.util.List; @@ -54,17 +52,14 @@ public class ElasticSearchOutput implements MessageOutput { private final Meter failures; private final Timer processTime; private final Messages messages; - private final Journal journal; private final AtomicBoolean isRunning = new AtomicBoolean(false); protected final MessageQueueAcknowledger acknowledger; @Inject public ElasticSearchOutput(MetricRegistry metricRegistry, Messages messages, - Journal journal, MessageQueueAcknowledger acknowledger) { this.messages = messages; - this.journal = journal; this.acknowledger = acknowledger; // Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin) this.writes = metricRegistry.meter(WRITES_METRICNAME); diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java b/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java index def7ceb23573..d9798e16aeb7 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/DefaultMessageFactory.java @@ -17,8 +17,11 @@ package org.graylog2.plugin; import jakarta.inject.Singleton; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.Map; // Intentionally package-private to enforce usage of injected MessageFactory. @@ -38,4 +41,9 @@ public Message createMessage(final Map fields) { public Message createMessage(final String id, Map newFields) { return new Message(id, newFields); } + + @Override + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + return new SystemMessage(indexSet, fields, resultCallback); + } } diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java b/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java index a0c4006fd318..36f8c2ac9954 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/GlobalMetricNames.java @@ -20,7 +20,8 @@ public final class GlobalMetricNames { - private GlobalMetricNames() {} + private GlobalMetricNames() { + } public static final String OLDEST_SEGMENT_SUFFIX = "oldest-segment"; public static final String RATE_SUFFIX = "1-sec-rate"; @@ -43,6 +44,7 @@ private GlobalMetricNames() {} public static final String OUTPUT_BUFFER_USAGE = "org.graylog2.buffers.output.usage"; public static final String OUTPUT_BUFFER_SIZE = "org.graylog2.buffers.output.size"; + public static final String OUTPUT_BUFFER_RATELIMIT = "org.graylog2.buffers.output.ratelimit"; public static final String JOURNAL_APPEND_RATE = name("org.graylog2.journal.append", RATE_SUFFIX); public static final String JOURNAL_READ_RATE = name("org.graylog2.journal.read", RATE_SUFFIX); diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java b/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java index f5d5fc416195..e049ec4a1686 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/MessageFactory.java @@ -16,6 +16,9 @@ */ package org.graylog2.plugin; +import jakarta.annotation.Nullable; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; import org.joda.time.DateTime; import java.util.Map; @@ -48,6 +51,22 @@ public interface MessageFactory { */ Message createMessage(String id, Map newFields); + /** + * Return a new {@link SystemMessage} object which can be used for System purposes like restoring Archives. + * The message has the following properties: + *
    + *
  • A size of 0, so its traffic is not accounted
  • + *
  • A single predetermined IndexSet
  • + *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • + *
+ * + * @param indexSet the predetermined indexSet where the message will be indexed + * @param fields the map of fields + * @param resultCallback an optional {@link IndexingResultCallback} that will be called once the Message is indexed + * @return the new SystemMessage object + */ + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback); + /** * Returns a fake {@link Message}. This message must not be used for real message processing! * diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java new file mode 100644 index 000000000000..8605a88566aa --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/plugin/SystemMessage.java @@ -0,0 +1,74 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.plugin; + +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResult; +import org.graylog2.indexer.messages.IndexingResultCallback; +import org.graylog2.plugin.streams.Stream; + +import javax.annotation.Nullable; +import java.util.Map; +import java.util.Set; + +/** + * A Message that is used for System purposes like restoring Archives. + * The message has the following properties: + *
    + *
  • A size of 0, so its traffic is not accounted
  • + *
  • A single predetermined IndexSet
  • + *
  • No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}
  • + *
+ */ +public class SystemMessage extends Message { + private final IndexSet indexSet; + private final IndexingResultCallback resultCallback; + + // Intentionally package-private to enforce MessageFactory usage. + SystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + super(fields); + this.indexSet = indexSet; + this.resultCallback = resultCallback; + } + + public void runIndexingResultCallback(IndexingResult result) { + if (resultCallback != null) { + resultCallback.accept(result); + } + } + + @Override + public Set getIndexSets() { + return Set.of(indexSet); + } + + @Override + public long getSize() { + return 0; + } + + @Override + public Set getStreams() { + return Set.of(); + } + + @Override + public Object getMessageQueueId() { + return null; + } + +} diff --git a/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java b/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java index 76565c058bda..8e5c13a05196 100644 --- a/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java +++ b/graylog2-server/src/main/java/org/graylog2/plugin/buffers/Buffer.java @@ -56,6 +56,10 @@ public long getUsage() { return (long) ringBuffer.getBufferSize() - ringBuffer.remainingCapacity(); } + public int getUsagePercent() { + return (int) (getUsage() * 100 / getRingBufferSize()); + } + protected void insert(Message message) { long sequence = ringBuffer.next(); MessageEvent event = ringBuffer.get(sequence); @@ -78,7 +82,7 @@ protected WaitStrategy getWaitStrategy(String waitStrategyName, String configOpt return new BusySpinWaitStrategy(); default: log.warn("Invalid setting for [{}]:" - + " Falling back to default: BlockingWaitStrategy.", configOptionName); + + " Falling back to default: BlockingWaitStrategy.", configOptionName); return new BlockingWaitStrategy(); } } @@ -91,7 +95,7 @@ protected void insert(Message[] messages) { long lo = hi - (length - 1); for (long sequence = lo; sequence <= hi; sequence++) { MessageEvent event = ringBuffer.get(sequence); - event.setMessage(messages[(int)(sequence - lo)]); + event.setMessage(messages[(int) (sequence - lo)]); } ringBuffer.publish(lo, hi); afterInsert(length); diff --git a/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java b/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java index 4b5dc41d9c95..e7a596f87fe2 100644 --- a/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java +++ b/graylog2-server/src/main/java/org/graylog2/shared/messageq/MessageQueueAcknowledger.java @@ -19,12 +19,13 @@ import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import com.google.auto.value.AutoValue; +import jakarta.inject.Inject; +import org.graylog2.indexer.messages.IndexingResult; +import org.graylog2.indexer.messages.IndexingResults; import org.graylog2.plugin.Message; +import org.graylog2.plugin.SystemMessage; import javax.annotation.Nullable; - -import jakarta.inject.Inject; - import java.util.List; import static com.codahale.metrics.MetricRegistry.name; @@ -37,6 +38,17 @@ public interface MessageQueueAcknowledger { void acknowledge(List messages); + default void acknowledge(IndexingResults indexingResults) { + var messages = indexingResults.allResults().stream().map(IndexingResult::message).filter(Message.class::isInstance).map(Message.class::cast).toList(); + acknowledge(messages); + + // SystemMessages + indexingResults.allResults().stream().filter(ir -> ir.message() instanceof SystemMessage).forEach(ir -> { + var systemMessage = (SystemMessage) ir.message(); + systemMessage.runIndexingResultCallback(ir); + }); + } + @AutoValue abstract class Metrics { public static class Provider implements jakarta.inject.Provider { diff --git a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java index 861308fb180b..bf4eab7ad3b9 100644 --- a/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java +++ b/graylog2-server/src/test/java/org/graylog2/outputs/BlockingBatchedESOutputTest.java @@ -27,7 +27,6 @@ import org.graylog2.plugin.MessageFactory; import org.graylog2.plugin.Tools; import org.graylog2.shared.SuppressForbidden; -import org.graylog2.shared.journal.NoopJournal; import org.graylog2.shared.messageq.MessageQueueAcknowledger; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; @@ -73,7 +72,6 @@ public void setUp(MessageFactory messageFactory) throws Exception { this.messageFactory = messageFactory; MetricRegistry metricRegistry = new MetricRegistry(); - NoopJournal journal = new NoopJournal(); this.config = new Configuration() { @Override public int getOutputBatchSize() { @@ -86,7 +84,7 @@ public int getShutdownTimeout() { } }; - output = new BlockingBatchedESOutput(metricRegistry, messages, config, journal, acknowledger, cluster, Executors.newSingleThreadScheduledExecutor()); + output = new BlockingBatchedESOutput(metricRegistry, messages, config, acknowledger, cluster, Executors.newSingleThreadScheduledExecutor()); output.initialize(); } diff --git a/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java b/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java index 4a053bc0fb75..15e1ed127556 100644 --- a/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java +++ b/graylog2-server/src/test/java/org/graylog2/plugin/TestMessageFactory.java @@ -16,6 +16,9 @@ */ package org.graylog2.plugin; +import org.graylog2.indexer.IndexSet; +import org.graylog2.indexer.messages.IndexingResultCallback; +import org.jetbrains.annotations.Nullable; import org.joda.time.DateTime; import java.util.Map; @@ -39,4 +42,9 @@ public Message createMessage(Map fields) { public Message createMessage(String id, Map newFields) { return new Message(id, newFields); } + + @Override + public SystemMessage createSystemMessage(IndexSet indexSet, Map fields, @Nullable IndexingResultCallback resultCallback) { + return new SystemMessage(indexSet, fields, resultCallback); + } }