Skip to content

Commit

Permalink
fix integration tests to reuse deprecatd routing field and document i…
Browse files Browse the repository at this point in the history
…d field

Signed-off-by: Maxwell Brown <[email protected]>
  • Loading branch information
Galactus22625 committed Jan 3, 2025
1 parent a0de738 commit 6d7181f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -828,11 +828,10 @@ public void testOutputCustomIndex() throws IOException, InterruptedException {
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
sink.output(testRecords);
Expand All @@ -856,11 +855,10 @@ public void testOpenSearchBulkActionsCreate() throws IOException, InterruptedExc
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
metadata.put(IndexConfiguration.ACTION, OpenSearchBulkActions.CREATE.toString());
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
Expand All @@ -885,11 +883,10 @@ public void testOpenSearchBulkActionsCreateWithExpression() throws IOException,
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
Event event = (Event) testRecords.get(0).getData();
event.getMetadata().setAttribute("action", "create");metadata.put(IndexConfiguration.ACTION, "create");
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
Expand All @@ -915,11 +912,10 @@ public void testOpenSearchBulkActionsCreateWithInvalidExpression() throws IOExce
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
metadata.put(IndexConfiguration.ACTION, "unknown");
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
assertThrows(IllegalArgumentException.class, () -> createObjectUnderTest(openSearchSinkConfig, true));
Expand All @@ -931,12 +927,11 @@ public void testBulkActionCreateWithActions() throws IOException, InterruptedExc
final String testTemplateFile = Objects.requireNonNull(
getClass().getClassLoader().getResource(TEST_TEMPLATE_V1_FILE)).getFile();
final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> aMap = new HashMap<>();
aMap.put("type", OpenSearchBulkActions.CREATE.toString());
Expand Down Expand Up @@ -966,12 +961,11 @@ public void testBulkActionUpdateWithActions() throws IOException, InterruptedExc
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1")));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> aMap = new HashMap<>();
aMap.put("type", OpenSearchBulkActions.CREATE.toString());
Expand Down Expand Up @@ -1017,7 +1011,6 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
final String documentRootKey = "root_key";

Expand All @@ -1032,7 +1025,7 @@ public void testBulkActionUpdateWithDocumentRootKey() throws IOException, Interr
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);

metadata.put(IndexConfiguration.DOCUMENT_ROOT_KEY, documentRootKey);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> actionMap = new HashMap<>();
actionMap.put("type", OpenSearchBulkActions.CREATE.toString());
Expand Down Expand Up @@ -1086,7 +1079,6 @@ public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, Int
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "key", "value")));

Expand All @@ -1096,7 +1088,7 @@ public void testBulkActionUpsertWithActionsAndNoCreate() throws IOException, Int
aList.add(actionMap);

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
metadata.put(IndexConfiguration.ACTIONS, aList);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
Expand All @@ -1119,12 +1111,11 @@ public void testBulkActionUpsertWithActions() throws IOException, InterruptedExc
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson2(testIdField, testId, "name", "value1")));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> aMap = new HashMap<>();
aMap.put("type", OpenSearchBulkActions.CREATE.toString());
Expand Down Expand Up @@ -1172,11 +1163,10 @@ public void testBulkActionUpsertWithoutCreate() throws IOException, InterruptedE
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson3(testIdField, testId, "name", "value1", "newKey", "newValue")));
Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> aMap = new HashMap<>();
aMap.put("type", OpenSearchBulkActions.UPSERT.toString());
Expand Down Expand Up @@ -1209,12 +1199,11 @@ public void testBulkActionDeleteWithActions() throws IOException, InterruptedExc
getClass().getClassLoader().getResource(TEST_TEMPLATE_BULK_FILE)).getFile();

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";
List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, testTemplateFile);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
List<Map<String, Object>> aList = new ArrayList<>();
Map<String, Object> aMap = new HashMap<>();
aMap.put("type", OpenSearchBulkActions.DELETE.toString());
Expand Down Expand Up @@ -1321,12 +1310,11 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO
.withEventType("event")
.build();
testEvent.put(testDocumentIdField, expectedId);
String testDocumentId = "${/" + testDocumentIdField + "}";

final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, null);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentId);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentIdField);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
sink.output(testRecords);
Expand All @@ -1338,6 +1326,32 @@ public void testOpenSearchDocumentId(final String testDocumentIdField) throws IO
sink.shutdown();
}

@ParameterizedTest
@ValueSource(strings = {"info/ids/rid", "rid"})
public void testOpenSearchRoutingField(final String testRoutingField) throws IOException, InterruptedException {
final String expectedRoutingField = UUID.randomUUID().toString();
final String testIndexAlias = "test_index";
final Event testEvent = JacksonEvent.builder()
.withData(Map.of("arbitrary_data", UUID.randomUUID().toString()))
.withEventType("event")
.build();
testEvent.put(testRoutingField, expectedRoutingField);

final List<Record<Event>> testRecords = Collections.singletonList(new Record<>(testEvent));

Map<String, Object> metadata = initializeConfigurationMetadata(null, testIndexAlias, null);
metadata.put(IndexConfiguration.ROUTING_FIELD, testRoutingField);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);
sink.output(testRecords);

final List<String> routingFields = getSearchResponseRoutingFields(testIndexAlias);
for (String routingField : routingFields) {
assertThat(routingField, equalTo(expectedRoutingField));
}
sink.shutdown();
}

@ParameterizedTest
@ValueSource(strings = {"", "info/ids/rid", "rid"})
public void testOpenSearchRouting(final String testRouting) throws IOException, InterruptedException {
Expand Down Expand Up @@ -1612,7 +1626,6 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept
securityAccessor.createUser(username, password, roleName);

final String testIdField = "someId";
final String testDocumentID = "${/someId}";
final String testId = "foo";

final List<Record<Event>> testRecords = Collections.singletonList(jsonStringToRecord(generateCustomRecordJson(testIdField, testId)));
Expand All @@ -1621,7 +1634,7 @@ public void testOutputManagementDisabled() throws IOException, InterruptedExcept
metadata.put(IndexConfiguration.INDEX_TYPE, IndexType.MANAGEMENT_DISABLED.getValue());
metadata.put(USERNAME, username);
metadata.put(PASSWORD, password);
metadata.put(IndexConfiguration.DOCUMENT_ID, testDocumentID);
metadata.put(IndexConfiguration.DOCUMENT_ID_FIELD, testId);
final OpenSearchSinkConfig openSearchSinkConfig = generateOpenSearchSinkConfigByMetadata(metadata);
final OpenSearchSink sink = createObjectUnderTest(openSearchSinkConfig, true);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public class IndexConfiguration {
public static final String FLUSH_TIMEOUT = "flush_timeout";
public static final String DOCUMENT_ID_FIELD = "document_id_field";
public static final String DOCUMENT_ID = "document_id";
public static final String ROUTING_FIELD = "routing_field";
public static final String ROUTING = "routing";
public static final String PIPELINE = "pipeline";
public static final String ISM_POLICY_FILE = "ism_policy_file";
Expand Down

0 comments on commit 6d7181f

Please sign in to comment.