Skip to content

Commit

Permalink
Add segment replication polling for search replicas.
Browse files Browse the repository at this point in the history
This PR introduces polling for search replicas at the existing refresh interval.
If any index has a search replica will trigger segrep on an interval.

Signed-off-by: Marc Handalian <[email protected]>
  • Loading branch information
mch2 committed Sep 4, 2024
1 parent 3fc0139 commit dc95d38
Show file tree
Hide file tree
Showing 11 changed files with 397 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.indices.replication;

import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.After;
import org.junit.Before;

import java.nio.file.Path;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";
protected Path absolutePath;

private Boolean useRemoteStore;

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
}

@Override
protected Settings nodeSettings(int nodeOrdinal) {
if (useRemoteStore) {
if (absolutePath == null) {
absolutePath = randomRepoPath().toAbsolutePath();
}
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
.build();
}
return super.nodeSettings(nodeOrdinal);
}

@After
public void teardown() {
if (useRemoteStore) {
clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
}
}

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put(IndexMetadata.SETTING_NUMBER_OF_SEARCH_REPLICAS, 1)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder().put(super.featureFlagSettings()).put(FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL, true).build();
}

public void testReplication() throws Exception {
internalCluster().startClusterManagerOnlyNode();
final String primary = internalCluster().startDataOnlyNode();
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replica = internalCluster().startDataOnlyNode();
ensureGreen(INDEX_NAME);

final int docCount = 10;
for (int i = 0; i < docCount; i++) {
client().prepareIndex(INDEX_NAME).setId(Integer.toString(i)).setSource("field", "value" + i).execute().get();
}
refresh(INDEX_NAME);
waitForSearchableDocs(docCount, primary, replica);
}

}
54 changes: 53 additions & 1 deletion server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.opensearch.index.engine.EngineFactory;
import org.opensearch.index.mapper.MapperService;
import org.opensearch.index.shard.IndexEventListener;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.shard.IndexingOperationListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.index.similarity.SimilarityService;
Expand Down Expand Up @@ -629,6 +630,56 @@ public IndexService newIndexService(
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings
) throws IOException {
return newIndexService(
indexCreationContext,
environment,
xContentRegistry,
shardStoreDeleter,
circuitBreakerService,
bigArrays,
threadPool,
scriptService,
clusterService,
client,
indicesQueryCache,
mapperRegistry,
indicesFieldDataCache,
namedWriteableRegistry,
idFieldDataEnabled,
valuesSourceRegistry,
remoteDirectoryFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}
);
}

public IndexService newIndexService(
IndexService.IndexCreationContext indexCreationContext,
NodeEnvironment environment,
NamedXContentRegistry xContentRegistry,
IndexService.ShardStoreDeleter shardStoreDeleter,
CircuitBreakerService circuitBreakerService,
BigArrays bigArrays,
ThreadPool threadPool,
ScriptService scriptService,
ClusterService clusterService,
Client client,
IndicesQueryCache indicesQueryCache,
MapperRegistry mapperRegistry,
IndicesFieldDataCache indicesFieldDataCache,
NamedWriteableRegistry namedWriteableRegistry,
BooleanSupplier idFieldDataEnabled,
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -689,7 +740,8 @@ public IndexService newIndexService(
recoverySettings,
remoteStoreSettings,
fileCache,
compositeIndexSettings
compositeIndexSettings,
replicator
);
success = true;
return indexService;
Expand Down
70 changes: 68 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@
import static java.util.Collections.emptyMap;
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.common.collect.MapBuilder.newMapBuilder;
import static org.opensearch.common.util.FeatureFlags.READER_WRITER_SPLIT_EXPERIMENTAL_SETTING;
import static org.opensearch.index.remote.RemoteMigrationIndexMetadataUpdater.indexHasRemoteStoreSettings;

/**
Expand Down Expand Up @@ -174,6 +175,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private volatile AsyncTranslogFSync fsyncTask;
private volatile AsyncGlobalCheckpointTask globalCheckpointTask;
private volatile AsyncRetentionLeaseSyncTask retentionLeaseSyncTask;
private volatile AsyncReplicationTask asyncReplicationTask;

// don't convert to Setting<> and register... we only set this in tests and register via a plugin
private final String INDEX_TRANSLOG_RETENTION_CHECK_INTERVAL_SETTING = "index.translog.retention.check_interval";
Expand All @@ -194,6 +196,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final RemoteStoreSettings remoteStoreSettings;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -231,7 +234,8 @@ public IndexService(
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -306,11 +310,15 @@ public IndexService(
this.trimTranslogTask = new AsyncTrimTranslogTask(this);
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
this.asyncReplicationTask = new AsyncReplicationTask(this);
}
this.translogFactorySupplier = translogFactorySupplier;
this.recoverySettings = recoverySettings;
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -386,7 +394,8 @@ public IndexService(
recoverySettings,
remoteStoreSettings,
null,
null
null,
s -> {}
);
}

Expand All @@ -395,6 +404,11 @@ static boolean needsMapperService(IndexSettings indexSettings, IndexCreationCont
&& indexCreationContext == IndexCreationContext.CREATE_INDEX); // metadata verification needs a mapper service
}

// visible for tests
AsyncReplicationTask getReplicationTask() {
return asyncReplicationTask;
}

/**
* Context for index creation
*
Expand Down Expand Up @@ -1065,11 +1079,22 @@ public synchronized void updateMetadata(final IndexMetadata currentIndexMetadata
}
onRefreshIntervalChange();
updateFsyncTaskIfNecessary();
if (READER_WRITER_SPLIT_EXPERIMENTAL_SETTING.get(indexSettings.getNodeSettings())) {
updateReplicationTask();
}
}

metadataListeners.forEach(c -> c.accept(newIndexMetadata));
}

private void updateReplicationTask() {
try {
asyncReplicationTask.close();
} finally {
asyncReplicationTask = new AsyncReplicationTask(this);
}
}

/**
* Called whenever the refresh interval changes. This can happen in 2 cases -
* 1. {@code cluster.default.index.refresh_interval} cluster setting changes. The change would only happen for
Expand Down Expand Up @@ -1334,6 +1359,47 @@ public String toString() {
}
}

final class AsyncReplicationTask extends BaseAsyncTask {

AsyncReplicationTask(IndexService indexService) {
super(indexService, indexService.getRefreshInterval());
}

@Override
protected void runInternal() {
indexService.maybeSyncSegments(false);
}

@Override
protected String getThreadPool() {
return ThreadPool.Names.GENERIC;
}

@Override
public String toString() {
return "replication";
}

@Override
protected boolean mustReschedule() {
return indexSettings.isSegRepEnabledOrRemoteNode() && super.mustReschedule();
}
}

private void maybeSyncSegments(boolean force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
replicator.accept(shard);
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// do nothing
}
}
}
}

final class AsyncTrimTranslogTask extends BaseAsyncTask {

AsyncTrimTranslogTask(IndexService indexService) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1251,12 +1251,13 @@ public ReplicationCheckpoint getLatestReplicationCheckpoint() {
return this.latestReplicationCheckpoint;
}

private boolean isPrimaryRelocation(String allocationId) {
// skip any shard that is a relocating primary or search only replica (not tracked by primary)
private boolean shouldSkipReplicationTimer(String allocationId) {
Optional<ShardRouting> shardRouting = routingTable.shards()
.stream()
.filter(routing -> routing.allocationId().getId().equals(allocationId))
.findAny();
return shardRouting.isPresent() && shardRouting.get().primary();
return shardRouting.isPresent() && (shardRouting.get().primary() || shardRouting.get().isSearchOnly());
}

private void createReplicationLagTimers() {
Expand All @@ -1268,7 +1269,7 @@ private void createReplicationLagTimers() {
// it is possible for a shard to be in-sync but not yet removed from the checkpoints collection after a failover event.
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(allocationId) == false
&& shouldSkipReplicationTimer(allocationId) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& (indexSettings.isSegRepLocalEnabled() == true
|| isShardOnRemoteEnabledNode.apply(routingTable.getByAllocationId(allocationId).currentNodeId()))) {
Expand Down Expand Up @@ -1302,7 +1303,7 @@ public synchronized void startReplicationLagTimers(ReplicationCheckpoint checkpo
final CheckpointState cps = e.getValue();
if (cps.inSync
&& replicationGroup.getUnavailableInSyncShards().contains(allocationId) == false
&& isPrimaryRelocation(e.getKey()) == false
&& shouldSkipReplicationTimer(e.getKey()) == false
&& latestReplicationCheckpoint.isAheadOf(cps.visibleReplicationCheckpoint)
&& cps.checkpointTimers.containsKey(latestReplicationCheckpoint)) {
cps.checkpointTimers.get(latestReplicationCheckpoint).start();
Expand Down Expand Up @@ -1330,7 +1331,7 @@ public synchronized Set<SegmentReplicationShardStats> getSegmentReplicationStats
entry -> entry.getKey().equals(this.shardAllocationId) == false
&& entry.getValue().inSync
&& replicationGroup.getUnavailableInSyncShards().contains(entry.getKey()) == false
&& isPrimaryRelocation(entry.getKey()) == false
&& shouldSkipReplicationTimer(entry.getKey()) == false
/*Check if the current primary shard is migrating to remote and
all the other shard copies of the same index still hasn't completely moved over
to the remote enabled nodes. Ensures that:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final SearchRequestStats searchRequestStats;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;

@Override
protected void doStart() {
Expand Down Expand Up @@ -395,7 +396,8 @@ public IndicesService(
CacheService cacheService,
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down Expand Up @@ -504,6 +506,7 @@ protected void closeInternal() {
this.remoteStoreSettings = remoteStoreSettings;
this.compositeIndexSettings = compositeIndexSettings;
this.fileCache = fileCache;
this.replicator = replicator;
}

public IndicesService(
Expand Down Expand Up @@ -564,6 +567,7 @@ public IndicesService(
cacheService,
remoteStoreSettings,
null,
null,
null
);
}
Expand Down Expand Up @@ -980,7 +984,8 @@ private synchronized IndexService createIndexService(
translogFactorySupplier,
this::getClusterDefaultRefreshInterval,
this.recoverySettings,
this.remoteStoreSettings
this.remoteStoreSettings,
replicator
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ public SegmentReplicationTargetService(
ForceSyncRequest::new,
new ForceSyncTransportRequestHandler()
);
replicator.setSourceFactory(sourceFactory);
}

@Override
Expand Down
Loading

0 comments on commit dc95d38

Please sign in to comment.