Skip to content

Commit

Permalink
Proper cancellation for multi-query search (#21183)
Browse files Browse the repository at this point in the history
  • Loading branch information
luk-kaminski authored Dec 17, 2024
1 parent 6820272 commit 63e03d9
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public QueryResult doRun(SearchJob job, Query query, ESGeneratedQueryContext que

//ES does not support per-request cancel_after_time_interval. We have to use simplified solution - the whole multi-search will be cancelled if it takes more than configured max. exec. time.
final PlainActionFuture<MultiSearchResponse> mSearchFuture = client.cancellableMsearch(searches);
job.setSearchEngineTaskFuture(mSearchFuture);
job.setQueryExecutionFuture(query.id(), mSearchFuture);
final List<MultiSearchResponse.Item> results = getResults(mSearchFuture, job.getCancelAfterSeconds(), searches.size());

for (SearchType searchType : query.searchTypes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ public QueryResult doRun(SearchJob job, Query query, OSGeneratedQueryContext que
.toList();

final PlainActionFuture<MultiSearchResponse> mSearchFuture = client.cancellableMsearch(searches);
job.setSearchEngineTaskFuture(mSearchFuture);
job.setQueryExecutionFuture(query.id(), mSearchFuture);
final List<MultiSearchResponse.Item> results = getResults(mSearchFuture, searches.size());

for (SearchType searchType : query.searchTypes()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@
import org.graylog.plugins.views.search.errors.SearchError;
import org.graylog.plugins.views.search.rest.ExecutionInfo;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand All @@ -46,7 +48,7 @@ public class SearchJob implements ParameterProvider {

private final Search search;

private Future<?> searchEngineTaskFuture;
private Map<String, Future<?>> queryExecutionFutures;

private CompletableFuture<Void> resultFuture;

Expand All @@ -72,6 +74,7 @@ public SearchJob(String id,
this.search = search;
this.searchJobIdentifier = new SearchJobIdentifier(id, search.id(), owner, executingNodeId);
this.cancelAfterSeconds = cancelAfterSeconds != null ? cancelAfterSeconds : NO_CANCELLATION;
this.queryExecutionFutures = new HashMap<>();
}

@JsonIgnore //covered by @JsonUnwrapped
Expand Down Expand Up @@ -120,14 +123,14 @@ public void addQueryResultFuture(String queryId, CompletableFuture<QueryResult>
}

@JsonIgnore
public void setSearchEngineTaskFuture(final Future<?> searchEngineTaskFuture) {
this.searchEngineTaskFuture = searchEngineTaskFuture;
public void setQueryExecutionFuture(final String queryId, final Future<?> future) {
this.queryExecutionFutures.put(queryId, future);
}

public void cancel() {
if (this.searchEngineTaskFuture != null) {
this.searchEngineTaskFuture.cancel(true);
}
this.queryExecutionFutures.values().stream()
.filter(Objects::nonNull)
.forEach(f -> f.cancel(true));
}

@JsonProperty("results")
Expand All @@ -141,8 +144,8 @@ public Map<String, QueryResult> results() {

@JsonProperty("execution")
public ExecutionInfo execution() {
final boolean isDone = (resultFuture == null || resultFuture.isDone()) && (searchEngineTaskFuture == null || searchEngineTaskFuture.isDone());
final boolean isCancelled = (searchEngineTaskFuture != null && searchEngineTaskFuture.isCancelled()) || (resultFuture != null && resultFuture.isCancelled());
final boolean isDone = (resultFuture == null || resultFuture.isDone()) && (queryExecutionFutures.values().stream().allMatch(f -> f == null || f.isDone()));
final boolean isCancelled = (queryExecutionFutures.values().stream().allMatch(f -> f != null && f.isCancelled()) || (resultFuture != null && resultFuture.isCancelled()));
return new ExecutionInfo(isDone, isCancelled, !errors.isEmpty());
}

Expand Down

0 comments on commit 63e03d9

Please sign in to comment.