diff --git a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java index 8836031412..831bd6ef43 100644 --- a/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java +++ b/data-prepper-plugins/opensearch/src/integrationTest/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkIT.java @@ -828,11 +828,10 @@ public void testOutputCustomIndex() throws IOException, InterruptedException { final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); @@ -856,11 +855,10 @@ public void testOpenSearchBulkActionsCreate() throws IOException, InterruptedExc final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); metadata.put(IndexConfiguration.ACTION, OpenSearchBulkActions.CREATE.toString()); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); @@ -885,11 +883,10 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException, final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); Event event = (Event) testRecords.get(0).getData(); event.getMetadata().setAttribute("action", "create");metadata.put(IndexConfiguration.ACTION, "create"); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); @@ -915,11 +912,10 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); metadata.put(IndexConfiguration.ACTION, "unknown"); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest(openSearchSinkConfig, true)); @@ -931,12 +927,11 @@ public void testBulkActionCreateWithActions() throws IOException, InterruptedExc final String testTemplateFile = Objects.requireNonNull( getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); @@ -966,12 +961,11 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); @@ -1017,7 +1011,6 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final String documentRootKey = "root_key"; @@ -1032,7 +1025,7 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); metadata.put(IndexConfiguration.DOCUMENT_ROOT_KEY, documentRootKey); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map actionMap = new HashMap<>(); actionMap.put("type", OpenSearchBulkActions.CREATE.toString()); @@ -1086,7 +1079,6 @@ public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, Int getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "key", "value"))); @@ -1096,7 +1088,7 @@ public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, Int aList.add(actionMap); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); metadata.put(IndexConfiguration.ACTIONS, aList); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); @@ -1119,12 +1111,11 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1"))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.CREATE.toString()); @@ -1172,11 +1163,10 @@ public void testBulkActionUpsertWithoutCreate() throws IOException, InterruptedE getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value1", "newKey", "newValue"))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.UPSERT.toString()); @@ -1209,12 +1199,11 @@ public void testBulkActionDeleteWithActions() throws IOException, InterruptedExc getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile(); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); List> aList = new ArrayList<>(); Map aMap = new HashMap<>(); aMap.put("type", OpenSearchBulkActions.DELETE.toString()); @@ -1321,12 +1310,11 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO .withEventType("event") .build(); testEvent.put(testDocumentIdField, expectedId); - String testDocumentId = "${/" + testDocumentIdField + "}"; final List> testRecords = Collections.singletonList(new Record<>(testEvent)); Map metadata = initializeConfigurationMetadata(null, testIndexAlias, null); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentId); + metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentIdField); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); sink.output(testRecords); @@ -1338,6 +1326,32 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO sink.shutdown(); } + @ParameterizedTest + @ValueSource(strings = {"info/ids/rid", "rid"}) + public void testOpenSearchRoutingField(final String testRoutingField) throws IOException, InterruptedException { + final String expectedRoutingField = UUID.randomUUID().toString(); + final String testIndexAlias = "test_index"; + final Event testEvent = JacksonEvent.builder() + .withData(Map.of("arbitrary_data", UUID.randomUUID().toString())) + .withEventType("event") + .build(); + testEvent.put(testRoutingField, expectedRoutingField); + + final List> testRecords = Collections.singletonList(new Record<>(testEvent)); + + Map metadata = initializeConfigurationMetadata(null, testIndexAlias, null); + metadata.put(IndexConfiguration.ROUTING_FIELD, testRoutingField); + final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); + final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); + sink.output(testRecords); + + final List routingFields = getSearchResponseRoutingFields(testIndexAlias); + for (String routingField : routingFields) { + assertThat(routingField, equalTo(expectedRoutingField)); + } + sink.shutdown(); + } + @ParameterizedTest @ValueSource(strings = {"", "info/ids/rid", "rid"}) public void testOpenSearchRouting(final String testRouting) throws IOException, InterruptedException { @@ -1612,7 +1626,6 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept securityAccessor.createUser(username, password, roleName); final String testIdField = "someId"; - final String testDocumentID = "${/someId}"; final String testId = "foo"; final List> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId))); @@ -1621,7 +1634,7 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue()); metadata.put(USERNAME, username); metadata.put(PASSWORD, password); - metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID); + metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId); final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata); final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true); diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java index e946323439..7aaa779181 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/index/IndexConfiguration.java @@ -60,6 +60,7 @@ public class IndexConfiguration { public static final String FLUSH_TIMEOUT = "flush_timeout"; public static final String DOCUMENT_ID_FIELD = "document_id_field"; public static final String DOCUMENT_ID = "document_id"; + public static final String ROUTING_FIELD = "routing_field"; public static final String ROUTING = "routing"; public static final String PIPELINE = "pipeline"; public static final String ISM_POLICY_FILE = "ism_policy_file";