Skip to content

Commit

Permalink
Reimplement remote reindexing migration in the migration state machine (
Browse files Browse the repository at this point in the history
#18262)

* Reimplement remote reindexing migration in the migration state machine

* fix response format for migration triggers

* Allow return one step back to start remote reindex migration again

* fixed tests

* added dummy RemoteReindexingMigrationAdapterES7

* simplified test
  • Loading branch information
todvora authored Feb 16, 2024
1 parent 0cb4ae7 commit 57d7cee
Show file tree
Hide file tree
Showing 16 changed files with 229 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.graylog2.indexer.cluster.NodeAdapter;
import org.graylog2.indexer.counts.CountsAdapter;
import org.graylog2.indexer.datanode.ProxyRequestAdapter;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.datastream.DataStreamAdapter;
import org.graylog2.indexer.fieldtypes.IndexFieldTypePollerAdapter;
import org.graylog2.indexer.fieldtypes.streamfiltered.esadapters.StreamsForFieldRetriever;
Expand Down Expand Up @@ -84,6 +85,8 @@ protected void configure() {

bind(RestHighLevelClient.class).toProvider(RestHighLevelClientProvider.class);
bind(CredentialsProvider.class).toProvider(ESCredentialsProvider.class);

bindForSupportedVersion(RemoteReindexingMigrationAdapter.class).to(RemoteReindexingMigrationAdapterES7.class);
}

private <T> LinkedBindingBuilder<T> bindForSupportedVersion(Class<T> interfaceClass) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.storage.elasticsearch7;

import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.migration.RemoteReindexMigration;

import java.net.URI;
import java.util.List;

public class RemoteReindexingMigrationAdapterES7 implements RemoteReindexingMigrationAdapter {
@Override
public RemoteReindexMigration start(URI uri, String username, String password, List<String> indices, boolean synchronous) {
throw new UnsupportedOperationException("Remote reindexing migrations are not supported for elasticsearch");
}

@Override
public RemoteReindexMigration status(String migrationID) {
throw new UnsupportedOperationException("Remote reindexing migrations are not supported for elasticsearch");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,7 @@ public interface MigrationActions {

void reindexUpgradeSelected();

boolean reindexingFinished();

void reindexOldData();
boolean isRemoteReindexingFinished();

void stopMessageProcessing();

Expand All @@ -59,4 +57,8 @@ public interface MigrationActions {

MigrationStateMachineContext getStateMachineContext();

void startRemoteReindex();

void requestMigrationStatus();

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.cluster.preflight.DataNodeProvisioningConfig;
import org.graylog2.cluster.preflight.DataNodeProvisioningService;
import org.graylog2.indexer.datanode.RemoteReindexingMigrationAdapter;
import org.graylog2.indexer.migration.RemoteReindexMigration;
import org.graylog2.plugin.certificates.RenewalPolicy;
import org.graylog2.plugin.cluster.ClusterConfigService;
import org.graylog2.system.processing.control.ClusterProcessingControl;
Expand All @@ -37,7 +39,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Singleton
public class MigrationActionsImpl implements MigrationActions {
Expand All @@ -47,21 +53,25 @@ public class MigrationActionsImpl implements MigrationActions {
private final ClusterProcessingControlFactory clusterProcessingControlFactory;
private final NodeService<DataNodeDto> nodeService;
private final CaService caService;
private final DataNodeProvisioningService dataNodeProvisioningService;
private final PreflightConfigService preflightConfigService;

private MigrationStateMachineContext stateMachineContext;
private final DataNodeProvisioningService dataNodeProvisioningService;

private final RemoteReindexingMigrationAdapter migrationService;

@Inject
public MigrationActionsImpl(final ClusterConfigService clusterConfigService, NodeService<DataNodeDto> nodeService,
final CaService caService, DataNodeProvisioningService dataNodeProvisioningService,
RemoteReindexingMigrationAdapter migrationService,
final ClusterProcessingControlFactory clusterProcessingControlFactory,
final PreflightConfigService preflightConfigService) {
this.clusterConfigService = clusterConfigService;
this.nodeService = nodeService;
this.caService = caService;
this.dataNodeProvisioningService = dataNodeProvisioningService;
this.clusterProcessingControlFactory = clusterProcessingControlFactory;
this.migrationService = migrationService;
this.preflightConfigService = preflightConfigService;
}

Expand Down Expand Up @@ -99,20 +109,9 @@ public void reindexUpgradeSelected() {

}

@Override
public boolean reindexingFinished() {
// TODO: add real test
return true;
}

@Override
public void reindexOldData() {

}

@Override
public void stopMessageProcessing() {
final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final String authToken = (String) stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final ClusterProcessingControl<RemoteProcessingControlResource> control = clusterProcessingControlFactory.create(authToken);
LOG.info("Attempting to pause processing on all nodes...");
control.pauseProcessing();
Expand All @@ -124,7 +123,7 @@ public void stopMessageProcessing() {

@Override
public void startMessageProcessing() {
final String authToken = (String)stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final String authToken = (String) stateMachineContext.getExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY);
final ClusterProcessingControl<RemoteProcessingControlResource> control = clusterProcessingControlFactory.create(authToken);
LOG.info("Resuming message processing.");
control.resumeGraylogMessageProcessing();
Expand Down Expand Up @@ -153,7 +152,7 @@ public boolean caAndRemovalPolicyExist() {
public void provisionDataNodes() {
// if we start provisioning DataNodes via the migration, Preflight is definitely done/no option anymore
var preflight = preflightConfigService.getPreflightConfigResult();
if(preflight == null || !preflight.equals(PreflightConfigResult.FINISHED)) {
if (preflight == null || !preflight.equals(PreflightConfigResult.FINISHED)) {
preflightConfigService.setConfigResult(PreflightConfigResult.FINISHED);
}
final Map<String, DataNodeDto> activeDataNodes = nodeService.allActive();
Expand Down Expand Up @@ -182,6 +181,33 @@ public boolean dataNodeStartupFinished() {
return nodeService.allActive().values().stream().allMatch(node -> node.getDataNodeStatus() == DataNodeStatus.AVAILABLE);
}

@Override
public void startRemoteReindex() {
final URI hostname = Objects.requireNonNull(URI.create(getStateMachineContext().getActionArgument("hostname", String.class)), "hostname has to be provided");
final String user = getStateMachineContext().getActionArgumentOpt("user", String.class).orElse(null);
final String password = getStateMachineContext().getActionArgumentOpt("password", String.class).orElse(null);
final List<String> indices = getStateMachineContext().getActionArgumentOpt("indices", List.class).orElse(Collections.emptyList()); // todo: generics!
final RemoteReindexMigration migration = migrationService.start(hostname, user, password, indices, false);
final String migrationID = migration.id();
getStateMachineContext().addExtendedState(MigrationStateMachineContext.KEY_MIGRATION_ID, migrationID);
}

@Override
public void requestMigrationStatus() {
getStateMachineContext().getExtendedState(MigrationStateMachineContext.KEY_MIGRATION_ID, String.class)
.map(migrationService::status)
.ifPresent(status -> getStateMachineContext().setResponse(status));
}

@Override
public boolean isRemoteReindexingFinished() {
return getStateMachineContext().getExtendedState(MigrationStateMachineContext.KEY_MIGRATION_ID, String.class)
.map(migrationService::status)
.filter(m -> m.status() == RemoteReindexingMigrationAdapter.Status.FINISHED)
.isPresent();
}

@Override
public void setStateMachineContext(MigrationStateMachineContext context) {
this.stateMachineContext = context;
}
Expand All @@ -190,5 +216,4 @@ public void setStateMachineContext(MigrationStateMachineContext context) {
public MigrationStateMachineContext getStateMachineContext() {
return stateMachineContext;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public enum MigrationState {
PROVISION_DATANODE_CERTIFICATES_RUNNING,
EXISTING_DATA_MIGRATION_QUESTION_PAGE,
MIGRATE_EXISTING_DATA,
REMOTE_REINDEX_RUNNING,
ASK_TO_SHUTDOWN_OLD_CLUSTER,
DIRECTORY_COMPATIBILITY_CHECK_PAGE,
PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,17 @@ private static StateMachineConfig<MigrationState, MigrationStep> configureStates
.permit(MigrationStep.SHOW_MIGRATE_EXISTING_DATA, MigrationState.MIGRATE_EXISTING_DATA)
.permit(MigrationStep.SKIP_EXISTING_DATA_MIGRATION, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER);

config.configure(MigrationState.MIGRATE_EXISTING_DATA)
.onEntry(migrationActions::reindexOldData)
.permitIf(MigrationStep.SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::reindexingFinished);
// we now have enough information in the context to start the remote reindex migration. This will move us to the
// next state that will be active as long as the migration is running and will provide status information to the FE
config.configure(MigrationState.MIGRATE_EXISTING_DATA) // this state and screen has to request username, password and url of the old cluster
.permit(MigrationStep.START_REMOTE_REINDEX_MIGRATION, MigrationState.REMOTE_REINDEX_RUNNING, migrationActions::startRemoteReindex);

// the state machine will stay in this state till the migration is finished or fails. It should provide
// current migration status every time we trigger MigrationStep.REQUEST_MIGRATION_STATUS.
config.configure(MigrationState.REMOTE_REINDEX_RUNNING)
.permitReentry(MigrationStep.REQUEST_MIGRATION_STATUS, migrationActions::requestMigrationStatus)
.permit(MigrationStep.RETRY_MIGRATE_EXISTING_DATA, MigrationState.MIGRATE_EXISTING_DATA) // allow one step back in case the migration fails
.permitIf(MigrationStep.SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::isRemoteReindexingFinished);

config.configure(MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER)
.permitIf(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG, migrationActions::isOldClusterStopped);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@
package org.graylog.plugins.views.storage.migration.state.machine;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.PROTECTED_AND_PUBLIC)
public class MigrationStateMachineContext {

public static final String AUTH_TOKEN_KEY = "authToken";

public static final String KEY_MIGRATION_ID = "migrationID";

@JsonProperty
protected MigrationStep currentStep;
@JsonProperty
protected Map<MigrationStep, Map<String, Object>> actionArguments;
@JsonProperty
protected Map<String, Object> extendedState;

@JsonIgnore
protected Object response;

public MigrationStateMachineContext() {
this.actionArguments = new HashMap<>();
this.extendedState = new HashMap<>();
Expand All @@ -59,6 +66,18 @@ public <T> T getActionArgument(String name, Class<T> type) {
return (T) arg;
}

public <T> Optional<T> getActionArgumentOpt(String name, Class<T> type) {
Map<String, Object> args = this.actionArguments.get(currentStep);
return Optional.ofNullable(args)
.map(arg -> arg.get(name))
.map(arg -> {
if (!type.isInstance(arg)) {
throw new IllegalArgumentException("Argument " + name + " must be of type " + type);
}
return (T) arg;
});
}

public void addActionArguments(MigrationStep step, Map<String, Object> args) {
this.actionArguments.put(step, args);
}
Expand All @@ -71,4 +90,22 @@ public Object getExtendedState(String key) {
return this.extendedState.get(key);
}

public <T> Optional<T> getExtendedState(String name, Class<T> type) {
if (!this.extendedState.containsKey(name)) {
return Optional.empty();
}
Object value = this.extendedState.get(name);
if (!type.isInstance(value)) {
throw new IllegalArgumentException("Argument " + name + " must be of type " + type);
}
return Optional.of((T) value);
}

public void setResponse(Object response) {
this.response = response;
}

public Object getResponse() {
return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public CurrentStateInformation trigger(MigrationStep step, Map<String, Object> a
}
context = migrationActions.getStateMachineContext();
persistenceService.saveStateMachineContext(context);
return new CurrentStateInformation(getState(), nextSteps(), errorMessage);
return new CurrentStateInformation(getState(), nextSteps(), errorMessage, context.getResponse());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ public enum MigrationStep {
PROVISION_DATANODE_CERTIFICATES,
SHOW_DATA_MIGRATION_QUESTION,
SHOW_MIGRATE_EXISTING_DATA,
START_REMOTE_REINDEX_MIGRATION,

RETRY_MIGRATE_EXISTING_DATA,
REQUEST_MIGRATION_STATUS,

SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER,
SHOW_PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES,
SHOW_STOP_PROCESSING_PAGE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@

import java.util.List;

public record CurrentStateInformation(MigrationState state, List<MigrationStep> nextSteps, String errorMessage) {
public record CurrentStateInformation(MigrationState state, List<MigrationStep> nextSteps, String errorMessage, Object response) {

public CurrentStateInformation(MigrationState state, List<MigrationStep> nextSteps) {
this(state, nextSteps, null);
this(state, nextSteps, null, null);
}

public boolean hasErrors() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,11 @@
import org.apache.shiro.authz.annotation.RequiresAuthentication;
import org.apache.shiro.authz.annotation.RequiresPermissions;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachine;
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext;
import org.graylog2.audit.jersey.NoAuditEvent;
import org.graylog2.shared.rest.resources.ProxiedResource;
import org.graylog2.shared.security.RestPermissions;

import static org.graylog.plugins.views.storage.migration.state.machine.MigrationStateMachineContext.AUTH_TOKEN_KEY;

@Path("/migration")
@RequiresAuthentication
@Consumes(MediaType.APPLICATION_JSON)
Expand All @@ -49,22 +48,20 @@ public class MigrationStateResource {
private final MigrationStateMachine stateMachine;

@Inject
public MigrationStateResource(MigrationStateMachine stateMachine,
@Context HttpHeaders httpHeaders) {
public MigrationStateResource(MigrationStateMachine stateMachine, @Context HttpHeaders httpHeaders) {
this.stateMachine = stateMachine;
this.stateMachine.getContext().addExtendedState(AUTH_TOKEN_KEY, ProxiedResource.authenticationToken(httpHeaders));
this.stateMachine.getContext().addExtendedState(MigrationStateMachineContext.AUTH_TOKEN_KEY, ProxiedResource.authenticationToken(httpHeaders));
}

@POST
@Path("/trigger")
@NoAuditEvent("No Audit Event needed") // TODO: do we need audit log here?
@RequiresPermissions(RestPermissions.DATANODE_MIGRATION)
@ApiOperation(value = "trigger migration step")
public Response migrate(@ApiParam(name = "request") @NotNull MigrationStepRequest request) {
public Response trigger(@ApiParam(name = "request") @NotNull MigrationStepRequest request) {
final CurrentStateInformation newState = stateMachine.trigger(request.step(), request.args());
return Response
.status(newState.hasErrors() ? Response.Status.INTERNAL_SERVER_ERROR : Response.Status.OK)
.entity(newState)
Response.ResponseBuilder response = newState.hasErrors() ? Response.serverError() : Response.ok();
return response.entity(newState)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ public RemoteReindexMigration setFinishCallback(Runnable finishCallback) {
return this;
}

public Status status() {
return status;
}

public void finish() {
if(finishCallback != null) {
Expand Down
Loading

0 comments on commit 57d7cee

Please sign in to comment.