Skip to content

Commit

Permalink
DataNode: Added missing steps to Migration STM (#18079)
Browse files Browse the repository at this point in the history
* added additional steps

* added additional state

* refactoring state machine states/actions

* fixed test

* added second compatibility check state

* Fix migration states for rolling upgrade path

* add link to render migration state machine

---------

Co-authored-by: Tomas Dvorak <[email protected]>
  • Loading branch information
janheise and todvora authored Jan 31, 2024
1 parent aed9479 commit 7118110
Show file tree
Hide file tree
Showing 7 changed files with 215 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,25 @@
* Set of callbacks used during the migration in different states.
*/
public interface MigrationActions extends WithArgs {
boolean runDirectoryCompatibilityCheck();

void resetMigration();
boolean isOldClusterStopped();

void migrateIndexTemplates();
void rollingUpgradeSelected();

void migrateWithoutDowntime();
boolean directoryCompatibilityCheckOk();

void migrateWithDowntime();
void reindexUpgradeSelected();

boolean isOldClusterStopped();
boolean reindexingFinished();

void rollingUpgradeSelected();
void reindexOldData();

void stopMessageProcessing();

void startMessageProcessing();
boolean caDoesNotExist();
boolean removalPolicyDoesNotExist();
boolean caAndRemovalPolicyExist();
void resetMigration();
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,28 +34,64 @@ public void resetMigration() {
clusterConfigService.remove(DatanodeMigrationConfiguration.class);
}


@Override
public void migrateIndexTemplates() {
// TODO!
public boolean runDirectoryCompatibilityCheck() {
return false;
}

@Override
public void migrateWithoutDowntime() {
// TODO!
public boolean isOldClusterStopped() {
return false;
}

@Override
public void migrateWithDowntime() {
// TODO!
public void rollingUpgradeSelected() {

}

@Override
public boolean isOldClusterStopped() {
public boolean directoryCompatibilityCheckOk() {
return false;
}

@Override
public void rollingUpgradeSelected() {
public void reindexUpgradeSelected() {

}

@Override
public boolean reindexingFinished() {
return false;
}

@Override
public void reindexOldData() {

}

@Override
public void stopMessageProcessing() {

}

@Override
public void startMessageProcessing() {

}

@Override
public boolean caDoesNotExist() {
return false;
}

@Override
public boolean removalPolicyDoesNotExist() {
return false;
}

@Override
public boolean caAndRemovalPolicyExist() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,22 @@

public enum MigrationState {
NEW,
ROLLING_UPGRADE_MIGRATION_WELCOME,
PROVISION_DATANODE_CERTIFICATES,
EXISTING_DATA_MIGRATION_QUESTION,
MIGRATE_WITH_DOWNTIME_QUESTION,
MIGRATION_WELCOME_PAGE,
SHOW_DIRECTORY_COMPATIBILITY_CHECK,
CA_CREATION_PAGE,
RENEWAL_POLICY_CREATION_PAGE,
MIGRATION_SELECTION_PAGE,
ROLLING_UPGRADE_MIGRATION_WELCOME_PAGE,
REMOTE_REINDEX_WELCOME_PAGE,
PROVISION_DATANODE_CERTIFICATES_PAGE,
EXISTING_DATA_MIGRATION_QUESTION_PAGE,
MIGRATE_EXISTING_DATA,
ASK_TO_SHUTDOWN_OLD_CLUSTER,
MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG,
REMOTE_REINDEX_WELCOME,
DIRECTORY_COMPATIBILITY_CHECK_PAGE,
PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES,
JOURNAL_SIZE_DOWNTIME_WARNING,
MESSAGE_PROCESSING_STOP_REPLACE_CLUSTER_AND_MP_RESTART,
MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG,
FAILED,
FINISHED
DIRECTORY_COMPATIBILITY_CHECK_PAGE2, FINISHED
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,48 +34,71 @@ public class MigrationStateMachineBuilder {
private static StateMachineConfig<MigrationState, MigrationStep> configureStates(MigrationActions migrationActions) {
StateMachineConfig<MigrationState, MigrationStep> config = new StateMachineConfig<>();

// Major decision - remote reindexing or rolling upgrade(in-place)?
config.configure(MigrationState.NEW)
.permit(MigrationStep.SELECT_ROLLING_UPGRADE_MIGRATION, MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME, migrationActions::rollingUpgradeSelected)
.permit(MigrationStep.SELECT_REMOTE_REINDEX_MIGRATION, MigrationState.REMOTE_REINDEX_WELCOME, () -> {
LOG.info("Selected remote reindex migration");
});
.permit(MigrationStep.SELECT_MIGRATION, MigrationState.MIGRATION_WELCOME_PAGE, () -> LOG.info("Migration selected in menu, show welcome page"));

config.configure(MigrationState.MIGRATION_WELCOME_PAGE)
.permit(MigrationStep.SHOW_DIRECTORY_COMPATIBILITY_CHECK, MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE, () -> LOG.info("Showing directory compatibility check page"));

config.configure(MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE)
.onEntry(migrationActions::runDirectoryCompatibilityCheck)
.permitIf(MigrationStep.SHOW_CA_CREATION, MigrationState.CA_CREATION_PAGE, migrationActions::directoryCompatibilityCheckOk)
.permitIf(MigrationStep.SHOW_RENEWAL_POLICY_CREATION, MigrationState.RENEWAL_POLICY_CREATION_PAGE, migrationActions::directoryCompatibilityCheckOk);

config.configure(MigrationState.CA_CREATION_PAGE)
.permitIf(MigrationStep.SHOW_RENEWAL_POLICY_CREATION, MigrationState.RENEWAL_POLICY_CREATION_PAGE, migrationActions::caDoesNotExist)
.permitIf(MigrationStep.SHOW_MIGRATION_SELECTION, MigrationState.MIGRATION_SELECTION_PAGE, migrationActions::caAndRemovalPolicyExist);

config.configure(MigrationState.RENEWAL_POLICY_CREATION_PAGE)
.permitIf(MigrationStep.SHOW_CA_CREATION, MigrationState.CA_CREATION_PAGE, migrationActions::removalPolicyDoesNotExist)
.permitIf(MigrationStep.SHOW_MIGRATION_SELECTION, MigrationState.MIGRATION_SELECTION_PAGE, migrationActions::caAndRemovalPolicyExist);

// Major decision - remote reindexing or rolling upgrade(in-place)?
config.configure(MigrationState.MIGRATION_SELECTION_PAGE)
.permit(MigrationStep.SELECT_ROLLING_UPGRADE_MIGRATION, MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME_PAGE, migrationActions::rollingUpgradeSelected)
.permit(MigrationStep.SELECT_REMOTE_REINDEX_MIGRATION, MigrationState.REMOTE_REINDEX_WELCOME_PAGE, migrationActions::reindexUpgradeSelected);

// remote reindexing branch of the migration
config.configure(MigrationState.REMOTE_REINDEX_WELCOME)
.permit(MigrationStep.DISCOVER_NEW_DATANODES, MigrationState.PROVISION_DATANODE_CERTIFICATES, () -> {
LOG.info("Compatibility check succeeded");
config.configure(MigrationState.REMOTE_REINDEX_WELCOME_PAGE)
.permit(MigrationStep.DISCOVER_NEW_DATANODES, MigrationState.PROVISION_DATANODE_CERTIFICATES_PAGE, () -> {
LOG.info("Remote Reindexing selected");
});

config.configure(MigrationState.PROVISION_DATANODE_CERTIFICATES)
.permit(MigrationStep.MIGRATE_INDEX_TEMPLATES, MigrationState.EXISTING_DATA_MIGRATION_QUESTION, migrationActions::migrateIndexTemplates);
config.configure(MigrationState.PROVISION_DATANODE_CERTIFICATES_PAGE)
.permit(MigrationStep.SHOW_DATA_MIGRATION_QUESTION, MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE);

config.configure(MigrationState.EXISTING_DATA_MIGRATION_QUESTION)
.permit(MigrationStep.MIGRATE_EXISTING_DATA, MigrationState.MIGRATE_WITH_DOWNTIME_QUESTION)
config.configure(MigrationState.EXISTING_DATA_MIGRATION_QUESTION_PAGE)
.permit(MigrationStep.SHOW_MIGRATE_EXISTING_DATA, MigrationState.MIGRATE_EXISTING_DATA)
.permit(MigrationStep.SKIP_EXISTING_DATA_MIGRATION, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER);

config.configure(MigrationState.MIGRATE_WITH_DOWNTIME_QUESTION)
.permit(MigrationStep.MIGRATE_CLUSTER_WITH_DOWNTIME, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::migrateWithDowntime)
.permit(MigrationStep.MIGRATE_CLUSTER_WITHOUT_DOWNTIME, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::migrateWithoutDowntime);
config.configure(MigrationState.MIGRATE_EXISTING_DATA)
.onEntry(migrationActions::reindexOldData)
.permitIf(MigrationStep.SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER, migrationActions::reindexingFinished);

config.configure(MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER)
.permitIf(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG, migrationActions::isOldClusterStopped);


// inplace / rolling upgrade branch of the migration
config.configure(MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME)
.permit(MigrationStep.INSTALL_DATANODES_ON_EVERY_NODE, MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE);

config.configure(MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE)
.permit(MigrationStep.DIRECTORY_COMPATIBILITY_CHECK_OK, MigrationState.PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES);
// in place / rolling upgrade branch of the migration
config.configure(MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME_PAGE)
.permit(MigrationStep.INSTALL_DATANODES_ON_EVERY_NODE, MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE2, () -> LOG.info("Showing directory compatibility check page"));

config.configure(MigrationState.DIRECTORY_COMPATIBILITY_CHECK_PAGE2)
.permit(MigrationStep.SHOW_PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES, MigrationState.PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES, migrationActions::directoryCompatibilityCheckOk);

config.configure(MigrationState.PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES)
.permit(MigrationStep.CALCULATE_JOURNAL_SIZE, MigrationState.JOURNAL_SIZE_DOWNTIME_WARNING);

config.configure(MigrationState.JOURNAL_SIZE_DOWNTIME_WARNING)
.permit(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG);
.permit(MigrationStep.SHOW_STOP_PROCESSING_PAGE, MigrationState.MESSAGE_PROCESSING_STOP_REPLACE_CLUSTER_AND_MP_RESTART);

config.configure(MigrationState.MESSAGE_PROCESSING_STOP_REPLACE_CLUSTER_AND_MP_RESTART)
.onEntry(migrationActions::stopMessageProcessing)
.onExit(migrationActions::startMessageProcessing)
.permit(MigrationStep.SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER, MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER);

config.configure(MigrationState.ASK_TO_SHUTDOWN_OLD_CLUSTER)
.permitIf(MigrationStep.CONFIRM_OLD_CLUSTER_STOPPED, MigrationState.MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG, migrationActions::isOldClusterStopped);

// common cleanup steps
config.configure(MigrationState.MANUALLY_REMOVE_OLD_CONNECTION_STRING_FROM_CONFIG)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,33 @@
package org.graylog.plugins.views.storage.migration.state.machine;

public enum MigrationStep {

SELECT_MIGRATION,
SHOW_DIRECTORY_COMPATIBILITY_CHECK,
SHOW_CA_CREATION,
SHOW_RENEWAL_POLICY_CREATION,
SHOW_MIGRATION_SELECTION,
SHOW_DATA_MIGRATION_QUESTION,
SHOW_MIGRATE_EXISTING_DATA,
SHOW_ASK_TO_SHUTDOWN_OLD_CLUSTER,
SHOW_PROVISION_ROLLING_UPGRADE_NODES_WITH_CERTIFICATES,
SHOW_STOP_PROCESSING_PAGE,
RUN_DIRECTORY_COMPATIBILITY_CHECK,
DIRECTORY_COMPATIBILITY_CHECK_OK,
SELECT_ROLLING_UPGRADE_MIGRATION,
SELECT_REMOTE_REINDEX_MIGRATION,
DISCOVER_NEW_DATANODES,
START_DATANODE_CLUSTER,
CONFIGURE_DATANODE_CLUSTER,
MIGRATE_INDEX_TEMPLATES,
MIGRATE_EXISTING_DATA,
SKIP_EXISTING_DATA_MIGRATION,
MIGRATE_CLUSTER_WITH_DOWNTIME,
MIGRATE_CLUSTER_WITHOUT_DOWNTIME,
INSTALL_DATANODES_ON_EVERY_NODE,
DIRECTORY_COMPATIBILITY_CHECK_OK,
CONFIRM_OLD_CLUSTER_STOPPED,
CALCULATE_JOURNAL_SIZE,
STOP_MESSAGE_PROCESSING,
CONFIRM_OLD_CLUSTER_STOPPED,
START_DATANODE_CLUSTER,
START_MESSAGE_PROCESSING,
CONFIRM_OLD_CONNECTION_STRING_FROM_CONFIG_REMOVED,

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,39 +25,73 @@
import org.graylog.plugins.views.storage.migration.state.machine.MigrationStep;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;


class MigrationStateMachineTest {

private static final Logger LOG = LoggerFactory.getLogger(MigrationStateMachineTest.class);

@Test
void testPersistence() {
final InMemoryStateMachinePersistence persistence = new InMemoryStateMachinePersistence();


final AtomicReference<Map<String, Object>> providedArgs = new AtomicReference<>();
AtomicBoolean directoryCheckTriggered = new AtomicBoolean(false);
AtomicReference<Map<String, Object>> capturedArgs = new AtomicReference<>();

final MigrationActionsAdapter migrationActions = new MigrationActionsAdapter() {

@Override
public void rollingUpgradeSelected() {
providedArgs.set(args());
public boolean runDirectoryCompatibilityCheck() {
capturedArgs.set(args());
return true;
}

@Override
public boolean directoryCompatibilityCheckOk() {
directoryCheckTriggered.set(true);
return true;
}
};

final MigrationStateMachine migrationStateMachine = new MigrationStateMachineProvider(persistence, migrationActions).get();
migrationStateMachine.trigger(MigrationStep.SELECT_ROLLING_UPGRADE_MIGRATION, Collections.singletonMap("foo", "bar"));
migrationStateMachine.trigger(MigrationStep.SELECT_MIGRATION, Collections.emptyMap());
migrationStateMachine.trigger(MigrationStep.SHOW_DIRECTORY_COMPATIBILITY_CHECK, Collections.singletonMap("foo", "bar"));
migrationStateMachine.trigger(MigrationStep.SHOW_CA_CREATION, Collections.emptyMap());


Assertions.assertThat(persistence.getConfiguration())
.isPresent()
.hasValueSatisfying(configuration -> {
Assertions.assertThat(configuration.currentState()).isEqualTo(MigrationState.ROLLING_UPGRADE_MIGRATION_WELCOME);
Assertions.assertThat(configuration.currentState()).isEqualTo(MigrationState.CA_CREATION_PAGE);
});

Assertions.assertThat(providedArgs.get())
Assertions.assertThat(directoryCheckTriggered.get())
.as("Directory check should be triggered during the migration process")
.isEqualTo(true);

System.out.println(migrationStateMachine.serialize());


Assertions.assertThat(capturedArgs.get())
.isNotNull()
.containsEntry("foo", "bar");
}

@Test
void testSerialization() {
final MigrationStateMachine migrationStateMachine = new MigrationStateMachineProvider(new InMemoryStateMachinePersistence(), new MigrationActionsAdapter()).get();
final String serialized = migrationStateMachine.serialize();
Assertions.assertThat(serialized).isNotEmpty().startsWith("digraph G {");
final String fragment = URLEncoder.encode(serialized, StandardCharsets.UTF_8).replace("+", "%20");
LOG.info("Render state machine on " + "https://dreampuf.github.io/GraphvizOnline/#" + fragment);
}
}
Loading

0 comments on commit 7118110

Please sign in to comment.