Skip to content

Commit

Permalink
Fix red index on close for remote translog
Browse files Browse the repository at this point in the history
Signed-off-by: Ashish Singh <[email protected]>
  • Loading branch information
ashking94 committed Sep 23, 2024
1 parent b984b9f commit 89789f4
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.SearchPhaseExecutionException;
import org.opensearch.client.Requests;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
Expand Down Expand Up @@ -202,7 +203,7 @@ public void testRemoteTranslogCleanup() throws Exception {

public void testStaleCommitDeletionWithInvokeFlush() throws Exception {
String dataNode = internalCluster().startNode();
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000l, -1));
createIndex(INDEX_NAME, remoteStoreIndexSettings(1, 10000L, -1));
int numberOfIterations = randomIntBetween(5, 15);
indexData(numberOfIterations, true, INDEX_NAME);
String segmentsPathFixedPrefix = RemoteStoreSettings.CLUSTER_REMOTE_STORE_SEGMENTS_PATH_PREFIX.get(getNodeSettings());
Expand Down Expand Up @@ -1011,4 +1012,73 @@ public void testAsyncTranslogDurabilityRestrictionsThroughIdxTemplates() throws
.get()
);
}

public void testCloseIndexWithNoOpSyncAndFlushForSyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
client().admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), "5s"))
.get();
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
if (randomBoolean()) {
for (int i = 0; i < randomIntBetween(1, 5); i++) {
indexSingleDoc(INDEX_NAME);
}
flushAndRefresh(INDEX_NAME);
}
// Index single doc to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
// Index another doc and in this case the flush would have happened before the sync.
indexSingleDoc(INDEX_NAME);
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}

public void testCloseIndexWithNoOpSyncAndFlushForAsyncTranslog() throws InterruptedException {
internalCluster().startNodes(3);
Settings.Builder settings = Settings.builder()
.put(remoteStoreIndexSettings(0, 10000L, -1))
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "1s")
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Durability.ASYNC)
.put(IndexSettings.INDEX_TRANSLOG_SYNC_INTERVAL_SETTING.getKey(), "10s");
createIndex(INDEX_NAME, settings.build());
CountDownLatch latch = new CountDownLatch(1);
new Thread(() -> {
// Index some docs to start the asyn io processor to run which will lead to 10s wait time before the next sync.
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
indexSingleDoc(INDEX_NAME);
// Reduce the latch for the main thread to flush after some sleep.
latch.countDown();
}).start();
// Wait for atleast one doc to be ingested.
latch.await();
// Sleep for some time for the next doc to be present in lucene buffer. If flush happens first before the doc #2
// gets indexed, then it goes into the happy case where the close index happens succefully.
Thread.sleep(1000);
// Flush so that the subsequent sync or flushes are no-op.
flush(INDEX_NAME);
// Closing the index involves translog.sync and shard.flush which are now no-op.
client().admin().indices().close(Requests.closeIndexRequest(INDEX_NAME)).actionGet();
Thread.sleep(10000);
ensureGreen(INDEX_NAME);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ public class RemoteFsTranslog extends Translog {
// min generation referred by last uploaded translog
protected volatile long minRemoteGenReferenced;

// the max global checkpoint that has been synced
protected volatile long globalCheckpointSynced;

// clean up translog folder uploaded by previous primaries once
protected final SetOnce<Boolean> olderPrimaryCleaned = new SetOnce<>();

Expand Down Expand Up @@ -437,9 +440,10 @@ private boolean upload(long primaryTerm, long generation, long maxSeqNo) throws
config.getNodeId()
).build()
) {
Checkpoint checkpoint = current.getLastSyncedCheckpoint();
return translogTransferManager.transferSnapshot(
transferSnapshotProvider,
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo)
new RemoteFsTranslogTransferListener(generation, primaryTerm, maxSeqNo, checkpoint.globalCheckpoint)
);
} finally {
syncPermit.release(SYNC_PERMIT);
Expand Down Expand Up @@ -474,7 +478,10 @@ public void sync() throws IOException {
public boolean syncNeeded() {
try (ReleasableLock lock = readLock.acquire()) {
return current.syncNeeded()
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0);
|| (maxRemoteTranslogGenerationUploaded + 1 < this.currentFileGeneration() && current.totalOperations() == 0)
// The below condition on GCP exists to handle global checkpoint updates during close index.
// Refer issue - https://github.com/opensearch-project/OpenSearch/issues/15989
|| (globalCheckpointSupplier.getAsLong() > globalCheckpointSynced);
}
}

Expand Down Expand Up @@ -674,16 +681,24 @@ private class RemoteFsTranslogTransferListener implements TranslogTransferListen

private final long maxSeqNo;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo) {
private final long globalCheckpoint;

RemoteFsTranslogTransferListener(long generation, long primaryTerm, long maxSeqNo, long globalCheckpoint) {
this.generation = generation;
this.primaryTerm = primaryTerm;
this.maxSeqNo = maxSeqNo;
this.globalCheckpoint = globalCheckpoint;
}

@Override
public void onUploadComplete(TransferSnapshot transferSnapshot) throws IOException {
maxRemoteTranslogGenerationUploaded = generation;
minRemoteGenReferenced = getMinFileGeneration();
// Update the global checkpoint only if the supplied global checkpoint is greater than it
// When a new writer is created the
if (globalCheckpoint > globalCheckpointSynced) {
globalCheckpointSynced = globalCheckpoint;
}
logger.debug(
"Successfully uploaded translog for primary term = {}, generation = {}, maxSeqNo = {}",
primaryTerm,
Expand Down

0 comments on commit 89789f4

Please sign in to comment.