From b9b543b70c57b81af6319ea5d028557b3c5c76ac Mon Sep 17 00:00:00 2001 From: Andre Kurait Date: Thu, 12 Dec 2024 14:24:10 -0600 Subject: [PATCH] Refactor based on PR comments Signed-off-by: Andre Kurait --- .../migrations/data/WorkloadGenerator.java | 12 +- .../migrations/data/WorkloadOptions.java | 14 ++- .../migrations/RfsMigrateDocuments.java | 2 +- .../migrations/RfsMigrateDocumentsTest.java | 115 ++++-------------- .../bulkload/LeaseExpirationTest.java | 10 +- .../bulkload/PerformanceVerificationTest.java | 5 +- .../common/LuceneDocumentsReader.java | 44 +++++-- .../bulkload/common/RfsDocument.java | 4 +- .../workcoordination/IWorkCoordinator.java | 4 +- .../OpenSearchWorkCoordinator.java | 18 +-- 10 files changed, 96 insertions(+), 132 deletions(-) diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java index c3d1ec6ea..0d47f18aa 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadGenerator.java @@ -25,7 +25,7 @@ public void generate(WorkloadOptions options) { // This workload creates ALL documents in memory, schedules them and waits for completion. // If larger scale is needed remove the toList() calls and stream all data. var allDocs = new ArrayList>(); - for (var workload : options.workloads) { + for (var workload : options.getWorkloads()) { var workloadInstance = workload.getNewInstance().get(); var docs = workloadInstance .indexNames() @@ -43,12 +43,12 @@ public void generate(WorkloadOptions options) { private List> generateDocs(String indexName, Workload workload, WorkloadOptions options) { // This happens inline to be sure the index exists before docs are indexed on it - var indexRequestDoc = workload.createIndex(options.index.indexSettings.deepCopy()); + var indexRequestDoc = workload.createIndex(options.getIndex().indexSettings.deepCopy()); log.atInfo().setMessage("Creating index {} with {}").addArgument(indexName).addArgument(indexRequestDoc).log(); client.createIndex(indexName, indexRequestDoc, null); var docIdCounter = new AtomicInteger(0); - var allDocs = workload.createDocs(options.totalDocs) + var allDocs = workload.createDocs(options.getTotalDocs()) .map(doc -> { log.atTrace().setMessage("Created doc for index {}: {}") .addArgument(indexName) @@ -59,14 +59,14 @@ private List> generateDocs(String indexName, Workload workl .collect(Collectors.toList()); var bulkDocGroups = new ArrayList>(); - for (int i = 0; i < allDocs.size(); i += options.maxBulkBatchSize) { - bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.maxBulkBatchSize, allDocs.size()))); + for (int i = 0; i < allDocs.size(); i += options.getMaxBulkBatchSize()) { + bulkDocGroups.add(allDocs.subList(i, Math.min(i + options.getMaxBulkBatchSize(), allDocs.size()))); } return bulkDocGroups.stream() .map(docs -> { var sendFuture = client.sendBulkRequest(indexName, docs, null).toFuture(); - if (options.refreshAfterEachWrite) { + if (options.isRefreshAfterEachWrite()) { sendFuture.thenRun(() -> client.refresh(null)); // Requests will be sent in parallel unless we wait for completion // This allows more segments to be created diff --git a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java index 74bab146c..09f28d872 100644 --- a/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java +++ b/DataGenerator/src/main/java/org/opensearch/migrations/data/WorkloadOptions.java @@ -6,18 +6,22 @@ import org.opensearch.migrations.data.workloads.Workloads; import com.beust.jcommander.Parameter; +import lombok.Getter; +import lombok.Setter; +@Getter +@Setter public class WorkloadOptions { @Parameter(names = { "--workloads", "-w" }, description = "The list of workloads to run, defaults to all available workloads.", required = false) - public List workloads = Arrays.asList(Workloads.values()); + private List workloads = Arrays.asList(Workloads.values()); @Parameter(names = { "--docs-per-workload-count" }, description = "The number of documents per workload") - public int totalDocs = 1000; + private int totalDocs = 1000; @Parameter(names = { "--max-bulk-request-batch-count" }, description = "The maximum batch count for bulk requests") - public int maxBulkBatchSize = 50; + private int maxBulkBatchSize = 50; - public final IndexOptions index = new IndexOptions(); + private final IndexOptions index = new IndexOptions(); - public boolean refreshAfterEachWrite = false; + private boolean refreshAfterEachWrite = false; } diff --git a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java index 4dac7a5fb..667a1d8ce 100644 --- a/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java +++ b/DocumentsFromSnapshotMigration/src/main/java/org/opensearch/migrations/RfsMigrateDocuments.java @@ -410,7 +410,7 @@ private static void exitOnLeaseTimeout( System.exit(PROCESS_TIMED_OUT_EXIT_CODE); } - protected static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration, + public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration, Instant leaseExpirationTime) { if (workItemTimeProvider.getLeaseAcquisitionTimeRef().get() == null || workItemTimeProvider.getDocumentMigraionStartTimeRef().get() == null) { diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java index 0e29c86a2..ed38677b7 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/RfsMigrateDocumentsTest.java @@ -2,114 +2,47 @@ import java.time.Duration; import java.time.Instant; +import java.util.stream.Stream; import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.*; class RfsMigrateDocumentsTest { + private static final Duration TEST_INITIAL_LEASE_DURATION = Duration.ofMinutes(1); + private static final double DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .025d; + private static final double INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD = .1d; - private static class TestClass extends RfsMigrateDocuments { - public static int getSuccessorNextAcquisitionLeaseExponent(WorkItemTimeProvider workItemTimeProvider, Duration initialLeaseDuration, - Instant leaseExpirationTime) { - return RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); - } - } - - @Test - public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThreshold() { - WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); - - // Lease at 40 minutes, shard prep 59 seconds, successor lease should be decreased since shard prep is < 2.5% - // and exponent is > 0 - var existingLeaseExponent = 2; - var shardPrepTime = Duration.ofSeconds(59); - Duration initialLeaseDuration = Duration.ofMinutes(10); - var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); - - workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); - workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); - Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple)); - - int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); - - Assertions.assertEquals(existingLeaseExponent - 1, successorNextAcquisitionLeaseExponent, "Should decrease successorExponent"); - } - - - @Test - public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanLowerThresholdWith0Exponent() { - WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); - - var shardPrepTime = Duration.ofSeconds(1); - var existingLeaseExponent = 0; - var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); - - workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); - workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); - Duration initialLeaseDuration = Duration.ofMinutes(10); - Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple)); - - int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); - - Assertions.assertEquals(0, successorNextAcquisitionLeaseExponent, "Should return 0 for successorExponent"); - } - - - @Test - public void testGetSuccessorNextAcquisitionLeaseExponent_LessThanUpperThreshold() { + @ParameterizedTest + @MethodSource("provideTestParameters") + void testGetSuccessorNextAcquisitionLeaseExponent(int existingLeaseExponent, int expectedSuccessorExponent, double shardPrepFraction, String message) { WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); - var shardPrepTime = Duration.ofSeconds(59); - var existingLeaseExponent = 0; - var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); - - workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); - workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); - Duration initialLeaseDuration = Duration.ofMinutes(10); - Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple)); + int initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); + Duration leaseDuration = TEST_INITIAL_LEASE_DURATION.multipliedBy(initialLeaseMultiple); - int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); - - Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is less than 10% of lease duration"); - } - - @Test - public void testGetSuccessorNextAcquisitionLeaseExponent_EqualToUpperThreshold() { - WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); - - var shardPrepTime = Duration.ofSeconds(60); - var existingLeaseExponent = 0; - var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); + Duration shardPrepTime = Duration.ofNanos((long)(leaseDuration.toNanos() * shardPrepFraction)); workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); - Duration initialLeaseDuration = Duration.ofMinutes(10); - Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple)); + Instant leaseExpirationTime = Instant.EPOCH.plus(leaseDuration); - int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); + int successorNextAcquisitionLeaseExponent = RfsMigrateDocuments.getSuccessorNextAcquisitionLeaseExponent( + workItemTimeProvider, TEST_INITIAL_LEASE_DURATION, leaseExpirationTime); - Assertions.assertEquals(existingLeaseExponent, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent when shard prep time is equal to 10% of lease duration"); + Assertions.assertEquals(expectedSuccessorExponent, successorNextAcquisitionLeaseExponent, message); } - @Test - public void testGetSuccessorNextAcquisitionLeaseExponent_ExceedsUpperThreshold() { - WorkItemTimeProvider workItemTimeProvider = new WorkItemTimeProvider(); - - var shardPrepTime = Duration.ofSeconds(61); - var existingLeaseExponent = 0; - var initialLeaseMultiple = (int) Math.pow(2, existingLeaseExponent); - - workItemTimeProvider.getLeaseAcquisitionTimeRef().set(Instant.EPOCH); - workItemTimeProvider.getDocumentMigraionStartTimeRef().set(Instant.EPOCH.plus(shardPrepTime)); - Duration initialLeaseDuration = Duration.ofMinutes(10); - Instant leaseExpirationTime = Instant.EPOCH.plus(initialLeaseDuration.multipliedBy(initialLeaseMultiple)); - - int successorNextAcquisitionLeaseExponent = TestClass.getSuccessorNextAcquisitionLeaseExponent(workItemTimeProvider, initialLeaseDuration, leaseExpirationTime); - - Assertions.assertEquals(existingLeaseExponent + 1, successorNextAcquisitionLeaseExponent, "Should return existingLeaseExponent + 1 when shard prep time is greater than to 10% of lease duration"); + static Stream provideTestParameters() { + return Stream.of( + Arguments.of(2, 1, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should decrease successorExponent when shard prep time is less than decrease threshold for lease duration"), + Arguments.of(0, 0, DECREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return 0 for successorExponent when shard prep time is less than decrease threshold for lease duration and existingLeaseExponent is 0"), + Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD - 0.001, "Should return existingLeaseExponent when shard prep time is less than increase threshold for lease duration"), + Arguments.of(1, 1, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD, "Should return existingLeaseExponent when shard prep time is equal to increase threshold for lease duration"), + Arguments.of(1, 2, INCREASE_LEASE_DURATION_SHARD_SETUP_THRESHOLD + 0.001, "Should return existingLeaseExponent + 1 when shard prep time is greater than increase threshold for lease duration") + ); } } diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java index 466fe738e..610997aec 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/LeaseExpirationTest.java @@ -109,12 +109,12 @@ private void runTestProcessWithCheckpoint(int expectedInitialExitCode, int expec ); sourceClusterOperations.createIndex("geonames", body); - workloadOptions.totalDocs = indexDocCount; - workloadOptions.workloads = List.of(Workloads.GEONAMES); - workloadOptions.index.indexSettings.put(IndexOptions.PROP_NUMBER_OF_SHARDS, shards); + workloadOptions.setTotalDocs(indexDocCount); + workloadOptions.setWorkloads(List.of(Workloads.GEONAMES)); + workloadOptions.getIndex().indexSettings.put(IndexOptions.PROP_NUMBER_OF_SHARDS, shards); // Segments will be created on each refresh which tests segment ordering logic - workloadOptions.refreshAfterEachWrite = forceMoreSegments; - workloadOptions.maxBulkBatchSize = forceMoreSegments ? 10 : 1000; + workloadOptions.setRefreshAfterEachWrite(forceMoreSegments); + workloadOptions.setMaxBulkBatchSize(forceMoreSegments ? 10 : 1000); generator.generate(workloadOptions); // Create the snapshot from the source cluster diff --git a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java index b5a9b7db5..03108fbd2 100644 --- a/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java +++ b/DocumentsFromSnapshotMigration/src/test/java/org/opensearch/migrations/bulkload/PerformanceVerificationTest.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import org.opensearch.migrations.bulkload.common.BulkDocSection; import org.opensearch.migrations.bulkload.common.DocumentReindexer; @@ -73,9 +74,9 @@ protected DirectoryReader getReader() { } @Override - protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) { + protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase, Supplier getSegmentReaderDebugInfo) { ingestedDocuments.incrementAndGet(); - return super.getDocument(reader, luceneDocId, isLive, segmentDocBase); + return super.getDocument(reader, luceneDocId, isLive, segmentDocBase, () -> "TestReaderWrapper(" + getSegmentReaderDebugInfo.get() + ")"); } }; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java index 17dea3b23..80df9e5b4 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/LuceneDocumentsReader.java @@ -5,7 +5,9 @@ import java.nio.file.Path; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Function; +import java.util.function.Supplier; import org.opensearch.migrations.cluster.ClusterSnapshotReader; @@ -130,7 +132,7 @@ public int compare(LeafReader leafReader1, LeafReader leafReader2) { } return leafDetails.toString(); }; - log.atError().setMessage("Unexpected equality during leafReader sorting, expected sort to yield no equality " + + log.atWarn().setMessage("Unexpected equality during leafReader sorting, expected sort to yield no equality " + "to ensure consistent segment ordering. This may cause missing documents if both segments" + "contains docs. LeafReader1DebugInfo: {} \nLeafReader2DebugInfo: {}") .addArgument(getLeafReaderDebugInfo.apply(leafReader1)) @@ -242,6 +244,14 @@ Flux readDocsFromSegment(LeafReaderContext leafReaderContext, int startDocIdInSegment = Math.max(docStartingId - segmentDocBase, 0); int numDocsToProcessInSegment = segmentReader.maxDoc() - startDocIdInSegment; + // For any errors, we want to log the segment reader debug info so we can see which segment is causing the issue. + // This allows us to pass the supplier to getDocument without having to recompute the debug info + // every time if requested multiple times. + var segmentReaderDebugInfoCache = new AtomicReference(); + final Supplier getSegmentReaderDebugInfo = () -> segmentReaderDebugInfoCache.updateAndGet(s -> + s == null ? segmentReader.toString() : s + ); + log.atInfo().setMessage("For segment: {}, migrating from doc: {}. Will process {} docs in segment.") .addArgument(leafReaderContext) .addArgument(startDocIdInSegment) @@ -253,14 +263,20 @@ Flux readDocsFromSegment(LeafReaderContext leafReaderContext, try { if (liveDocs == null || liveDocs.get(docIdx)) { // Get document, returns null to skip malformed docs - RfsLuceneDocument document = getDocument(segmentReader, docIdx, true, segmentDocBase); + RfsLuceneDocument document = getDocument(segmentReader, docIdx, true, segmentDocBase, getSegmentReaderDebugInfo); return Mono.justOrEmpty(document); // Emit only non-null documents } else { return Mono.empty(); // Skip non-live documents } } catch (Exception e) { // Handle individual document read failures gracefully - return Mono.error(new RuntimeException("Error reading document at index: " + docIdx, e)); + log.atError().setMessage("Error reading document from reader {} with index: {}") + .addArgument(getSegmentReaderDebugInfo) + .addArgument(docIdx) + .setCause(e) + .log(); + return Mono.error(new RuntimeException("Error reading document from reader with index " + docIdx + + " from segment " + getSegmentReaderDebugInfo.get(), e)); } }).subscribeOn(scheduler), concurrency, 1) @@ -273,7 +289,7 @@ protected DirectoryReader wrapReader(DirectoryReader reader, boolean softDeletes return reader; } - protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase) { + protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boolean isLive, int segmentDocBase, final Supplier getSegmentReaderDebugInfo) { Document document; try { document = reader.document(luceneDocId); @@ -319,21 +335,31 @@ protected RfsLuceneDocument getDocument(IndexReader reader, int luceneDocId, boo } } if (openSearchDocId == null) { - log.atError().setMessage("Document with index {} does not have an id. Skipping") - .addArgument(luceneDocId).log(); + log.atWarn().setMessage("Skipping document with index {} from segment {} from source {}, it does not have an referenceable id.") + .addArgument(luceneDocId) + .addArgument(getSegmentReaderDebugInfo) + .addArgument(indexDirectoryPath) + .log(); return null; // Skip documents with missing id } if (sourceBytes == null || sourceBytes.bytes.length == 0) { - log.atWarn().setMessage("Document {} doesn't have the _source field enabled") - .addArgument(openSearchDocId).log(); + log.atWarn().setMessage("Skipping document with index {} from segment {} from source {}, it does not have the _source field enabled.") + .addArgument(luceneDocId) + .addArgument(getSegmentReaderDebugInfo) + .addArgument(indexDirectoryPath) + .log(); return null; // Skip these } log.atDebug().setMessage("Reading document {}").addArgument(openSearchDocId).log(); } catch (RuntimeException e) { StringBuilder errorMessage = new StringBuilder(); - errorMessage.append("Unable to parse Document id from Document. The Document's Fields: "); + errorMessage.append("Unable to parse Document id from Document with index ") + .append(luceneDocId) + .append(" from segment ") + .append(getSegmentReaderDebugInfo.get()) + .append(". The Document's Fields: "); document.getFields().forEach(f -> errorMessage.append(f.name()).append(", ")); log.atError().setCause(e).setMessage("{}").addArgument(errorMessage).log(); return null; // Skip documents with invalid id diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java index e6a7344e1..cf775823e 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/common/RfsDocument.java @@ -1,7 +1,7 @@ package org.opensearch.migrations.bulkload.common; import java.util.Map; -import java.util.function.Function; +import java.util.function.UnaryOperator; import lombok.AllArgsConstructor; @@ -32,7 +32,7 @@ public static RfsDocument fromLuceneDocument(RfsLuceneDocument doc, String index ); } - public static RfsDocument transform(Function, Map> transformer, RfsDocument doc) { + public static RfsDocument transform(UnaryOperator> transformer, RfsDocument doc) { return new RfsDocument( doc.luceneDocNumber, BulkDocSection.fromMap(transformer.apply(doc.document.toMap())) diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java index 1b4c19331..8c0dc5ea0 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/IWorkCoordinator.java @@ -5,7 +5,7 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; -import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; import org.opensearch.migrations.bulkload.tracing.IWorkCoordinationContexts; @@ -111,7 +111,7 @@ void completeWorkItem( */ void createSuccessorWorkItemsAndMarkComplete( String workItemId, - ArrayList successorWorkItemIds, + List successorWorkItemIds, int initialNextAcquisitionLeaseExponent, Supplier contextSupplier ) throws IOException, InterruptedException; diff --git a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java index 11c83f3e6..3c37a5029 100644 --- a/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java +++ b/RFS/src/main/java/org/opensearch/migrations/bulkload/workcoordination/OpenSearchWorkCoordinator.java @@ -114,7 +114,7 @@ private boolean waitExtendsPastLease(Duration nextRetryAtDuration) { static class WorkItemWithPotentialSuccessors { final String workItemId; final Instant leaseExpirationTime; - final ArrayList successorWorkItemIds; + final List successorWorkItemIds; } private final long tolerableClientServerClockDifferenceSeconds; @@ -394,11 +394,11 @@ public boolean createUnassignedWorkItem( } } - private ArrayList getSuccessorItemsIfPresent(JsonNode responseDoc) { + private List getSuccessorItemsIfPresent(JsonNode responseDoc) { if (responseDoc.has(SUCCESSOR_ITEMS_FIELD_NAME)) { return new ArrayList<>(Arrays.asList(responseDoc.get(SUCCESSOR_ITEMS_FIELD_NAME).asText().split(SUCCESSOR_ITEM_DELIMITER))); } - return null; + return List.of(); } @Override @@ -673,8 +673,8 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItemUnsafe() final var resultHitsUpper = objectMapper.readTree(response.getPayloadBytes()).path("hits"); if (resultHitsUpper.isMissingNode()) { - log.warn("Couldn't find the top level 'hits' field, returning null"); - return null; + log.warn("Couldn't find the top level 'hits' field, returning no work item"); + throw new AssignedWorkDocumentNotFoundException(response); } final var numDocs = resultHitsUpper.path("total").path("value").longValue(); if (numDocs == 0) { @@ -737,7 +737,7 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItem(LeaseChecker leaseCh } } - private void updateWorkItemWithSuccessors(String workItemId, ArrayList successorWorkItemIds) throws IOException, NonRetryableException { + private void updateWorkItemWithSuccessors(String workItemId, List successorWorkItemIds) throws IOException, NonRetryableException { final var updateSuccessorWorkItemsTemplate = "{\n" + " \"script\": {\n" + " \"lang\": \"painless\",\n" @@ -805,7 +805,7 @@ private void updateWorkItemWithSuccessors(String workItemId, ArrayList s // API which creates a document only if the specified ID doesn't yet exist. It is distinct from createUnassignedWorkItem // because it is an expected outcome of this function that sometimes the work item is already created. That function // uses `createOrUpdateLease`, whereas this function deliberately never modifies an already-existing work item. - private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemIds, int nextAcquisitionLeaseExponent) throws IOException, IllegalStateException { + private void createUnassignedWorkItemsIfNonexistent(List workItemIds, int nextAcquisitionLeaseExponent) throws IOException, IllegalStateException { String workItemBodyTemplate = "{\"nextAcquisitionLeaseExponent\":" + nextAcquisitionLeaseExponent + ", \"scriptVersion\":\"" + SCRIPT_VERSION_TEMPLATE + "\", " + "\"creatorId\":\"" + WORKER_ID_TEMPLATE + "\", \"" + EXPIRATION_FIELD_NAME + "\":0 }"; String workItemBody = workItemBodyTemplate.replace(SCRIPT_VERSION_TEMPLATE, "2.0").replace(WORKER_ID_TEMPLATE, workerId); @@ -863,7 +863,7 @@ private void createUnassignedWorkItemsIfNonexistent(ArrayList workItemId @Override public void createSuccessorWorkItemsAndMarkComplete( String workItemId, - ArrayList successorWorkItemIds, + List successorWorkItemIds, int successorNextAcquisitionLeaseExponent, Supplier contextSupplier ) throws IOException, InterruptedException, IllegalStateException { @@ -1038,7 +1038,7 @@ private void refresh(Supplier context case SUCCESSFUL_ACQUISITION: ctx.recordAssigned(); var workItem = getAssignedWorkItem(leaseChecker, ctx); - if (workItem.successorWorkItemIds != null) { + if (!workItem.successorWorkItemIds.isEmpty()) { // continue the previous work of creating the successors and marking this item as completed. createSuccessorWorkItemsAndMarkComplete(workItem.workItemId, workItem.successorWorkItemIds, // in cases of partial successor creation, create with 0 nextAcquisitionLeaseExponent to use default