Skip to content

Commit

Permalink
Perform buildAggregation concurrently
Browse files Browse the repository at this point in the history
Signed-off-by: Jay Deng <[email protected]>
  • Loading branch information
jed326 authored and Jay Deng committed Mar 18, 2024
1 parent 1f5df54 commit f5f34f2
Show file tree
Hide file tree
Showing 10 changed files with 150 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import com.carrotsearch.randomizedtesting.annotations.ParametersFactory;

import org.opensearch.action.admin.indices.segments.IndicesSegmentResponse;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
Expand All @@ -26,6 +27,7 @@
import java.util.Collection;
import java.util.List;

import static org.opensearch.indices.IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING;
import static org.opensearch.search.SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse;
Expand All @@ -50,26 +52,31 @@ public void setupSuiteScopeCluster() throws Exception {
assertAcked(
prepareCreate(
"idx",
Settings.builder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), false)
).setMapping("type", "type=keyword", "num", "type=integer", "score", "type=integer")
);
waitForRelocation(ClusterHealthStatus.GREEN);

client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15").get();
refresh("idx");
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1").get();
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100").get();
refresh("idx");
indexRandom(
true,
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "5"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "11", "score", "50"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "1", "score", "2"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "12", "score", "20"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "10"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "15"),
client().prepareIndex("idx").setId("1").setSource("type", "type1", "num", "3", "score", "1"),
client().prepareIndex("idx").setId("1").setSource("type", "type2", "num", "13", "score", "100")
);

waitForRelocation(ClusterHealthStatus.GREEN);
refresh();

IndicesSegmentResponse segmentResponse = client().admin().indices().prepareSegments("idx").get();
System.out.println("Segments: " + segmentResponse.getIndices().get("idx").getShards().get(0).getShards()[0].getSegments().size());
}

public void testCompositeAggWithNoSubAgg() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
import org.opensearch.search.query.ReduceableSearchResult;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;

/**
* Common {@link CollectorManager} used by both concurrent and non-concurrent aggregation path and also for global and non-global
Expand Down Expand Up @@ -56,17 +56,9 @@ public String getCollectorReason() {

@Override
public ReduceableSearchResult reduce(Collection<Collector> collectors) throws IOException {
final List<Aggregator> aggregators = context.bucketCollectorProcessor().toAggregators(collectors);
final List<InternalAggregation> internals = new ArrayList<>(aggregators.size());
List<InternalAggregation> internals = context.bucketCollectorProcessor().toInternalAggregations(collectors);
assert !internals.stream().anyMatch(Objects::isNull);
context.aggregations().resetBucketMultiConsumer();
for (Aggregator aggregator : aggregators) {
try {
// post collection is called in ContextIndexSearcher after search on leaves are completed
internals.add(aggregator.buildTopLevel());
} catch (IOException e) {
throw new AggregationExecutionException("Failed to build aggregation [" + aggregator.name() + "]", e);
}
}

final InternalAggregations internalAggregations = InternalAggregations.from(internals);
return buildAggregationResult(internalAggregations);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
package org.opensearch.search.aggregations;

import org.opensearch.OpenSearchParseException;
import org.opensearch.common.SetOnce;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lease.Releasable;
import org.opensearch.core.ParseField;
Expand Down Expand Up @@ -61,6 +62,8 @@
@PublicApi(since = "1.0.0")
public abstract class Aggregator extends BucketCollector implements Releasable {

final SetOnce<InternalAggregation> internalAggregation = new SetOnce<>();

/**
* Parses the aggregation request and creates the appropriate aggregator factory for it.
*
Expand All @@ -83,6 +86,14 @@ public interface Parser {
AggregationBuilder parse(String aggregationName, XContentParser parser) throws IOException;
}

public void setInternalAggregation(InternalAggregation internalAggregation) {
this.internalAggregation.set(internalAggregation);
}

public InternalAggregation getInternalAggregation() {
return internalAggregation.get();
}

/**
* Return the name of this aggregator.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ public void postCollection() throws IOException {
collectableSubAggregators.postCollection();
}

public void buildAndSetInternalAggregation() throws IOException {
// Only call buildTopLevel for top level aggregators. This will subsequently build aggregations for child aggs.
if (parent == null) {
setInternalAggregation(buildTopLevel());
}
}

/** Called upon release of the aggregator. */
@Override
public void close() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lucene.MinimumScoreCollector;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.search.profile.aggregation.ProfilingAggregator;
import org.opensearch.search.profile.query.InternalProfileCollector;

import java.io.IOException;
Expand All @@ -22,6 +23,7 @@
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;

/**
Expand Down Expand Up @@ -63,6 +65,7 @@ public void processPostCollection(Collector collectorTree) throws IOException {
while (!collectors.isEmpty()) {
Collector currentCollector = collectors.poll();
if (currentCollector instanceof InternalProfileCollector) {
// Profile collector should be the top level one so we should be able to call buildAggregation on it here
collectors.offer(((InternalProfileCollector) currentCollector).getCollector());
} else if (currentCollector instanceof MinimumScoreCollector) {
collectors.offer(((MinimumScoreCollector) currentCollector).getCollector());
Expand All @@ -72,6 +75,19 @@ public void processPostCollection(Collector collectorTree) throws IOException {
}
} else if (currentCollector instanceof BucketCollector) {
((BucketCollector) currentCollector).postCollection();

// Perform build aggregation during post collection
if (currentCollector instanceof AggregatorBase) {
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
} else if (currentCollector instanceof ProfilingAggregator) {
((ProfilingAggregator) currentCollector).setInternalAggregation(
((ProfilingAggregator) currentCollector).buildTopLevel()
);
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
collectors.offer(innerCollector);
}
}
}
}
}
Expand Down Expand Up @@ -106,4 +122,64 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
}
return aggregators;
}

/**
* Unwraps the input collection of {@link Collector} to get the list of the {@link InternalAggregation}. The
* input is expected to contain the collectors related to Aggregations only as that is passed to {@link AggregationCollectorManager}
* during the reduce phase. This list of {@link InternalAggregation} is used to optionally perform reduce at shard level before
* returning response to coordinator
* @param collectors collection of aggregation collectors to reduce
* @return list of unwrapped {@link InternalAggregation}
*/
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) throws IOException {
List<InternalAggregation> internalAggregations = new ArrayList<>();

final Deque<Collector> allCollectors = new LinkedList<>(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof AggregatorBase) {
internalAggregations.add(((AggregatorBase) currentCollector).getInternalAggregation());
} else if (currentCollector instanceof ProfilingAggregator) {
internalAggregations.add(((ProfilingAggregator) currentCollector).getInternalAggregation());
} else if (currentCollector instanceof InternalProfileCollector) {
if (((InternalProfileCollector) currentCollector).getCollector() instanceof AggregatorBase) {
internalAggregations.add(
((AggregatorBase) ((InternalProfileCollector) currentCollector).getCollector()).getInternalAggregation()
);
} else if (((InternalProfileCollector) currentCollector).getCollector() instanceof MultiBucketCollector) {
allCollectors.addAll(
Arrays.asList(((MultiBucketCollector) ((InternalProfileCollector) currentCollector).getCollector()).getCollectors())
);
}
} else if (currentCollector instanceof MultiBucketCollector) {
allCollectors.addAll(Arrays.asList(((MultiBucketCollector) currentCollector).getCollectors()));
}
}

// Check that internalAggregations does not contain any null objects as that means postCollection was not called for a given
// collector. This can happen as collect will not get called whenever there are no leaves on a shard. Since we build the
// InternalAggregation in postCollection that will not get called in such cases either. Therefore we need to manually call it again
// here to build empty InternalAggregation objects for this collector tree.
if (internalAggregations.stream().anyMatch(Objects::isNull)) {
allCollectors.addAll(collectors);
while (!allCollectors.isEmpty()) {
final Collector currentCollector = allCollectors.pop();
if (currentCollector instanceof AggregatorBase) {
((AggregatorBase) currentCollector).buildAndSetInternalAggregation();
} else if (currentCollector instanceof ProfilingAggregator) {
((ProfilingAggregator) currentCollector).setInternalAggregation(
((ProfilingAggregator) currentCollector).buildTopLevel()
);
} else if (currentCollector instanceof MultiBucketCollector) {
for (Collector innerCollector : ((MultiBucketCollector) currentCollector).getCollectors()) {
allCollectors.offer(innerCollector);
}
}
}
// Iterate through collector tree again to get InternalAggregations object
return toInternalAggregations(collectors);
} else {
return internalAggregations;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ public static class MultiBucketConsumer implements IntConsumer {

// aggregations execute in a single thread for both sequential
// and concurrent search, so no atomic here
private int count;
private final LongAdder count;

// will be updated by multiple threads in concurrent search
// hence making it as LongAdder
Expand All @@ -145,6 +145,7 @@ public MultiBucketConsumer(int limit, CircuitBreaker breaker) {
this.limit = limit;
this.breaker = breaker;
callCount = new LongAdder();
count = new LongAdder();
availProcessors = Runtime.getRuntime().availableProcessors();
}

Expand All @@ -158,6 +159,7 @@ protected MultiBucketConsumer(
) {
this.limit = limit;
this.breaker = breaker;
this.count = new LongAdder();
this.callCount = callCount;
this.circuitBreakerTripped = circuitBreakerTripped;
this.availProcessors = availProcessors;
Expand All @@ -166,8 +168,9 @@ protected MultiBucketConsumer(
@Override
public void accept(int value) {
if (value != 0) {
count += value;
if (count > limit) {
count.add(value);
;
if (count.intValue() > limit) {
throw new TooManyBucketsException(
"Trying to create too many buckets. Must be less than or equal to: ["
+ limit
Expand Down Expand Up @@ -205,11 +208,11 @@ public void accept(int value) {
}

public void reset() {
this.count = 0;
this.count.reset();
}

public int getCount() {
return count;
return count.intValue();
}

public int getLimit() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ protected Aggregator createInternal(

@Override
protected boolean supportsConcurrentSegmentSearch() {
// See https://github.com/opensearch-project/OpenSearch/issues/12331 for details
return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class GlobalOrdinalsStringTermsAggregator extends AbstractStringTermsAggr
private final long valueCount;
private final String fieldName;
private Weight weight;
private final GlobalOrdLookupFunction lookupGlobalOrd;
protected final CollectionStrategy collectionStrategy;
protected int segmentsWithSingleValuedOrds = 0;
protected int segmentsWithMultiValuedOrds = 0;
Expand Down Expand Up @@ -129,11 +128,10 @@ public GlobalOrdinalsStringTermsAggregator(
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
this.valuesSource = valuesSource;
final IndexReader reader = context.searcher().getIndexReader();
final SortedSetDocValues values = reader.leaves().size() > 0
final SortedSetDocValues values = !reader.leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
this.valueCount = values.getValueCount();
this.lookupGlobalOrd = values::lookupOrd;
this.acceptedGlobalOrdinals = includeExclude == null ? ALWAYS_TRUE : includeExclude.acceptedGlobalOrdinals(values)::get;
if (remapGlobalOrds) {
this.collectionStrategy = new RemapGlobalOrds(cardinality);
Expand Down Expand Up @@ -885,7 +883,12 @@ PriorityQueue<OrdBucket> buildPriorityQueue(int size) {
}

StringTerms.Bucket convertTempBucketToRealBucket(OrdBucket temp) throws IOException {
BytesRef term = BytesRef.deepCopyOf(lookupGlobalOrd.apply(temp.globalOrd));
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
BytesRef term = BytesRef.deepCopyOf(values.lookupOrd(temp.globalOrd));

StringTerms.Bucket result = new StringTerms.Bucket(term, temp.docCount, null, showTermDocCountError, 0, format);
result.bucketOrd = temp.bucketOrd;
result.docCountError = 0;
Expand Down Expand Up @@ -1001,7 +1004,11 @@ BucketUpdater<SignificantStringTerms.Bucket> bucketUpdater(long owningBucketOrd)
long subsetSize = subsetSize(owningBucketOrd);
return (spare, globalOrd, bucketOrd, docCount) -> {
spare.bucketOrd = bucketOrd;
oversizedCopy(lookupGlobalOrd.apply(globalOrd), spare.termBytes);
// Recreate DocValues as needed for concurrent segment search
SortedSetDocValues values = !context.searcher().getIndexReader().leaves().isEmpty()
? valuesSource.globalOrdinalsValues(context.searcher().getIndexReader().leaves().get(0))
: DocValues.emptySortedSet();
oversizedCopy(values.lookupOrd(globalOrd), spare.termBytes);
spare.subsetDf = docCount;
spare.subsetSize = subsetSize;
spare.supersetDf = backgroundFrequencies.freq(spare.termBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ public List<Aggregator> toAggregators(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toAggregators call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}

@Override
public List<InternalAggregation> toInternalAggregations(Collection<Collector> collectors) {
// should not be called when there is no aggregation collector
throw new IllegalStateException("Unexpected toInternalAggregations call on NO_OP_BUCKET_COLLECTOR_PROCESSOR");
}
};

private final List<Releasable> releasables = new CopyOnWriteArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,12 @@
"LuceneFixedGap",
"LuceneVarGapFixedInterval",
"LuceneVarGapDocFreqInterval",
"Lucene50" })
"Lucene50",
"Lucene90",
"Lucene94",
"Lucene90",
"Lucene95",
"Lucene99" })
@LuceneTestCase.SuppressReproduceLine
public abstract class OpenSearchTestCase extends LuceneTestCase {

Expand Down

0 comments on commit f5f34f2

Please sign in to comment.