Skip to content

Commit

Permalink
[Workload Management] QueryGroup Stats API Logic (#15777)
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
* changelog
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* add tests
Signed-off-by: Ruirui Zhang <[email protected]>

* modify uri
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on comments
Signed-off-by: Ruirui Zhang <[email protected]>

* changelog
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* add tests
Signed-off-by: Ruirui Zhang <[email protected]>

* modify uri
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on comments
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on comments
Signed-off-by: Ruirui Zhang <[email protected]>

* revise
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* changelog
Signed-off-by: Ruirui Zhang <[email protected]>

* address comments
Signed-off-by: Ruirui Zhang <[email protected]>

* add tests
Signed-off-by: Ruirui Zhang <[email protected]>

* modify uri
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on comments
Signed-off-by: Ruirui Zhang <[email protected]>

* modify based on comments
Signed-off-by: Ruirui Zhang <[email protected]>

* git pull
Signed-off-by: Ruirui Zhang <[email protected]>

* rebase
Signed-off-by: Ruirui Zhang <[email protected]>

* encapsulate querygroupstats in wlmstats
Signed-off-by: Ruirui Zhang <[email protected]>

* fix UT
Signed-off-by: Ruirui Zhang <[email protected]>
(cherry picked from commit b2253f1)
  • Loading branch information
ruai0511 committed Oct 10, 2024
1 parent f795fdb commit 7ce85e2
Show file tree
Hide file tree
Showing 19 changed files with 731 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- [Offline Nodes] Adds offline-tasks library containing various interfaces to be used for Offline Background Tasks. ([#13574](https://github.com/opensearch-project/OpenSearch/pull/13574))
- Add support for async deletion in S3BlobContainer ([#15621](https://github.com/opensearch-project/OpenSearch/pull/15621))
- [Workload Management] QueryGroup resource cancellation framework changes ([#15651](https://github.com/opensearch-project/OpenSearch/pull/15651))
- [Workload Management] Add QueryGroup Stats API Logic ([15777](https://github.com/opensearch-project/OpenSearch/pull/15777))
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967), [#16110](https://github.com/opensearch-project/OpenSearch/pull/16110))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.opensearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.opensearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.opensearch.action.admin.cluster.wlm.TransportWlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.WlmStatsAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.TransportIndicesAliasesAction;
Expand Down Expand Up @@ -369,6 +371,7 @@
import org.opensearch.rest.action.admin.cluster.RestRestoreSnapshotAction;
import org.opensearch.rest.action.admin.cluster.RestSnapshotsStatusAction;
import org.opensearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestWlmStatsAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
import org.opensearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
Expand Down Expand Up @@ -614,6 +617,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(WlmStatsAction.INSTANCE, TransportWlmStatsAction.class);
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
Expand Down Expand Up @@ -812,6 +816,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestMainAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestWlmStatsAction());
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.stats.WlmStats;

import java.io.IOException;
import java.util.List;

/**
* Transport action for obtaining WlmStats
*
* @opensearch.experimental
*/
public class TransportWlmStatsAction extends TransportNodesAction<WlmStatsRequest, WlmStatsResponse, WlmStatsRequest, WlmStats> {

final QueryGroupService queryGroupService;

@Inject
public TransportWlmStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
QueryGroupService queryGroupService,
ActionFilters actionFilters
) {
super(
WlmStatsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
WlmStatsRequest::new,
WlmStatsRequest::new,
ThreadPool.Names.MANAGEMENT,
WlmStats.class
);
this.queryGroupService = queryGroupService;
}

@Override
protected WlmStatsResponse newResponse(WlmStatsRequest request, List<WlmStats> wlmStats, List<FailedNodeException> failures) {
return new WlmStatsResponse(clusterService.getClusterName(), wlmStats, failures);
}

@Override
protected WlmStatsRequest newNodeRequest(WlmStatsRequest request) {
return request;
}

@Override
protected WlmStats newNodeResponse(StreamInput in) throws IOException {
return new WlmStats(in);
}

@Override
protected WlmStats nodeOperation(WlmStatsRequest wlmStatsRequest) {
assert transportService.getLocalNode() != null;
return new WlmStats(
transportService.getLocalNode(),
queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach())
);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.ActionType;

/**
* Transport action for obtaining Workload Management Stats.
*
* @opensearch.experimental
*/
public class WlmStatsAction extends ActionType<WlmStatsResponse> {
public static final WlmStatsAction INSTANCE = new WlmStatsAction();
public static final String NAME = "cluster:monitor/wlm/stats";

private WlmStatsAction() {
super(NAME, WlmStatsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

/**
* A request to get Workload Management Stats
*/
@ExperimentalApi
public class WlmStatsRequest extends BaseNodesRequest<WlmStatsRequest> {

private final Set<String> queryGroupIds;
private final Boolean breach;

public WlmStatsRequest(StreamInput in) throws IOException {
super(in);
this.queryGroupIds = new HashSet<>(Set.of(in.readStringArray()));
this.breach = in.readOptionalBoolean();
}

/**
* Get QueryGroup stats from nodes based on the nodes ids specified. If none are passed, stats
* for all nodes will be returned.
*/
public WlmStatsRequest(String[] nodesIds, Set<String> queryGroupIds, Boolean breach) {
super(false, nodesIds);
this.queryGroupIds = queryGroupIds;
this.breach = breach;
}

public WlmStatsRequest() {
super(false, (String[]) null);
queryGroupIds = new HashSet<>();
this.breach = false;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeStringArray(queryGroupIds.toArray(new String[0]));
out.writeOptionalBoolean(breach);
}

public Set<String> getQueryGroupIds() {
return queryGroupIds;
}

public Boolean isBreach() {
return breach;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.wlm.stats.QueryGroupStats;
import org.opensearch.wlm.stats.WlmStats;

import java.io.IOException;
import java.util.List;

/**
* A response for obtaining Workload Management Stats
*/
@ExperimentalApi
public class WlmStatsResponse extends BaseNodesResponse<WlmStats> implements ToXContentFragment {

WlmStatsResponse(StreamInput in) throws IOException {
super(in);
}

WlmStatsResponse(ClusterName clusterName, List<WlmStats> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
protected List<WlmStats> readNodesFrom(StreamInput in) throws IOException {
return in.readList(WlmStats::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<WlmStats> nodes) throws IOException {
out.writeList(nodes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (WlmStats wlmStats : getNodes()) {
builder.startObject(wlmStats.getNode().getId());
QueryGroupStats queryGroupStats = wlmStats.getQueryGroupStats();
queryGroupStats.toXContent(builder, params);
builder.endObject();
}
return builder;
}

@Override
public String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.toString();
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** WlmStats transport handlers. */
package org.opensearch.action.admin.cluster.wlm;
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse;
import org.opensearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
Expand Down Expand Up @@ -320,6 +322,13 @@ public interface ClusterAdminClient extends OpenSearchClient {
*/
NodesStatsRequestBuilder prepareNodesStats(String... nodesIds);

/**
* QueryGroup stats of the cluster.
* @param request The wlmStatsRequest
* @param listener A listener to be notified with a result
*/
void wlmStats(WlmStatsRequest request, ActionListener<WlmStatsResponse> listener);

void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener<RemoteStoreStatsResponse> listener);

RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.WlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsResponse;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand Down Expand Up @@ -892,6 +895,11 @@ public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) {
return new NodesStatsRequestBuilder(this, NodesStatsAction.INSTANCE).setNodesIds(nodesIds);
}

@Override
public void wlmStats(final WlmStatsRequest request, final ActionListener<WlmStatsResponse> listener) {
execute(WlmStatsAction.INSTANCE, request, listener);
}

@Override
public void remoteStoreStats(final RemoteStoreStatsRequest request, final ActionListener<RemoteStoreStatsResponse> listener) {
execute(RemoteStoreStatsAction.INSTANCE, request, listener);
Expand Down
12 changes: 6 additions & 6 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -353,12 +353,12 @@ public class Node implements Closeable {
);

/**
* controls whether the node is allowed to persist things like metadata to disk
* Note that this does not control whether the node stores actual indices (see
* {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING}
* and {@link #NODE_MASTER_SETTING} must also be false.
*
*/
* controls whether the node is allowed to persist things like metadata to disk
* Note that this does not control whether the node stores actual indices (see
* {@link #NODE_DATA_SETTING}). However, if this is false, {@link #NODE_DATA_SETTING}
* and {@link #NODE_MASTER_SETTING} must also be false.
*
*/
public static final Setting<Boolean> NODE_LOCAL_STORAGE_SETTING = Setting.boolSetting(
"node.local_storage",
true,
Expand Down
Loading

0 comments on commit 7ce85e2

Please sign in to comment.