diff --git a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/actions/MigrationActionsImpl.java b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/actions/MigrationActionsImpl.java index 0a6716030b36..0e005d0445be 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/actions/MigrationActionsImpl.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/actions/MigrationActionsImpl.java @@ -29,25 +29,35 @@ import org.graylog2.cluster.preflight.DataNodeProvisioningService; import org.graylog2.plugin.certificates.RenewalPolicy; import org.graylog2.plugin.cluster.ClusterConfigService; +import org.graylog2.system.processing.control.ClusterProcessingControl; +import org.graylog2.system.processing.control.ClusterProcessingControlFactory; +import org.graylog2.system.processing.control.RemoteProcessingControlResource; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; @Singleton public class MigrationActionsImpl implements MigrationActions { + private static final Logger LOG = LoggerFactory.getLogger(MigrationActionsImpl.class); private final ClusterConfigService clusterConfigService; + private final ClusterProcessingControlFactory clusterProcessingControlFactory; private final NodeService nodeService; private final CaService caService; - private MigrationStateMachineContext stateMachineContext; private final DataNodeProvisioningService dataNodeProvisioningService; + private MigrationStateMachineContext stateMachineContext; + @Inject public MigrationActionsImpl(final ClusterConfigService clusterConfigService, NodeService nodeService, - final CaService caService, DataNodeProvisioningService dataNodeProvisioningService) { + final CaService caService, DataNodeProvisioningService dataNodeProvisioningService, + final ClusterProcessingControlFactory clusterProcessingControlFactory) { this.clusterConfigService = clusterConfigService; this.nodeService = nodeService; this.caService = caService; this.dataNodeProvisioningService = dataNodeProvisioningService; + this.clusterProcessingControlFactory = clusterProcessingControlFactory; } @Override @@ -97,12 +107,22 @@ public void reindexOldData() { @Override public void stopMessageProcessing() { - + final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY); + final ClusterProcessingControl control = clusterProcessingControlFactory.create(authToken); + LOG.info("Attempting to pause processing on all nodes..."); + control.pauseProcessing(); + LOG.info("Done pausing processing on all nodes."); + LOG.info("Waiting for output buffer to drain on all nodes..."); + control.waitForEmptyBuffers(); + LOG.info("Done waiting for output buffer to drain on all nodes."); } @Override public void startMessageProcessing() { - + final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY); + final ClusterProcessingControl control = clusterProcessingControlFactory.create(authToken); + LOG.info("Resuming message processing."); + control.resumeGraylogMessageProcessing(); } @Override @@ -135,7 +155,6 @@ public boolean provisioningFinished() { return nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE); } - @Override public void setStateMachineContext(MigrationStateMachineContext context) { this.stateMachineContext = context; } diff --git a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineContext.java b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineContext.java index 018ac258ef7d..eb7a36785fa1 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineContext.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineContext.java @@ -26,6 +26,8 @@ @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.PROTECTED_AND_PUBLIC) public class MigrationStateMachineContext { + public static final String AUTH_TOKEN_KEY = "authToken"; + @JsonProperty protected MigrationStep currentStep; @JsonProperty diff --git a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResource.java b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResource.java index 0459235ff778..f131f70d1f28 100644 --- a/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResource.java +++ b/graylog2-server/src/main/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResource.java @@ -26,14 +26,19 @@ import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import org.apache.shiro.authz.annotation.RequiresAuthentication; import org.apache.shiro.authz.annotation.RequiresPermissions; import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachine; import org.graylog2.audit.jersey.NoAuditEvent; +import org.graylog2.shared.rest.resources.ProxiedResource; import org.graylog2.shared.security.RestPermissions; +import static org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext.AUTH_TOKEN_KEY; + @Path("/migration") @RequiresAuthentication @Consumes(MediaType.APPLICATION_JSON) @@ -44,8 +49,10 @@ public class MigrationStateResource { private final MigrationStateMachine stateMachine; @Inject - public MigrationStateResource(MigrationStateMachine stateMachine) { + public MigrationStateResource(MigrationStateMachine stateMachine, + @Context HttpHeaders httpHeaders) { this.stateMachine = stateMachine; + this.stateMachine.getContext().addExtendedState(AUTH_TOKEN_KEY, ProxiedResource.authenticationToken(httpHeaders)); } @POST @@ -77,7 +84,7 @@ public CurrentStateInformation status() { @Produces(MediaType.TEXT_PLAIN) @ApiOperation(value = "Serialize", notes = "Serialize migration graph as graphviz source") public String serialize() { - // you can use https://dreampuf.github.io/GraphvizOnline/ to vizualize the result + // you can use https://dreampuf.github.io/GraphvizOnline/ to visualize the result return stateMachine.serialize(); } } diff --git a/graylog2-server/src/main/java/org/graylog2/Configuration.java b/graylog2-server/src/main/java/org/graylog2/Configuration.java index 9857429e5bc2..1fedb3a9aae9 100644 --- a/graylog2-server/src/main/java/org/graylog2/Configuration.java +++ b/graylog2-server/src/main/java/org/graylog2/Configuration.java @@ -247,6 +247,24 @@ public class Configuration extends CaConfiguration { @Parameter(value = "field_value_suggestion_mode", required = true, converter = FieldValueSuggestionModeConverter.class) private FieldValueSuggestionMode fieldValueSuggestionMode = FieldValueSuggestionMode.ON; + public static final String INSTALL_HTTP_CONNECTION_TIMEOUT = "install_http_connection_timeout"; + public static final String INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL = "install_output_buffer_drain_interval"; + public static final String INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES = "install_output_buffer_max_retries"; + + private static final int DEFAULT_INSTALL_RETRIES = 150; + private static final Duration DEFAULT_INSTALL_SECONDS = Duration.seconds(2); + + @Parameter(value = INSTALL_HTTP_CONNECTION_TIMEOUT, validators = PositiveDurationValidator.class) + private Duration installHttpConnectionTimeout = Duration.seconds(10L); + + @Parameter(value = INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL, validators = PositiveDurationValidator.class) + private Duration installOutputBufferDrainingInterval = DEFAULT_INSTALL_SECONDS; + + // The maximum number of times to check if buffers have drained during Illuminate restarts on all + // nodes before giving up + @Parameter(value = INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES, validators = PositiveIntegerValidator.class) + private int installOutputBufferDrainingMaxRetries = DEFAULT_INSTALL_RETRIES; + public boolean maintainsStreamAwareFieldTypes() { return streamAwareFieldTypes; } diff --git a/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControl.java b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControl.java new file mode 100644 index 000000000000..a41f3c5bf4fe --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControl.java @@ -0,0 +1,221 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.system.processing.control; + +import com.github.joschi.jadconfig.util.Duration; +import com.github.rholder.retry.Attempt; +import com.github.rholder.retry.RetryException; +import com.github.rholder.retry.RetryListener; +import com.github.rholder.retry.Retryer; +import com.github.rholder.retry.RetryerBuilder; +import com.github.rholder.retry.StopStrategies; +import com.github.rholder.retry.WaitStrategies; +import org.graylog2.cluster.Node; +import org.graylog2.cluster.nodes.NodeService; +import org.graylog2.cluster.nodes.ServerNodeDto; +import org.graylog2.rest.RemoteInterfaceProvider; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import retrofit2.Call; +import retrofit2.Response; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; +import java.util.stream.Collectors; + +import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL; +import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES; +import static org.graylog2.shared.utilities.StringUtils.f; + +public class ClusterProcessingControl { + private final Logger LOG = LoggerFactory.getLogger(ClusterProcessingControl.class); + + private static final String OUTPUT_RATE_METRIC_NAME = "org.graylog2.throughput.output.1-sec-rate"; + + protected final String authorizationToken; + protected final RemoteInterfaceProvider remoteInterfaceProvider; + protected final NodeService nodeService; + protected final Duration connectionTimeout; + private final Duration bufferDrainInterval; + private final int maxBufferDrainRetries; + + public ClusterProcessingControl(String authorizationToken, RemoteInterfaceProvider remoteInterfaceProvider, NodeService nodeService, Duration connectionTimeout, Duration bufferDrainInterval, int maxBufferDrainRetries) { + this.authorizationToken = authorizationToken; + this.remoteInterfaceProvider = remoteInterfaceProvider; + this.nodeService = nodeService; + this.connectionTimeout = connectionTimeout; + this.bufferDrainInterval = bufferDrainInterval; + this.maxBufferDrainRetries = maxBufferDrainRetries; + } + + public void pauseProcessing() { + runOnAllActiveNodes("pause processing", RemoteProcessingControlResource::pauseProcessing, true); + } + + protected Map runOnAllActiveNodes( + String operationName, + Function> callRemoteResource, + boolean stopOnFirstException + ) { + final Map result = new HashMap<>(); + final List exceptions = new ArrayList<>(); + printNodeDebugInfo(); + nodeService.allActive().entrySet().forEach(entry -> { + final Node nodeValue = entry.getValue(); + try { + LOG.info("Attempting to call '{}' on node [{}].", operationName, nodeValue.getNodeId()); + final Response response = getrResponse(callRemoteResource, entry); + if (!response.isSuccessful()) { + final String message = f("Unable to call '%s' on node [%s] code [%s] body [%s]", + operationName, nodeValue.getNodeId(), + response.code(), response.body()); + LOG.error("Unable to call '{}' on node [{}] code [{}] body [{}].", + operationName, nodeValue.getNodeId(), + response.code(), response.body()); + throw new ClusterProcessingControlException(message); + } + result.put(entry.getKey(), response.body()); + LOG.info("Successfully called '{}' on node [{}].", operationName, nodeValue.getNodeId()); + } catch (Exception e) { + if (e instanceof ClusterProcessingControlException) { + exceptions.add((ClusterProcessingControlException) e); + } else { + final String message = f("Unable to call '%s' on node [%s]", operationName, nodeValue.getNodeId()); + LOG.error(message, e); + exceptions.add(new ClusterProcessingControlException(message, e)); + } + + if (stopOnFirstException) { + throw exceptions.get(0); + } + } + }); + + if (!exceptions.isEmpty()) { + throw exceptions.get(0); + } + + return result; + } + + protected Response getrResponse(Function> callRemoteResource, Map.Entry entry) throws IOException { + var remoteProcessingControlResource = remoteInterfaceProvider.get(entry.getValue(), + this.authorizationToken, RemoteProcessingControlResource.class, + java.time.Duration.ofSeconds(connectionTimeout.toSeconds())); + return callRemoteResource.apply((F) remoteProcessingControlResource).execute(); + } + + public void waitForEmptyBuffers() throws OutputBufferDrainFailureException { + printNodeDebugInfo(); + final Retryer retryer = RetryerBuilder.newBuilder() + .retryIfResult(value -> !value.success) + .withWaitStrategy(WaitStrategies.fixedWait(bufferDrainInterval.toSeconds(), TimeUnit.SECONDS)) + .withStopStrategy(StopStrategies.stopAfterAttempt(maxBufferDrainRetries)) + .withRetryListener(new RetryListener() { + @Override + public void onRetry(Attempt attempt) { + if (attempt.getAttemptNumber() > 1) { + LOG.info("Checking again for empty output buffers (attempt #{}).", attempt.getAttemptNumber()); + } + } + }) + .build(); + try { + retryer.call(() -> { + final Map nodeOutputRateMap = runOnAllActiveNodes("fetching output rate metric value", + res -> res.getMetric(OUTPUT_RATE_METRIC_NAME), true) + .entrySet() + .stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> (Double) entry.getValue().get("value"))); + final boolean allZero = new HashSet<>(nodeOutputRateMap.values()).stream() + .allMatch(this::isOutputRateCloseToZero); + final Set nonZeroNodes = nodeOutputRateMap + .entrySet() + .stream() + .filter(e -> !isOutputRateCloseToZero(e.getValue())) + .map(Map.Entry::getKey) + .collect(Collectors.toSet()); + if (allZero) { + LOG.info("Output buffer is now empty on all nodes."); + } else { + LOG.info("Output rate has not yet reached zero on nodes [{}].", nonZeroNodes); + } + return new NodeOperationResult(allZero, nonZeroNodes); + }); + } catch (RetryException e) { + final String message = f("The [%s] rate failed to reach zero on all nodes in [%s] with [%s] retries. Giving up. " + + "This is configurable with the [%s] and [%s] configuration properties", OUTPUT_RATE_METRIC_NAME, + bufferDrainInterval.toSeconds(), maxBufferDrainRetries, INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL, + INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES); + LOG.error(message); + throw new OutputBufferDrainFailureException(bufferDrainInterval.toSeconds(), maxBufferDrainRetries, + tryGetExceptionNodes(e)); + } catch (Exception e) { + throw new ClusterProcessingControlException("Failed to request node output rate on all nodes.", e); + } + } + + /** + * Try to retrieve the nodes that have a non-zero output rate from the RetryException. + * This should succeed with the current implementation. + */ + protected static Set tryGetExceptionNodes(RetryException e) { + try { + return ((NodeOperationResult) e.getLastFailedAttempt().get()).nonZeroOutputRateNodeIds(); + } catch (ExecutionException ex) { + return Collections.emptySet(); + } + } + + public record NodeOperationResult(boolean success, Set nonZeroOutputRateNodeIds) { + } + + /** + * The output rate is the number of messages per second that are being written to OpenSearch (usually a + * whole number followed by some meaningless decimals - e.g. 100.01 messages/second). + * A value < 1 is effectively zero. The rate might become very small, but not zero in some cases, + * so this method accounts for that condition. + */ + protected boolean isOutputRateCloseToZero(double outputRate) { + return outputRate < 0.0001; + } + + public void resumeGraylogMessageProcessing() { + LOG.info("Attempting to resume processing on all nodes..."); + runOnAllActiveNodes("resume processing", RemoteProcessingControlResource::resumeProcessing, false); + LOG.info("Done resuming processing on all nodes."); + } + + protected void printNodeDebugInfo() { + if (LOG.isDebugEnabled()) { + LOG.debug("The Graylog cluster contains the following nodes:"); + nodeService.allActive().entrySet().forEach((entry) -> { + final Node node = entry.getValue(); + LOG.debug("Node ID [{}] Transport Address [{}] Last Seen [{}]", node.getNodeId(), node.getTransportAddress(), node.getLastSeen()); + }); + } + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlException.java b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlException.java new file mode 100644 index 000000000000..e8a4d263b640 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlException.java @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.system.processing.control; + +public class ClusterProcessingControlException extends RuntimeException { + + public ClusterProcessingControlException(String message) { + super(message); + } + + public ClusterProcessingControlException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlFactory.java b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlFactory.java new file mode 100644 index 000000000000..266a17467885 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/system/processing/control/ClusterProcessingControlFactory.java @@ -0,0 +1,53 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.system.processing.control; + +import com.github.joschi.jadconfig.util.Duration; +import jakarta.inject.Inject; +import jakarta.inject.Named; +import org.graylog2.cluster.nodes.NodeService; +import org.graylog2.cluster.nodes.ServerNodeDto; +import org.graylog2.rest.RemoteInterfaceProvider; + +import static org.graylog2.Configuration.INSTALL_HTTP_CONNECTION_TIMEOUT; +import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL; +import static org.graylog2.Configuration.INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES; + +public class ClusterProcessingControlFactory { + protected final RemoteInterfaceProvider remoteInterfaceProvider; + protected final NodeService nodeService; + protected final Duration connectionTimeout; + private final Duration bufferDrainInterval; + private final int maxBufferDrainRetries; + + @Inject + public ClusterProcessingControlFactory(final RemoteInterfaceProvider remoteInterfaceProvider, + final NodeService nodeService, + @Named(INSTALL_HTTP_CONNECTION_TIMEOUT) final Duration connectionTimeout, + @Named(INSTALL_OUTPUT_BUFFER_DRAINING_INTERVAL) final Duration bufferDrainInterval, + @Named(INSTALL_OUTPUT_BUFFER_DRAINING_MAX_RETRIES) final int maxBufferDrainRetries) { + this.remoteInterfaceProvider = remoteInterfaceProvider; + this.nodeService = nodeService; + this.connectionTimeout = connectionTimeout; + this.bufferDrainInterval = bufferDrainInterval; + this.maxBufferDrainRetries = maxBufferDrainRetries; + } + + public ClusterProcessingControl create(String authorizationToken) { + return new ClusterProcessingControl<>(authorizationToken, remoteInterfaceProvider, nodeService, connectionTimeout, bufferDrainInterval, maxBufferDrainRetries); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/system/processing/control/OutputBufferDrainFailureException.java b/graylog2-server/src/main/java/org/graylog2/system/processing/control/OutputBufferDrainFailureException.java new file mode 100644 index 000000000000..1c236abfeab8 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/system/processing/control/OutputBufferDrainFailureException.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.system.processing.control; + +import java.util.Set; + +import static org.graylog2.shared.utilities.StringUtils.f; + +public class OutputBufferDrainFailureException extends RuntimeException { + + public OutputBufferDrainFailureException(long retryIntervalSeconds, int maxRetries, Set nodes) { + super(f("Failed to drain the Output buffer on nodes [%s] within [%s] seconds. " + + "This probably means that your Graylog nodes are processing a very high rate of messages. " + + "You can try to pause processing manually in System > Nodes, and attempt the installation " + + "again once the output buffers have fully drained.", + String.join("", nodes), retryIntervalSeconds * maxRetries)); + } +} diff --git a/graylog2-server/src/main/java/org/graylog2/system/processing/control/RemoteProcessingControlResource.java b/graylog2-server/src/main/java/org/graylog2/system/processing/control/RemoteProcessingControlResource.java new file mode 100644 index 000000000000..e9bf09152535 --- /dev/null +++ b/graylog2-server/src/main/java/org/graylog2/system/processing/control/RemoteProcessingControlResource.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2020 Graylog, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + */ +package org.graylog2.system.processing.control; + +import retrofit2.Call; +import retrofit2.http.GET; +import retrofit2.http.PUT; +import retrofit2.http.Path; + +import java.util.HashMap; + +public interface RemoteProcessingControlResource { + @PUT("system/processing/pause") + Call pauseProcessing(); + + @PUT("system/processing/resume") + Call resumeProcessing(); + + @GET("system/metrics/{metricName}") + Call getMetric(@Path("metricName") String metricName); +} diff --git a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationActionsAdapter.java b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationActionsAdapter.java index b028e6ec8eb5..410a053488f1 100644 --- a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationActionsAdapter.java +++ b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationActionsAdapter.java @@ -31,7 +31,6 @@ public void resetMigration() { } - @Override public void setStateMachineContext(MigrationStateMachineContext context) { this.context = context; } diff --git a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineImplTest.java b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineImplTest.java index b551baf92777..d81173524157 100644 --- a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineImplTest.java +++ b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/machine/MigrationStateMachineImplTest.java @@ -150,7 +150,7 @@ private StateMachine testStateMachineWithAction(C private static class TestMigrationActions extends MigrationActionsImpl { public TestMigrationActions() { - super(null, null, null, null); + super(null, null, null, null, null); } public void runTestFunction(Consumer testFunction) { diff --git a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResourceTest.java b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResourceTest.java index e2f0e412a744..22a630266af0 100644 --- a/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResourceTest.java +++ b/graylog2-server/src/test/java/org/graylog/plugins/views/storage/migration/state/rest/MigrationStateResourceTest.java @@ -16,13 +16,15 @@ */ package org.graylog.plugins.views.storage.migration.state.rest; +import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; import org.graylog.plugins.views.storage.migration.state.machine.MigrationState; import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachine; +import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext; import org.graylog.plugins.views.storage.migration.state.machine.MigrationStep; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -37,11 +39,27 @@ @ExtendWith(MockitoExtension.class) public class MigrationStateResourceTest { + public static final String AUTHORIZATION = "MyAuthorization"; @Mock MigrationStateMachine stateMachine; - @InjectMocks + @Mock + HttpHeaders httpHeaders; + MigrationStateMachineContext stateMachineContext; MigrationStateResource migrationStateResource; + @BeforeEach + public void setUp() { + stateMachineContext = new MigrationStateMachineContext(); + when(stateMachine.getContext()).thenReturn(stateMachineContext); + when(httpHeaders.getRequestHeader(HttpHeaders.AUTHORIZATION)).thenReturn(List.of(AUTHORIZATION)); + migrationStateResource = new MigrationStateResource(stateMachine, httpHeaders); + } + + @Test + public void authenticationTokenSetToStateMachineContext() { + assertThat(stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY)).isEqualTo(AUTHORIZATION); + } + @Test public void requestReturnsSuccessfulResult() { CurrentStateInformation state = new CurrentStateInformation(MigrationState.NEW, List.of(MigrationStep.SELECT_MIGRATION));