Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added version-specific OpenSearchWorkCoordinators #1226

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.IWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.ScopedWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkCoordinatorFactory;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.ShardWorkPreparer;
Expand Down Expand Up @@ -181,6 +181,12 @@ public static class Args {
description = ("Version of the source cluster."))
public Version sourceVersion = Version.fromString("ES 7.10");

@Parameter(required = true,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Target version detection build into the OpenSearch client, can we use that instead of requiring a parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Easy to do, thanks for the idea! I'd forgotten about this.

names = { "--target-version" },
converter = VersionConverter.class,
description = ("Version of the target cluster."))
public Version targetVersion = Version.fromString("OS 2.11");

@ParametersDelegate
private DocParams docTransformationParams = new DocParams();
}
Expand Down Expand Up @@ -288,7 +294,8 @@ public static void main(String[] args) throws Exception {
var progressCursor = new AtomicReference<WorkItemCursor>();
var cancellationRunnableRef = new AtomicReference<Runnable>();
var workItemTimeProvider = new WorkItemTimeProvider();
try (var workCoordinator = new OpenSearchWorkCoordinator(
var coordinatorFactory = new WorkCoordinatorFactory(arguments.targetVersion);
try (var workCoordinator = coordinatorFactory.get(
new CoordinateWorkHttpClient(connectionContext),
TOLERABLE_CLIENT_SERVER_CLOCK_DIFFERENCE_SECONDS,
workerId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ private void migrationDocumentsWithClusters(
clockJitter,
testDocMigrationContext,
sourceCluster.getContainerVersion().getVersion(),
targetCluster.getContainerVersion().getVersion(),
false
)
)
Expand Down Expand Up @@ -178,5 +179,5 @@ private void checkDocsWithRouting(
Assertions.assertEquals("1", routing);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ public void testDocumentMigration(
clockJitter,
testDocMigrationContext,
sourceVersion.getVersion(),
targetVersion.getVersion(),
compressionEnabled
),
executorService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.opensearch.migrations.bulkload.http.SearchClusterRequests;
import org.opensearch.migrations.bulkload.workcoordination.CoordinateWorkHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.LeaseExpireTrigger;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;
import org.opensearch.migrations.bulkload.workcoordination.WorkCoordinatorFactory;
import org.opensearch.migrations.bulkload.workcoordination.WorkItemTimeProvider;
import org.opensearch.migrations.bulkload.worker.DocumentsRunner;
import org.opensearch.migrations.bulkload.worker.WorkItemCursor;
Expand Down Expand Up @@ -144,7 +144,8 @@ public static int migrateDocumentsSequentially(
AtomicInteger runCounter,
Random clockJitter,
DocumentMigrationTestContext testContext,
Version version,
Version sourceVersion,
Version targetVersion,
boolean compressionEnabled
) {
for (int runNumber = 1; ; ++runNumber) {
Expand All @@ -156,7 +157,8 @@ public static int migrateDocumentsSequentially(
targetAddress,
clockJitter,
testContext,
version,
sourceVersion,
targetVersion,
compressionEnabled
);
if (workResult == DocumentsRunner.CompletionStatus.NOTHING_DONE) {
Expand Down Expand Up @@ -202,7 +204,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
String targetAddress,
Random clockJitter,
DocumentMigrationTestContext context,
Version version,
Version sourceVersion,
Version targetVersion,
boolean compressionEnabled
) throws RfsMigrateDocuments.NoWorkLeftException {
var tempDir = Files.createTempDirectory("opensearchMigrationReindexFromSnapshot_test_lucene");
Expand All @@ -219,7 +222,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
return d;
};

var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(version, sourceRepo);
var sourceResourceProvider = ClusterProviderRegistry.getSnapshotReader(sourceVersion, sourceRepo);

DefaultSourceRepoAccessor repoAccessor = new DefaultSourceRepoAccessor(sourceRepo);
SnapshotShardUnpacker.Factory unpackerFactory = new SnapshotShardUnpacker.Factory(
Expand All @@ -238,7 +241,8 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
var defaultDocTransformer = new TransformationLoader().getTransformerFactoryLoader(RfsMigrateDocuments.DEFAULT_DOCUMENT_TRANSFORMATION_CONFIG);

AtomicReference<WorkItemCursor> progressCursor = new AtomicReference<>();
try (var workCoordinator = new OpenSearchWorkCoordinator(
var coordinatorFactory = new WorkCoordinatorFactory(targetVersion);
try (var workCoordinator = coordinatorFactory.get(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.build()
Expand All @@ -255,7 +259,7 @@ public static DocumentsRunner.CompletionStatus migrateDocumentsWithOneWorker(
.build()
.toConnectionContext()), 1000, Long.MAX_VALUE, 1, defaultDocTransformer),
progressCursor,
new OpenSearchWorkCoordinator(
coordinatorFactory.get(
new CoordinateWorkHttpClient(ConnectionContextTestParams.builder()
.host(targetAddress)
.build()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package org.opensearch.migrations.bulkload.version_os_2_11;

import java.time.Clock;
import java.util.function.Consumer;

import org.opensearch.migrations.bulkload.workcoordination.AbstractedHttpClient;
import org.opensearch.migrations.bulkload.workcoordination.OpenSearchWorkCoordinator;

import com.fasterxml.jackson.databind.JsonNode;

public class OpenSearchWorkCoordinator_OS_2_11 extends OpenSearchWorkCoordinator {
public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId);
}

public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock);
}


public OpenSearchWorkCoordinator_OS_2_11(
AbstractedHttpClient httpClient,
long tolerableClientServerClockDifferenceSeconds,
String workerId,
Clock clock,
Consumer<WorkItemAndDuration> workItemConsumer
) {
super(httpClient, tolerableClientServerClockDifferenceSeconds, workerId, clock, workItemConsumer);
}

protected String getCoordinationIndexSettingsBody(){
return "{\n"
+ " \"settings\": {\n"
+ " \"index\": {"
+ " \"number_of_shards\": 1,\n"
+ " \"number_of_replicas\": 1\n"
+ " }\n"
+ " },\n"
+ " \"mappings\": {\n"
+ " \"properties\": {\n"
+ " \"" + EXPIRATION_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"leaseHolderId\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
}

protected String getPathForUpdates(String workItemId) {
return INDEX_NAME + "/_update/" + workItemId;
}

protected String getPathForGets(String workItemId) {
return INDEX_NAME + "/_doc/" + workItemId;
}

protected String getPathForSearches() {
return INDEX_NAME + "/_search";
}

protected int getTotalHitsFromSearchResponse(JsonNode searchResponse) {
return searchResponse.path("hits").path("total").path("value").asInt();
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import lombok.NonNull;
import lombok.SneakyThrows;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class OpenSearchWorkCoordinator implements IWorkCoordinator {
// Create a stable logger that descendants can use, and we can predictably read from in tests
protected static final Logger log = LoggerFactory.getLogger(OpenSearchWorkCoordinator.class);

@Slf4j
public class OpenSearchWorkCoordinator implements IWorkCoordinator {
public static final String INDEX_NAME = ".migrations_working_state";
public static final int MAX_REFRESH_RETRIES = 6;
public static final int MAX_SETUP_RETRIES = 6;
Expand Down Expand Up @@ -194,38 +197,23 @@ private static void retryWithExponentialBackoff(
}
}

public String getLoggerName() {
return log.getName();
}

protected abstract String getCoordinationIndexSettingsBody();

protected abstract String getPathForUpdates(String workItemId);

protected abstract String getPathForGets(String workItemId);

protected abstract String getPathForSearches();

protected abstract int getTotalHitsFromSearchResponse(JsonNode searchResponse);

public void setup(Supplier<IWorkCoordinationContexts.IInitializeCoordinatorStateContext> contextSupplier)
throws IOException, InterruptedException {
var body = "{\n"
+ " \"settings\": {\n"
+ " \"index\": {"
+ " \"number_of_shards\": 1,\n"
+ " \"number_of_replicas\": 1\n"
+ " }\n"
+ " },\n"
+ " \"mappings\": {\n"
+ " \"properties\": {\n"
+ " \"" + EXPIRATION_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"" + COMPLETED_AT_FIELD_NAME + "\": {\n"
+ " \"type\": \"long\"\n"
+ " },\n"
+ " \"leaseHolderId\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"status\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " },\n"
+ " \"" + SUCCESSOR_ITEMS_FIELD_NAME + "\": {\n"
+ " \"type\": \"keyword\",\n"
+ " \"norms\": false\n"
+ " }\n"
+ " }\n"
+ " }\n"
+ "}\n";
var body = getCoordinationIndexSettingsBody();

try {
doUntil("setup-" + INDEX_NAME, 100, MAX_SETUP_RETRIES, contextSupplier::get, () -> {
Expand Down Expand Up @@ -345,7 +333,7 @@ AbstractedHttpClient.AbstractHttpResponse createOrUpdateLeaseForDocument(

return httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down Expand Up @@ -419,7 +407,7 @@ public WorkAcquisitionOutcome createOrUpdateLeaseForWorkItem(
} else {
final var httpResponse = httpClient.makeJsonRequest(
AbstractedHttpClient.GET_METHOD,
INDEX_NAME + "/_doc/" + workItemId,
getPathForGets(workItemId),
null,
null
);
Expand Down Expand Up @@ -471,7 +459,7 @@ public void completeWorkItem(

var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down Expand Up @@ -513,7 +501,7 @@ private int numWorkItemsNotYetCompleteInternal(
+ "\"size\": 0" // This sets the number of items to include in the `hits.hits` array, but doesn't affect
+ "}"; // the integer value in `hits.total.value`

var path = INDEX_NAME + "/_search";
var path = getPathForSearches();
var response = httpClient.makeJsonRequest(AbstractedHttpClient.POST_METHOD, path, null, queryBody);
var statusCode = response.getStatusCode();
if (statusCode != 200) {
Expand All @@ -525,7 +513,7 @@ private int numWorkItemsNotYetCompleteInternal(
);
}
var payload = objectMapper.readTree(response.getPayloadBytes());
var totalHits = payload.path("hits").path("total").path("value").asInt();
var totalHits = getTotalHitsFromSearchResponse(payload);
// In the case where totalHits is 0, we need to be particularly sure that we're not missing data. The `relation`
// for the total must be `eq` or we need to throw an error because it's not safe to rely on this data.
if (totalHits == 0 && !payload.path("hits").path("total").path("relation").textValue().equals("eq")) {
Expand Down Expand Up @@ -662,7 +650,7 @@ private WorkItemWithPotentialSuccessors getAssignedWorkItemUnsafe()
final var body = queryWorkersAssignedItemsTemplate.replace(WORKER_ID_TEMPLATE, workerId);
var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_search",
getPathForSearches(),
null,
body
);
Expand Down Expand Up @@ -770,7 +758,7 @@ private void updateWorkItemWithSuccessors(String workItemId, List<String> succes
.addArgument(workItemId).log();
var response = httpClient.makeJsonRequest(
AbstractedHttpClient.POST_METHOD,
INDEX_NAME + "/_update/" + workItemId,
getPathForUpdates(workItemId),
null,
body
);
Expand Down
Loading
Loading