Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support indexing internal data via the OutputBuffer #18442

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.graylog2.plugin.GlobalMetricNames;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.ServerStatus;
import org.graylog2.plugin.SystemMessage;
import org.graylog2.plugin.buffers.MessageEvent;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.shared.buffers.WorkHandler;
Expand All @@ -57,6 +58,7 @@ public class OutputBufferProcessor implements WorkHandler<MessageEvent> {
private static final Logger LOG = LoggerFactory.getLogger(OutputBufferProcessor.class);

private static final String INCOMING_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingMessages");
private static final String INCOMING_SYSTEM_MESSAGES_METRICNAME = name(OutputBufferProcessor.class, "incomingSystemMessages");
private static final String PROCESS_TIME_METRICNAME = name(OutputBufferProcessor.class, "processTime");

private final ExecutorService executor;
Expand All @@ -65,6 +67,7 @@ public class OutputBufferProcessor implements WorkHandler<MessageEvent> {
private final ServerStatus serverStatus;

private final Meter incomingMessages;
private final Meter incomingSystemMessages;
private final Counter outputThroughput;
private final Timer processTime;

Expand All @@ -89,6 +92,7 @@ public OutputBufferProcessor(Configuration configuration,
this.executor = executorService(globalMetricRegistry, corePoolSize);

this.incomingMessages = globalMetricRegistry.meter(INCOMING_MESSAGES_METRICNAME);
this.incomingSystemMessages = globalMetricRegistry.meter(INCOMING_SYSTEM_MESSAGES_METRICNAME);
this.outputThroughput = globalMetricRegistry.counter(GlobalMetricNames.OUTPUT_THROUGHPUT);
this.processTime = globalMetricRegistry.timer(PROCESS_TIME_METRICNAME);
}
Expand Down Expand Up @@ -149,15 +153,19 @@ private ExecutorService executorService(final MetricRegistry globalRegistry, fin
*/
@Override
public void onEvent(MessageEvent event) throws Exception {
incomingMessages.mark();

final Message msg = event.getMessage();
if (msg == null) {
LOG.debug("Skipping null message.");
return;
}
LOG.trace("Processing message <{}> from OutputBuffer.", msg.getId());

if (msg instanceof SystemMessage) {
incomingSystemMessages.mark();
} else {
incomingMessages.mark();
}

final Set<MessageOutput> messageOutputs = outputRouter.getStreamOutputsForMessage(msg);
msg.recordCounter(serverStatus, "matched-outputs", messageOutputs.size());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package org.graylog2.indexer.messages;

public interface IndexingResult {
public Indexable message();
Indexable message();

public String index();
String index();

default boolean isSuccess() {
return this instanceof IndexingSuccess;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.indexer.messages;

import java.util.function.Consumer;

public interface IndexingResultCallback extends Consumer<IndexingResult> {
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.github.rholder.retry.WaitStrategy;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import jakarta.inject.Inject;
import jakarta.inject.Singleton;
import org.graylog.failure.FailureSubmissionService;
import org.graylog2.indexer.InvalidWriteTargetException;
import org.graylog2.indexer.MasterNotDiscoveredException;
Expand All @@ -36,10 +38,6 @@
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,18 @@
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import jakarta.inject.Inject;
import jakarta.inject.Named;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.cluster.Cluster;
import org.graylog2.indexer.messages.IndexingResults;
import org.graylog2.indexer.messages.MessageWithIndex;
import org.graylog2.indexer.messages.Messages;
import org.graylog2.plugin.Message;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;
import jakarta.inject.Named;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
Expand All @@ -46,7 +44,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

import static com.codahale.metrics.MetricRegistry.name;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -75,11 +72,10 @@ public class BlockingBatchedESOutput extends ElasticSearchOutput {
public BlockingBatchedESOutput(MetricRegistry metricRegistry,
Messages messages,
org.graylog2.Configuration serverConfiguration,
Journal journal,
MessageQueueAcknowledger acknowledger,
Cluster cluster,
@Named("daemonScheduler") ScheduledExecutorService daemonScheduler) {
super(metricRegistry, messages, journal, acknowledger);
super(metricRegistry, messages, acknowledger);
this.maxBufferSize = serverConfiguration.getOutputBatchSize();
outputFlushInterval = serverConfiguration.getOutputFlushInterval();
this.processTime = metricRegistry.timer(name(this.getClass(), "processTime"));
Expand Down Expand Up @@ -133,9 +129,9 @@ private void flush(List<MessageWithIndex> messages) {
}

try {
indexMessageBatch(messages);
var indexingResults = indexMessageBatch(messages);
// This does not exclude failedMessageIds, because we don't know if ES is ever gonna accept these messages.
acknowledger.acknowledge(messages.stream().map(MessageWithIndex::message).collect(Collectors.toList()));
acknowledger.acknowledge(indexingResults);
} catch (Exception e) {
log.error("Unable to flush message buffer", e);
bufferFlushFailures.mark();
Expand All @@ -145,13 +141,15 @@ private void flush(List<MessageWithIndex> messages) {
}

protected IndexingResults indexMessageBatch(List<MessageWithIndex> messages) throws Exception {
IndexingResults indexingResults;
try (Timer.Context ignored = processTime.time()) {
lastFlushTime.set(System.nanoTime());
final IndexingResults indexingResults = writeMessageEntries(messages);
indexingResults = writeMessageEntries(messages);
batchSize.update(messages.size());
bufferFlushes.mark();
return indexingResults;
}

return indexingResults;
}

public void forceFlushIfTimedout() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import jakarta.inject.Inject;
import org.graylog2.indexer.messages.IndexingResults;
import org.graylog2.indexer.messages.MessageWithIndex;
import org.graylog2.indexer.messages.Messages;
Expand All @@ -27,13 +28,10 @@
import org.graylog2.plugin.configuration.ConfigurationRequest;
import org.graylog2.plugin.outputs.MessageOutput;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.shared.journal.Journal;
import org.graylog2.shared.messageq.MessageQueueAcknowledger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -54,17 +52,14 @@ public class ElasticSearchOutput implements MessageOutput {
private final Meter failures;
private final Timer processTime;
private final Messages messages;
private final Journal journal;
private final AtomicBoolean isRunning = new AtomicBoolean(false);
protected final MessageQueueAcknowledger acknowledger;

@Inject
public ElasticSearchOutput(MetricRegistry metricRegistry,
Messages messages,
Journal journal,
MessageQueueAcknowledger acknowledger) {
this.messages = messages;
this.journal = journal;
this.acknowledger = acknowledger;
// Only constructing metrics here. write() get's another Core reference. (because this technically is a plugin)
this.writes = metricRegistry.meter(WRITES_METRICNAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@
package org.graylog2.plugin;

import jakarta.inject.Singleton;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.IndexingResultCallback;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.util.Map;

// Intentionally package-private to enforce usage of injected MessageFactory.
Expand All @@ -38,4 +41,9 @@ public Message createMessage(final Map<String, Object> fields) {
public Message createMessage(final String id, Map<String, Object> newFields) {
return new Message(id, newFields);
}

@Override
public SystemMessage createSystemMessage(IndexSet indexSet, Map<String, Object> fields, @Nullable IndexingResultCallback resultCallback) {
return new SystemMessage(indexSet, fields, resultCallback);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@

public final class GlobalMetricNames {

private GlobalMetricNames() {}
private GlobalMetricNames() {
}

public static final String OLDEST_SEGMENT_SUFFIX = "oldest-segment";
public static final String RATE_SUFFIX = "1-sec-rate";
Expand All @@ -43,6 +44,7 @@ private GlobalMetricNames() {}

public static final String OUTPUT_BUFFER_USAGE = "org.graylog2.buffers.output.usage";
public static final String OUTPUT_BUFFER_SIZE = "org.graylog2.buffers.output.size";
public static final String OUTPUT_BUFFER_RATELIMIT = "org.graylog2.buffers.output.ratelimit";

public static final String JOURNAL_APPEND_RATE = name("org.graylog2.journal.append", RATE_SUFFIX);
public static final String JOURNAL_READ_RATE = name("org.graylog2.journal.read", RATE_SUFFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
*/
package org.graylog2.plugin;

import jakarta.annotation.Nullable;
import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.IndexingResultCallback;
import org.joda.time.DateTime;

import java.util.Map;
Expand Down Expand Up @@ -48,6 +51,22 @@ public interface MessageFactory {
*/
Message createMessage(String id, Map<String, Object> newFields);

/**
* Return a new {@link SystemMessage} object which can be used for System purposes like restoring Archives.
* The message has the following properties:
* <ul>
* <li>A size of 0, so its traffic is not accounted</li>
* <li>A single predetermined IndexSet</li>
* <li>No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}</li>
* </ul>
*
* @param indexSet the predetermined indexSet where the message will be indexed
* @param fields the map of fields
* @param resultCallback an optional {@link IndexingResultCallback} that will be called once the Message is indexed
* @return the new SystemMessage object
*/
public SystemMessage createSystemMessage(IndexSet indexSet, Map<String, Object> fields, @Nullable IndexingResultCallback resultCallback);

/**
* Returns a fake {@link Message}. This message must not be used for real message processing!
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog2.plugin;

import org.graylog2.indexer.IndexSet;
import org.graylog2.indexer.messages.IndexingResult;
import org.graylog2.indexer.messages.IndexingResultCallback;
import org.graylog2.plugin.streams.Stream;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Set;

/**
* A Message that is used for System purposes like restoring Archives.
* The message has the following properties:
* <ul>
* <li>A size of 0, so its traffic is not accounted</li>
* <li>A single predetermined IndexSet</li>
* <li>No streams, so it will only be routed to the {@link org.graylog2.outputs.DefaultMessageOutput}</li>
* </ul>
*/
public class SystemMessage extends Message {
private final IndexSet indexSet;
private final IndexingResultCallback resultCallback;

// Intentionally package-private to enforce MessageFactory usage.
SystemMessage(IndexSet indexSet, Map<String, Object> fields, @Nullable IndexingResultCallback resultCallback) {
super(fields);
this.indexSet = indexSet;
this.resultCallback = resultCallback;
}

public void runIndexingResultCallback(IndexingResult result) {
if (resultCallback != null) {
resultCallback.accept(result);
}
}

@Override
public Set<IndexSet> getIndexSets() {
return Set.of(indexSet);
}

@Override
public long getSize() {
return 0;
}

@Override
public Set<Stream> getStreams() {
return Set.of();
}

@Override
public Object getMessageQueueId() {
return null;
}

}
Loading
Loading