Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix wlm stats output #16278

Merged
merged 3 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.ResourceNotFoundException;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.cluster.ClusterChangedEvent;
import org.opensearch.cluster.ClusterStateListener;
import org.opensearch.cluster.metadata.Metadata;
Expand Down Expand Up @@ -354,10 +353,6 @@ public void onTaskCompleted(Task task) {
queryGroupId = QueryGroupTask.DEFAULT_QUERY_GROUP_ID_SUPPLIER.get();
}

if (task instanceof SearchShardTask) {
queryGroupsStateAccessor.getQueryGroupState(queryGroupId).shardCompletions.inc();
} else {
queryGroupsStateAccessor.getQueryGroupState(queryGroupId).completions.inc();
}
queryGroupsStateAccessor.getQueryGroupState(queryGroupId).totalCompletions.inc();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,7 @@ public class QueryGroupState {
/**
* co-ordinator level completions at the query group level, this is a cumulative counter since the Opensearch start time
*/
public final CounterMetric completions = new CounterMetric();

/**
* shard level completions at the query group level, this is a cumulative counter since the Opensearch start time
*/
public final CounterMetric shardCompletions = new CounterMetric();
public final CounterMetric totalCompletions = new CounterMetric();

/**
* rejections at the query group level, this is a cumulative counter since the OpenSearch start time
Expand Down Expand Up @@ -61,16 +56,8 @@ public QueryGroupState() {
*
* @return co-ordinator completions in the query group
*/
public long getCompletions() {
return completions.count();
}

/**
*
* @return shard completions in the query group
*/
public long getShardCompletions() {
return shardCompletions.count();
public long getTotalCompletions() {
return totalCompletions.count();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,16 +91,14 @@ public Map<String, QueryGroupStatsHolder> getStats() {
* the instance will only be created on demand through stats api
*/
public static class QueryGroupStatsHolder implements ToXContentObject, Writeable {
public static final String COMPLETIONS = "completions";
public static final String REJECTIONS = "rejections";
public static final String COMPLETIONS = "total_completions";
public static final String REJECTIONS = "total_rejections";
public static final String TOTAL_CANCELLATIONS = "total_cancellations";
public static final String FAILURES = "failures";
public static final String SHARD_COMPLETIONS = "shard_completions";
private long completions;
private long shardCompletions;
private long rejections;
private long failures;
private long totalCancellations;
private long cancellations;
private Map<ResourceType, ResourceStats> resourceStats;

// this is needed to support the factory method
Expand All @@ -110,24 +108,21 @@ public QueryGroupStatsHolder(
long completions,
long rejections,
long failures,
long totalCancellations,
long shardCompletions,
long cancellations,
Map<ResourceType, ResourceStats> resourceStats
) {
this.completions = completions;
this.rejections = rejections;
this.failures = failures;
this.shardCompletions = shardCompletions;
this.totalCancellations = totalCancellations;
this.cancellations = cancellations;
this.resourceStats = resourceStats;
}

public QueryGroupStatsHolder(StreamInput in) throws IOException {
this.completions = in.readVLong();
this.rejections = in.readVLong();
this.failures = in.readVLong();
this.totalCancellations = in.readVLong();
this.shardCompletions = in.readVLong();
this.cancellations = in.readVLong();
this.resourceStats = in.readMap((i) -> ResourceType.fromName(i.readString()), ResourceStats::new);
}

Expand All @@ -145,11 +140,10 @@ public static QueryGroupStatsHolder from(QueryGroupState queryGroupState) {
resourceStatsMap.put(resourceTypeStateEntry.getKey(), ResourceStats.from(resourceTypeStateEntry.getValue()));
}

statsHolder.completions = queryGroupState.getCompletions();
statsHolder.completions = queryGroupState.getTotalCompletions();
statsHolder.rejections = queryGroupState.getTotalRejections();
statsHolder.failures = queryGroupState.getFailures();
statsHolder.totalCancellations = queryGroupState.getTotalCancellations();
statsHolder.shardCompletions = queryGroupState.getShardCompletions();
statsHolder.cancellations = queryGroupState.getTotalCancellations();
statsHolder.resourceStats = resourceStatsMap;
return statsHolder;
}
Expand All @@ -164,8 +158,7 @@ public static void writeTo(StreamOutput out, QueryGroupStatsHolder statsHolder)
out.writeVLong(statsHolder.completions);
out.writeVLong(statsHolder.rejections);
out.writeVLong(statsHolder.failures);
out.writeVLong(statsHolder.totalCancellations);
out.writeVLong(statsHolder.shardCompletions);
out.writeVLong(statsHolder.cancellations);
out.writeMap(statsHolder.resourceStats, (o, val) -> o.writeString(val.getName()), ResourceStats::writeTo);
}

Expand All @@ -177,10 +170,10 @@ public void writeTo(StreamOutput out) throws IOException {
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(COMPLETIONS, completions);
builder.field(SHARD_COMPLETIONS, shardCompletions);
// builder.field(SHARD_COMPLETIONS, shardCompletions);
builder.field(REJECTIONS, rejections);
builder.field(FAILURES, failures);
builder.field(TOTAL_CANCELLATIONS, totalCancellations);
// builder.field(FAILURES, failures);
builder.field(TOTAL_CANCELLATIONS, cancellations);

for (ResourceType resourceType : ResourceType.getSortedValues()) {
ResourceStats resourceStats1 = resourceStats.get(resourceType);
Expand All @@ -199,15 +192,14 @@ public boolean equals(Object o) {
QueryGroupStatsHolder that = (QueryGroupStatsHolder) o;
return completions == that.completions
&& rejections == that.rejections
&& shardCompletions == that.shardCompletions
&& Objects.equals(resourceStats, that.resourceStats)
&& failures == that.failures
&& totalCancellations == that.totalCancellations;
&& cancellations == that.cancellations;
}

@Override
public int hashCode() {
return Objects.hash(completions, shardCompletions, rejections, totalCancellations, failures, resourceStats);
return Objects.hash(completions, rejections, cancellations, failures, resourceStats);
}
}

Expand All @@ -217,6 +209,7 @@ public int hashCode() {
public static class ResourceStats implements ToXContentObject, Writeable {
public static final String CURRENT_USAGE = "current_usage";
public static final String CANCELLATIONS = "cancellations";
public static final String REJECTIONS = "rejections";
public static final double PRECISION = 1e-9;
private final double currentUsage;
private final long cancellations;
Expand Down Expand Up @@ -268,7 +261,7 @@ public void writeTo(StreamOutput out) throws IOException {
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.field(CURRENT_USAGE, currentUsage);
builder.field(CANCELLATIONS, cancellations);
builder.field(QueryGroupStatsHolder.REJECTIONS, rejections);
builder.field(REJECTIONS, rejections);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ public class WlmStatsResponseTests extends OpenSearchTestCase {
0,
1,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand Down Expand Up @@ -78,10 +77,8 @@ public void testToString() {
+ " \"node-1\" : {\n"
+ " \"query_groups\" : {\n"
+ " \"safjgagnaeekg-3r3fads\" : {\n"
+ " \"completions\" : 0,\n"
+ " \"shard_completions\" : 0,\n"
+ " \"rejections\" : 0,\n"
+ " \"failures\" : 1,\n"
+ " \"total_completions\" : 0,\n"
+ " \"total_rejections\" : 0,\n"
+ " \"total_cancellations\" : 0,\n"
+ " \"cpu\" : {\n"
+ " \"current_usage\" : 0.0,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,14 @@ public void testOnTaskCompleted() {
((QueryGroupTask) task).setQueryGroupId(mockThreadPool.getThreadContext());
queryGroupService.onTaskCompleted(task);

assertEquals(1, queryGroupState.completions.count());
assertEquals(1, queryGroupState.totalCompletions.count());

// test non QueryGroupTask
task = new Task(1, "simple", "test", "mock task", null, null);
queryGroupService.onTaskCompleted(task);

// It should still be 1
assertEquals(1, queryGroupState.completions.count());
assertEquals(1, queryGroupState.totalCompletions.count());

mockThreadPool.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,6 @@ public void testValidQueryGroupRequestFailure() throws IOException {
0,
1,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand All @@ -109,7 +108,6 @@ public void testValidQueryGroupRequestFailure() throws IOException {
0,
0,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand Down Expand Up @@ -172,7 +170,6 @@ public void testMultiThreadedValidQueryGroupRequestFailures() {
0,
ITERATIONS,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand All @@ -186,7 +183,6 @@ public void testMultiThreadedValidQueryGroupRequestFailures() {
0,
0,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand All @@ -209,7 +205,6 @@ public void testInvalidQueryGroupFailure() throws IOException {
0,
0,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand All @@ -223,7 +218,6 @@ public void testInvalidQueryGroupFailure() throws IOException {
0,
1,
0,
0,
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(0, 0, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,7 @@ public void testRandomQueryGroupsStateUpdates() {

for (int i = 0; i < 25; i++) {
if (i % 5 == 0) {
updaterThreads.add(new Thread(() -> {
if (randomBoolean()) {
queryGroupState.completions.inc();
} else {
queryGroupState.shardCompletions.inc();
}
}));
updaterThreads.add(new Thread(() -> { queryGroupState.totalCompletions.inc(); }));
} else if (i % 5 == 1) {
updaterThreads.add(new Thread(() -> {
queryGroupState.totalRejections.inc();
Expand Down Expand Up @@ -63,7 +57,7 @@ public void testRandomQueryGroupsStateUpdates() {
}
});

assertEquals(5, queryGroupState.getCompletions() + queryGroupState.getShardCompletions());
assertEquals(5, queryGroupState.getTotalCompletions());
assertEquals(5, queryGroupState.getTotalRejections());

final long sumOfRejectionsDueToResourceTypes = queryGroupState.getResourceState().get(ResourceType.CPU).rejections.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public void testToXContent() throws IOException {
13,
2,
0,
1213718,
Map.of(ResourceType.CPU, new QueryGroupStats.ResourceStats(0.3, 13, 2))
)
);
Expand All @@ -48,7 +47,7 @@ public void testToXContent() throws IOException {
queryGroupStats.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertEquals(
"{\"query_groups\":{\"afakjklaj304041-afaka\":{\"completions\":123456789,\"shard_completions\":1213718,\"rejections\":13,\"failures\":2,\"total_cancellations\":0,\"cpu\":{\"current_usage\":0.3,\"cancellations\":13,\"rejections\":2}}}}",
"{\"query_groups\":{\"afakjklaj304041-afaka\":{\"total_completions\":123456789,\"total_rejections\":13,\"total_cancellations\":0,\"cpu\":{\"current_usage\":0.3,\"cancellations\":13,\"rejections\":2}}}}",
builder.toString()
);
}
Expand All @@ -68,7 +67,6 @@ protected QueryGroupStats createTestInstance() {
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
randomNonNegativeLong(),
Map.of(
ResourceType.CPU,
new QueryGroupStats.ResourceStats(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ public void testToXContent() throws IOException {
13,
2,
0,
1213718,
Map.of(ResourceType.CPU, new QueryGroupStats.ResourceStats(0.3, 13, 2))
)
);
Expand All @@ -50,7 +49,7 @@ public void testToXContent() throws IOException {
wlmStats.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
assertEquals(
"{\"query_groups\":{\"afakjklaj304041-afaka\":{\"completions\":123456789,\"shard_completions\":1213718,\"rejections\":13,\"failures\":2,\"total_cancellations\":0,\"cpu\":{\"current_usage\":0.3,\"cancellations\":13,\"rejections\":2}}}}",
"{\"query_groups\":{\"afakjklaj304041-afaka\":{\"total_completions\":123456789,\"total_rejections\":13,\"total_cancellations\":0,\"cpu\":{\"current_usage\":0.3,\"cancellations\":13,\"rejections\":2}}}}",
builder.toString()
);
}
Expand Down
Loading