Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix aggs result of NestedAggregator with sub NestedAggregator (opense…
Browse files Browse the repository at this point in the history
…arch-project#13324)

Signed-off-by: kkewwei <[email protected]>
kkewwei authored and wangdongyu.danny committed Aug 22, 2024
1 parent ac67f99 commit b296cd1
Showing 4 changed files with 399 additions and 34 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -39,6 +39,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Fix the computed max shards of cluster to avoid int overflow ([#14155](https://github.com/opensearch-project/OpenSearch/pull/14155))
- Fixed rest-high-level client searchTemplate & mtermVectors endpoints to have a leading slash ([#14465](https://github.com/opensearch-project/OpenSearch/pull/14465))
- Write shard level metadata blob when snapshotting searchable snapshot indexes ([#13190](https://github.com/opensearch-project/OpenSearch/pull/13190))
- Fix aggs result of NestedAggregator with sub NestedAggregator ([#13324](https://github.com/opensearch-project/OpenSearch/pull/13324))

### Security

Original file line number Diff line number Diff line change
@@ -43,8 +43,10 @@
import org.apache.lucene.search.Weight;
import org.apache.lucene.search.join.BitSetProducer;
import org.apache.lucene.util.BitSet;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.core.ParseField;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.search.aggregations.Aggregator;
import org.opensearch.search.aggregations.AggregatorFactories;
@@ -88,12 +90,25 @@ public class NestedAggregator extends BucketsAggregator implements SingleBucketA
) throws IOException {
super(name, factories, context, parent, cardinality, metadata);

Query parentFilter = parentObjectMapper != null ? parentObjectMapper.nestedTypeFilter() : Queries.newNonNestedFilter();
Query parentFilter = isParent(parentObjectMapper, childObjectMapper, context.mapperService())
? parentObjectMapper.nestedTypeFilter()
: Queries.newNonNestedFilter();
this.parentFilter = context.bitsetFilterCache().getBitSetProducer(parentFilter);
this.childFilter = childObjectMapper.nestedTypeFilter();
this.collectsFromSingleBucket = cardinality.map(estimate -> estimate < 2);
}

private boolean isParent(ObjectMapper parentObjectMapper, ObjectMapper childObjectMapper, MapperService mapperService) {
if (parentObjectMapper == null) {
return false;
}
ObjectMapper parent;
do {
parent = childObjectMapper.getParentObjectMapper(mapperService);
} while (parent != null && parent != parentObjectMapper);
return parentObjectMapper == parent;
}

@Override
public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final LeafBucketCollector sub) throws IOException {
IndexReaderContext topLevelContext = ReaderUtil.getTopLevelContext(ctx);
@@ -107,20 +122,17 @@ public LeafBucketCollector getLeafCollector(final LeafReaderContext ctx, final L
if (collectsFromSingleBucket) {
return new LeafBucketCollectorBase(sub, null) {
@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
public void collect(int parentAggDoc, long bucket) throws IOException {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(parentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, parentAggDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < parentDoc; childDocId = childDocs.nextDoc()) {
for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
collectBucket(sub, childDocId, bucket);
}
}
@@ -130,6 +142,43 @@ public void collect(int parentDoc, long bucket) throws IOException {
}
}

/**
* In one case, it's talking about the parent doc (from the Lucene block-join standpoint),
* while in the other case, it's talking about a child doc ID (from the block-join standpoint)
* from the parent aggregation, where we're trying to aggregate over a sibling of that child.
* So, we need to map from that document to its parent, then join to the appropriate sibling.
*
* @param parentAggDoc the parent aggregation's current doc
* (which may or may not be a block-level parent doc)
* @return a tuple consisting of the current block-level parent doc (the parent of the
* parameter doc), and the next matching child doc (hopefully under this parent)
* for the aggregation (according to the child doc iterator).
*/
static Tuple<Integer, Integer> getParentAndChildId(BitSet parentDocs, DocIdSetIterator childDocs, int parentAggDoc) throws IOException {
int currentParentAggDoc;
int prevParentDoc = parentDocs.prevSetBit(parentAggDoc);
if (prevParentDoc == -1) {
currentParentAggDoc = parentDocs.nextSetBit(0);
} else if (prevParentDoc == parentAggDoc) {
// parentAggDoc is the parent of that child, and is belongs to parentDocs
currentParentAggDoc = parentAggDoc;
if (currentParentAggDoc == 0) {
prevParentDoc = -1;
} else {
prevParentDoc = parentDocs.prevSetBit(currentParentAggDoc - 1);
}
} else {
// parentAggDoc is the sibling of that child, and it means the block-join parent
currentParentAggDoc = parentDocs.nextSetBit(prevParentDoc + 1);
}

int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
return Tuple.tuple(currentParentAggDoc, childDocId);
}

@Override
protected void preGetSubLeafCollectors(LeafReaderContext ctx) throws IOException {
super.preGetSubLeafCollectors(ctx);
@@ -191,9 +240,8 @@ public void setScorer(Scorable scorer) throws IOException {

@Override
public void collect(int parentDoc, long bucket) throws IOException {
// if parentDoc is 0 then this means that this parent doesn't have child docs (b/c these appear always before the parent
// doc), so we can skip:
if (parentDoc == 0 || parentDocs == null || childDocs == null) {
// parentAggDoc can be 0 when aggregation:
if (parentDocs == null || childDocs == null) {
return;
}

@@ -214,11 +262,9 @@ void processBufferedChildBuckets() throws IOException {
return;
}

final int prevParentDoc = parentDocs.prevSetBit(currentParentDoc - 1);
int childDocId = childDocs.docID();
if (childDocId <= prevParentDoc) {
childDocId = childDocs.advance(prevParentDoc + 1);
}
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, currentParentDoc);
int currentParentDoc = res.v1();
int childDocId = res.v2();

for (; childDocId < currentParentDoc; childDocId = childDocs.nextDoc()) {
cachedScorer.doc = childDocId;
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
import org.apache.lucene.document.LongPoint;
import org.apache.lucene.document.SortedDocValuesField;
import org.apache.lucene.document.SortedNumericDocValuesField;
import org.apache.lucene.document.SortedSetDocValuesField;
@@ -45,23 +46,36 @@
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreQuery;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.join.ScoreMode;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.BitSet;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.FixedBitSet;
import org.opensearch.common.CheckedConsumer;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.cache.bitset.BitsetFilterCache;
import org.opensearch.index.mapper.ContentPath;
import org.opensearch.index.mapper.IdFieldMapper;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.index.mapper.MappedFieldType;
import org.opensearch.index.mapper.Mapper;
import org.opensearch.index.mapper.NestedPathFieldMapper;
import org.opensearch.index.mapper.NumberFieldMapper;
import org.opensearch.index.mapper.ObjectMapper;
import org.opensearch.index.mapper.SeqNoFieldMapper;
import org.opensearch.index.mapper.Uid;
import org.opensearch.index.query.MatchAllQueryBuilder;
import org.opensearch.index.query.NestedQueryBuilder;
import org.opensearch.index.query.QueryShardContext;
import org.opensearch.index.query.TermsQueryBuilder;
import org.opensearch.index.query.support.NestedScope;
import org.opensearch.script.MockScriptEngine;
import org.opensearch.script.Script;
import org.opensearch.script.ScriptEngine;
@@ -104,20 +118,34 @@
import java.util.stream.DoubleStream;
import java.util.stream.LongStream;

import org.mockito.Mockito;

import static java.util.stream.Collectors.toList;
import static org.opensearch.search.aggregations.AggregationBuilders.max;
import static org.opensearch.search.aggregations.AggregationBuilders.nested;
import static org.opensearch.search.aggregations.bucket.nested.NestedAggregator.getParentAndChildId;
import static org.opensearch.test.InternalAggregationTestCase.DEFAULT_MAX_BUCKETS;
import static org.hamcrest.Matchers.equalTo;
import static org.apache.lucene.search.DocIdSetIterator.NO_MORE_DOCS;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class NestedAggregatorTests extends AggregatorTestCase {

private static final String VALUE_FIELD_NAME = "number";
private static final String VALUE_FIELD_NAME2 = "number2";
private static final String NESTED_OBJECT = "nested_object";
private static final String NESTED_OBJECT2 = "nested_object2";
private static final String NESTED_AGG = "nestedAgg";
private static final String MAX_AGG_NAME = "maxAgg";
private static final String SUM_AGG_NAME = "sumAgg";
private static final String INVERSE_SCRIPT = "inverse";
private static final String OUT_NESTED = "outNested";
private static final String OUT_TERMS = "outTerms";
private static final String INNER_NESTED = "innerNested";
private static final String INNER_TERMS = "innerTerms";

private static final SeqNoFieldMapper.SequenceIDFields sequenceIDFields = SeqNoFieldMapper.SequenceIDFields.emptySeqID();

@@ -201,17 +229,22 @@ public void testSingleNestingMax() throws IOException {
}
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME);
nestedBuilder.subAggregation(maxAgg);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(
NESTED_OBJECT + "." + VALUE_FIELD_NAME,
NumberFieldMapper.NumberType.LONG
);

InternalNested nested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
nestedBuilder,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);
assertEquals(expectedNestedDocs, nested.getDocCount());

assertEquals(NESTED_AGG, nested.getName());
assertEquals(expectedNestedDocs, nested.getDocCount());
@@ -240,7 +273,7 @@ public void testDoubleNestingMax() throws IOException {
int numNestedDocs = randomIntBetween(0, 20);
expectedMaxValue = Math.max(
expectedMaxValue,
generateMaxDocs(documents, numNestedDocs, i, NESTED_OBJECT + "." + NESTED_OBJECT2, VALUE_FIELD_NAME)
generateMaxDocs(documents, numNestedDocs, i, NESTED_OBJECT, VALUE_FIELD_NAME)
);
expectedNestedDocs += numNestedDocs;

@@ -253,19 +286,24 @@ public void testDoubleNestingMax() throws IOException {
iw.commit();
}
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT + "." + NESTED_OBJECT2);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(VALUE_FIELD_NAME);
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT);
MaxAggregationBuilder maxAgg = new MaxAggregationBuilder(MAX_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME);
nestedBuilder.subAggregation(maxAgg);

MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(
NESTED_OBJECT + "." + VALUE_FIELD_NAME,
NumberFieldMapper.NumberType.LONG
);

InternalNested nested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
nestedBuilder,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);
assertEquals(expectedNestedDocs, nested.getDocCount());

assertEquals(NESTED_AGG, nested.getName());
assertEquals(expectedNestedDocs, nested.getDocCount());
@@ -310,17 +348,22 @@ public void testOrphanedDocs() throws IOException {
}
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
NestedAggregationBuilder nestedBuilder = new NestedAggregationBuilder(NESTED_AGG, NESTED_OBJECT);
SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME).field(VALUE_FIELD_NAME);
SumAggregationBuilder sumAgg = new SumAggregationBuilder(SUM_AGG_NAME).field(NESTED_OBJECT + "." + VALUE_FIELD_NAME);
nestedBuilder.subAggregation(sumAgg);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG);
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(
NESTED_OBJECT + "." + VALUE_FIELD_NAME,
NumberFieldMapper.NumberType.LONG
);

InternalNested nested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
nestedBuilder,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);
assertEquals(expectedNestedDocs, nested.getDocCount());

assertEquals(NESTED_AGG, nested.getName());
assertEquals(expectedNestedDocs, nested.getDocCount());
@@ -747,8 +790,24 @@ public void testFieldAlias() throws IOException {
max(MAX_AGG_NAME).field(VALUE_FIELD_NAME + "-alias")
);

InternalNested nested = searchAndReduce(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), agg, fieldType);
Nested aliasNested = searchAndReduce(newSearcher(indexReader, false, true), new MatchAllDocsQuery(), aliasAgg, fieldType);
InternalNested nested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
agg,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);
Nested aliasNested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
aliasAgg,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);

assertEquals(nested, aliasNested);
assertEquals(expectedNestedDocs, nested.getDocCount());
@@ -796,13 +855,15 @@ public void testNestedWithPipeline() throws IOException {
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(VALUE_FIELD_NAME, NumberFieldMapper.NumberType.LONG);

InternalNested nested = searchAndReduce(
createIndexSettings(),
newSearcher(indexReader, false, true),
new MatchAllDocsQuery(),
nestedBuilder,
DEFAULT_MAX_BUCKETS,
true,
fieldType
);

assertEquals(expectedNestedDocs, nested.getDocCount());
assertEquals(NESTED_AGG, nested.getName());
assertEquals(expectedNestedDocs, nested.getDocCount());

@@ -853,6 +914,238 @@ public void testNestedUnderTerms() throws IOException {
}, resellersMappedFields());
}

public void testBufferingNestedLeafBucketCollector() throws IOException {
int numRootDocs = scaledRandomIntBetween(2, 200);
int expectedNestedDocs;
String[] bucketKeys;
try (Directory directory = newDirectory()) {
try (RandomIndexWriter iw = new RandomIndexWriter(random(), directory)) {
for (int i = 0; i < numRootDocs; i++) {

List<Document> documents = new ArrayList<>();
if (randomBoolean()) {
generateDocument(documents, i, NESTED_OBJECT, VALUE_FIELD_NAME, 1);
generateDocument(documents, i, NESTED_OBJECT2, VALUE_FIELD_NAME2, i);
} else {
generateDocument(documents, i, NESTED_OBJECT2, VALUE_FIELD_NAME2, i);
generateDocument(documents, i, NESTED_OBJECT, VALUE_FIELD_NAME, 1);
}
Document document = new Document();
document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(i)), IdFieldMapper.Defaults.FIELD_TYPE));
document.add(sequenceIDFields.primaryTerm);
documents.add(document);
iw.addDocuments(documents);
}
iw.commit();
}
try (IndexReader indexReader = wrapInMockESDirectoryReader(DirectoryReader.open(directory))) {
IndexSettings indexSettings = createIndexSettings();
MappedFieldType fieldType = new NumberFieldMapper.NumberFieldType(
NESTED_OBJECT + "." + VALUE_FIELD_NAME,
NumberFieldMapper.NumberType.LONG
);
MappedFieldType fieldType1 = new NumberFieldMapper.NumberFieldType(
NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2,
NumberFieldMapper.NumberType.LONG
);
QueryShardContext queryShardContext = createQueryShardContext(NESTED_OBJECT2, indexSettings, fieldType1);
// query
expectedNestedDocs = numRootDocs / 2;
bucketKeys = new String[expectedNestedDocs];
BytesRef[] values = new BytesRef[numRootDocs / 2];
for (int i = 0; i < numRootDocs / 2; i++) {
bucketKeys[i] = "" + (i * 2);
values[i] = new BytesRef(bucketKeys[i]);
}
TermsQueryBuilder termsQueryBuilder = new TermsQueryBuilder(NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2, (Object[]) values);
NestedQueryBuilder nestedQueryBuilder = new NestedQueryBuilder(NESTED_OBJECT2, termsQueryBuilder, ScoreMode.None);

// out nested aggs
NestedAggregationBuilder outNestedBuilder = new NestedAggregationBuilder(OUT_NESTED, NESTED_OBJECT);
TermsAggregationBuilder outTermsAggregator = new TermsAggregationBuilder(OUT_TERMS).field(
NESTED_OBJECT + "." + VALUE_FIELD_NAME
).size(100);
outNestedBuilder.subAggregation(outTermsAggregator);

// inner nested aggs
NestedAggregationBuilder innerNestedBuilder = new NestedAggregationBuilder(INNER_NESTED, NESTED_OBJECT2);
TermsAggregationBuilder innerTermsAggregator = new TermsAggregationBuilder(INNER_TERMS).field(
NESTED_OBJECT2 + "." + VALUE_FIELD_NAME2
).size(100);
innerNestedBuilder.subAggregation(innerTermsAggregator);
outTermsAggregator.subAggregation(innerNestedBuilder);

InternalNested nested = searchAndReduce(
indexSettings,
newSearcher(indexReader, false, true),
nestedQueryBuilder.toQuery(queryShardContext),
outNestedBuilder,
DEFAULT_MAX_BUCKETS,
true,
fieldType,
fieldType1
);

assertEquals(OUT_NESTED, nested.getName());
assertEquals(expectedNestedDocs, nested.getDocCount());

LongTerms outTerms = (LongTerms) nested.getProperty(OUT_TERMS);
assertEquals(1, outTerms.getBuckets().size());

InternalNested internalNested = (InternalNested) (((Object[]) outTerms.getProperty(INNER_NESTED))[0]);
assertEquals(expectedNestedDocs, internalNested.getDocCount());

LongTerms innerTerms = (LongTerms) internalNested.getProperty(INNER_TERMS);
assertEquals(bucketKeys.length, innerTerms.getBuckets().size());
for (int i = 0; i < expectedNestedDocs; i++) {
LongTerms.Bucket bucket = innerTerms.getBuckets().get(i);
assertEquals(bucketKeys[i], bucket.getKeyAsString());
assertEquals(1, bucket.getDocCount());
}
}
}
}

private DocIdSetIterator getDocIdSetIterator(int[] value) {
int[] bits = new int[value[value.length - 1] + 1];
for (int i : value) {
bits[i] = 1;
}
return new DocIdSetIterator() {
int index = -1;

@Override
public int docID() {
if (index == -1 || index > bits.length || bits[index] != 1) {
return -1;
}
return index;
}

@Override
public int nextDoc() {
for (int i = index; i < bits.length; i++) {
if (bits[i] == 1) {
index = i;
return index;
}
}
index = bits.length;
return NO_MORE_DOCS;
}

@Override
public int advance(int target) {
for (int i = target; i < bits.length; i++) {
if (bits[i] == 1) {
index = i;
return index;
}
}
index = bits.length;
return NO_MORE_DOCS;
}

@Override
public long cost() {
return bits.length;
}
};
}

public void testGetParentAndChildId() throws IOException {
{
// p: parent c: child
// [p0], [p1], [c2,p3], [c4,x5,p6], [p7], [p8]
BitSet parentDocs = new FixedBitSet(20);
parentDocs.set(0);
parentDocs.set(1);
parentDocs.set(3);
parentDocs.set(6);
parentDocs.set(7);
parentDocs.set(8);
DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 2, 4 });

Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, 0);
assertEquals(0, res.v1().intValue());
assertEquals(2, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 3);
assertEquals(3, res.v1().intValue());
assertEquals(2, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 4);
assertEquals(6, res.v1().intValue());
assertEquals(4, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 8);
assertEquals(8, res.v1().intValue());
assertEquals(NO_MORE_DOCS, res.v2().intValue());
}

{
// p: parent c: child1 d: child2
// [p0], [c1,d2,p3], [d4,c5,p6], [c7,d8,p9], [c10,p11]
BitSet parentDocs = new FixedBitSet(20);
parentDocs.set(0);
parentDocs.set(3);
parentDocs.set(6);
parentDocs.set(9);
parentDocs.set(11);
{
DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 1, 5, 7, 10 });
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, 2);
assertEquals(3, res.v1().intValue());
assertEquals(1, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 4);
assertEquals(6, res.v1().intValue());
assertEquals(5, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 8);
assertEquals(9, res.v1().intValue());
assertEquals(7, res.v2().intValue());
}

{
DocIdSetIterator childDocs = getDocIdSetIterator(new int[] { 2, 4, 8 });
Tuple<Integer, Integer> res = getParentAndChildId(parentDocs, childDocs, 1);
assertEquals(3, res.v1().intValue());
assertEquals(2, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 5);
assertEquals(6, res.v1().intValue());
assertEquals(4, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 7);
assertEquals(9, res.v1().intValue());
assertEquals(8, res.v2().intValue());

res = getParentAndChildId(parentDocs, childDocs, 10);
assertEquals(11, res.v1().intValue());
assertEquals(NO_MORE_DOCS, res.v2().intValue());
}
}
}

protected QueryShardContext createQueryShardContext(String fieldName, IndexSettings indexSettings, MappedFieldType fieldType) {
QueryShardContext queryShardContext = mock(QueryShardContext.class);
when(queryShardContext.nestedScope()).thenReturn(new NestedScope(indexSettings));

BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, Mockito.mock(BitsetFilterCache.Listener.class));
when(queryShardContext.bitsetFilter(any())).thenReturn(bitsetFilterCache.getBitSetProducer(Queries.newNonNestedFilter()));
when(queryShardContext.fieldMapper(anyString())).thenReturn(fieldType);
when(queryShardContext.getSearchQuoteAnalyzer(any())).thenCallRealMethod();
when(queryShardContext.getSearchAnalyzer(any())).thenCallRealMethod();
when(queryShardContext.getIndexSettings()).thenReturn(indexSettings);
when(queryShardContext.getObjectMapper(anyString())).thenAnswer(invocation -> {
Mapper.BuilderContext context = new Mapper.BuilderContext(indexSettings.getSettings(), new ContentPath());
return new ObjectMapper.Builder<>(fieldName).nested(ObjectMapper.Nested.newNested()).build(context);
});
when(queryShardContext.allowExpensiveQueries()).thenReturn(true);
return queryShardContext;
}

public static CheckedConsumer<RandomIndexWriter, IOException> buildResellerData(int numProducts, int numResellers) {
return iw -> {
for (int p = 0; p < numProducts; p++) {
@@ -893,13 +1186,22 @@ private static double[] generateDocuments(List<Document> documents, int numNeste
document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(id)), IdFieldMapper.Defaults.NESTED_FIELD_TYPE));
document.add(new Field(NestedPathFieldMapper.NAME, path, NestedPathFieldMapper.Defaults.FIELD_TYPE));
long value = randomNonNegativeLong() % 10000;
document.add(new SortedNumericDocValuesField(fieldName, value));
document.add(new SortedNumericDocValuesField(path + "." + fieldName, value));
documents.add(document);
values[nested] = value;
}
return values;
}

private static void generateDocument(List<Document> documents, int id, String path, String fieldName, long vales) {
Document document = new Document();
document.add(new Field(IdFieldMapper.NAME, Uid.encodeId(Integer.toString(id)), IdFieldMapper.Defaults.NESTED_FIELD_TYPE));
document.add(new Field(NestedPathFieldMapper.NAME, path, NestedPathFieldMapper.Defaults.FIELD_TYPE));
document.add(new SortedNumericDocValuesField(path + "." + fieldName, vales));
document.add(new LongPoint(path + "." + fieldName, vales));
documents.add(document);
}

private List<Document> generateBook(String id, String[] authors, int[] numPages) {
List<Document> documents = new ArrayList<>();

Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.lease.Releasables;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.lucene.search.Queries;
import org.opensearch.common.network.NetworkAddress;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.BigArrays;
@@ -533,6 +534,17 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
return searchAndReduce(createIndexSettings(), searcher, query, builder, maxBucket, fieldTypes);
}

protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduce(
IndexSettings indexSettings,
IndexSearcher searcher,
Query query,
AggregationBuilder builder,
int maxBucket,
MappedFieldType... fieldTypes
) throws IOException {
return searchAndReduce(indexSettings, searcher, query, builder, maxBucket, false, fieldTypes);
}

/**
* Collects all documents that match the provided query {@link Query} and
* returns the reduced {@link InternalAggregation}.
@@ -547,11 +559,15 @@ protected <A extends InternalAggregation, C extends Aggregator> A searchAndReduc
Query query,
AggregationBuilder builder,
int maxBucket,
boolean hasNested,
MappedFieldType... fieldTypes
) throws IOException {
final IndexReaderContext ctx = searcher.getTopReaderContext();
final PipelineTree pipelines = builder.buildPipelineTree();
List<InternalAggregation> aggs = new ArrayList<>();
if (hasNested) {
query = Queries.filtered(query, Queries.newNonNestedFilter());
}
Query rewritten = searcher.rewrite(query);
MultiBucketConsumer bucketConsumer = new MultiBucketConsumer(
maxBucket,

0 comments on commit b296cd1

Please sign in to comment.