Skip to content

Commit

Permalink
Add template snippets support for field and target_field in KV ingest…
Browse files Browse the repository at this point in the history
… processor (opensearch-project#10040)

* Add template snippets support for field and target_field in KV ingest processor

Signed-off-by: Gao Binlong <[email protected]>

* modify change log

Signed-off-by: Gao Binlong <[email protected]>

* revert replace assertThat by assertEquals

Signed-off-by: Gao Binlong <[email protected]>

* Revert some code

Signed-off-by: Gao Binlong <[email protected]>

* Revert some code

Signed-off-by: Gao Binlong <[email protected]>

* Fix typo and skip some yml test by version

Signed-off-by: Gao Binlong <[email protected]>

---------

Signed-off-by: Gao Binlong <[email protected]>
  • Loading branch information
gaobinlong authored Nov 21, 2023
1 parent e058858 commit 7f4537c
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 47 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add cluster state stats ([#10670](https://github.com/opensearch-project/OpenSearch/pull/10670))
- Adding slf4j license header to LoggerMessageFormat.java ([#11069](https://github.com/opensearch-project/OpenSearch/pull/11069))
- [Streaming Indexing] Introduce new experimental server HTTP transport based on Netty 4 and Project Reactor (Reactor Netty) ([#9672](https://github.com/opensearch-project/OpenSearch/pull/9672))
- Add template snippets support for field and target_field in KV ingest processor ([#10040](https://github.com/opensearch-project/OpenSearch/pull/10040))
- Allowing pipeline processors to access index mapping info by passing ingest service ref as part of the processor factory parameters ([#10307](https://github.com/opensearch-project/OpenSearch/pull/10307))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
processors.put(ScriptProcessor.TYPE, new ScriptProcessor.Factory(parameters.scriptService));
processors.put(DotExpanderProcessor.TYPE, new DotExpanderProcessor.Factory());
processors.put(JsonProcessor.TYPE, new JsonProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory());
processors.put(KeyValueProcessor.TYPE, new KeyValueProcessor.Factory(parameters.scriptService));
processors.put(URLDecodeProcessor.TYPE, new URLDecodeProcessor.Factory());
processors.put(BytesProcessor.TYPE, new BytesProcessor.Factory());
processors.put(PipelineProcessor.TYPE, new PipelineProcessor.Factory(parameters.ingestService));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,13 @@
package org.opensearch.ingest.common;

import org.opensearch.common.util.set.Sets;
import org.opensearch.core.common.Strings;
import org.opensearch.ingest.AbstractProcessor;
import org.opensearch.ingest.ConfigurationUtils;
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.script.ScriptService;
import org.opensearch.script.TemplateScript;

import java.util.Collections;
import java.util.List;
Expand All @@ -56,24 +59,24 @@ public final class KeyValueProcessor extends AbstractProcessor {

private static final Pattern STRIP_BRACKETS = Pattern.compile("(^[\\(\\[<\"'])|([\\]\\)>\"']$)");

private final String field;
private final TemplateScript.Factory field;
private final String fieldSplit;
private final String valueSplit;
private final Set<String> includeKeys;
private final Set<String> excludeKeys;
private final String targetField;
private final TemplateScript.Factory targetField;
private final boolean ignoreMissing;
private final Consumer<IngestDocument> execution;

KeyValueProcessor(
String tag,
String description,
String field,
TemplateScript.Factory field,
String fieldSplit,
String valueSplit,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand Down Expand Up @@ -106,10 +109,10 @@ public final class KeyValueProcessor extends AbstractProcessor {
private static Consumer<IngestDocument> buildExecution(
String fieldSplit,
String valueSplit,
String field,
TemplateScript.Factory field,
Set<String> includeKeys,
Set<String> excludeKeys,
String targetField,
TemplateScript.Factory targetField,
boolean ignoreMissing,
String trimKey,
String trimValue,
Expand All @@ -130,41 +133,62 @@ private static Consumer<IngestDocument> buildExecution(
keyFilter = key -> includeKeys.contains(key) && excludeKeys.contains(key) == false;
}
}
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField == null) {
fieldPathPrefix = keyPrefix;
} else {
fieldPathPrefix = targetField + "." + keyPrefix;
}
final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

return document -> {
String value = document.getFieldValue(field, String.class, ignoreMissing);
final String fieldPathPrefix;
String keyPrefix = prefix == null ? "" : prefix;
if (targetField != null) {
String targetFieldPath = document.renderTemplate(targetField);
if (!Strings.isNullOrEmpty((targetFieldPath))) {
fieldPathPrefix = targetFieldPath + "." + keyPrefix;
} else {
fieldPathPrefix = keyPrefix;
}
} else {
fieldPathPrefix = keyPrefix;
}

final Function<String, String> keyPrefixer;
if (fieldPathPrefix.isEmpty()) {
keyPrefixer = val -> val;
} else {
keyPrefixer = val -> fieldPathPrefix + val;
}
final Function<String, String[]> fieldSplitter = buildSplitter(fieldSplit, true);
Function<String, String[]> valueSplitter = buildSplitter(valueSplit, false);
final Function<String, String> keyTrimmer = buildTrimmer(trimKey);
final Function<String, String> bracketStrip;
if (stripBrackets) {
bracketStrip = val -> STRIP_BRACKETS.matcher(val).replaceAll("");
} else {
bracketStrip = val -> val;
}
final Function<String, String> valueTrimmer = buildTrimmer(trimValue);

String path = document.renderTemplate(field);
final boolean fieldPathNullOrEmpty = Strings.isNullOrEmpty(path);
if (fieldPathNullOrEmpty || document.hasField(path, true) == false) {
if (ignoreMissing) {
return;
} else if (fieldPathNullOrEmpty) {
throw new IllegalArgumentException("field path cannot be null nor empty");
} else {
throw new IllegalArgumentException("field [" + path + "] doesn't exist");
}
}

String value = document.getFieldValue(path, String.class, ignoreMissing);
if (value == null) {
if (ignoreMissing) {
return;
}
throw new IllegalArgumentException("field [" + field + "] is null, cannot extract key-value pairs.");
throw new IllegalArgumentException("field [" + path + "] is null, cannot extract key-value pairs. ");
}

for (String part : fieldSplitter.apply(value)) {
String[] kv = valueSplitter.apply(part);
if (kv.length != 2) {
throw new IllegalArgumentException("field [" + field + "] does not contain value_split [" + valueSplit + "]");
throw new IllegalArgumentException("field [" + path + "] does not contain value_split [" + valueSplit + "]");
}
String key = keyTrimmer.apply(kv[0]);
if (keyFilter.test(key)) {
Expand Down Expand Up @@ -193,7 +217,7 @@ private static Function<String, String[]> buildSplitter(String split, boolean fi
}
}

String getField() {
TemplateScript.Factory getField() {
return field;
}

Expand All @@ -213,7 +237,7 @@ Set<String> getExcludeKeys() {
return excludeKeys;
}

String getTargetField() {
TemplateScript.Factory getTargetField() {
return targetField;
}

Expand Down Expand Up @@ -241,6 +265,12 @@ public String getType() {
}

public static class Factory implements Processor.Factory {
private final ScriptService scriptService;

public Factory(ScriptService scriptService) {
this.scriptService = scriptService;
}

@Override
public KeyValueProcessor create(
Map<String, Processor.Factory> registry,
Expand All @@ -249,7 +279,13 @@ public KeyValueProcessor create(
Map<String, Object> config
) throws Exception {
String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
TemplateScript.Factory fieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "field", field, scriptService);
String targetField = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "target_field");
TemplateScript.Factory targetFieldTemplate = null;
if (!Strings.isNullOrEmpty(targetField)) {
targetFieldTemplate = ConfigurationUtils.compileTemplate(TYPE, processorTag, "target_field", targetField, scriptService);
}

String fieldSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field_split");
String valueSplit = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "value_split");
String trimKey = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag, config, "trim_key");
Expand All @@ -270,12 +306,12 @@ public KeyValueProcessor create(
return new KeyValueProcessor(
processorTag,
description,
field,
fieldTemplate,
fieldSplit,
valueSplit,
includeKeys,
excludeKeys,
targetField,
targetFieldTemplate,
ignoreMissing,
trimKey,
trimValue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@
import org.opensearch.OpenSearchException;
import org.opensearch.OpenSearchParseException;
import org.opensearch.common.util.set.Sets;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -48,16 +50,22 @@

public class KeyValueProcessorFactoryTests extends OpenSearchTestCase {

private KeyValueProcessor.Factory factory;

@Before
public void init() {
factory = new KeyValueProcessor.Factory(TestTemplateService.instance());
}

public void testCreateWithDefaults() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
config.put("value_split", "=");
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), is(nullValue()));
Expand All @@ -66,7 +74,6 @@ public void testCreateWithDefaults() throws Exception {
}

public void testCreateWithAllFieldsSet() throws Exception {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand All @@ -78,17 +85,16 @@ public void testCreateWithAllFieldsSet() throws Exception {
String processorTag = randomAlphaOfLength(10);
KeyValueProcessor processor = factory.create(null, processorTag, null, config);
assertThat(processor.getTag(), equalTo(processorTag));
assertThat(processor.getField(), equalTo("field1"));
assertThat(processor.getField().newInstance(Collections.emptyMap()).execute(), equalTo("field1"));
assertThat(processor.getFieldSplit(), equalTo("&"));
assertThat(processor.getValueSplit(), equalTo("="));
assertThat(processor.getIncludeKeys(), equalTo(Sets.newHashSet("a", "b")));
assertThat(processor.getExcludeKeys(), equalTo(Collections.emptySet()));
assertThat(processor.getTargetField(), equalTo("target"));
assertThat(processor.getTargetField().newInstance(Collections.emptyMap()).execute(), equalTo("target"));
assertTrue(processor.isIgnoreMissing());
}

public void testCreateWithMissingField() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
String processorTag = randomAlphaOfLength(10);
OpenSearchException exception = expectThrows(
Expand All @@ -99,7 +105,6 @@ public void testCreateWithMissingField() {
}

public void testCreateWithMissingFieldSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
String processorTag = randomAlphaOfLength(10);
Expand All @@ -111,7 +116,6 @@ public void testCreateWithMissingFieldSplit() {
}

public void testCreateWithMissingValueSplit() {
KeyValueProcessor.Factory factory = new KeyValueProcessor.Factory();
Map<String, Object> config = new HashMap<>();
config.put("field", "field1");
config.put("field_split", "&");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.ingest.IngestDocument;
import org.opensearch.ingest.Processor;
import org.opensearch.ingest.RandomDocumentPicks;
import org.opensearch.ingest.TestTemplateService;
import org.opensearch.test.OpenSearchTestCase;

import java.util.ArrayList;
Expand All @@ -51,7 +52,7 @@

public class KeyValueProcessorTests extends OpenSearchTestCase {

private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory();
private static final KeyValueProcessor.Factory FACTORY = new KeyValueProcessor.Factory(TestTemplateService.instance());

public void test() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random());
Expand Down Expand Up @@ -123,7 +124,12 @@ public void testMissingField() throws Exception {
IngestDocument ingestDocument = RandomDocumentPicks.randomIngestDocument(random(), Collections.emptyMap());
Processor processor = createKvProcessor("unknown", "&", "=", null, null, "target", false);
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () -> processor.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field [unknown] not present as part of path [unknown]"));
assertThat(exception.getMessage(), equalTo("field [unknown] doesn't exist"));

// when using template snippet, the resolved field path maybe empty
Processor processorWithEmptyFieldPath = createKvProcessor("", "&", "=", null, null, "target", false);
exception = expectThrows(IllegalArgumentException.class, () -> processorWithEmptyFieldPath.execute(ingestDocument));
assertThat(exception.getMessage(), equalTo("field path cannot be null nor empty"));
}

public void testNullValueWithIgnoreMissing() throws Exception {
Expand Down
Loading

0 comments on commit 7f4537c

Please sign in to comment.