Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Segment Replication] Add PIT/Scroll compatibility with Segment Replication #6644 #6765

Merged
merged 8 commits into from
Apr 5, 2023
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Remove 'cluster_manager' role attachment when using 'node.master' deprecated setting ([#6331](https://github.com/opensearch-project/OpenSearch/pull/6331))
- Add new cluster settings to ignore weighted round-robin routing and fallback to default behaviour. ([#6834](https://github.com/opensearch-project/OpenSearch/pull/6834))
- Add experimental support for ZSTD compression. ([#3577](https://github.com/opensearch-project/OpenSearch/pull/3577))
- [Segment Replication] Add point in time and scroll query compatibility. ([#6644](https://github.com/opensearch-project/OpenSearch/pull/6644))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.18.0 to 2.20.0 ([#6490](https://github.com/opensearch-project/OpenSearch/pull/6490))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public NRTReplicationEngine(EngineConfig engineConfig) {
WriteOnlyTranslogManager translogManagerRef = null;
try {
lastCommittedSegmentInfos = store.readLastCommittedSegmentsInfo();
readerManager = new NRTReplicationReaderManager(OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId));
readerManager = buildReaderManager();
final SequenceNumbers.CommitInfo commitInfo = SequenceNumbers.loadSeqNoInfoFromLuceneCommit(
this.lastCommittedSegmentInfos.getUserData().entrySet()
);
Expand Down Expand Up @@ -119,6 +119,28 @@ public void onAfterTranslogSync() {
}
}

private NRTReplicationReaderManager buildReaderManager() throws IOException {
return new NRTReplicationReaderManager(
OpenSearchDirectoryReader.wrap(getDirectoryReader(), shardId),
store::incRefFileDeleter,
(files) -> {
store.decRefFileDeleter(files);
try {
store.cleanupAndPreserveLatestCommitPoint(
"On reader closed",
getLatestSegmentInfos(),
getLastCommittedSegmentInfos(),
false
);
} catch (IOException e) {
// Log but do not rethrow - we can try cleaning up again after next replication cycle.
// If that were to fail, the shard will as well.
logger.error("Unable to clean store after reader closed", e);
}
}
);
}

@Override
public TranslogManager translogManager() {
return translogManager;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
import java.util.function.Consumer;

/**
* This is an extension of {@link OpenSearchReaderManager} for use with {@link NRTReplicationEngine}.
Expand All @@ -35,17 +37,27 @@ public class NRTReplicationReaderManager extends OpenSearchReaderManager {

private final static Logger logger = LogManager.getLogger(NRTReplicationReaderManager.class);
private volatile SegmentInfos currentInfos;
private Consumer<Collection<String>> onReaderClosed;
private Consumer<Collection<String>> onNewReader;

/**
* Creates and returns a new SegmentReplicationReaderManager from the given
* already-opened {@link OpenSearchDirectoryReader}, stealing
* the incoming reference.
*
* @param reader the SegmentReplicationReaderManager to use for future reopens
* @param reader - The SegmentReplicationReaderManager to use for future reopens.
* @param onNewReader - Called when a new reader is created.
* @param onReaderClosed - Called when a reader is closed.
*/
NRTReplicationReaderManager(OpenSearchDirectoryReader reader) {
NRTReplicationReaderManager(
OpenSearchDirectoryReader reader,
Consumer<Collection<String>> onNewReader,
Consumer<Collection<String>> onReaderClosed
) {
super(reader);
currentInfos = unwrapStandardReader(reader).getSegmentInfos();
this.onNewReader = onNewReader;
this.onReaderClosed = onReaderClosed;
}

@Override
Expand All @@ -60,6 +72,7 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
for (LeafReaderContext ctx : standardDirectoryReader.leaves()) {
subs.add(ctx.reader());
}
final Collection<String> files = currentInfos.files(false);
DirectoryReader innerReader = StandardDirectoryReader.open(referenceToRefresh.directory(), currentInfos, subs, null);
final DirectoryReader softDeletesDirectoryReaderWrapper = new SoftDeletesDirectoryReaderWrapper(
innerReader,
Expand All @@ -68,7 +81,13 @@ protected OpenSearchDirectoryReader refreshIfNeeded(OpenSearchDirectoryReader re
logger.trace(
() -> new ParameterizedMessage("updated to SegmentInfosVersion=" + currentInfos.getVersion() + " reader=" + innerReader)
);
return OpenSearchDirectoryReader.wrap(softDeletesDirectoryReaderWrapper, referenceToRefresh.shardId());
final OpenSearchDirectoryReader reader = OpenSearchDirectoryReader.wrap(
softDeletesDirectoryReaderWrapper,
referenceToRefresh.shardId()
);
onNewReader.accept(files);
OpenSearchDirectoryReader.addReaderCloseListener(reader, key -> onReaderClosed.accept(files));
return reader;
}

/**
Expand All @@ -89,7 +108,7 @@ public SegmentInfos getSegmentInfos() {
return currentInfos;
}

private StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
public static StandardDirectoryReader unwrapStandardReader(OpenSearchDirectoryReader reader) {
final DirectoryReader delegate = reader.getDelegate();
if (delegate instanceof SoftDeletesDirectoryReaderWrapper) {
return (StandardDirectoryReader) ((SoftDeletesDirectoryReaderWrapper) delegate).getDelegate();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.index.store;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

/**
* This class is a version of Lucene's ReplicaFileDeleter class used to keep track of
* segment files that should be preserved on replicas between replication events.
* The difference is this component does not actually perform any deletions, it only handles refcounts.
* Our deletions are made through Store.java.
*
* https://github.com/apache/lucene/blob/main/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/ReplicaFileDeleter.java
*
* @opensearch.internal
*/
final class ReplicaFileTracker {

private final Map<String, Integer> refCounts = new HashMap<>();

public synchronized void incRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
refCounts.merge(fileName, 1, Integer::sum);
}
}

public synchronized void decRef(Collection<String> fileNames) {
for (String fileName : fileNames) {
Integer curCount = refCounts.get(fileName);
assert curCount != null : "fileName=" + fileName;
assert curCount > 0;
if (curCount == 1) {
refCounts.remove(fileName);
} else {
refCounts.put(fileName, curCount - 1);
}
}
}

public synchronized boolean canDelete(String fileName) {
return refCounts.containsKey(fileName) == false;
}
}
64 changes: 59 additions & 5 deletions server/src/main/java/org/opensearch/index/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import static java.util.Collections.unmodifiableMap;
import static org.opensearch.index.seqno.SequenceNumbers.LOCAL_CHECKPOINT_KEY;
import static org.opensearch.index.store.Store.MetadataSnapshot.loadMetadata;
import static org.opensearch.indices.replication.SegmentReplicationTarget.REPLICATION_PREFIX;

/**
* A Store provides plain access to files written by an opensearch index shard. Each shard
Expand Down Expand Up @@ -182,6 +183,10 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
private final ShardLock shardLock;
private final OnClose onClose;

// used to ref count files when a new Reader is opened for PIT/Scroll queries
// prevents segment files deletion until the PIT/Scroll expires or is discarded
private final ReplicaFileTracker replicaFileTracker;
dreamer-89 marked this conversation as resolved.
Show resolved Hide resolved

private final AbstractRefCounted refCounter = new AbstractRefCounted("store") {
@Override
protected void closeInternal() {
Expand All @@ -202,6 +207,7 @@ public Store(ShardId shardId, IndexSettings indexSettings, Directory directory,
this.directory = new StoreDirectory(sizeCachingDir, Loggers.getLogger("index.store.deletes", shardId));
this.shardLock = shardLock;
this.onClose = onClose;
this.replicaFileTracker = indexSettings.isSegRepEnabled() ? new ReplicaFileTracker() : null;

assert onClose != null;
assert shardLock != null;
Expand Down Expand Up @@ -782,34 +788,70 @@ public void cleanupAndVerify(String reason, MetadataSnapshot sourceMetadata) thr
}

/**
* Segment Replication method -
* Segment Replication method
* This method deletes every file in this store that is not referenced by the passed in SegmentInfos or
* part of the latest on-disk commit point.
*
* This method is used for segment replication when the in memory SegmentInfos can be ahead of the on disk segment file.
* In this case files from both snapshots must be preserved. Verification has been done that all files are present on disk.
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(String reason, SegmentInfos infos) throws IOException {
this.cleanupAndPreserveLatestCommitPoint(reason, infos, readLastCommittedSegmentsInfo(), true);
}

/**
* Segment Replication method
*
* Similar to {@link Store#cleanupAndPreserveLatestCommitPoint(String, SegmentInfos)} with extra parameters for cleanup
*
* This method deletes every file in this store. Except
* 1. Files referenced by the passed in SegmentInfos, usually in-memory segment infos copied from primary
* 2. Files part of the passed in segment infos, typically the last committed segment info
* 3. Files incremented by active reader for pit/scroll queries
* 4. Temporary replication file if passed in deleteTempFiles is true.
*
* @param reason the reason for this cleanup operation logged for each deleted file
* @param infos {@link SegmentInfos} Files from this infos will be preserved on disk if present.
* @param lastCommittedSegmentInfos {@link SegmentInfos} Last committed segment infos
* @param deleteTempFiles Does this clean up delete temporary replication files
*
* @throws IllegalStateException if the latest snapshot in this store differs from the given one after the cleanup.
*/
public void cleanupAndPreserveLatestCommitPoint(
String reason,
SegmentInfos infos,
SegmentInfos lastCommittedSegmentInfos,
boolean deleteTempFiles
) throws IOException {
assert indexSettings.isSegRepEnabled();
// fetch a snapshot from the latest on disk Segments_N file. This can be behind
// the passed in local in memory snapshot, so we want to ensure files it references are not removed.
metadataLock.writeLock().lock();
try (Lock writeLock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
cleanupFiles(reason, getMetadata(readLastCommittedSegmentsInfo()), infos.files(true));
cleanupFiles(reason, getMetadata(lastCommittedSegmentInfos), infos.files(true), deleteTempFiles);
} finally {
metadataLock.writeLock().unlock();
}
}

private void cleanupFiles(String reason, MetadataSnapshot localSnapshot, @Nullable Collection<String> additionalFiles)
throws IOException {
private void cleanupFiles(
String reason,
MetadataSnapshot localSnapshot,
@Nullable Collection<String> additionalFiles,
boolean deleteTempFiles
) throws IOException {
assert metadataLock.isWriteLockedByCurrentThread();
for (String existingFile : directory.listAll()) {
if (Store.isAutogenerated(existingFile)
|| localSnapshot.contains(existingFile)
|| (additionalFiles != null && additionalFiles.contains(existingFile))) {
|| (additionalFiles != null && additionalFiles.contains(existingFile))
// also ensure we are not deleting a file referenced by an active reader.
|| replicaFileTracker != null && replicaFileTracker.canDelete(existingFile) == false
// prevent temporary file deletion during reader cleanup
|| deleteTempFiles == false && existingFile.startsWith(REPLICATION_PREFIX)) {
// don't delete snapshot file, or the checksums file (note, this is extra protection since the Store won't delete
// checksum)
continue;
Expand Down Expand Up @@ -1909,4 +1951,16 @@ private static IndexWriterConfig newIndexWriterConfig() {
// we also don't specify a codec here and merges should use the engines for this index
.setMergePolicy(NoMergePolicy.INSTANCE);
}

public void incRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileTracker.incRef(files);
}
}

public void decRefFileDeleter(Collection<String> files) {
if (this.indexSettings.isSegRepEnabled()) {
this.replicaFileTracker.decRef(files);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public class SegmentReplicationTarget extends ReplicationTarget {
private final SegmentReplicationState state;
protected final MultiFileWriter multiFileWriter;

public final static String REPLICATION_PREFIX = "replication.";

public ReplicationCheckpoint getCheckpoint() {
return this.checkpoint;
}
Expand Down Expand Up @@ -85,7 +87,7 @@ protected void closeInternal() {

@Override
protected String getPrefix() {
return "replication." + UUIDs.randomBase64UUID() + ".";
return REPLICATION_PREFIX + UUIDs.randomBase64UUID() + ".";
}

@Override
Expand Down