Skip to content

Commit

Permalink
Bulk actions for data node (#18121)
Browse files Browse the repository at this point in the history
* add action queue for data nodes and queued removal

* implement wait for completed bus event

* add change log

* add queue status to frontend

* add bulk start/stop

* adjust test for removing multiple nodes

* add queue field to entity test

* integrated bulk action endpoints

* action_queue status styling

* cleanup mocks

* add tests for removing multiple

* fixed requested wordings and typos

---------

Co-authored-by: Mohamed Ould Hocine <[email protected]>
Co-authored-by: Mohamed OULD HOCINE <[email protected]>
Co-authored-by: Laura <[email protected]>
  • Loading branch information
4 people authored Feb 5, 2024
1 parent 64a3b0d commit 76ee116
Show file tree
Hide file tree
Showing 22 changed files with 508 additions and 59 deletions.
6 changes: 6 additions & 0 deletions changelog/unreleased/pr-18121.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
type = "a"
message = "add bulk management capabilities for data nodes"

issues = ["17732"]
pulls = ["18121"]

Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Copyright (C) 2020 Graylog, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*/
package org.graylog.datanode.management;

import org.graylog.datanode.process.ProcessEvent;
import org.graylog.datanode.process.ProcessState;
import org.graylog.datanode.process.StateMachineTracer;
import org.graylog2.cluster.NodeNotFoundException;
import org.graylog2.cluster.nodes.DataNodeDto;
import org.graylog2.cluster.nodes.NodeService;
import org.graylog2.datanode.DataNodeLifecycleTrigger;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ClusterNodeStateTracer implements StateMachineTracer {

private final Logger log = LoggerFactory.getLogger(ClusterNodeStateTracer.class);

private final NodeService<DataNodeDto> nodeService;
private final NodeId nodeId;

public ClusterNodeStateTracer(NodeService<DataNodeDto> nodeService, NodeId nodeId) {
this.nodeService = nodeService;
this.nodeId = nodeId;
}

@Override
public void trigger(ProcessEvent processEvent) {
}

@Override
public void transition(ProcessEvent processEvent, ProcessState source, ProcessState destination) {
try {
if (!source.equals(destination)) {
log.info("Updating cluster node {} from {} to {} (reason: {})", nodeId.getNodeId(),
source.getDataNodeStatus(), destination.getDataNodeStatus(), processEvent.name());
DataNodeDto node = nodeService.byNodeId(nodeId);
nodeService.update(node.toBuilder()
.setDataNodeStatus(destination.getDataNodeStatus())
.setActionQueue(DataNodeLifecycleTrigger.CLEAR)
.build());
}
} catch (NodeNotFoundException e) {
throw new RuntimeException("Node not registered, this should not happen.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.graylog2.cluster.preflight.DataNodeProvisioningStateChangeEvent;
import org.graylog2.datanode.DataNodeLifecycleEvent;
import org.graylog2.datanode.RemoteReindexAllowlistEvent;
import org.graylog2.events.ClusterEventBus;
import org.graylog2.indexer.fieldtypes.IndexFieldTypesService;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.security.CustomCAX509TrustManager;
Expand All @@ -53,11 +54,14 @@ public class OpensearchProcessService extends AbstractIdleService implements Pro
private final OpensearchProcess process;
private final Provider<OpensearchConfiguration> configurationProvider;
private final EventBus eventBus;
private final NodeService<DataNodeDto> nodeService;
private final NodeId nodeId;
private final DataNodeProvisioningService dataNodeProvisioningService;
private final IndexFieldTypesService indexFieldTypesService;
private final ObjectMapper objectMapper;
private final ProcessStateMachine processStateMachine;
private final ClusterEventBus clusterEventBus;


@Inject
public OpensearchProcessService(final DatanodeConfiguration datanodeConfiguration,
Expand All @@ -70,14 +74,17 @@ public OpensearchProcessService(final DatanodeConfiguration datanodeConfiguratio
final NodeId nodeId,
final IndexFieldTypesService indexFieldTypesService,
final ObjectMapper objectMapper,
final ProcessStateMachine processStateMachine) {
final ProcessStateMachine processStateMachine,
final ClusterEventBus clusterEventBus) {
this.configurationProvider = configurationProvider;
this.eventBus = eventBus;
this.nodeService = nodeService;
this.nodeId = nodeId;
this.dataNodeProvisioningService = dataNodeProvisioningService;
this.objectMapper = objectMapper;
this.indexFieldTypesService = indexFieldTypesService;
this.processStateMachine = processStateMachine;
this.clusterEventBus = clusterEventBus;
this.process = createOpensearchProcess(datanodeConfiguration, trustManager, configuration, nodeService, objectMapper, processStateMachine);
eventBus.register(this);
}
Expand All @@ -88,8 +95,9 @@ private OpensearchProcess createOpensearchProcess(final DatanodeConfiguration da
final ProcessWatchdog watchdog = new ProcessWatchdog(process, WATCHDOG_RESTART_ATTEMPTS);
process.addStateMachineTracer(watchdog);
process.addStateMachineTracer(new StateMachineTransitionLogger());
process.addStateMachineTracer(new OpensearchRemovalTracer(process, configuration.getDatanodeNodeName()));
process.addStateMachineTracer(new OpensearchRemovalTracer(process, configuration.getDatanodeNodeName(), nodeId, clusterEventBus));
process.addStateMachineTracer(new ConfigureMetricsIndexSettings(process, configuration, indexFieldTypesService, objectMapper));
process.addStateMachineTracer(new ClusterNodeStateTracer(nodeService, nodeId));
return process;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.graylog.datanode.management;

import com.google.common.eventbus.EventBus;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.graylog.datanode.process.ProcessEvent;
import org.graylog.datanode.process.ProcessState;
Expand All @@ -31,6 +32,9 @@
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
import org.graylog2.datanode.DataNodeLifecycleEvent;
import org.graylog2.datanode.DataNodeLifecycleTrigger;
import org.graylog2.plugin.system.NodeId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -52,12 +56,16 @@ public class OpensearchRemovalTracer implements StateMachineTracer {

private final OpensearchProcess process;
private final String nodeName;
private final NodeId nodeId;
private final EventBus eventBus;
boolean allocationExcludeChecked = false;
ScheduledExecutorService executorService;

public OpensearchRemovalTracer(OpensearchProcess process, String nodeName) {
public OpensearchRemovalTracer(OpensearchProcess process, String nodeName, NodeId nodeId, EventBus eventBus) {
this.process = process;
this.nodeName = nodeName;
this.nodeId = nodeId;
this.eventBus = eventBus;
}


Expand Down Expand Up @@ -130,6 +138,7 @@ void checkRemovalStatus() {
if (health.getRelocatingShards() == 0) {
process.stop();
executorService.shutdown();
eventBus.post(DataNodeLifecycleEvent.create(nodeId.getNodeId(), DataNodeLifecycleTrigger.REMOVED));
}
} catch (IOException | OpenSearchStatusException e) {
process.onEvent(ProcessEvent.HEALTH_CHECK_FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.graylog.datanode.management;

import com.google.common.eventbus.EventBus;
import org.graylog.datanode.process.ProcessEvent;
import org.graylog.datanode.process.ProcessState;
import org.graylog.shaded.opensearch2.org.opensearch.action.admin.cluster.health.ClusterHealthResponse;
Expand All @@ -26,6 +27,8 @@
import org.graylog.shaded.opensearch2.org.opensearch.client.RequestOptions;
import org.graylog.shaded.opensearch2.org.opensearch.client.RestHighLevelClient;
import org.graylog.shaded.opensearch2.org.opensearch.common.settings.Settings;
import org.graylog2.plugin.system.NodeId;
import org.graylog2.plugin.system.SimpleNodeId;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -53,18 +56,21 @@ public class OpensearchRemovalTracerTest {

private OpensearchRemovalTracer classUnderTest;
private final String NODENAME = "datanode1";
private final NodeId nodeId = new SimpleNodeId(NODENAME);
@Mock
private OpensearchProcess process;
@Mock
RestHighLevelClient restClient;
@Mock
ClusterClient clusterClient;
@Mock
EventBus eventBus;

@Before
public void setUp() {
when(process.restClient()).thenReturn(Optional.of(restClient));
when(restClient.cluster()).thenReturn(clusterClient);
this.classUnderTest = new OpensearchRemovalTracer(process, NODENAME);
this.classUnderTest = new OpensearchRemovalTracer(process, NODENAME, nodeId, eventBus);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.WriteResult;
import jakarta.inject.Inject;
import org.bson.types.ObjectId;
import org.graylog2.Configuration;
import org.graylog2.cluster.Node;
Expand All @@ -31,8 +32,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import jakarta.inject.Inject;

import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Iterator;
Expand Down Expand Up @@ -228,4 +227,12 @@ public void ping(NodeDto dto) {
LOG.warn("Caught exception during node ping.", e);
}
}

@Override
public void update(NodeDto dto) {
BasicDBObject query = new BasicDBObject("node_id", dto.getNodeId());
final BasicDBObject update = new BasicDBObject(Map.of("$set", dto.toEntityParameters()));
super.collection(nodeClass).update(query, update);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import com.google.auto.value.AutoValue;
import org.graylog.security.certutil.CertRenewalService;
import org.graylog2.datanode.DataNodeLifecycleTrigger;

import javax.annotation.Nullable;
import java.util.Map;
import java.util.Objects;

@AutoValue
@JsonIgnoreProperties(ignoreUnknown = true)
Expand All @@ -45,16 +47,33 @@ public abstract class DataNodeDto extends NodeDto {
@JsonProperty("data_node_status")
public abstract DataNodeStatus getDataNodeStatus();

@Nullable
@JsonProperty("action_queue")
public abstract DataNodeLifecycleTrigger getActionQueue();

@Nullable
@JsonUnwrapped
public abstract CertRenewalService.ProvisioningInformation getProvisioningInformation();

@Override
public Map<String, Object> toEntityParameters() {
final Map<String, Object> params = super.toEntityParameters();
params.put("cluster_address", getClusterAddress());
params.put("rest_api_address", getRestApiAddress());
params.put("datanode_status", getDataNodeStatus());
if (Objects.nonNull(getClusterAddress())) {
params.put("cluster_address", getClusterAddress());
}
if (Objects.nonNull(getRestApiAddress())) {
params.put("rest_api_address", getRestApiAddress());
}
if (Objects.nonNull(getDataNodeStatus())) {
params.put("datanode_status", getDataNodeStatus());
}
if (Objects.nonNull(getActionQueue())) {
if (getActionQueue() == DataNodeLifecycleTrigger.CLEAR) {
params.put("action_queue", null);
} else {
params.put("action_queue", getActionQueue());
}
}
return params;
}

Expand All @@ -78,8 +97,12 @@ public static Builder builder() {
@JsonProperty("datanode_status")
public abstract Builder setDataNodeStatus(DataNodeStatus dataNodeStatus);

@JsonProperty("action_queue")
public abstract Builder setActionQueue(DataNodeLifecycleTrigger trigger);

public abstract Builder setProvisioningInformation(CertRenewalService.ProvisioningInformation provisioningInformation);


public abstract DataNodeDto build();


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import org.bson.types.ObjectId;
import org.graylog2.database.DbEntity;
import org.graylog2.datanode.DataNodeLifecycleTrigger;

import java.util.Map;
import java.util.Objects;

@DbEntity(collection = "datanodes", titleField = "node_id")
public class DataNodeEntity extends AbstractNode<DataNodeDto> {
Expand All @@ -42,6 +44,13 @@ public String getRestApiAddress() {
return (String) fields.get("rest_api_address");
}

public DataNodeLifecycleTrigger getActionQueue() {
if (!fields.containsKey("action_queue") || Objects.isNull(fields.get("action_queue"))) {
return null;
}
return DataNodeLifecycleTrigger.valueOf(fields.get("action_queue").toString());
}

public DataNodeStatus getDataNodeStatus() {
if (!fields.containsKey("datanode_status")) {
return null;
Expand All @@ -61,6 +70,7 @@ public DataNodeDto toDto() {
.setClusterAddress(this.getClusterAddress())
.setDataNodeStatus(this.getDataNodeStatus())
.setRestApiAddress(this.getRestApiAddress())
.setActionQueue(this.getActionQueue())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

public abstract class NodeDto implements Node {

Expand Down Expand Up @@ -64,7 +65,9 @@ public Map<String, Object> toEntityParameters() {
params.put("node_id", getNodeId());
params.put("transport_address", getTransportAddress());
params.put("is_leader", isLeader());
params.put("hostname", getHostname());
if (Objects.nonNull(getHostname())) {
params.put("hostname", getHostname());
}
return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,4 +48,6 @@ public interface NodeService<T extends NodeDto> {
* @param dto Dto of the node to be marked as alive
*/
void ping(NodeDto dto);

void update(NodeDto dto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
package org.graylog2.datanode;

public enum DataNodeLifecycleTrigger {
REMOVE, RESET, STOP, START
REMOVE, RESET, STOP, START, REMOVED, STOPPED, STARTED, CLEAR
}
Loading

0 comments on commit 76ee116

Please sign in to comment.