Skip to content

Commit

Permalink
fix backward compatibility issue
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Oct 10, 2024
1 parent 7ce85e2 commit c169973
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.BaseNodeRequest;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupService;
Expand All @@ -27,7 +30,11 @@
*
* @opensearch.experimental
*/
public class TransportWlmStatsAction extends TransportNodesAction<WlmStatsRequest, WlmStatsResponse, WlmStatsRequest, WlmStats> {
public class TransportWlmStatsAction extends TransportNodesAction<
WlmStatsRequest,
WlmStatsResponse,
TransportWlmStatsAction.NodeWlmStatsRequest,
WlmStats> {

final QueryGroupService queryGroupService;

Expand All @@ -46,7 +53,7 @@ public TransportWlmStatsAction(
transportService,
actionFilters,
WlmStatsRequest::new,
WlmStatsRequest::new,
NodeWlmStatsRequest::new,
ThreadPool.Names.MANAGEMENT,
WlmStats.class
);
Expand All @@ -59,8 +66,8 @@ protected WlmStatsResponse newResponse(WlmStatsRequest request, List<WlmStats> w
}

@Override
protected WlmStatsRequest newNodeRequest(WlmStatsRequest request) {
return request;
protected NodeWlmStatsRequest newNodeRequest(WlmStatsRequest request) {
return new NodeWlmStatsRequest(request);
}

@Override
Expand All @@ -69,11 +76,38 @@ protected WlmStats newNodeResponse(StreamInput in) throws IOException {
}

@Override
protected WlmStats nodeOperation(WlmStatsRequest wlmStatsRequest) {
protected WlmStats nodeOperation(NodeWlmStatsRequest nodeWlmStatsRequest) {
assert transportService.getLocalNode() != null;
return new WlmStats(
transportService.getLocalNode(),
queryGroupService.nodeStats(wlmStatsRequest.getQueryGroupIds(), wlmStatsRequest.isBreach())
);
WlmStatsRequest request = nodeWlmStatsRequest.request;
return new WlmStats(transportService.getLocalNode(), queryGroupService.nodeStats(request.getQueryGroupIds(), request.isBreach()));
}

/**
* Inner WlmStatsRequest
*
* @opensearch.experimental
*/
public static class NodeWlmStatsRequest extends BaseNodeRequest {

protected WlmStatsRequest request;

public NodeWlmStatsRequest(StreamInput in) throws IOException {
super(in);
request = new WlmStatsRequest(in);
}

NodeWlmStatsRequest(WlmStatsRequest request) {
this.request = request;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}

public DiscoveryNode[] getDiscoveryNodes() {
return this.request.concreteNodes();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.action.support.nodes;

import org.opensearch.action.admin.cluster.wlm.TransportWlmStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportWlmStatsAction.NodeWlmStatsRequest;
import org.opensearch.action.admin.cluster.wlm.WlmStatsRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.PlainActionFuture;
Expand All @@ -34,16 +35,16 @@ public class TransportWlmStatsActionTests extends TransportNodesActionTests {
*/
public void testWlmStatsActionWithRetentionOfDiscoveryNodesList() {
WlmStatsRequest request = new WlmStatsRequest();
Map<String, List<WlmStatsRequest>> combinedSentRequest = performWlmStatsAction(request);
Map<String, List<NodeWlmStatsRequest>> combinedSentRequest = performWlmStatsAction(request);

assertNotNull(combinedSentRequest);
combinedSentRequest.forEach((node, capturedRequestList) -> {
assertNotNull(capturedRequestList);
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.concreteNodes()); });
capturedRequestList.forEach(sentRequest -> { assertNull(sentRequest.getDiscoveryNodes()); });
});
}

private Map<String, List<WlmStatsRequest>> performWlmStatsAction(WlmStatsRequest request) {
private Map<String, List<NodeWlmStatsRequest>> performWlmStatsAction(WlmStatsRequest request) {
TransportNodesAction action = new TransportWlmStatsAction(
THREAD_POOL,
clusterService,
Expand All @@ -54,18 +55,19 @@ private Map<String, List<WlmStatsRequest>> performWlmStatsAction(WlmStatsRequest
PlainActionFuture<WlmStatsRequest> listener = new PlainActionFuture<>();
action.new AsyncAction(null, request, listener).start();
Map<String, List<CapturingTransport.CapturedRequest>> capturedRequests = transport.getCapturedRequestsByTargetNodeAndClear();
Map<String, List<WlmStatsRequest>> combinedSentRequest = new HashMap<>();
Map<String, List<NodeWlmStatsRequest>> combinedSentRequest = new HashMap<>();

capturedRequests.forEach((node, capturedRequestList) -> {
List<WlmStatsRequest> sentRequestList = new ArrayList<>();
List<NodeWlmStatsRequest> sentRequestList = new ArrayList<>();

capturedRequestList.forEach(preSentRequest -> {
BytesStreamOutput out = new BytesStreamOutput();
try {
WlmStatsRequest wlmStatsRequestFromCoordinator = (WlmStatsRequest) preSentRequest.request;
NodeWlmStatsRequest wlmStatsRequestFromCoordinator =
(NodeWlmStatsRequest) preSentRequest.request;
wlmStatsRequestFromCoordinator.writeTo(out);
StreamInput in = out.bytes().streamInput();
WlmStatsRequest wlmStatsRequest = new WlmStatsRequest(in);
NodeWlmStatsRequest wlmStatsRequest = new NodeWlmStatsRequest(in);
sentRequestList.add(wlmStatsRequest);
} catch (IOException e) {
throw new RuntimeException(e);
Expand Down

0 comments on commit c169973

Please sign in to comment.