Skip to content

Commit

Permalink
Added version-specific OpenSearchWorkCoordinators
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Helma <[email protected]>
  • Loading branch information
chelma committed Jan 9, 2025
1 parent 9010fcf commit 2262d1d
Show file tree
Hide file tree
Showing 11 changed files with 302 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkCoordinatorFactory;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
Expand Down Expand Up @@ -181,6 +181,12 @@ public static class Args {
description = ("Version of the source cluster."))
public Version sourceVersion = Version.fromString("ES 7.10");

@Parameter(required = true,
names = { "--target-version" },
converter = VersionConverter.class,
description = ("Version of the target cluster."))
public Version targetVersion = Version.fromString("OS 2.11");

@ParametersDelegate
private DocParams docTransformationParams = new DocParams();
}
Expand Down Expand Up @@ -288,7 +294,8 @@ public static void main(String[] args) throws Exception {
var progressCursor = new AtomicReference<WorkItemCursor>();
var cancellationRunnableRef = new AtomicReference<Runnable>();
var workItemTimeProvider = new WorkItemTimeProvider();
try (var workCoordinator = new OpenSearchWorkCoordinator(
var coordinatorFactory = new WorkCoordinatorFactory(arguments.targetVersion);
try (var workCoordinator = coordinatorFactory.get(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private void migrationDocumentsWithClusters(
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
targetCluster.getContainerVersion().getVersion(),
false
)
)
Expand Down Expand Up @@ -179,4 +180,29 @@ private void checkDocsWithRouting(
}
}

// private static Stream<Arguments> scenarios_experimental() {
// var scenarios = Stream.<Arguments>builder();

// for (var sourceCluster : SupportedClusters.sources()) {
// for (var targetCluster : SupportedClusters.targets_experimental()) {
// scenarios.add(Arguments.of(sourceCluster, targetCluster));
// }
// }

// return scenarios.build();
// }

// @ParameterizedTest(name = "Source {0} to Target {1}")
// @MethodSource(value = "scenarios_experimental")
// public void migrationDocumentsExperimental(
// final SearchClusterContainer.ContainerVersion sourceVersion,
// final SearchClusterContainer.ContainerVersion targetVersion) throws Exception {
// try (
// final var sourceCluster = new SearchClusterContainer(sourceVersion);
// final var targetCluster = new SearchClusterContainer(targetVersion)
// ) {
// migrationDocumentsWithClusters(sourceCluster, targetCluster);
// }
// }

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testDocumentMigration(
clockJitter,
testDocMigrationContext,
sourceVersion.getVersion(),
targetVersion.getVersion(),
compressionEnabled
),
executorService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkCoordinatorFactory;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
Expand Down Expand Up @@ -144,7 +144,8 @@ public static int migrateDocumentsSequentially(
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext,
Version version,
Version sourceVersion,
Version targetVersion,
boolean compressionEnabled
) {
for (int runNumber = 1; ; ++runNumber) {
Expand All @@ -156,7 +157,8 @@ public static int migrateDocumentsSequentially(
targetAddress,
clockJitter,
testContext,
version,
sourceVersion,
targetVersion,
compressionEnabled
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
Expand Down Expand Up @@ -202,7 +204,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context,
Version version,
Version sourceVersion,
Version targetVersion,
boolean compressionEnabled
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
Expand All @@ -219,7 +222,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
return d;
};

var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(version, sourceRepo);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(sourceVersion, sourceRepo);

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
Expand All @@ -238,7 +241,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
var coordinatorFactory = new WorkCoordinatorFactory(targetVersion);
try (var workCoordinator = coordinatorFactory.get(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.build()
Expand All @@ -255,7 +259,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
progressCursor,
new OpenSearchWorkCoordinator(
coordinatorFactory.get(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import java.time.Clock;
import java.util.function.Consumer;

import org.opensearch.migrations.bulkload.workcoordination.AbstractedHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;

import com.fasterxml.jackson.databind.JsonNode;

public class OpenSearchWorkCoordinator_OS_2_11 extends OpenSearchWorkCoordinator {
public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
}

public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock);
}


public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock,
Consumer<WorkItemAndDuration> workItemConsumer
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
}

protected String getCoordinationIndexSettingsBody(){
return "{\n"
+ " \"settings\": {\n"
+ " \"index\": {"
+ " \"number_of_shards\": 1,\n"
+ " \"number_of_replicas\": 1\n"
+ " }\n"
+ " },\n"
+ " \"mappings\": {\n"
+ " \"properties\": {\n"
+ " \"" + EXPIRATION_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"leaseHolderId\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
}

protected String getPathForUpdates(String workItemId) {
return INDEX_NAME + "/_update/" + workItemId;
}

protected String getPathForGets(String workItemId) {
return INDEX_NAME + "/_doc/" + workItemId;
}

protected String getPathForSearches() {
return INDEX_NAME + "/_search";
}

protected int getTotalHitsFromSearchResponse(JsonNode searchResponse) {
return searchResponse.path("hits").path("total").path("value").asInt();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OpenSearchWorkCoordinator implements IWorkCoordinator {
// Create a stable logger that descendants can use, and we can predictable read from in tests
protected static final Logger log = LoggerFactory.getLogger(OpenSearchWorkCoordinator.class);

@Slf4j
public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String INDEX_NAME = ".migrations_working_state";
public static final int MAX_REFRESH_RETRIES = 6;
public static final int MAX_SETUP_RETRIES = 6;
Expand Down Expand Up @@ -194,38 +197,23 @@ private static void retryWithExponentialBackoff(
}
}

public String getLoggerName() {
return log.getName();
}

protected abstract String getCoordinationIndexSettingsBody();

protected abstract String getPathForUpdates(String workItemId);

protected abstract String getPathForGets(String workItemId);

protected abstract String getPathForSearches();

protected abstract int getTotalHitsFromSearchResponse(JsonNode searchResponse);

public void setup(Supplier<IWorkCoordinationContexts.IInitializeCoordinatorStateContext> contextSupplier)
throws IOException, InterruptedException {
var body = "{\n"
+ " \"settings\": {\n"
+ " \"index\": {"
+ " \"number_of_shards\": 1,\n"
+ " \"number_of_replicas\": 1\n"
+ " }\n"
+ " },\n"
+ " \"mappings\": {\n"
+ " \"properties\": {\n"
+ " \"" + EXPIRATION_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"leaseHolderId\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
var body = getCoordinationIndexSettingsBody();

try {
doUntil("setup-" + INDEX_NAME, 100, MAX_SETUP_RETRIES, contextSupplier::get, () -> {
Expand Down Expand Up @@ -345,7 +333,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument(

return httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down Expand Up @@ -419,7 +407,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
} else {
final var httpResponse = httpClient.makeJsonRequest(
AbstractedHttpClient.GET_METHOD,
INDEX_NAME + "/_doc/" + workItemId,
getPathForGets(workItemId),
null,
null
);
Expand Down Expand Up @@ -471,7 +459,7 @@ public void completeWorkItem(

var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down Expand Up @@ -513,7 +501,7 @@ private int numWorkItemsNotYetCompleteInternal(
+ "\"size\": 0" // This sets the number of items to include in the `hits.hits` array, but doesn't affect
+ "}"; // the integer value in `hits.total.value`

var path = INDEX_NAME + "/_search";
var path = getPathForSearches();
var response = httpClient.makeJsonRequest(AbstractedHttpClient.POST_METHOD, path, null, queryBody);
var statusCode = response.getStatusCode();
if (statusCode != 200) {
Expand All @@ -525,7 +513,7 @@ private int numWorkItemsNotYetCompleteInternal(
);
}
var payload = objectMapper.readTree(response.getPayloadBytes());
var totalHits = payload.path("hits").path("total").path("value").asInt();
var totalHits = getTotalHitsFromSearchResponse(payload);
// In the case where totalHits is 0, we need to be particularly sure that we're not missing data. The `relation`
// for the total must be `eq` or we need to throw an error because it's not safe to rely on this data.
if (totalHits == 0 && !payload.path("hits").path("total").path("relation").textValue().equals("eq")) {
Expand Down Expand Up @@ -662,7 +650,7 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItemUnsafe()
final var body = queryWorkersAssignedItemsTemplate.replace(WORKER_ID_TEMPLATE, workerId);
var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_search",
getPathForSearches(),
null,
body
);
Expand Down Expand Up @@ -770,7 +758,7 @@ private void updateWorkItemWithSuccessors(String workItemId, List<String> succes
.addArgument(workItemId).log();
var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down
Loading

0 comments on commit 2262d1d

Please sign in to comment.